This is an automated email from the ASF dual-hosted git repository. bchapuis pushed a commit to branch geoparquet-filtering-and-simplification in repository https://gitbox.apache.org/repos/asf/incubator-baremaps.git
commit a9a201e4c875e0be21ee4852c8bf2459ba8acae1 Author: Bertil Chapuis <[email protected]> AuthorDate: Wed Oct 9 14:12:10 2024 +0200 Pass properties instead of parent object --- .../baremaps/geoparquet/GeoParquetGroup.java | 4 +-- .../baremaps/geoparquet/GeoParquetReader.java | 2 +- .../baremaps/geoparquet/GeoParquetSpliterator.java | 32 +++++++++++++++------- 3 files changed, 25 insertions(+), 13 deletions(-) diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetGroup.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetGroup.java index 12972057..21d3bf2c 100644 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetGroup.java +++ b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetGroup.java @@ -110,8 +110,8 @@ public class GeoParquetGroup { private void addValue(int fieldIndex, Object value) { Object currentValue = data[fieldIndex]; - if (currentValue instanceof List<?>list) { - ((List<Object>) list).add(value); + if (currentValue instanceof List<?>) { + ((List<Object>) currentValue).add(value); } else { data[fieldIndex] = value; } diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetReader.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetReader.java index 74981a77..e2c94fc3 100644 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetReader.java +++ b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetReader.java @@ -157,7 +157,7 @@ public class GeoParquetReader { } private Stream<GeoParquetGroup> streamGeoParquetGroups(boolean inParallel) { - Spliterator<GeoParquetGroup> spliterator = new GeoParquetSpliterator(this, 0, files.size()); + Spliterator<GeoParquetGroup> spliterator = new GeoParquetSpliterator(files, configuration, 0, files.size()); return StreamSupport.stream(spliterator, inParallel); } diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetSpliterator.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetSpliterator.java index 90e3d8cb..b73e8d19 100644 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetSpliterator.java +++ b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetSpliterator.java @@ -20,10 +20,15 @@ package org.apache.baremaps.geoparquet; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; +import java.util.List; import java.util.Spliterator; import java.util.function.Consumer; + +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.filter2.compat.FilterCompat.Filter; import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.metadata.FileMetaData; import org.apache.parquet.hadoop.util.HadoopInputFile; @@ -32,10 +37,13 @@ import org.apache.parquet.io.InputFile; import org.apache.parquet.io.MessageColumnIO; import org.apache.parquet.io.RecordReader; import org.apache.parquet.schema.MessageType; +import org.locationtech.jts.geom.Envelope; class GeoParquetSpliterator implements Spliterator<GeoParquetGroup> { - private final GeoParquetReader reader; + private final List<FileStatus> files; + private final Configuration configuration; + private ParquetFileReader fileReader; private int fileStartIndex; private int fileEndIndex; @@ -47,10 +55,13 @@ class GeoParquetSpliterator implements Spliterator<GeoParquetGroup> { private long rowsReadInGroup; private long rowsInCurrentGroup; - GeoParquetSpliterator(GeoParquetReader reader, + GeoParquetSpliterator( + List<FileStatus> files, + Configuration configuration, int fileStartIndex, int fileEndIndex) { - this.reader = reader; + this.files = files; + this.configuration = configuration; this.fileStartIndex = fileStartIndex; this.fileEndIndex = fileEndIndex; setupReaderForNextFile(); @@ -64,9 +75,9 @@ class GeoParquetSpliterator implements Spliterator<GeoParquetGroup> { return; } - FileStatus fileStatus = reader.files.get(fileStartIndex++); + FileStatus fileStatus = files.get(fileStartIndex++); try { - InputFile inputFile = HadoopInputFile.fromPath(fileStatus.getPath(), reader.configuration); + InputFile inputFile = HadoopInputFile.fromPath(fileStatus.getPath(), configuration); fileReader = ParquetFileReader.open(inputFile); FileMetaData fileMetaData = fileReader.getFooter().getFileMetaData(); @@ -92,7 +103,7 @@ class GeoParquetSpliterator implements Spliterator<GeoParquetGroup> { return; } - PageReadStore pages = fileReader.readNextRowGroup(); + PageReadStore pages = fileReader.readNextFilteredRowGroup(); if (pages == null) { setupReaderForNextFile(); return; @@ -103,7 +114,7 @@ class GeoParquetSpliterator implements Spliterator<GeoParquetGroup> { GeoParquetGroupRecordMaterializer materializer = new GeoParquetGroupRecordMaterializer(schema, metadata); - recordReader = columnIO.getRecordReader(pages, materializer); + recordReader = columnIO.getRecordReader(pages, materializer, FilterCompat.NOOP); currentRowGroup++; } @@ -141,9 +152,10 @@ class GeoParquetSpliterator implements Spliterator<GeoParquetGroup> { try { fileReader.close(); } catch (IOException e) { - // Ignore exceptions during close + throw new GeoParquetException("Failed to close ParquetFileReader.", e); + } finally { + fileReader = null; } - fileReader = null; } } @@ -154,7 +166,7 @@ class GeoParquetSpliterator implements Spliterator<GeoParquetGroup> { return null; } int mid = fileStartIndex + remainingFiles / 2; - GeoParquetSpliterator split = new GeoParquetSpliterator(reader, mid, fileEndIndex); + GeoParquetSpliterator split = new GeoParquetSpliterator(files, configuration, mid, fileEndIndex); this.fileEndIndex = mid; return split; }
