This is an automated email from the ASF dual-hosted git repository.

bchapuis pushed a commit to branch geoparquet-filtering-and-simplification
in repository https://gitbox.apache.org/repos/asf/incubator-baremaps.git

commit e866f7625061e2c324b3e842dab600ae2d0d382d
Author: Bertil Chapuis <[email protected]>
AuthorDate: Mon Sep 30 22:20:21 2024 +0200

    Improve spliterator concurrency
---
 .../geoparquet/GeoParquetGroupSpliterator.java     | 129 +++++++++++----------
 1 file changed, 65 insertions(+), 64 deletions(-)

diff --git 
a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetGroupSpliterator.java
 
b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetGroupSpliterator.java
index 7a86a386..c17797f6 100644
--- 
a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetGroupSpliterator.java
+++ 
b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetGroupSpliterator.java
@@ -1,24 +1,8 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
 package org.apache.baremaps.geoparquet;
 
 import java.io.IOException;
-import java.util.*;
+import java.util.List;
+import java.util.Spliterator;
 import java.util.function.Consumer;
 import org.apache.baremaps.geoparquet.data.GeoParquetGroup;
 import org.apache.baremaps.geoparquet.hadoop.GeoParquetGroupReadSupport;
@@ -29,16 +13,37 @@ class GeoParquetGroupSpliterator implements 
Spliterator<GeoParquetGroup> {
 
   private final GeoParquetReader geoParquetReader;
   private final List<FileStatus> fileStatuses;
+  private int currentFileIndex;
+  private int currentEndIndex;
   private ParquetReader<GeoParquetGroup> reader;
 
-  GeoParquetGroupSpliterator(GeoParquetReader geoParquetReader, 
List<FileStatus> files) {
+  public GeoParquetGroupSpliterator(GeoParquetReader geoParquetReader, 
List<FileStatus> files) {
+    this(geoParquetReader, files, 0, files.size());
+  }
+
+  GeoParquetGroupSpliterator(
+          GeoParquetReader geoParquetReader,
+          List<FileStatus> fileStatuses,
+          int startIndex,
+          int endIndex) {
     this.geoParquetReader = geoParquetReader;
-    this.fileStatuses = Collections.synchronizedList(files);
+    this.fileStatuses = fileStatuses;
+    this.currentFileIndex = startIndex;
+    this.currentEndIndex = endIndex;
     setupReaderForNextFile();
   }
 
+
+
   private void setupReaderForNextFile() {
-    FileStatus fileStatus = fileStatuses.remove(0);
+    closeCurrentReader();
+
+    if (currentFileIndex >= currentEndIndex) {
+      reader = null;
+      return;
+    }
+
+    FileStatus fileStatus = fileStatuses.get(currentFileIndex++);
     try {
       reader = createParquetReader(fileStatus);
     } catch (IOException e) {
@@ -46,73 +51,69 @@ class GeoParquetGroupSpliterator implements 
Spliterator<GeoParquetGroup> {
     }
   }
 
+  private void closeCurrentReader() {
+    if (reader != null) {
+      try {
+        reader.close();
+      } catch (IOException e) {
+        // Ignore exceptions during close
+      }
+      reader = null;
+    }
+  }
+
   @Override
   public boolean tryAdvance(Consumer<? super GeoParquetGroup> action) {
     try {
-      // Read the next group
-      GeoParquetGroup group = reader.read();
-
-      // If the group is null, try to get the one from the next file.
-      while (group == null) {
-        synchronized (fileStatuses) {
-          if (fileStatuses.isEmpty()) {
-            reader.close();
-            return false;
-          }
-          setupReaderForNextFile();
+      while (true) {
+        if (reader == null) {
+          return false;
         }
-        group = reader.read();
-      }
 
-      // Accept the group and tell the caller that there are more groups to 
read
-      action.accept(group);
-      return true;
+        GeoParquetGroup group = reader.read();
 
-    } catch (IOException e) {
-      // If an exception occurs, try to close the resources and throw a 
runtime exception
-      try {
-        reader.close();
-      } catch (IOException e2) {
-        // Ignore the exception as the original exception is more important
+        if (group == null) {
+          setupReaderForNextFile();
+          continue;
+        }
+
+        action.accept(group);
+        return true;
       }
+    } catch (IOException e) {
+      closeCurrentReader();
       throw new GeoParquetException("IOException caught while trying to read 
the next file.", e);
     }
   }
 
   private ParquetReader<GeoParquetGroup> createParquetReader(FileStatus file)
-      throws IOException {
-    return ParquetReader
-        .builder(new GeoParquetGroupReadSupport(), file.getPath())
-        .withConf(geoParquetReader.configuration)
-        .build();
+          throws IOException {
+    return ParquetReader.builder(new GeoParquetGroupReadSupport(), 
file.getPath())
+            .withConf(geoParquetReader.configuration)
+            .build();
   }
 
   @Override
   public Spliterator<GeoParquetGroup> trySplit() {
-    List<FileStatus> sublist;
-    synchronized (fileStatuses) {
-      if (fileStatuses.size() < 2) {
-        // There is nothing left to split
-        return null;
-      }
-
-      sublist = fileStatuses.subList(0, fileStatuses.size() / 2);
+    int remainingFiles = currentEndIndex - currentFileIndex;
+    if (remainingFiles <= 1) {
+      return null;
     }
-    List<FileStatus> secondList = new ArrayList<>(sublist);
-    sublist.clear();
-
-    // Return a new spliterator with the sublist
-    return new GeoParquetGroupSpliterator(geoParquetReader, secondList);
+    int mid = currentFileIndex + remainingFiles / 2;
+    GeoParquetGroupSpliterator split = new GeoParquetGroupSpliterator(
+            geoParquetReader, fileStatuses, mid, currentEndIndex);
+    this.currentEndIndex = mid;
+    return split;
   }
 
   @Override
   public long estimateSize() {
-    return geoParquetReader.size();
+    // Return Long.MAX_VALUE as the actual number of elements is unknown
+    return Long.MAX_VALUE;
   }
 
   @Override
   public int characteristics() {
-    // The spliterator is not ordered, or sorted
-    return NONNULL | CONCURRENT | DISTINCT;
+    return NONNULL | IMMUTABLE;
   }
 }

Reply via email to