This is an automated email from the ASF dual-hosted git repository. bchapuis pushed a commit to branch geoparquet-filtering-and-simplification in repository https://gitbox.apache.org/repos/asf/incubator-baremaps.git
commit ea1eb1163033d5230a358b86e03f872472179945 Author: Bertil Chapuis <[email protected]> AuthorDate: Mon Sep 30 23:45:46 2024 +0200 Improve the geoparquet reader --- .../geoparquet/GeoParquetGroupSpliterator.java | 33 ++++-- .../baremaps/geoparquet/GeoParquetReader.java | 121 ++++++++++++++------- 2 files changed, 105 insertions(+), 49 deletions(-) diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetGroupSpliterator.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetGroupSpliterator.java index c17797f6..62f6e927 100644 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetGroupSpliterator.java +++ b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetGroupSpliterator.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.baremaps.geoparquet; import java.io.IOException; @@ -22,10 +39,10 @@ class GeoParquetGroupSpliterator implements Spliterator<GeoParquetGroup> { } GeoParquetGroupSpliterator( - GeoParquetReader geoParquetReader, - List<FileStatus> fileStatuses, - int startIndex, - int endIndex) { + GeoParquetReader geoParquetReader, + List<FileStatus> fileStatuses, + int startIndex, + int endIndex) { this.geoParquetReader = geoParquetReader; this.fileStatuses = fileStatuses; this.currentFileIndex = startIndex; @@ -87,10 +104,10 @@ class GeoParquetGroupSpliterator implements Spliterator<GeoParquetGroup> { } private ParquetReader<GeoParquetGroup> createParquetReader(FileStatus file) - throws IOException { + throws IOException { return ParquetReader.builder(new GeoParquetGroupReadSupport(), file.getPath()) - .withConf(geoParquetReader.configuration) - .build(); + .withConf(geoParquetReader.getConfiguration()) + .build(); } @Override @@ -101,7 +118,7 @@ class GeoParquetGroupSpliterator implements Spliterator<GeoParquetGroup> { } int mid = currentFileIndex + remainingFiles / 2; GeoParquetGroupSpliterator split = new GeoParquetGroupSpliterator( - geoParquetReader, fileStatuses, mid, currentEndIndex); + geoParquetReader, fileStatuses, mid, currentEndIndex); this.currentEndIndex = mid; return split; } diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetReader.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetReader.java index 03a07e68..994609ee 100644 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetReader.java +++ b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetReader.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; import java.net.URI; import java.util.*; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import java.util.stream.Stream; import java.util.stream.StreamSupport; @@ -38,20 +39,34 @@ import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.schema.MessageType; - /** * This reader is based on the parquet example code located at: org.apache.parquet.example.data.*. */ public class GeoParquetReader { private final URI uri; - final Configuration configuration; - private List<FileStatus> files; - private Long groupCount; - - record FileInfo(FileStatus file, Long recordCount, Map<String, String> keyValueMetadata, - MessageType messageType, GeoParquetMetadata metadata, - GeoParquetGroup.Schema geoParquetSchema) { + private final Configuration configuration; + private final List<FileStatus> files; + private final AtomicLong groupCount = new AtomicLong(-1); + + private static class FileInfo { + final FileStatus file; + final long recordCount; + final Map<String, String> keyValueMetadata; + final MessageType messageType; + final GeoParquetMetadata metadata; + final Schema geoParquetSchema; + + FileInfo(FileStatus file, long recordCount, Map<String, String> keyValueMetadata, + MessageType messageType, GeoParquetMetadata metadata, + Schema geoParquetSchema) { + this.file = file; + this.recordCount = recordCount; + this.keyValueMetadata = keyValueMetadata; + this.messageType = messageType; + this.metadata = metadata; + this.geoParquetSchema = geoParquetSchema; + } } public GeoParquetReader(URI uri) { @@ -61,80 +76,89 @@ public class GeoParquetReader { public GeoParquetReader(URI uri, Configuration configuration) { this.uri = uri; this.configuration = configuration; + this.files = initializeFiles(); + } + + private List<FileStatus> initializeFiles() { + try { + Path globPath = new Path(uri.getPath()); + FileSystem fileSystem = FileSystem.get(uri, configuration); + FileStatus[] fileStatuses = fileSystem.globStatus(globPath); + if (fileStatuses == null) { + throw new GeoParquetException("No files found at the specified URI."); + } + return Collections.unmodifiableList(Arrays.asList(fileStatuses)); + } catch (IOException e) { + throw new GeoParquetException("IOException while attempting to list files.", e); + } } public MessageType getParquetSchema() { - return files().stream() + return files.stream() .findFirst() .map(this::getFileInfo) - .orElseThrow() - .messageType(); + .orElseThrow( + () -> new GeoParquetException("No files available to read schema.")).messageType; } private FileInfo getFileInfo(FileStatus fileStatus) { try { return buildFileInfo(fileStatus); } catch (IOException e) { - throw new GeoParquetException("Failed to build Info", e); + throw new GeoParquetException("Failed to build FileInfo for file: " + fileStatus, e); } } public GeoParquetMetadata getGeoParquetMetadata() { - return files().stream() + return files.stream() .findFirst() .map(this::getFileInfo) - .orElseThrow() - .metadata(); + .orElseThrow( + () -> new GeoParquetException("No files available to read metadata.")).metadata; } public Schema getGeoParquetSchema() { - return files().stream() + return files.stream() .findFirst() .map(this::getFileInfo) - .orElseThrow() - .geoParquetSchema(); + .orElseThrow( + () -> new GeoParquetException("No files available to read schema.")).geoParquetSchema; } public boolean validateSchemasAreIdentical() { - // Verify that the files all have the same schema - final int messageTypeCount = files().stream().parallel().map(this::getFileInfo) - .map(FileInfo::messageType).collect(Collectors.toSet()).size(); - return messageTypeCount == 1; + // Verify that all files have the same schema + Set<MessageType> schemas = files.parallelStream() + .map(this::getFileInfo) + .map(fileInfo -> fileInfo.messageType) + .collect(Collectors.toSet()); + return schemas.size() == 1; } public long size() { - if (groupCount == null) { - groupCount = files().stream().parallel().map(this::getFileInfo).map(FileInfo::recordCount) - .reduce(0L, Long::sum); + if (groupCount.get() == -1) { + long totalCount = files.parallelStream() + .map(this::getFileInfo) + .mapToLong(fileInfo -> fileInfo.recordCount) + .sum(); + groupCount.set(totalCount); } - return groupCount; - } - - private synchronized List<FileStatus> files() { - try { - if (files == null) { - Path globPath = new Path(uri.getPath()); - FileSystem fileSystem = FileSystem.get(uri, configuration); - - files = new ArrayList<>(Arrays.asList(fileSystem.globStatus(globPath))); - } - } catch (IOException e) { - throw new GeoParquetException("IOException while attempting to list files.", e); - } - return files; + return groupCount.get(); } private FileInfo buildFileInfo(FileStatus file) throws IOException { long recordCount; MessageType messageType; Map<String, String> keyValueMetadata; + try (ParquetFileReader reader = ParquetFileReader.open(configuration, file.getPath())) { recordCount = reader.getRecordCount(); messageType = reader.getFileMetaData().getSchema(); keyValueMetadata = reader.getFileMetaData().getKeyValueMetaData(); } + GeoParquetMetadata geoParquetMetadata = null; Schema geoParquetSchema = null; + if (keyValueMetadata.containsKey("geo")) { ObjectMapper objectMapper = new ObjectMapper(); objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); @@ -143,6 +167,7 @@ public class GeoParquetReader { geoParquetSchema = GeoParquetGroupFactory.createGeoParquetSchema(messageType, geoParquetMetadata); } + return new FileInfo(file, recordCount, keyValueMetadata, messageType, geoParquetMetadata, geoParquetSchema); } @@ -152,7 +177,9 @@ public class GeoParquetReader { } private Stream<GeoParquetGroup> retrieveGeoParquetGroups(boolean inParallel) { - return StreamSupport.stream(new GeoParquetGroupSpliterator(this, files()), inParallel); + Spliterator<GeoParquetGroup> spliterator = + new GeoParquetGroupSpliterator(this, files, 0, files.size()); + return StreamSupport.stream(spliterator, inParallel); } public Stream<GeoParquetGroup> read() { @@ -167,4 +194,16 @@ public class GeoParquetReader { conf.set("fs.s3a.path.style.access", "true"); return conf; } + + public URI getUri() { + return uri; + } + + public Configuration getConfiguration() { + return configuration; + } + + public List<FileStatus> getFiles() { + return files; + } }
