This is an automated email from the ASF dual-hosted git repository.

bchapuis pushed a commit to branch geoparquet-filtering-and-simplification
in repository https://gitbox.apache.org/repos/asf/incubator-baremaps.git

commit 38e561c3d91d91a83339bb0333e28ddf965b0832
Author: Bertil Chapuis <[email protected]>
AuthorDate: Sat Oct 5 23:20:28 2024 +0200

    Make the spliterator an internal class
---
 .../storage/geoparquet/GeoParquetDataTable.java    |   2 +-
 .../geoparquet/GeoParquetTypeConversion.java       |   4 +-
 .../geoparquet/GeoParquetGroupSpliterator.java     | 134 ---------------
 .../baremaps/geoparquet/GeoParquetReader.java      | 186 +++++++++++++++------
 .../baremaps/geoparquet/data/GeoParquetGroup.java  |  14 +-
 .../geoparquet/data/GeoParquetGroupImpl.java       |  10 +-
 6 files changed, 148 insertions(+), 202 deletions(-)

diff --git 
a/baremaps-core/src/main/java/org/apache/baremaps/storage/geoparquet/GeoParquetDataTable.java
 
b/baremaps-core/src/main/java/org/apache/baremaps/storage/geoparquet/GeoParquetDataTable.java
index 79836b2e..90c78d1a 100644
--- 
a/baremaps-core/src/main/java/org/apache/baremaps/storage/geoparquet/GeoParquetDataTable.java
+++ 
b/baremaps-core/src/main/java/org/apache/baremaps/storage/geoparquet/GeoParquetDataTable.java
@@ -61,7 +61,7 @@ public class GeoParquetDataTable implements DataTable {
   @Override
   public Stream<DataRow> parallelStream() {
     return reader.readParallel().map(group -> new DataRowImpl(
-        GeoParquetTypeConversion.asSchema(path.toString(), group.getSchema()),
+        GeoParquetTypeConversion.asSchema(path.toString(), 
group.getGeoParquetSchema()),
         GeoParquetTypeConversion.asRowValues(group)));
   }
 
diff --git 
a/baremaps-core/src/main/java/org/apache/baremaps/storage/geoparquet/GeoParquetTypeConversion.java
 
b/baremaps-core/src/main/java/org/apache/baremaps/storage/geoparquet/GeoParquetTypeConversion.java
index 435effa5..eed61e8d 100644
--- 
a/baremaps-core/src/main/java/org/apache/baremaps/storage/geoparquet/GeoParquetTypeConversion.java
+++ 
b/baremaps-core/src/main/java/org/apache/baremaps/storage/geoparquet/GeoParquetTypeConversion.java
@@ -67,7 +67,7 @@ public class GeoParquetTypeConversion {
 
   public static List<Object> asRowValues(GeoParquetGroup group) {
     List<Object> values = new ArrayList<>();
-    Schema schema = group.getSchema();
+    Schema schema = group.getGeoParquetSchema();
     List<Field> fields = schema.fields();
     for (int i = 0; i < fields.size(); i++) {
       if (group.getValues(i).isEmpty()) {
@@ -93,7 +93,7 @@ public class GeoParquetTypeConversion {
 
   public static Map<String, Object> asNested(GeoParquetGroup group) {
     Map<String, Object> nested = new HashMap<>();
-    Schema schema = group.getSchema();
+    Schema schema = group.getGeoParquetSchema();
     List<Field> fields = schema.fields();
     for (int i = 0; i < fields.size(); i++) {
       if (group.getValues(i).isEmpty()) {
diff --git 
a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetGroupSpliterator.java
 
b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetGroupSpliterator.java
deleted file mode 100644
index 0fe0702a..00000000
--- 
a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetGroupSpliterator.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.baremaps.geoparquet;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Spliterator;
-import java.util.function.Consumer;
-import org.apache.baremaps.geoparquet.data.GeoParquetGroup;
-import org.apache.baremaps.geoparquet.hadoop.GeoParquetGroupReadSupport;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.parquet.hadoop.ParquetReader;
-
-class GeoParquetGroupSpliterator implements Spliterator<GeoParquetGroup> {
-
-  private final GeoParquetReader geoParquetReader;
-  private final List<FileStatus> fileStatuses;
-  private int currentFileIndex;
-  private int currentEndIndex;
-  private ParquetReader<GeoParquetGroup> reader;
-
-  public GeoParquetGroupSpliterator(GeoParquetReader geoParquetReader, 
List<FileStatus> files) {
-    this(geoParquetReader, files, 0, files.size());
-  }
-
-  GeoParquetGroupSpliterator(
-      GeoParquetReader geoParquetReader,
-      List<FileStatus> fileStatuses,
-      int startIndex,
-      int endIndex) {
-    this.geoParquetReader = geoParquetReader;
-    this.fileStatuses = fileStatuses;
-    this.currentFileIndex = startIndex;
-    this.currentEndIndex = endIndex;
-    setupReaderForNextFile();
-  }
-
-  private void setupReaderForNextFile() {
-    closeCurrentReader();
-
-    if (currentFileIndex >= currentEndIndex) {
-      reader = null;
-      return;
-    }
-
-    FileStatus fileStatus = fileStatuses.get(currentFileIndex++);
-    try {
-      reader = createParquetReader(fileStatus);
-    } catch (IOException e) {
-      throw new GeoParquetException("Failed to create reader for " + 
fileStatus, e);
-    }
-  }
-
-  private void closeCurrentReader() {
-    if (reader != null) {
-      try {
-        reader.close();
-      } catch (IOException e) {
-        // Ignore exceptions during close
-      }
-      reader = null;
-    }
-  }
-
-  @Override
-  public boolean tryAdvance(Consumer<? super GeoParquetGroup> action) {
-    try {
-      while (true) {
-        if (reader == null) {
-          return false;
-        }
-
-        GeoParquetGroup group = reader.read();
-
-        if (group == null) {
-          setupReaderForNextFile();
-          continue;
-        }
-
-        action.accept(group);
-        return true;
-      }
-    } catch (IOException e) {
-      closeCurrentReader();
-      throw new GeoParquetException("IOException caught while trying to read 
the next file.", e);
-    }
-  }
-
-  private ParquetReader<GeoParquetGroup> createParquetReader(FileStatus file)
-      throws IOException {
-    return ParquetReader.builder(new GeoParquetGroupReadSupport(), 
file.getPath())
-        .withConf(geoParquetReader.getConfiguration())
-        .build();
-  }
-
-  @Override
-  public Spliterator<GeoParquetGroup> trySplit() {
-    int remainingFiles = currentEndIndex - currentFileIndex;
-    if (remainingFiles <= 1) {
-      return null;
-    }
-    int mid = currentFileIndex + remainingFiles / 2;
-    GeoParquetGroupSpliterator split = new GeoParquetGroupSpliterator(
-        geoParquetReader, fileStatuses, mid, currentEndIndex);
-    this.currentEndIndex = mid;
-    return split;
-  }
-
-  @Override
-  public long estimateSize() {
-    // Return Long.MAX_VALUE as the actual number of elements is unknown
-    return Long.MAX_VALUE;
-  }
-
-  @Override
-  public int characteristics() {
-    return NONNULL | IMMUTABLE;
-  }
-}
diff --git 
a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetReader.java
 
b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetReader.java
index 41a41db2..3a9a705b 100644
--- 
a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetReader.java
+++ 
b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetReader.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.net.URI;
 import java.util.*;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
@@ -30,6 +31,7 @@ import org.apache.baremaps.geoparquet.data.GeoParquetGroup;
 import org.apache.baremaps.geoparquet.data.GeoParquetGroup.Schema;
 import org.apache.baremaps.geoparquet.data.GeoParquetGroupFactory;
 import org.apache.baremaps.geoparquet.data.GeoParquetMetadata;
+import org.apache.baremaps.geoparquet.hadoop.GeoParquetGroupReadSupport;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -37,6 +39,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider;
 import org.apache.hadoop.fs.s3a.S3AFileSystem;
 import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetReader;
 import org.apache.parquet.hadoop.metadata.BlockMetaData;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
 import org.apache.parquet.schema.MessageType;
@@ -46,42 +49,20 @@ import org.apache.parquet.schema.MessageType;
  */
 public class GeoParquetReader {
 
-  private final URI uri;
   private final Configuration configuration;
   private final List<FileStatus> files;
   private final AtomicLong groupCount = new AtomicLong(-1);
 
-  private static class FileInfo {
-    final FileStatus file;
-    final long recordCount;
-    final Map<String, String> keyValueMetadata;
-    final MessageType messageType;
-    final GeoParquetMetadata metadata;
-    final Schema geoParquetSchema;
-
-    FileInfo(FileStatus file, long recordCount, Map<String, String> 
keyValueMetadata,
-        MessageType messageType, GeoParquetMetadata metadata,
-        Schema geoParquetSchema) {
-      this.file = file;
-      this.recordCount = recordCount;
-      this.keyValueMetadata = keyValueMetadata;
-      this.messageType = messageType;
-      this.metadata = metadata;
-      this.geoParquetSchema = geoParquetSchema;
-    }
-  }
-
   public GeoParquetReader(URI uri) {
-    this(uri, createConfiguration());
+    this(uri, createDefaultConfiguration());
   }
 
   public GeoParquetReader(URI uri, Configuration configuration) {
-    this.uri = uri;
     this.configuration = configuration;
-    this.files = initializeFiles();
+    this.files = initializeFiles(uri, configuration);
   }
 
-  private List<FileStatus> initializeFiles() {
+  private static List<FileStatus> initializeFiles(URI uri, Configuration 
configuration) {
     try {
       Path globPath = new Path(uri.getPath());
       FileSystem fileSystem = FileSystem.get(uri, configuration);
@@ -105,22 +86,18 @@ public class GeoParquetReader {
 
   private FileInfo getFileInfo(FileStatus fileStatus) {
     try {
-      long recordCount;
-      MessageType messageType;
-      Map<String, String> keyValueMetadata;
-
       ParquetMetadata parquetMetadata =
           ParquetFileReader.readFooter(configuration, fileStatus.getPath());
-      recordCount = parquetMetadata.getBlocks().stream()
+      long recordCount = parquetMetadata.getBlocks().stream()
           .mapToLong(BlockMetaData::getRowCount)
           .sum();
 
-      messageType = parquetMetadata.getFileMetaData().getSchema();
-      keyValueMetadata = 
parquetMetadata.getFileMetaData().getKeyValueMetaData();
+      MessageType messageType = parquetMetadata.getFileMetaData().getSchema();
+      Map<String, String> keyValueMetadata =
+          parquetMetadata.getFileMetaData().getKeyValueMetaData();
 
       GeoParquetMetadata geoParquetMetadata = null;
       Schema geoParquetSchema = null;
-
       if (keyValueMetadata.containsKey("geo")) {
         ObjectMapper objectMapper = new ObjectMapper();
         
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
@@ -130,8 +107,14 @@ public class GeoParquetReader {
             GeoParquetGroupFactory.createGeoParquetSchema(messageType, 
geoParquetMetadata);
       }
 
-      return new FileInfo(fileStatus, recordCount, keyValueMetadata, 
messageType,
-          geoParquetMetadata, geoParquetSchema);
+      return new FileInfo(
+          fileStatus,
+          recordCount,
+          keyValueMetadata,
+          messageType,
+          geoParquetMetadata,
+          geoParquetSchema);
+
     } catch (IOException e) {
       throw new GeoParquetException("Failed to build FileInfo for file: " + 
fileStatus, e);
     }
@@ -141,16 +124,20 @@ public class GeoParquetReader {
     return files.stream()
         .findFirst()
         .map(this::getFileInfo)
-        .orElseThrow(
-            () -> new GeoParquetException("No files available to read 
metadata.")).metadata;
+        .orElseThrow(this::noParquetFilesAvailable)
+        .metadata();
   }
 
   public Schema getGeoParquetSchema() {
     return files.stream()
         .findFirst()
         .map(this::getFileInfo)
-        .orElseThrow(
-            () -> new GeoParquetException("No files available to read 
schema.")).geoParquetSchema;
+        .orElseThrow(this::noParquetFilesAvailable)
+        .geoParquetSchema();
+  }
+
+  public GeoParquetException noParquetFilesAvailable() {
+    return new GeoParquetException("No parquet files available.");
   }
 
   public boolean validateSchemasAreIdentical() {
@@ -173,13 +160,8 @@ public class GeoParquetReader {
     return groupCount.get();
   }
 
-  public Stream<GeoParquetGroup> readParallel() {
-    return retrieveGeoParquetGroups(true);
-  }
-
   private Stream<GeoParquetGroup> retrieveGeoParquetGroups(boolean inParallel) 
{
-    Spliterator<GeoParquetGroup> spliterator =
-        new GeoParquetGroupSpliterator(this, files, 0, files.size());
+    Spliterator<GeoParquetGroup> spliterator = new GeoParquetSpliterator(0, 
files.size());
     return StreamSupport.stream(spliterator, inParallel);
   }
 
@@ -187,7 +169,11 @@ public class GeoParquetReader {
     return retrieveGeoParquetGroups(false);
   }
 
-  private static Configuration createConfiguration() {
+  public Stream<GeoParquetGroup> readParallel() {
+    return retrieveGeoParquetGroups(true);
+  }
+
+  private static Configuration createDefaultConfiguration() {
     Configuration conf = new Configuration();
     conf.set("fs.s3a.endpoint", "s3.us-west-2.amazonaws.com");
     conf.set("fs.s3a.aws.credentials.provider", 
AnonymousAWSCredentialsProvider.class.getName());
@@ -196,15 +182,109 @@ public class GeoParquetReader {
     return conf;
   }
 
-  public URI getUri() {
-    return uri;
-  }
+  private record FileInfo(
+      FileStatus file,
+      long recordCount,
+      Map<String, String> keyValueMetadata,
+      MessageType messageType,
+      GeoParquetMetadata metadata,
+      Schema geoParquetSchema) {
 
-  public Configuration getConfiguration() {
-    return configuration;
   }
 
-  public List<FileStatus> getFiles() {
-    return files;
+  private class GeoParquetSpliterator implements Spliterator<GeoParquetGroup> {
+
+    private int currentFileIndex;
+    private int currentEndIndex;
+    private ParquetReader<GeoParquetGroup> reader;
+
+    GeoParquetSpliterator(
+        int startIndex,
+        int endIndex) {
+      this.currentFileIndex = startIndex;
+      this.currentEndIndex = endIndex;
+      setupReaderForNextFile();
+    }
+
+    private void setupReaderForNextFile() {
+      closeCurrentReader();
+
+      if (currentFileIndex >= currentEndIndex) {
+        reader = null;
+        return;
+      }
+
+      FileStatus fileStatus = files.get(currentFileIndex++);
+      try {
+        reader = createParquetReader(fileStatus);
+      } catch (IOException e) {
+        throw new GeoParquetException("Failed to create reader for " + 
fileStatus, e);
+      }
+    }
+
+    private void closeCurrentReader() {
+      if (reader != null) {
+        try {
+          reader.close();
+        } catch (IOException e) {
+          // Ignore exceptions during close
+        }
+        reader = null;
+      }
+    }
+
+    @Override
+    public boolean tryAdvance(Consumer<? super GeoParquetGroup> action) {
+      try {
+        while (true) {
+          if (reader == null) {
+            return false;
+          }
+
+          GeoParquetGroup group = reader.read();
+
+          if (group == null) {
+            setupReaderForNextFile();
+            continue;
+          }
+
+          action.accept(group);
+          return true;
+        }
+      } catch (IOException e) {
+        closeCurrentReader();
+        throw new GeoParquetException("IOException caught while trying to read 
the next file.", e);
+      }
+    }
+
+    private ParquetReader<GeoParquetGroup> createParquetReader(FileStatus file)
+        throws IOException {
+      return ParquetReader.builder(new GeoParquetGroupReadSupport(), 
file.getPath())
+          .withConf(configuration)
+          .build();
+    }
+
+    @Override
+    public Spliterator<GeoParquetGroup> trySplit() {
+      int remainingFiles = currentEndIndex - currentFileIndex;
+      if (remainingFiles <= 1) {
+        return null;
+      }
+      int mid = currentFileIndex + remainingFiles / 2;
+      GeoParquetSpliterator split = new GeoParquetSpliterator(mid, 
currentEndIndex);
+      this.currentEndIndex = mid;
+      return split;
+    }
+
+    @Override
+    public long estimateSize() {
+      // Return Long.MAX_VALUE as the actual number of elements is unknown
+      return Long.MAX_VALUE;
+    }
+
+    @Override
+    public int characteristics() {
+      return NONNULL | IMMUTABLE;
+    }
   }
 }
diff --git 
a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroup.java
 
b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroup.java
index 46e57bb3..81366348 100644
--- 
a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroup.java
+++ 
b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroup.java
@@ -30,19 +30,19 @@ import org.locationtech.jts.geom.Geometry;
 public interface GeoParquetGroup {
 
   /**
-   * Returns the GeoParquet schema of the group built upon the Parquet schema 
and the GeoParquet
-   * metadata.
+   * Returns the Parquet schema of the group.
    *
-   * @return the GeoParquet schema
+   * @return the Parquet schema
    */
-  Schema getSchema();
+  GroupType getParquetSchema();
 
   /**
-   * Returns the Parquet schema of the group.
+   * Returns the GeoParquet schema of the group built upon the Parquet schema 
and the GeoParquet
+   * metadata.
    *
-   * @return the Parquet schema
+   * @return the GeoParquet schema
    */
-  GroupType getParquetSchema();
+  Schema getGeoParquetSchema();
 
   /**
    * Returns the GeoParquet metadata of the group.
diff --git 
a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupImpl.java
 
b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupImpl.java
index a108e383..2a41d531 100644
--- 
a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupImpl.java
+++ 
b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupImpl.java
@@ -288,7 +288,7 @@ public class GeoParquetGroupImpl implements GeoParquetGroup 
{
   }
 
   @Override
-  public Schema getSchema() {
+  public Schema getGeoParquetSchema() {
     return geoParquetSchema;
   }
 
@@ -437,16 +437,16 @@ public class GeoParquetGroupImpl implements 
GeoParquetGroup {
   @Override
   public List<Envelope> getEnvelopeValues(int fieldIndex) {
     return getGroupValues(fieldIndex).stream().map(group -> {
-      double xMin = group.getSchema().fields().get(0).type().equals(Type.FLOAT)
+      double xMin = 
group.getGeoParquetSchema().fields().get(0).type().equals(Type.FLOAT)
           ? (double) group.getFloatValue(0)
           : group.getDoubleValue(0);
-      double yMin = group.getSchema().fields().get(1).type().equals(Type.FLOAT)
+      double yMin = 
group.getGeoParquetSchema().fields().get(1).type().equals(Type.FLOAT)
           ? (double) group.getFloatValue(1)
           : group.getDoubleValue(1);
-      double xMax = group.getSchema().fields().get(2).type().equals(Type.FLOAT)
+      double xMax = 
group.getGeoParquetSchema().fields().get(2).type().equals(Type.FLOAT)
           ? (double) group.getFloatValue(2)
           : group.getDoubleValue(2);
-      double yMax = group.getSchema().fields().get(0).type().equals(Type.FLOAT)
+      double yMax = 
group.getGeoParquetSchema().fields().get(0).type().equals(Type.FLOAT)
           ? (double) group.getFloatValue(3)
           : group.getDoubleValue(3);
       return new Envelope(xMin, xMax, yMin, yMax);

Reply via email to