This is an automated email from the ASF dual-hosted git repository.

bchapuis pushed a commit to branch geoparquet-filtering-and-simplification
in repository https://gitbox.apache.org/repos/asf/incubator-baremaps.git

commit a16bfd496077678f6f426dc5dcd80493e38421ed
Author: Bertil Chapuis <[email protected]>
AuthorDate: Thu Oct 10 01:00:08 2024 +0200

    Add filtering capabilities
---
 .../baremaps/geoparquet/GeoParquetReader.java      |  14 ++-
 .../baremaps/geoparquet/GeoParquetSpliterator.java | 114 ++++++++++++++++++---
 .../baremaps/geoparquet/GeoParquetReaderTest.java  |  22 +++-
 3 files changed, 130 insertions(+), 20 deletions(-)

diff --git 
a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetReader.java
 
b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetReader.java
index e2c94fc3..66059c03 100644
--- 
a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetReader.java
+++ 
b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetReader.java
@@ -38,6 +38,7 @@ import org.apache.parquet.hadoop.metadata.BlockMetaData;
 import org.apache.parquet.hadoop.metadata.FileMetaData;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
 import org.apache.parquet.schema.MessageType;
+import org.locationtech.jts.geom.Envelope;
 
 /**
  * This reader is based on the parquet example code located at: 
org.apache.parquet.example.data.*.
@@ -47,14 +48,20 @@ public class GeoParquetReader {
   protected final Configuration configuration;
   protected final List<FileStatus> files;
   private final AtomicLong groupCount = new AtomicLong(-1);
+  private final Envelope envelope;
 
   public GeoParquetReader(URI uri) {
-    this(uri, createDefaultConfiguration());
+    this(uri, null, createDefaultConfiguration());
   }
 
-  public GeoParquetReader(URI uri, Configuration configuration) {
+  public GeoParquetReader(URI uri, Envelope envelope) {
+    this(uri, envelope, createDefaultConfiguration());
+  }
+
+  public GeoParquetReader(URI uri, Envelope envelope, Configuration 
configuration) {
     this.configuration = configuration;
     this.files = initializeFiles(uri, configuration);
+    this.envelope = envelope;
   }
 
   private static List<FileStatus> initializeFiles(URI uri, Configuration 
configuration) {
@@ -157,7 +164,8 @@ public class GeoParquetReader {
   }
 
   private Stream<GeoParquetGroup> streamGeoParquetGroups(boolean inParallel) {
-    Spliterator<GeoParquetGroup> spliterator = new 
GeoParquetSpliterator(files, configuration, 0, files.size());
+    Spliterator<GeoParquetGroup> spliterator =
+        new GeoParquetSpliterator(files, envelope, configuration, 0, 
files.size());
     return StreamSupport.stream(spliterator, inParallel);
   }
 
diff --git 
a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetSpliterator.java
 
b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetSpliterator.java
index b73e8d19..0a9ba79b 100644
--- 
a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetSpliterator.java
+++ 
b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetSpliterator.java
@@ -23,12 +23,13 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Spliterator;
 import java.util.function.Consumer;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.parquet.column.page.PageReadStore;
 import org.apache.parquet.filter2.compat.FilterCompat;
 import org.apache.parquet.filter2.compat.FilterCompat.Filter;
+import org.apache.parquet.filter2.predicate.FilterApi;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
 import org.apache.parquet.hadoop.ParquetFileReader;
 import org.apache.parquet.hadoop.metadata.FileMetaData;
 import org.apache.parquet.hadoop.util.HadoopInputFile;
@@ -36,13 +37,17 @@ import org.apache.parquet.io.ColumnIOFactory;
 import org.apache.parquet.io.InputFile;
 import org.apache.parquet.io.MessageColumnIO;
 import org.apache.parquet.io.RecordReader;
+import org.apache.parquet.schema.GroupType;
 import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+import org.apache.parquet.schema.Type;
 import org.locationtech.jts.geom.Envelope;
 
 class GeoParquetSpliterator implements Spliterator<GeoParquetGroup> {
 
   private final List<FileStatus> files;
   private final Configuration configuration;
+  private final Envelope envelope;
 
   private ParquetFileReader fileReader;
   private int fileStartIndex;
@@ -56,12 +61,14 @@ class GeoParquetSpliterator implements 
Spliterator<GeoParquetGroup> {
   private long rowsInCurrentGroup;
 
   GeoParquetSpliterator(
-      List<FileStatus> files,
-      Configuration configuration,
-      int fileStartIndex,
-      int fileEndIndex) {
+          List<FileStatus> files,
+          Envelope envelope,
+          Configuration configuration,
+          int fileStartIndex,
+          int fileEndIndex) {
     this.files = files;
     this.configuration = configuration;
+    this.envelope = envelope;
     this.fileStartIndex = fileStartIndex;
     this.fileEndIndex = fileEndIndex;
     setupReaderForNextFile();
@@ -97,6 +104,86 @@ class GeoParquetSpliterator implements 
Spliterator<GeoParquetGroup> {
     }
   }
 
+  private FilterPredicate createEnvelopeFilter(MessageType schema, Envelope 
envelope) {
+    // Check whether the envelope is null or the world
+    if (envelope == null
+    || envelope.isNull()
+    || envelope.equals(new Envelope(-180, 180, -90, 90))) {
+      return null;
+    }
+
+    // Check whether the schema has a bbox field
+    Type type = schema.getType("bbox");
+    if (type == null) {
+      return null;
+    }
+
+    // Check whether the bbox has the xmin, ymin, xmax, ymax fields
+    GroupType bbox = type.asGroupType();
+    if (bbox.getFieldCount() != 4
+        || !bbox.containsField("xmin")
+        || !bbox.containsField("ymin")
+        || !bbox.containsField("xmax")
+        || !bbox.containsField("ymax")) {
+      return null;
+    }
+
+    // Check whether all fields are primitive types
+    List<Type> types = bbox.getFields();
+    if (types.stream().anyMatch(t -> !t.isPrimitive())) {
+      return null;
+    }
+
+    // Check whether all fields are of the same type
+    List<PrimitiveTypeName> typeNames = types.stream()
+        .map(t -> t.asPrimitiveType().getPrimitiveTypeName())
+        .toList();
+    PrimitiveTypeName typeName = typeNames.get(0);
+    if (!typeNames.stream().allMatch(typeName::equals)) {
+      return null;
+    }
+
+    // Check whether all fields are double
+    if (typeName == PrimitiveTypeName.DOUBLE) {
+      return FilterApi.and(
+          FilterApi.and(
+              FilterApi.gtEq(
+                  FilterApi.doubleColumn("bbox.xmin"),
+                  envelope.getMinX()),
+              FilterApi.ltEq(
+                  FilterApi.doubleColumn("bbox.xmax"),
+                  envelope.getMaxX())),
+          FilterApi.and(
+              FilterApi.gtEq(
+                  FilterApi.doubleColumn("bbox.ymin"),
+                  envelope.getMinY()),
+              FilterApi.ltEq(
+                  FilterApi.doubleColumn("bbox.ymax"),
+                  envelope.getMaxY())));
+    }
+
+    // Check whether all fields are float
+    if (typeName == PrimitiveTypeName.FLOAT) {
+      return FilterApi.and(
+          FilterApi.and(
+              FilterApi.gtEq(
+                  FilterApi.floatColumn("bbox.xmin"),
+                  (float) envelope.getMinX()),
+              FilterApi.ltEq(
+                  FilterApi.floatColumn("bbox.xmax"),
+                  (float) envelope.getMaxX())),
+          FilterApi.and(
+              FilterApi.gtEq(
+                  FilterApi.floatColumn("bbox.ymin"),
+                  (float) envelope.getMinY()),
+              FilterApi.ltEq(
+                  FilterApi.floatColumn("bbox.ymax"),
+                  (float) envelope.getMaxY())));
+    }
+
+    return null;
+  }
+
   private void advanceToNextRowGroup() throws IOException {
     if (currentRowGroup >= fileReader.getRowGroups().size()) {
       setupReaderForNextFile();
@@ -114,7 +201,11 @@ class GeoParquetSpliterator implements 
Spliterator<GeoParquetGroup> {
 
     GeoParquetGroupRecordMaterializer materializer =
         new GeoParquetGroupRecordMaterializer(schema, metadata);
-    recordReader = columnIO.getRecordReader(pages, materializer, 
FilterCompat.NOOP);
+
+    FilterPredicate envelopeFilter = createEnvelopeFilter(schema, envelope);
+    Filter filter = envelopeFilter == null ? FilterCompat.NOOP : 
FilterCompat.get(envelopeFilter);
+
+    recordReader = columnIO.getRecordReader(pages, materializer, filter);
     currentRowGroup++;
   }
 
@@ -132,13 +223,11 @@ class GeoParquetSpliterator implements 
Spliterator<GeoParquetGroup> {
         }
 
         GeoParquetGroup group = recordReader.read();
-        if (group == null) {
-          // Should not happen unless there is an error
-          throw new GeoParquetException("Unexpected null group read from 
recordReader.");
+        rowsReadInGroup++;
+        if (group != null) {
+          action.accept(group);
         }
 
-        rowsReadInGroup++;
-        action.accept(group);
         return true;
       }
     } catch (IOException e) {
@@ -166,7 +255,8 @@ class GeoParquetSpliterator implements 
Spliterator<GeoParquetGroup> {
       return null;
     }
     int mid = fileStartIndex + remainingFiles / 2;
-    GeoParquetSpliterator split = new GeoParquetSpliterator(files, 
configuration, mid, fileEndIndex);
+    GeoParquetSpliterator split =
+        new GeoParquetSpliterator(files, envelope, configuration, mid, 
fileEndIndex);
     this.fileEndIndex = mid;
     return split;
   }
diff --git 
a/baremaps-geoparquet/src/test/java/org/apache/baremaps/geoparquet/GeoParquetReaderTest.java
 
b/baremaps-geoparquet/src/test/java/org/apache/baremaps/geoparquet/GeoParquetReaderTest.java
index 96628722..d87429b4 100644
--- 
a/baremaps-geoparquet/src/test/java/org/apache/baremaps/geoparquet/GeoParquetReaderTest.java
+++ 
b/baremaps-geoparquet/src/test/java/org/apache/baremaps/geoparquet/GeoParquetReaderTest.java
@@ -26,12 +26,16 @@ import java.util.stream.Stream;
 import org.apache.baremaps.testing.TestFiles;
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
+import org.locationtech.jts.geom.Envelope;
 
 class GeoParquetReaderTest {
 
-  private static void readGroups(URI geoParquet, boolean parallel,
-      int expectedGroupCount) {
-    GeoParquetReader geoParquetReader = new GeoParquetReader(geoParquet);
+  private static void readGroups(
+          URI geoParquet,
+          Envelope envelope,
+          boolean parallel,
+                                 int expectedGroupCount) {
+    GeoParquetReader geoParquetReader = new GeoParquetReader(geoParquet, 
envelope);
     final AtomicInteger groupCount = new AtomicInteger();
     Stream<GeoParquetGroup> geoParquetGroupStream;
     if (parallel) {
@@ -48,7 +52,15 @@ class GeoParquetReaderTest {
     URI geoParquet = TestFiles.GEOPARQUET.toUri();
     final boolean isParallel = false;
     final int expectedGroupCount = 5;
-    readGroups(geoParquet, isParallel, expectedGroupCount);
+    readGroups(geoParquet, null, isParallel, expectedGroupCount);
+  }
+
+  @Test
+  void readFiltered() {
+    URI geoParquet = TestFiles.GEOPARQUET.toUri();
+    final boolean isParallel = false;
+    final int expectedGroupCount = 1;
+    readGroups(geoParquet, new Envelope(-172, -65, 18, 72), isParallel, 
expectedGroupCount);
   }
 
   @Disabled("Requires access to the Internet")
@@ -59,7 +71,7 @@ class GeoParquetReaderTest {
     final boolean isParallel = true;
     final int expectedGroupCount = 974708;
 
-    readGroups(geoParquet, isParallel, expectedGroupCount);
+    readGroups(geoParquet, null, isParallel, expectedGroupCount);
   }
 
   @Disabled("Requires access to the Internet")

Reply via email to