This is an automated email from the ASF dual-hosted git repository.
bchapuis pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-baremaps.git
The following commit(s) were added to refs/heads/main by this push:
new cd43aaee Make SplitIterator an outer class and fix parallelism (#863)
cd43aaee is described below
commit cd43aaeeeb3d0b59bb7afc7f7910f34e85597986
Author: sebr72 <[email protected]>
AuthorDate: Wed Jun 5 09:57:42 2024 +0200
Make SplitIterator an outer class and fix parallelism (#863)
---
.../geoparquet/GeoParquetGroupSpliterator.java | 135 +++++++++++++++++++++
.../baremaps/geoparquet/GeoParquetReader.java | 112 +----------------
2 files changed, 138 insertions(+), 109 deletions(-)
diff --git
a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetGroupSpliterator.java
b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetGroupSpliterator.java
new file mode 100644
index 00000000..465b1226
--- /dev/null
+++
b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetGroupSpliterator.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.baremaps.geoparquet;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Spliterator;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.function.Consumer;
+import org.apache.baremaps.geoparquet.data.GeoParquetGroup;
+import org.apache.baremaps.geoparquet.hadoop.GeoParquetGroupReadSupport;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.parquet.hadoop.ParquetReader;
+
+public class GeoParquetGroupSpliterator implements
Spliterator<GeoParquetGroup> {
+
+ private final GeoParquetReader geoParquetReader;
+ private final Queue<FileStatus> queue;
+ private final Map<FileStatus, GeoParquetReader.FileInfo> files;
+ private FileStatus fileStatus = null;
+ private ParquetReader<GeoParquetGroup> reader;
+
+ GeoParquetGroupSpliterator(GeoParquetReader geoParquetReader,
+ Map<FileStatus, GeoParquetReader.FileInfo> files) {
+ this.geoParquetReader = geoParquetReader;
+ this.files = files;
+ this.queue = new ArrayBlockingQueue<>(files.keySet().size(), false,
files.keySet());
+ }
+
+ @Override
+ public boolean tryAdvance(Consumer<? super GeoParquetGroup> action) {
+ try {
+ // Poll the next file
+ if (fileStatus == null) {
+ fileStatus = queue.poll();
+ }
+
+ // If there are no more files, return false
+ if (fileStatus == null) {
+ return false;
+ }
+
+ // Create a new reader if it does not exist
+ if (reader == null) {
+ reader = createParquetReader(fileStatus);
+ }
+
+ // Read the next group
+ GeoParquetGroup group = reader.read();
+
+ // If the group is null, close the resources and set the variables to
null
+ if (group == null) {
+ reader.close();
+ reader = null;
+ fileStatus = null;
+
+ // Try to advance again
+ return tryAdvance(action);
+ }
+
+ // Accept the group and tell the caller that there are more groups to
read
+ action.accept(group);
+ return true;
+
+ } catch (IOException e) {
+ // If an exception occurs, try to close the resources and throw a
runtime exception
+ if (reader != null) {
+ try {
+ reader.close();
+ } catch (IOException e2) {
+ // Ignore the exception as the original exception is more important
+ }
+ reader = null;
+ }
+ throw new GeoParquetException("IOException caught while trying to read
the next file.", e);
+ }
+ }
+
+ private ParquetReader<GeoParquetGroup> createParquetReader(FileStatus file)
+ throws IOException {
+ return ParquetReader
+ .builder(new GeoParquetGroupReadSupport(), file.getPath())
+ .withConf(geoParquetReader.configuration)
+ .build();
+ }
+
+ @Override
+ public Spliterator<GeoParquetGroup> trySplit() {
+ if (queue.size() < 2) {
+ // There is nothing left to split
+ return null;
+ }
+
+ // Create a new spliterator by polling the next polledFileStatus
+ FileStatus polledFileStatus = queue.poll();
+
+ // If there are no more files, tell the caller that there is nothing to
split anymore
+ if (polledFileStatus == null) {
+ return null;
+ }
+
+ // Return a new spliterator with the polledFileStatus
+ return new GeoParquetGroupSpliterator(geoParquetReader,
+ Map.of(polledFileStatus, files.get(polledFileStatus)));
+ }
+
+ @Override
+ public long estimateSize() {
+ return files.values().stream()
+ .map(GeoParquetReader.FileInfo::recordCount)
+ .reduce(0L, Long::sum);
+ }
+
+ @Override
+ public int characteristics() {
+ // The spliterator is not ordered, or sorted
+ return NONNULL | IMMUTABLE | SIZED | DISTINCT;
+ }
+}
diff --git
a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetReader.java
b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetReader.java
index 1eae1417..e88f2c61 100644
---
a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetReader.java
+++
b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetReader.java
@@ -23,15 +23,12 @@ import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.*;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.function.Consumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.baremaps.geoparquet.data.GeoParquetGroup;
import org.apache.baremaps.geoparquet.data.GeoParquetGroup.Schema;
import org.apache.baremaps.geoparquet.data.GeoParquetGroupFactory;
import org.apache.baremaps.geoparquet.data.GeoParquetMetadata;
-import org.apache.baremaps.geoparquet.hadoop.GeoParquetGroupReadSupport;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -39,7 +36,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.parquet.hadoop.ParquetFileReader;
-import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.schema.MessageType;
@@ -50,11 +46,11 @@ public class GeoParquetReader {
private final URI uri;
- private final Configuration configuration;
+ final Configuration configuration;
private Map<FileStatus, FileInfo> files;
- private record FileInfo(FileStatus file, Long recordCount, Map<String,
String> keyValueMetadata,
+ record FileInfo(FileStatus file, Long recordCount, Map<String, String>
keyValueMetadata,
MessageType messageType, GeoParquetMetadata metadata,
GeoParquetGroup.Schema geoParquetSchema) {
}
@@ -144,115 +140,13 @@ public class GeoParquetReader {
}
public Stream<GeoParquetGroup> readParallel() throws URISyntaxException {
- return StreamSupport.stream(
- new GeoParquetGroupSpliterator(files()),
- true);
+ return StreamSupport.stream(new GeoParquetGroupSpliterator(this, files()),
true);
}
public Stream<GeoParquetGroup> read() throws IOException, URISyntaxException
{
return readParallel().sequential();
}
- public class GeoParquetGroupSpliterator implements
Spliterator<GeoParquetGroup> {
-
- private final Queue<FileStatus> queue;
- private final Map<FileStatus, FileInfo> files;
-
- private FileStatus fileStatus;
-
- private ParquetReader<GeoParquetGroup> reader;
-
- GeoParquetGroupSpliterator(Map<FileStatus, FileInfo> files) {
- this.files = files;
- this.queue = new ArrayBlockingQueue<>(files.keySet().size(), false,
files.keySet());
-
- }
-
- @Override
- public boolean tryAdvance(Consumer<? super GeoParquetGroup> action) {
- try {
- // Poll the next file
- if (fileStatus == null) {
- fileStatus = queue.poll();
- }
-
- // If there are no more files, return false
- if (fileStatus == null) {
- return false;
- }
-
- // Create a new reader if it does not exist
- if (reader == null) {
- reader = createParquetReader(fileStatus);
- }
-
- // Read the next group
- GeoParquetGroup group = reader.read();
-
- // If the group is null, close the resources and set the variables to
null
- if (group == null) {
- reader.close();
- reader = null;
- fileStatus = null;
-
- // Try to advance again
- return tryAdvance(action);
- }
-
- // Accept the group and tell the caller that there are more groups to
read
- action.accept(group);
- return true;
-
- } catch (IOException e) {
- // If an exception occurs, try to close the resources and throw a
runtime exception
- if (reader != null) {
- try {
- reader.close();
- } catch (IOException e2) {
- // Ignore the exception as the original exception is more important
- }
- }
- throw new GeoParquetException("IOException caught while trying to read
the next file.", e);
- }
- }
-
- private ParquetReader<GeoParquetGroup> createParquetReader(FileStatus file)
- throws IOException {
- return ParquetReader
- .builder(new GeoParquetGroupReadSupport(), file.getPath())
- .withConf(configuration)
- .build();
- }
-
- @Override
- public Spliterator<GeoParquetGroup> trySplit() {
- // Create a new spliterator by polling the next polledFileStatus
- FileStatus polledFileStatus = queue.poll();
-
- // If there are no more files, tell the caller that there is nothing to
split anymore
- if (polledFileStatus == null) {
- return null;
- }
-
- // Return a new spliterator with the polledFileStatus
- return new GeoParquetGroupSpliterator(Map.of(polledFileStatus,
files.get(polledFileStatus)));
- }
-
- @Override
- public long estimateSize() {
- // The size is unknown
- return files.values().stream()
- .map(FileInfo::recordCount)
- .reduce(0L, Long::sum);
- }
-
- @Override
- public int characteristics() {
- // The spliterator is not sized, ordered, or sorted
- return Spliterator.NONNULL | Spliterator.IMMUTABLE;
- }
- }
-
private static Configuration createConfiguration() {
Configuration conf = new Configuration();
conf.set("fs.s3a.endpoint", "s3.us-west-2.amazonaws.com");