This is an automated email from the ASF dual-hosted git repository.

bchapuis pushed a commit to branch geoparquet-filtering-and-simplification
in repository https://gitbox.apache.org/repos/asf/incubator-baremaps.git

commit ea1eb1163033d5230a358b86e03f872472179945
Author: Bertil Chapuis <[email protected]>
AuthorDate: Mon Sep 30 23:45:46 2024 +0200

    Improve the geoparquet reader
---
 .../geoparquet/GeoParquetGroupSpliterator.java     |  33 ++++--
 .../baremaps/geoparquet/GeoParquetReader.java      | 121 ++++++++++++++-------
 2 files changed, 105 insertions(+), 49 deletions(-)

diff --git 
a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetGroupSpliterator.java
 
b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetGroupSpliterator.java
index c17797f6..62f6e927 100644
--- 
a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetGroupSpliterator.java
+++ 
b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetGroupSpliterator.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.baremaps.geoparquet;
 
 import java.io.IOException;
@@ -22,10 +39,10 @@ class GeoParquetGroupSpliterator implements 
Spliterator<GeoParquetGroup> {
   }
 
   GeoParquetGroupSpliterator(
-          GeoParquetReader geoParquetReader,
-          List<FileStatus> fileStatuses,
-          int startIndex,
-          int endIndex) {
+      GeoParquetReader geoParquetReader,
+      List<FileStatus> fileStatuses,
+      int startIndex,
+      int endIndex) {
     this.geoParquetReader = geoParquetReader;
     this.fileStatuses = fileStatuses;
     this.currentFileIndex = startIndex;
@@ -87,10 +104,10 @@ class GeoParquetGroupSpliterator implements 
Spliterator<GeoParquetGroup> {
   }
 
   private ParquetReader<GeoParquetGroup> createParquetReader(FileStatus file)
-          throws IOException {
+      throws IOException {
     return ParquetReader.builder(new GeoParquetGroupReadSupport(), 
file.getPath())
-            .withConf(geoParquetReader.configuration)
-            .build();
+        .withConf(geoParquetReader.getConfiguration())
+        .build();
   }
 
   @Override
@@ -101,7 +118,7 @@ class GeoParquetGroupSpliterator implements 
Spliterator<GeoParquetGroup> {
     }
     int mid = currentFileIndex + remainingFiles / 2;
     GeoParquetGroupSpliterator split = new GeoParquetGroupSpliterator(
-            geoParquetReader, fileStatuses, mid, currentEndIndex);
+        geoParquetReader, fileStatuses, mid, currentEndIndex);
     this.currentEndIndex = mid;
     return split;
   }
diff --git 
a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetReader.java
 
b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetReader.java
index 03a07e68..994609ee 100644
--- 
a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetReader.java
+++ 
b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetReader.java
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import java.io.IOException;
 import java.net.URI;
 import java.util.*;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
@@ -38,20 +39,34 @@ import org.apache.hadoop.fs.s3a.S3AFileSystem;
 import org.apache.parquet.hadoop.ParquetFileReader;
 import org.apache.parquet.schema.MessageType;
 
-
 /**
  * This reader is based on the parquet example code located at: 
org.apache.parquet.example.data.*.
  */
 public class GeoParquetReader {
 
   private final URI uri;
-  final Configuration configuration;
-  private List<FileStatus> files;
-  private Long groupCount;
-
-  record FileInfo(FileStatus file, Long recordCount, Map<String, String> 
keyValueMetadata,
-      MessageType messageType, GeoParquetMetadata metadata,
-      GeoParquetGroup.Schema geoParquetSchema) {
+  private final Configuration configuration;
+  private final List<FileStatus> files;
+  private final AtomicLong groupCount = new AtomicLong(-1);
+
+  private static class FileInfo {
+    final FileStatus file;
+    final long recordCount;
+    final Map<String, String> keyValueMetadata;
+    final MessageType messageType;
+    final GeoParquetMetadata metadata;
+    final Schema geoParquetSchema;
+
+    FileInfo(FileStatus file, long recordCount, Map<String, String> 
keyValueMetadata,
+        MessageType messageType, GeoParquetMetadata metadata,
+        Schema geoParquetSchema) {
+      this.file = file;
+      this.recordCount = recordCount;
+      this.keyValueMetadata = keyValueMetadata;
+      this.messageType = messageType;
+      this.metadata = metadata;
+      this.geoParquetSchema = geoParquetSchema;
+    }
   }
 
   public GeoParquetReader(URI uri) {
@@ -61,80 +76,89 @@ public class GeoParquetReader {
   public GeoParquetReader(URI uri, Configuration configuration) {
     this.uri = uri;
     this.configuration = configuration;
+    this.files = initializeFiles();
+  }
+
+  private List<FileStatus> initializeFiles() {
+    try {
+      Path globPath = new Path(uri.getPath());
+      FileSystem fileSystem = FileSystem.get(uri, configuration);
+      FileStatus[] fileStatuses = fileSystem.globStatus(globPath);
+      if (fileStatuses == null) {
+        throw new GeoParquetException("No files found at the specified URI.");
+      }
+      return Collections.unmodifiableList(Arrays.asList(fileStatuses));
+    } catch (IOException e) {
+      throw new GeoParquetException("IOException while attempting to list 
files.", e);
+    }
   }
 
   public MessageType getParquetSchema() {
-    return files().stream()
+    return files.stream()
         .findFirst()
         .map(this::getFileInfo)
-        .orElseThrow()
-        .messageType();
+        .orElseThrow(
+            () -> new GeoParquetException("No files available to read 
schema.")).messageType;
   }
 
   private FileInfo getFileInfo(FileStatus fileStatus) {
     try {
       return buildFileInfo(fileStatus);
     } catch (IOException e) {
-      throw new GeoParquetException("Failed to build Info", e);
+      throw new GeoParquetException("Failed to build FileInfo for file: " + 
fileStatus, e);
     }
   }
 
   public GeoParquetMetadata getGeoParquetMetadata() {
-    return files().stream()
+    return files.stream()
         .findFirst()
         .map(this::getFileInfo)
-        .orElseThrow()
-        .metadata();
+        .orElseThrow(
+            () -> new GeoParquetException("No files available to read 
metadata.")).metadata;
   }
 
   public Schema getGeoParquetSchema() {
-    return files().stream()
+    return files.stream()
         .findFirst()
         .map(this::getFileInfo)
-        .orElseThrow()
-        .geoParquetSchema();
+        .orElseThrow(
+            () -> new GeoParquetException("No files available to read 
schema.")).geoParquetSchema;
   }
 
   public boolean validateSchemasAreIdentical() {
-    // Verify that the files all have the same schema
-    final int messageTypeCount = 
files().stream().parallel().map(this::getFileInfo)
-        .map(FileInfo::messageType).collect(Collectors.toSet()).size();
-    return messageTypeCount == 1;
+    // Verify that all files have the same schema
+    Set<MessageType> schemas = files.parallelStream()
+        .map(this::getFileInfo)
+        .map(fileInfo -> fileInfo.messageType)
+        .collect(Collectors.toSet());
+    return schemas.size() == 1;
   }
 
   public long size() {
-    if (groupCount == null) {
-      groupCount = 
files().stream().parallel().map(this::getFileInfo).map(FileInfo::recordCount)
-          .reduce(0L, Long::sum);
+    if (groupCount.get() == -1) {
+      long totalCount = files.parallelStream()
+          .map(this::getFileInfo)
+          .mapToLong(fileInfo -> fileInfo.recordCount)
+          .sum();
+      groupCount.set(totalCount);
     }
-    return groupCount;
-  }
-
-  private synchronized List<FileStatus> files() {
-    try {
-      if (files == null) {
-        Path globPath = new Path(uri.getPath());
-        FileSystem fileSystem = FileSystem.get(uri, configuration);
-
-        files = new 
ArrayList<>(Arrays.asList(fileSystem.globStatus(globPath)));
-      }
-    } catch (IOException e) {
-      throw new GeoParquetException("IOException while attempting to list 
files.", e);
-    }
-    return files;
+    return groupCount.get();
   }
 
   private FileInfo buildFileInfo(FileStatus file) throws IOException {
     long recordCount;
     MessageType messageType;
     Map<String, String> keyValueMetadata;
+
     try (ParquetFileReader reader = ParquetFileReader.open(configuration, 
file.getPath())) {
       recordCount = reader.getRecordCount();
       messageType = reader.getFileMetaData().getSchema();
       keyValueMetadata = reader.getFileMetaData().getKeyValueMetaData();
     }
+
     GeoParquetMetadata geoParquetMetadata = null;
     Schema geoParquetSchema = null;
+
     if (keyValueMetadata.containsKey("geo")) {
       ObjectMapper objectMapper = new ObjectMapper();
       
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
@@ -143,6 +167,7 @@ public class GeoParquetReader {
       geoParquetSchema =
           GeoParquetGroupFactory.createGeoParquetSchema(messageType, 
geoParquetMetadata);
     }
+
     return new FileInfo(file, recordCount, keyValueMetadata, messageType,
         geoParquetMetadata, geoParquetSchema);
   }
@@ -152,7 +177,9 @@ public class GeoParquetReader {
   }
 
   private Stream<GeoParquetGroup> retrieveGeoParquetGroups(boolean inParallel) 
{
-    return StreamSupport.stream(new GeoParquetGroupSpliterator(this, files()), 
inParallel);
+    Spliterator<GeoParquetGroup> spliterator =
+        new GeoParquetGroupSpliterator(this, files, 0, files.size());
+    return StreamSupport.stream(spliterator, inParallel);
   }
 
   public Stream<GeoParquetGroup> read() {
@@ -167,4 +194,16 @@ public class GeoParquetReader {
     conf.set("fs.s3a.path.style.access", "true");
     return conf;
   }
+
+  public URI getUri() {
+    return uri;
+  }
+
+  public Configuration getConfiguration() {
+    return configuration;
+  }
+
+  public List<FileStatus> getFiles() {
+    return files;
+  }
 }

Reply via email to