This is an automated email from the ASF dual-hosted git repository. bchapuis pushed a commit to branch geoparquet-filtering-and-simplification in repository https://gitbox.apache.org/repos/asf/incubator-baremaps.git
commit c3734fea182a914105a115678626b29efaab994e Author: Bertil Chapuis <[email protected]> AuthorDate: Thu Oct 10 11:51:15 2024 +0200 Improve the creation of the filter predicate --- .../baremaps/geoparquet/GeoParquetSpliterator.java | 121 +++++++++------------ .../baremaps/geoparquet/GeoParquetReaderTest.java | 10 +- 2 files changed, 57 insertions(+), 74 deletions(-) diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetSpliterator.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetSpliterator.java index 0a9ba79b..4bd2e718 100644 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetSpliterator.java +++ b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetSpliterator.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; import java.util.List; import java.util.Spliterator; +import java.util.function.BiFunction; import java.util.function.Consumer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -61,11 +62,11 @@ class GeoParquetSpliterator implements Spliterator<GeoParquetGroup> { private long rowsInCurrentGroup; GeoParquetSpliterator( - List<FileStatus> files, - Envelope envelope, - Configuration configuration, - int fileStartIndex, - int fileEndIndex) { + List<FileStatus> files, + Envelope envelope, + Configuration configuration, + int fileStartIndex, + int fileEndIndex) { this.files = files; this.configuration = configuration; this.envelope = envelope; @@ -104,11 +105,36 @@ class GeoParquetSpliterator implements Spliterator<GeoParquetGroup> { } } + private void advanceToNextRowGroup() throws IOException { + if (currentRowGroup >= fileReader.getRowGroups().size()) { + setupReaderForNextFile(); + return; + } + + PageReadStore pages = fileReader.readNextFilteredRowGroup(); + if (pages == null) { + setupReaderForNextFile(); + return; + } + + rowsInCurrentGroup = pages.getRowCount(); + rowsReadInGroup = 0; + + GeoParquetGroupRecordMaterializer materializer = + new GeoParquetGroupRecordMaterializer(schema, metadata); + + FilterPredicate envelopeFilter = createEnvelopeFilter(schema, envelope); + Filter filter = envelopeFilter == null ? FilterCompat.NOOP : FilterCompat.get(envelopeFilter); + + recordReader = columnIO.getRecordReader(pages, materializer, filter); + currentRowGroup++; + } + private FilterPredicate createEnvelopeFilter(MessageType schema, Envelope envelope) { // Check whether the envelope is null or the world if (envelope == null - || envelope.isNull() - || envelope.equals(new Envelope(-180, 180, -90, 90))) { + || envelope.isNull() + || envelope.equals(new Envelope(-180, 180, -90, 90))) { return null; } @@ -143,70 +169,27 @@ class GeoParquetSpliterator implements Spliterator<GeoParquetGroup> { return null; } - // Check whether all fields are double - if (typeName == PrimitiveTypeName.DOUBLE) { - return FilterApi.and( - FilterApi.and( - FilterApi.gtEq( - FilterApi.doubleColumn("bbox.xmin"), - envelope.getMinX()), - FilterApi.ltEq( - FilterApi.doubleColumn("bbox.xmax"), - envelope.getMaxX())), - FilterApi.and( - FilterApi.gtEq( - FilterApi.doubleColumn("bbox.ymin"), - envelope.getMinY()), - FilterApi.ltEq( - FilterApi.doubleColumn("bbox.ymax"), - envelope.getMaxY()))); - } - - // Check whether all fields are float - if (typeName == PrimitiveTypeName.FLOAT) { - return FilterApi.and( - FilterApi.and( - FilterApi.gtEq( - FilterApi.floatColumn("bbox.xmin"), - (float) envelope.getMinX()), - FilterApi.ltEq( - FilterApi.floatColumn("bbox.xmax"), - (float) envelope.getMaxX())), - FilterApi.and( - FilterApi.gtEq( - FilterApi.floatColumn("bbox.ymin"), - (float) envelope.getMinY()), - FilterApi.ltEq( - FilterApi.floatColumn("bbox.ymax"), - (float) envelope.getMaxY()))); - } - - return null; - } - - private void advanceToNextRowGroup() throws IOException { - if (currentRowGroup >= fileReader.getRowGroups().size()) { - setupReaderForNextFile(); - return; - } - - PageReadStore pages = fileReader.readNextFilteredRowGroup(); - if (pages == null) { - setupReaderForNextFile(); - return; + // Check whether the type is a float or a double + if (typeName != PrimitiveTypeName.DOUBLE && typeName != PrimitiveTypeName.FLOAT) { + return null; } - rowsInCurrentGroup = pages.getRowCount(); - rowsReadInGroup = 0; - - GeoParquetGroupRecordMaterializer materializer = - new GeoParquetGroupRecordMaterializer(schema, metadata); - - FilterPredicate envelopeFilter = createEnvelopeFilter(schema, envelope); - Filter filter = envelopeFilter == null ? FilterCompat.NOOP : FilterCompat.get(envelopeFilter); - - recordReader = columnIO.getRecordReader(pages, materializer, filter); - currentRowGroup++; + // Initialize the filter predicate creator for the given type + BiFunction<String, Number, FilterPredicate> filterPredicateCreator = + (column, value) -> switch (typeName) { + case DOUBLE -> FilterApi.gtEq(FilterApi.doubleColumn(column), value.doubleValue()); + case FLOAT -> FilterApi.gtEq(FilterApi.floatColumn(column), value.floatValue()); + default -> throw new IllegalStateException("Unexpected value: " + typeName); + }; + + // Create the filter predicate + return FilterApi.and( + FilterApi.and( + filterPredicateCreator.apply("bbox.xmin", envelope.getMinX()), + filterPredicateCreator.apply("bbox.xmax", envelope.getMaxX())), + FilterApi.and( + filterPredicateCreator.apply("bbox.ymin", envelope.getMinY()), + filterPredicateCreator.apply("bbox.ymax", envelope.getMaxY()))); } @Override diff --git a/baremaps-geoparquet/src/test/java/org/apache/baremaps/geoparquet/GeoParquetReaderTest.java b/baremaps-geoparquet/src/test/java/org/apache/baremaps/geoparquet/GeoParquetReaderTest.java index d87429b4..885e8419 100644 --- a/baremaps-geoparquet/src/test/java/org/apache/baremaps/geoparquet/GeoParquetReaderTest.java +++ b/baremaps-geoparquet/src/test/java/org/apache/baremaps/geoparquet/GeoParquetReaderTest.java @@ -31,10 +31,10 @@ import org.locationtech.jts.geom.Envelope; class GeoParquetReaderTest { private static void readGroups( - URI geoParquet, - Envelope envelope, - boolean parallel, - int expectedGroupCount) { + URI geoParquet, + Envelope envelope, + boolean parallel, + int expectedGroupCount) { GeoParquetReader geoParquetReader = new GeoParquetReader(geoParquet, envelope); final AtomicInteger groupCount = new AtomicInteger(); Stream<GeoParquetGroup> geoParquetGroupStream; @@ -43,7 +43,7 @@ class GeoParquetReaderTest { } else { geoParquetGroupStream = geoParquetReader.read(); } - geoParquetGroupStream.peek(System.out::println).forEach(group -> groupCount.getAndIncrement()); + geoParquetGroupStream.forEach(group -> groupCount.getAndIncrement()); assertEquals(expectedGroupCount, groupCount.get()); }
