This is an automated email from the ASF dual-hosted git repository. bchapuis pushed a commit to branch geoparquet-filtering-and-simplification in repository https://gitbox.apache.org/repos/asf/incubator-baremaps.git
commit a16bfd496077678f6f426dc5dcd80493e38421ed Author: Bertil Chapuis <[email protected]> AuthorDate: Thu Oct 10 01:00:08 2024 +0200 Add filtering capabilities --- .../baremaps/geoparquet/GeoParquetReader.java | 14 ++- .../baremaps/geoparquet/GeoParquetSpliterator.java | 114 ++++++++++++++++++--- .../baremaps/geoparquet/GeoParquetReaderTest.java | 22 +++- 3 files changed, 130 insertions(+), 20 deletions(-) diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetReader.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetReader.java index e2c94fc3..66059c03 100644 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetReader.java +++ b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetReader.java @@ -38,6 +38,7 @@ import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.FileMetaData; import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.schema.MessageType; +import org.locationtech.jts.geom.Envelope; /** * This reader is based on the parquet example code located at: org.apache.parquet.example.data.*. @@ -47,14 +48,20 @@ public class GeoParquetReader { protected final Configuration configuration; protected final List<FileStatus> files; private final AtomicLong groupCount = new AtomicLong(-1); + private final Envelope envelope; public GeoParquetReader(URI uri) { - this(uri, createDefaultConfiguration()); + this(uri, null, createDefaultConfiguration()); } - public GeoParquetReader(URI uri, Configuration configuration) { + public GeoParquetReader(URI uri, Envelope envelope) { + this(uri, envelope, createDefaultConfiguration()); + } + + public GeoParquetReader(URI uri, Envelope envelope, Configuration configuration) { this.configuration = configuration; this.files = initializeFiles(uri, configuration); + this.envelope = envelope; } private static List<FileStatus> initializeFiles(URI uri, Configuration configuration) { @@ -157,7 +164,8 @@ public class GeoParquetReader { } private Stream<GeoParquetGroup> streamGeoParquetGroups(boolean inParallel) { - Spliterator<GeoParquetGroup> spliterator = new GeoParquetSpliterator(files, configuration, 0, files.size()); + Spliterator<GeoParquetGroup> spliterator = + new GeoParquetSpliterator(files, envelope, configuration, 0, files.size()); return StreamSupport.stream(spliterator, inParallel); } diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetSpliterator.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetSpliterator.java index b73e8d19..0a9ba79b 100644 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetSpliterator.java +++ b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetSpliterator.java @@ -23,12 +23,13 @@ import java.io.IOException; import java.util.List; import java.util.Spliterator; import java.util.function.Consumer; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.parquet.column.page.PageReadStore; import org.apache.parquet.filter2.compat.FilterCompat; import org.apache.parquet.filter2.compat.FilterCompat.Filter; +import org.apache.parquet.filter2.predicate.FilterApi; +import org.apache.parquet.filter2.predicate.FilterPredicate; import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.metadata.FileMetaData; import org.apache.parquet.hadoop.util.HadoopInputFile; @@ -36,13 +37,17 @@ import org.apache.parquet.io.ColumnIOFactory; import org.apache.parquet.io.InputFile; import org.apache.parquet.io.MessageColumnIO; import org.apache.parquet.io.RecordReader; +import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; +import org.apache.parquet.schema.Type; import org.locationtech.jts.geom.Envelope; class GeoParquetSpliterator implements Spliterator<GeoParquetGroup> { private final List<FileStatus> files; private final Configuration configuration; + private final Envelope envelope; private ParquetFileReader fileReader; private int fileStartIndex; @@ -56,12 +61,14 @@ class GeoParquetSpliterator implements Spliterator<GeoParquetGroup> { private long rowsInCurrentGroup; GeoParquetSpliterator( - List<FileStatus> files, - Configuration configuration, - int fileStartIndex, - int fileEndIndex) { + List<FileStatus> files, + Envelope envelope, + Configuration configuration, + int fileStartIndex, + int fileEndIndex) { this.files = files; this.configuration = configuration; + this.envelope = envelope; this.fileStartIndex = fileStartIndex; this.fileEndIndex = fileEndIndex; setupReaderForNextFile(); @@ -97,6 +104,86 @@ class GeoParquetSpliterator implements Spliterator<GeoParquetGroup> { } } + private FilterPredicate createEnvelopeFilter(MessageType schema, Envelope envelope) { + // Check whether the envelope is null or the world + if (envelope == null + || envelope.isNull() + || envelope.equals(new Envelope(-180, 180, -90, 90))) { + return null; + } + + // Check whether the schema has a bbox field + Type type = schema.getType("bbox"); + if (type == null) { + return null; + } + + // Check whether the bbox has the xmin, ymin, xmax, ymax fields + GroupType bbox = type.asGroupType(); + if (bbox.getFieldCount() != 4 + || !bbox.containsField("xmin") + || !bbox.containsField("ymin") + || !bbox.containsField("xmax") + || !bbox.containsField("ymax")) { + return null; + } + + // Check whether all fields are primitive types + List<Type> types = bbox.getFields(); + if (types.stream().anyMatch(t -> !t.isPrimitive())) { + return null; + } + + // Check whether all fields are of the same type + List<PrimitiveTypeName> typeNames = types.stream() + .map(t -> t.asPrimitiveType().getPrimitiveTypeName()) + .toList(); + PrimitiveTypeName typeName = typeNames.get(0); + if (!typeNames.stream().allMatch(typeName::equals)) { + return null; + } + + // Check whether all fields are double + if (typeName == PrimitiveTypeName.DOUBLE) { + return FilterApi.and( + FilterApi.and( + FilterApi.gtEq( + FilterApi.doubleColumn("bbox.xmin"), + envelope.getMinX()), + FilterApi.ltEq( + FilterApi.doubleColumn("bbox.xmax"), + envelope.getMaxX())), + FilterApi.and( + FilterApi.gtEq( + FilterApi.doubleColumn("bbox.ymin"), + envelope.getMinY()), + FilterApi.ltEq( + FilterApi.doubleColumn("bbox.ymax"), + envelope.getMaxY()))); + } + + // Check whether all fields are float + if (typeName == PrimitiveTypeName.FLOAT) { + return FilterApi.and( + FilterApi.and( + FilterApi.gtEq( + FilterApi.floatColumn("bbox.xmin"), + (float) envelope.getMinX()), + FilterApi.ltEq( + FilterApi.floatColumn("bbox.xmax"), + (float) envelope.getMaxX())), + FilterApi.and( + FilterApi.gtEq( + FilterApi.floatColumn("bbox.ymin"), + (float) envelope.getMinY()), + FilterApi.ltEq( + FilterApi.floatColumn("bbox.ymax"), + (float) envelope.getMaxY()))); + } + + return null; + } + private void advanceToNextRowGroup() throws IOException { if (currentRowGroup >= fileReader.getRowGroups().size()) { setupReaderForNextFile(); @@ -114,7 +201,11 @@ class GeoParquetSpliterator implements Spliterator<GeoParquetGroup> { GeoParquetGroupRecordMaterializer materializer = new GeoParquetGroupRecordMaterializer(schema, metadata); - recordReader = columnIO.getRecordReader(pages, materializer, FilterCompat.NOOP); + + FilterPredicate envelopeFilter = createEnvelopeFilter(schema, envelope); + Filter filter = envelopeFilter == null ? FilterCompat.NOOP : FilterCompat.get(envelopeFilter); + + recordReader = columnIO.getRecordReader(pages, materializer, filter); currentRowGroup++; } @@ -132,13 +223,11 @@ class GeoParquetSpliterator implements Spliterator<GeoParquetGroup> { } GeoParquetGroup group = recordReader.read(); - if (group == null) { - // Should not happen unless there is an error - throw new GeoParquetException("Unexpected null group read from recordReader."); + rowsReadInGroup++; + if (group != null) { + action.accept(group); } - rowsReadInGroup++; - action.accept(group); return true; } } catch (IOException e) { @@ -166,7 +255,8 @@ class GeoParquetSpliterator implements Spliterator<GeoParquetGroup> { return null; } int mid = fileStartIndex + remainingFiles / 2; - GeoParquetSpliterator split = new GeoParquetSpliterator(files, configuration, mid, fileEndIndex); + GeoParquetSpliterator split = + new GeoParquetSpliterator(files, envelope, configuration, mid, fileEndIndex); this.fileEndIndex = mid; return split; } diff --git a/baremaps-geoparquet/src/test/java/org/apache/baremaps/geoparquet/GeoParquetReaderTest.java b/baremaps-geoparquet/src/test/java/org/apache/baremaps/geoparquet/GeoParquetReaderTest.java index 96628722..d87429b4 100644 --- a/baremaps-geoparquet/src/test/java/org/apache/baremaps/geoparquet/GeoParquetReaderTest.java +++ b/baremaps-geoparquet/src/test/java/org/apache/baremaps/geoparquet/GeoParquetReaderTest.java @@ -26,12 +26,16 @@ import java.util.stream.Stream; import org.apache.baremaps.testing.TestFiles; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; +import org.locationtech.jts.geom.Envelope; class GeoParquetReaderTest { - private static void readGroups(URI geoParquet, boolean parallel, - int expectedGroupCount) { - GeoParquetReader geoParquetReader = new GeoParquetReader(geoParquet); + private static void readGroups( + URI geoParquet, + Envelope envelope, + boolean parallel, + int expectedGroupCount) { + GeoParquetReader geoParquetReader = new GeoParquetReader(geoParquet, envelope); final AtomicInteger groupCount = new AtomicInteger(); Stream<GeoParquetGroup> geoParquetGroupStream; if (parallel) { @@ -48,7 +52,15 @@ class GeoParquetReaderTest { URI geoParquet = TestFiles.GEOPARQUET.toUri(); final boolean isParallel = false; final int expectedGroupCount = 5; - readGroups(geoParquet, isParallel, expectedGroupCount); + readGroups(geoParquet, null, isParallel, expectedGroupCount); + } + + @Test + void readFiltered() { + URI geoParquet = TestFiles.GEOPARQUET.toUri(); + final boolean isParallel = false; + final int expectedGroupCount = 1; + readGroups(geoParquet, new Envelope(-172, -65, 18, 72), isParallel, expectedGroupCount); } @Disabled("Requires access to the Internet") @@ -59,7 +71,7 @@ class GeoParquetReaderTest { final boolean isParallel = true; final int expectedGroupCount = 974708; - readGroups(geoParquet, isParallel, expectedGroupCount); + readGroups(geoParquet, null, isParallel, expectedGroupCount); } @Disabled("Requires access to the Internet")
