This is an automated email from the ASF dual-hosted git repository. bchapuis pushed a commit to branch geoparquet-filtering-and-simplification in repository https://gitbox.apache.org/repos/asf/incubator-baremaps.git
commit 78b464e48e7614c19e7001645f185f06d942e479 Author: Bertil Chapuis <[email protected]> AuthorDate: Sat Oct 5 22:14:05 2024 +0200 Improve the geoparquet reader --- .../storage/geoparquet/GeoParquetDataTable.java | 17 ++--- .../geoparquet/GeoParquetGroupSpliterator.java | 2 - .../baremaps/geoparquet/GeoParquetReader.java | 57 ++++++++-------- ...java => GeoParquetGroupRecordMaterializer.java} | 4 +- .../geoparquet/data/GeoParquetMetadata.java | 78 ++++++++++++++++++++++ .../hadoop/GeoParquetGroupReadSupport.java | 15 +++-- .../baremaps/geoparquet/GeoParquetReaderTest.java | 1 - 7 files changed, 124 insertions(+), 50 deletions(-) diff --git a/baremaps-core/src/main/java/org/apache/baremaps/storage/geoparquet/GeoParquetDataTable.java b/baremaps-core/src/main/java/org/apache/baremaps/storage/geoparquet/GeoParquetDataTable.java index 225ce61c..79836b2e 100644 --- a/baremaps-core/src/main/java/org/apache/baremaps/storage/geoparquet/GeoParquetDataTable.java +++ b/baremaps-core/src/main/java/org/apache/baremaps/storage/geoparquet/GeoParquetDataTable.java @@ -35,18 +35,12 @@ public class GeoParquetDataTable implements DataTable { public GeoParquetDataTable(URI path) { this.path = path; - } - - private GeoParquetReader reader() { - if (reader == null) { - reader = new GeoParquetReader(path); - } - return reader; + this.reader = new GeoParquetReader(path); } @Override public long size() { - return reader().size(); + return reader.size(); } @Override @@ -66,7 +60,7 @@ public class GeoParquetDataTable implements DataTable { @Override public Stream<DataRow> parallelStream() { - return reader().readParallel().map(group -> new DataRowImpl( + return reader.readParallel().map(group -> new DataRowImpl( GeoParquetTypeConversion.asSchema(path.toString(), group.getSchema()), GeoParquetTypeConversion.asRowValues(group))); } @@ -76,7 +70,6 @@ public class GeoParquetDataTable implements DataTable { if (reader != null) { reader = null; } - if (schema != null) { schema = null; } @@ -87,7 +80,7 @@ public class GeoParquetDataTable implements DataTable { if (schema == null) { this.schema = GeoParquetTypeConversion.asSchema( path.toString(), - reader().getGeoParquetSchema()); + reader.getGeoParquetSchema()); return this.schema; } return schema; @@ -95,7 +88,7 @@ public class GeoParquetDataTable implements DataTable { public int srid(String column) { try { - return reader().getGeoParquetMetadata().getSrid(column); + return reader.getGeoParquetMetadata().getSrid(column); } catch (Exception e) { throw new GeoParquetException("Fail to read the SRID from the GeoParquet metadata", e); } diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetGroupSpliterator.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetGroupSpliterator.java index 62f6e927..0fe0702a 100644 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetGroupSpliterator.java +++ b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetGroupSpliterator.java @@ -50,8 +50,6 @@ class GeoParquetGroupSpliterator implements Spliterator<GeoParquetGroup> { setupReaderForNextFile(); } - - private void setupReaderForNextFile() { closeCurrentReader(); diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetReader.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetReader.java index 994609ee..41a41db2 100644 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetReader.java +++ b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetReader.java @@ -37,6 +37,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider; import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.schema.MessageType; /** @@ -103,7 +105,33 @@ public class GeoParquetReader { private FileInfo getFileInfo(FileStatus fileStatus) { try { - return buildFileInfo(fileStatus); + long recordCount; + MessageType messageType; + Map<String, String> keyValueMetadata; + + ParquetMetadata parquetMetadata = + ParquetFileReader.readFooter(configuration, fileStatus.getPath()); + recordCount = parquetMetadata.getBlocks().stream() + .mapToLong(BlockMetaData::getRowCount) + .sum(); + + messageType = parquetMetadata.getFileMetaData().getSchema(); + keyValueMetadata = parquetMetadata.getFileMetaData().getKeyValueMetaData(); + + GeoParquetMetadata geoParquetMetadata = null; + Schema geoParquetSchema = null; + + if (keyValueMetadata.containsKey("geo")) { + ObjectMapper objectMapper = new ObjectMapper(); + objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + geoParquetMetadata = + objectMapper.readValue(keyValueMetadata.get("geo"), GeoParquetMetadata.class); + geoParquetSchema = + GeoParquetGroupFactory.createGeoParquetSchema(messageType, geoParquetMetadata); + } + + return new FileInfo(fileStatus, recordCount, keyValueMetadata, messageType, + geoParquetMetadata, geoParquetSchema); } catch (IOException e) { throw new GeoParquetException("Failed to build FileInfo for file: " + fileStatus, e); } @@ -145,33 +173,6 @@ public class GeoParquetReader { return groupCount.get(); } - private FileInfo buildFileInfo(FileStatus file) throws IOException { - long recordCount; - MessageType messageType; - Map<String, String> keyValueMetadata; - - try (ParquetFileReader reader = ParquetFileReader.open(configuration, file.getPath())) { - recordCount = reader.getRecordCount(); - messageType = reader.getFileMetaData().getSchema(); - keyValueMetadata = reader.getFileMetaData().getKeyValueMetaData(); - } - - GeoParquetMetadata geoParquetMetadata = null; - Schema geoParquetSchema = null; - - if (keyValueMetadata.containsKey("geo")) { - ObjectMapper objectMapper = new ObjectMapper(); - objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - geoParquetMetadata = - objectMapper.readValue(keyValueMetadata.get("geo"), GeoParquetMetadata.class); - geoParquetSchema = - GeoParquetGroupFactory.createGeoParquetSchema(messageType, geoParquetMetadata); - } - - return new FileInfo(file, recordCount, keyValueMetadata, messageType, - geoParquetMetadata, geoParquetSchema); - } - public Stream<GeoParquetGroup> readParallel() { return retrieveGeoParquetGroups(true); } diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupRecordConverter.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupRecordMaterializer.java similarity index 92% rename from baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupRecordConverter.java rename to baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupRecordMaterializer.java index 01cfc871..5813633d 100644 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupRecordConverter.java +++ b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupRecordMaterializer.java @@ -36,13 +36,13 @@ import org.apache.parquet.io.api.GroupConverter; import org.apache.parquet.io.api.RecordMaterializer; import org.apache.parquet.schema.MessageType; -public class GeoParquetGroupRecordConverter extends RecordMaterializer<GeoParquetGroup> { +public class GeoParquetGroupRecordMaterializer extends RecordMaterializer<GeoParquetGroup> { private final GeoParquetGroupFactory groupFactory; private final GeoParquetGroupConverter root; - public GeoParquetGroupRecordConverter(MessageType schema, GeoParquetMetadata metadata) { + public GeoParquetGroupRecordMaterializer(MessageType schema, GeoParquetMetadata metadata) { this.groupFactory = new GeoParquetGroupFactory(schema, metadata); this.root = new GeoParquetGroupConverter(null, 0, schema) { @Override diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetMetadata.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetMetadata.java index fe3955d7..6e34931c 100644 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetMetadata.java +++ b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetMetadata.java @@ -20,6 +20,7 @@ package org.apache.baremaps.geoparquet.data; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.JsonNode; import com.google.common.base.Objects; +import java.util.List; import java.util.Map; import java.util.Optional; @@ -34,6 +35,27 @@ public class GeoParquetMetadata { @JsonProperty("columns") private Map<String, GeoParquetColumnMetadata> columns; + @JsonProperty("encoding") + private String encoding; + + @JsonProperty("geometry_types") + private List<String> geometryTypes; + + @JsonProperty("crs") + private Object crs; + + @JsonProperty("edges") + private String edges; + + @JsonProperty("bbox") + private List<Double> bbox; + + @JsonProperty("epoch") + private String epoch; + + @JsonProperty("covering") + private Object covering; + public String getVersion() { return version; } @@ -58,6 +80,62 @@ public class GeoParquetMetadata { this.columns = columns; } + public String getEncoding() { + return encoding; + } + + public void setEncoding(String encoding) { + this.encoding = encoding; + } + + public List<String> getGeometryTypes() { + return geometryTypes; + } + + public void setGeometryTypes(List<String> geometryTypes) { + this.geometryTypes = geometryTypes; + } + + public Object getCrs() { + return crs; + } + + public void setCrs(Object crs) { + this.crs = crs; + } + + public String getEdges() { + return edges; + } + + public void setEdges(String edges) { + this.edges = edges; + } + + public List<Double> getBbox() { + return bbox; + } + + public void setBbox(List<Double> bbox) { + this.bbox = bbox; + } + + public String getEpoch() { + return epoch; + } + + public void setEpoch(String epoch) { + this.epoch = epoch; + } + + public Object getCovering() { + return covering; + } + + public void setCovering(Object covering) { + this.covering = covering; + } + public int getSrid(String column) { return Optional.ofNullable(getColumns().get(column).getCrs()).map(crs -> { JsonNode id = crs.get("id"); diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/hadoop/GeoParquetGroupReadSupport.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/hadoop/GeoParquetGroupReadSupport.java index 21dfc376..887d7471 100644 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/hadoop/GeoParquetGroupReadSupport.java +++ b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/hadoop/GeoParquetGroupReadSupport.java @@ -23,7 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import java.util.Map; import org.apache.baremaps.geoparquet.GeoParquetException; import org.apache.baremaps.geoparquet.data.GeoParquetGroup; -import org.apache.baremaps.geoparquet.data.GeoParquetGroupRecordConverter; +import org.apache.baremaps.geoparquet.data.GeoParquetGroupRecordMaterializer; import org.apache.baremaps.geoparquet.data.GeoParquetMetadata; import org.apache.hadoop.conf.Configuration; import org.apache.parquet.hadoop.api.ReadSupport; @@ -32,9 +32,12 @@ import org.apache.parquet.schema.MessageType; public class GeoParquetGroupReadSupport extends ReadSupport<GeoParquetGroup> { + public GeoParquetGroupReadSupport() {} + @Override public ReadContext init( - Configuration configuration, Map<String, String> keyValueMetaData, + Configuration configuration, + Map<String, String> keyValueMetaData, MessageType fileSchema) { String partialSchemaString = configuration.get(ReadSupport.PARQUET_READ_SCHEMA); MessageType requestedProjection = getSchemaForRead(fileSchema, partialSchemaString); @@ -42,8 +45,10 @@ public class GeoParquetGroupReadSupport extends ReadSupport<GeoParquetGroup> { } @Override - public RecordMaterializer<GeoParquetGroup> prepareForRead(Configuration configuration, - Map<String, String> keyValueMetaData, MessageType fileSchema, + public RecordMaterializer<GeoParquetGroup> prepareForRead( + Configuration configuration, + Map<String, String> keyValueMetaData, + MessageType fileSchema, ReadContext readContext) { // Read the GeoParquet metadata of the Parquet file @@ -52,7 +57,7 @@ public class GeoParquetGroupReadSupport extends ReadSupport<GeoParquetGroup> { GeoParquetMetadata metadata = new ObjectMapper() .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) .readValue(json, GeoParquetMetadata.class); - return new GeoParquetGroupRecordConverter(readContext.getRequestedSchema(), metadata); + return new GeoParquetGroupRecordMaterializer(readContext.getRequestedSchema(), metadata); } catch (JsonProcessingException e) { throw new GeoParquetException("Failed to read GeoParquet's metadata of the Parquet file", e); } diff --git a/baremaps-geoparquet/src/test/java/org/apache/baremaps/geoparquet/GeoParquetReaderTest.java b/baremaps-geoparquet/src/test/java/org/apache/baremaps/geoparquet/GeoParquetReaderTest.java index 6d942a55..f0c38ab4 100644 --- a/baremaps-geoparquet/src/test/java/org/apache/baremaps/geoparquet/GeoParquetReaderTest.java +++ b/baremaps-geoparquet/src/test/java/org/apache/baremaps/geoparquet/GeoParquetReaderTest.java @@ -35,7 +35,6 @@ class GeoParquetReaderTest { URI geoParquet = TestFiles.GEOPARQUET.toUri(); final boolean isParallel = false; final int expectedGroupCount = 5; - readGroups(geoParquet, isParallel, expectedGroupCount); }
