This is an automated email from the ASF dual-hosted git repository. bchapuis pushed a commit to branch geoparquet-filtering-and-simplification in repository https://gitbox.apache.org/repos/asf/incubator-baremaps.git
commit 38e561c3d91d91a83339bb0333e28ddf965b0832 Author: Bertil Chapuis <[email protected]> AuthorDate: Sat Oct 5 23:20:28 2024 +0200 Make the spliterator an internal class --- .../storage/geoparquet/GeoParquetDataTable.java | 2 +- .../geoparquet/GeoParquetTypeConversion.java | 4 +- .../geoparquet/GeoParquetGroupSpliterator.java | 134 --------------- .../baremaps/geoparquet/GeoParquetReader.java | 186 +++++++++++++++------ .../baremaps/geoparquet/data/GeoParquetGroup.java | 14 +- .../geoparquet/data/GeoParquetGroupImpl.java | 10 +- 6 files changed, 148 insertions(+), 202 deletions(-) diff --git a/baremaps-core/src/main/java/org/apache/baremaps/storage/geoparquet/GeoParquetDataTable.java b/baremaps-core/src/main/java/org/apache/baremaps/storage/geoparquet/GeoParquetDataTable.java index 79836b2e..90c78d1a 100644 --- a/baremaps-core/src/main/java/org/apache/baremaps/storage/geoparquet/GeoParquetDataTable.java +++ b/baremaps-core/src/main/java/org/apache/baremaps/storage/geoparquet/GeoParquetDataTable.java @@ -61,7 +61,7 @@ public class GeoParquetDataTable implements DataTable { @Override public Stream<DataRow> parallelStream() { return reader.readParallel().map(group -> new DataRowImpl( - GeoParquetTypeConversion.asSchema(path.toString(), group.getSchema()), + GeoParquetTypeConversion.asSchema(path.toString(), group.getGeoParquetSchema()), GeoParquetTypeConversion.asRowValues(group))); } diff --git a/baremaps-core/src/main/java/org/apache/baremaps/storage/geoparquet/GeoParquetTypeConversion.java b/baremaps-core/src/main/java/org/apache/baremaps/storage/geoparquet/GeoParquetTypeConversion.java index 435effa5..eed61e8d 100644 --- a/baremaps-core/src/main/java/org/apache/baremaps/storage/geoparquet/GeoParquetTypeConversion.java +++ b/baremaps-core/src/main/java/org/apache/baremaps/storage/geoparquet/GeoParquetTypeConversion.java @@ -67,7 +67,7 @@ public class GeoParquetTypeConversion { public static List<Object> asRowValues(GeoParquetGroup group) { List<Object> values = new ArrayList<>(); - Schema schema = group.getSchema(); + Schema schema = group.getGeoParquetSchema(); List<Field> fields = schema.fields(); for (int i = 0; i < fields.size(); i++) { if (group.getValues(i).isEmpty()) { @@ -93,7 +93,7 @@ public class GeoParquetTypeConversion { public static Map<String, Object> asNested(GeoParquetGroup group) { Map<String, Object> nested = new HashMap<>(); - Schema schema = group.getSchema(); + Schema schema = group.getGeoParquetSchema(); List<Field> fields = schema.fields(); for (int i = 0; i < fields.size(); i++) { if (group.getValues(i).isEmpty()) { diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetGroupSpliterator.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetGroupSpliterator.java deleted file mode 100644 index 0fe0702a..00000000 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetGroupSpliterator.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.baremaps.geoparquet; - -import java.io.IOException; -import java.util.List; -import java.util.Spliterator; -import java.util.function.Consumer; -import org.apache.baremaps.geoparquet.data.GeoParquetGroup; -import org.apache.baremaps.geoparquet.hadoop.GeoParquetGroupReadSupport; -import org.apache.hadoop.fs.FileStatus; -import org.apache.parquet.hadoop.ParquetReader; - -class GeoParquetGroupSpliterator implements Spliterator<GeoParquetGroup> { - - private final GeoParquetReader geoParquetReader; - private final List<FileStatus> fileStatuses; - private int currentFileIndex; - private int currentEndIndex; - private ParquetReader<GeoParquetGroup> reader; - - public GeoParquetGroupSpliterator(GeoParquetReader geoParquetReader, List<FileStatus> files) { - this(geoParquetReader, files, 0, files.size()); - } - - GeoParquetGroupSpliterator( - GeoParquetReader geoParquetReader, - List<FileStatus> fileStatuses, - int startIndex, - int endIndex) { - this.geoParquetReader = geoParquetReader; - this.fileStatuses = fileStatuses; - this.currentFileIndex = startIndex; - this.currentEndIndex = endIndex; - setupReaderForNextFile(); - } - - private void setupReaderForNextFile() { - closeCurrentReader(); - - if (currentFileIndex >= currentEndIndex) { - reader = null; - return; - } - - FileStatus fileStatus = fileStatuses.get(currentFileIndex++); - try { - reader = createParquetReader(fileStatus); - } catch (IOException e) { - throw new GeoParquetException("Failed to create reader for " + fileStatus, e); - } - } - - private void closeCurrentReader() { - if (reader != null) { - try { - reader.close(); - } catch (IOException e) { - // Ignore exceptions during close - } - reader = null; - } - } - - @Override - public boolean tryAdvance(Consumer<? super GeoParquetGroup> action) { - try { - while (true) { - if (reader == null) { - return false; - } - - GeoParquetGroup group = reader.read(); - - if (group == null) { - setupReaderForNextFile(); - continue; - } - - action.accept(group); - return true; - } - } catch (IOException e) { - closeCurrentReader(); - throw new GeoParquetException("IOException caught while trying to read the next file.", e); - } - } - - private ParquetReader<GeoParquetGroup> createParquetReader(FileStatus file) - throws IOException { - return ParquetReader.builder(new GeoParquetGroupReadSupport(), file.getPath()) - .withConf(geoParquetReader.getConfiguration()) - .build(); - } - - @Override - public Spliterator<GeoParquetGroup> trySplit() { - int remainingFiles = currentEndIndex - currentFileIndex; - if (remainingFiles <= 1) { - return null; - } - int mid = currentFileIndex + remainingFiles / 2; - GeoParquetGroupSpliterator split = new GeoParquetGroupSpliterator( - geoParquetReader, fileStatuses, mid, currentEndIndex); - this.currentEndIndex = mid; - return split; - } - - @Override - public long estimateSize() { - // Return Long.MAX_VALUE as the actual number of elements is unknown - return Long.MAX_VALUE; - } - - @Override - public int characteristics() { - return NONNULL | IMMUTABLE; - } -} diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetReader.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetReader.java index 41a41db2..3a9a705b 100644 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetReader.java +++ b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetReader.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.net.URI; import java.util.*; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.Stream; import java.util.stream.StreamSupport; @@ -30,6 +31,7 @@ import org.apache.baremaps.geoparquet.data.GeoParquetGroup; import org.apache.baremaps.geoparquet.data.GeoParquetGroup.Schema; import org.apache.baremaps.geoparquet.data.GeoParquetGroupFactory; import org.apache.baremaps.geoparquet.data.GeoParquetMetadata; +import org.apache.baremaps.geoparquet.hadoop.GeoParquetGroupReadSupport; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -37,6 +39,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider; import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.ParquetReader; import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.schema.MessageType; @@ -46,42 +49,20 @@ import org.apache.parquet.schema.MessageType; */ public class GeoParquetReader { - private final URI uri; private final Configuration configuration; private final List<FileStatus> files; private final AtomicLong groupCount = new AtomicLong(-1); - private static class FileInfo { - final FileStatus file; - final long recordCount; - final Map<String, String> keyValueMetadata; - final MessageType messageType; - final GeoParquetMetadata metadata; - final Schema geoParquetSchema; - - FileInfo(FileStatus file, long recordCount, Map<String, String> keyValueMetadata, - MessageType messageType, GeoParquetMetadata metadata, - Schema geoParquetSchema) { - this.file = file; - this.recordCount = recordCount; - this.keyValueMetadata = keyValueMetadata; - this.messageType = messageType; - this.metadata = metadata; - this.geoParquetSchema = geoParquetSchema; - } - } - public GeoParquetReader(URI uri) { - this(uri, createConfiguration()); + this(uri, createDefaultConfiguration()); } public GeoParquetReader(URI uri, Configuration configuration) { - this.uri = uri; this.configuration = configuration; - this.files = initializeFiles(); + this.files = initializeFiles(uri, configuration); } - private List<FileStatus> initializeFiles() { + private static List<FileStatus> initializeFiles(URI uri, Configuration configuration) { try { Path globPath = new Path(uri.getPath()); FileSystem fileSystem = FileSystem.get(uri, configuration); @@ -105,22 +86,18 @@ public class GeoParquetReader { private FileInfo getFileInfo(FileStatus fileStatus) { try { - long recordCount; - MessageType messageType; - Map<String, String> keyValueMetadata; - ParquetMetadata parquetMetadata = ParquetFileReader.readFooter(configuration, fileStatus.getPath()); - recordCount = parquetMetadata.getBlocks().stream() + long recordCount = parquetMetadata.getBlocks().stream() .mapToLong(BlockMetaData::getRowCount) .sum(); - messageType = parquetMetadata.getFileMetaData().getSchema(); - keyValueMetadata = parquetMetadata.getFileMetaData().getKeyValueMetaData(); + MessageType messageType = parquetMetadata.getFileMetaData().getSchema(); + Map<String, String> keyValueMetadata = + parquetMetadata.getFileMetaData().getKeyValueMetaData(); GeoParquetMetadata geoParquetMetadata = null; Schema geoParquetSchema = null; - if (keyValueMetadata.containsKey("geo")) { ObjectMapper objectMapper = new ObjectMapper(); objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); @@ -130,8 +107,14 @@ public class GeoParquetReader { GeoParquetGroupFactory.createGeoParquetSchema(messageType, geoParquetMetadata); } - return new FileInfo(fileStatus, recordCount, keyValueMetadata, messageType, - geoParquetMetadata, geoParquetSchema); + return new FileInfo( + fileStatus, + recordCount, + keyValueMetadata, + messageType, + geoParquetMetadata, + geoParquetSchema); + } catch (IOException e) { throw new GeoParquetException("Failed to build FileInfo for file: " + fileStatus, e); } @@ -141,16 +124,20 @@ public class GeoParquetReader { return files.stream() .findFirst() .map(this::getFileInfo) - .orElseThrow( - () -> new GeoParquetException("No files available to read metadata.")).metadata; + .orElseThrow(this::noParquetFilesAvailable) + .metadata(); } public Schema getGeoParquetSchema() { return files.stream() .findFirst() .map(this::getFileInfo) - .orElseThrow( - () -> new GeoParquetException("No files available to read schema.")).geoParquetSchema; + .orElseThrow(this::noParquetFilesAvailable) + .geoParquetSchema(); + } + + public GeoParquetException noParquetFilesAvailable() { + return new GeoParquetException("No parquet files available."); } public boolean validateSchemasAreIdentical() { @@ -173,13 +160,8 @@ public class GeoParquetReader { return groupCount.get(); } - public Stream<GeoParquetGroup> readParallel() { - return retrieveGeoParquetGroups(true); - } - private Stream<GeoParquetGroup> retrieveGeoParquetGroups(boolean inParallel) { - Spliterator<GeoParquetGroup> spliterator = - new GeoParquetGroupSpliterator(this, files, 0, files.size()); + Spliterator<GeoParquetGroup> spliterator = new GeoParquetSpliterator(0, files.size()); return StreamSupport.stream(spliterator, inParallel); } @@ -187,7 +169,11 @@ public class GeoParquetReader { return retrieveGeoParquetGroups(false); } - private static Configuration createConfiguration() { + public Stream<GeoParquetGroup> readParallel() { + return retrieveGeoParquetGroups(true); + } + + private static Configuration createDefaultConfiguration() { Configuration conf = new Configuration(); conf.set("fs.s3a.endpoint", "s3.us-west-2.amazonaws.com"); conf.set("fs.s3a.aws.credentials.provider", AnonymousAWSCredentialsProvider.class.getName()); @@ -196,15 +182,109 @@ public class GeoParquetReader { return conf; } - public URI getUri() { - return uri; - } + private record FileInfo( + FileStatus file, + long recordCount, + Map<String, String> keyValueMetadata, + MessageType messageType, + GeoParquetMetadata metadata, + Schema geoParquetSchema) { - public Configuration getConfiguration() { - return configuration; } - public List<FileStatus> getFiles() { - return files; + private class GeoParquetSpliterator implements Spliterator<GeoParquetGroup> { + + private int currentFileIndex; + private int currentEndIndex; + private ParquetReader<GeoParquetGroup> reader; + + GeoParquetSpliterator( + int startIndex, + int endIndex) { + this.currentFileIndex = startIndex; + this.currentEndIndex = endIndex; + setupReaderForNextFile(); + } + + private void setupReaderForNextFile() { + closeCurrentReader(); + + if (currentFileIndex >= currentEndIndex) { + reader = null; + return; + } + + FileStatus fileStatus = files.get(currentFileIndex++); + try { + reader = createParquetReader(fileStatus); + } catch (IOException e) { + throw new GeoParquetException("Failed to create reader for " + fileStatus, e); + } + } + + private void closeCurrentReader() { + if (reader != null) { + try { + reader.close(); + } catch (IOException e) { + // Ignore exceptions during close + } + reader = null; + } + } + + @Override + public boolean tryAdvance(Consumer<? super GeoParquetGroup> action) { + try { + while (true) { + if (reader == null) { + return false; + } + + GeoParquetGroup group = reader.read(); + + if (group == null) { + setupReaderForNextFile(); + continue; + } + + action.accept(group); + return true; + } + } catch (IOException e) { + closeCurrentReader(); + throw new GeoParquetException("IOException caught while trying to read the next file.", e); + } + } + + private ParquetReader<GeoParquetGroup> createParquetReader(FileStatus file) + throws IOException { + return ParquetReader.builder(new GeoParquetGroupReadSupport(), file.getPath()) + .withConf(configuration) + .build(); + } + + @Override + public Spliterator<GeoParquetGroup> trySplit() { + int remainingFiles = currentEndIndex - currentFileIndex; + if (remainingFiles <= 1) { + return null; + } + int mid = currentFileIndex + remainingFiles / 2; + GeoParquetSpliterator split = new GeoParquetSpliterator(mid, currentEndIndex); + this.currentEndIndex = mid; + return split; + } + + @Override + public long estimateSize() { + // Return Long.MAX_VALUE as the actual number of elements is unknown + return Long.MAX_VALUE; + } + + @Override + public int characteristics() { + return NONNULL | IMMUTABLE; + } } } diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroup.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroup.java index 46e57bb3..81366348 100644 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroup.java +++ b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroup.java @@ -30,19 +30,19 @@ import org.locationtech.jts.geom.Geometry; public interface GeoParquetGroup { /** - * Returns the GeoParquet schema of the group built upon the Parquet schema and the GeoParquet - * metadata. + * Returns the Parquet schema of the group. * - * @return the GeoParquet schema + * @return the Parquet schema */ - Schema getSchema(); + GroupType getParquetSchema(); /** - * Returns the Parquet schema of the group. + * Returns the GeoParquet schema of the group built upon the Parquet schema and the GeoParquet + * metadata. * - * @return the Parquet schema + * @return the GeoParquet schema */ - GroupType getParquetSchema(); + Schema getGeoParquetSchema(); /** * Returns the GeoParquet metadata of the group. diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupImpl.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupImpl.java index a108e383..2a41d531 100644 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupImpl.java +++ b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupImpl.java @@ -288,7 +288,7 @@ public class GeoParquetGroupImpl implements GeoParquetGroup { } @Override - public Schema getSchema() { + public Schema getGeoParquetSchema() { return geoParquetSchema; } @@ -437,16 +437,16 @@ public class GeoParquetGroupImpl implements GeoParquetGroup { @Override public List<Envelope> getEnvelopeValues(int fieldIndex) { return getGroupValues(fieldIndex).stream().map(group -> { - double xMin = group.getSchema().fields().get(0).type().equals(Type.FLOAT) + double xMin = group.getGeoParquetSchema().fields().get(0).type().equals(Type.FLOAT) ? (double) group.getFloatValue(0) : group.getDoubleValue(0); - double yMin = group.getSchema().fields().get(1).type().equals(Type.FLOAT) + double yMin = group.getGeoParquetSchema().fields().get(1).type().equals(Type.FLOAT) ? (double) group.getFloatValue(1) : group.getDoubleValue(1); - double xMax = group.getSchema().fields().get(2).type().equals(Type.FLOAT) + double xMax = group.getGeoParquetSchema().fields().get(2).type().equals(Type.FLOAT) ? (double) group.getFloatValue(2) : group.getDoubleValue(2); - double yMax = group.getSchema().fields().get(0).type().equals(Type.FLOAT) + double yMax = group.getGeoParquetSchema().fields().get(0).type().equals(Type.FLOAT) ? (double) group.getFloatValue(3) : group.getDoubleValue(3); return new Envelope(xMin, xMax, yMin, yMax);
