This is an automated email from the ASF dual-hosted git repository.

bchapuis pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-baremaps.git


The following commit(s) were added to refs/heads/main by this push:
     new 9a17df2c File-based SplitIterator (#866)
9a17df2c is described below

commit 9a17df2c17c97f22d4cec3fe0b121ef7d7b5d7d1
Author: sebr72 <[email protected]>
AuthorDate: Sat Jun 8 10:41:30 2024 +0200

    File-based SplitIterator (#866)
    
    * Separate schema validation, and add more tests
    
    * Improve CPU utilisation
---
 .../storage/geoparquet/GeoParquetDataTable.java    |  28 ++----
 .../geoparquet/GeoParquetGroupSpliterator.java     | 101 +++++++++------------
 .../baremaps/geoparquet/GeoParquetReader.java      |  80 +++++++++-------
 .../baremaps/geoparquet/GeoParquetReaderTest.java  |  61 +++++++++----
 4 files changed, 138 insertions(+), 132 deletions(-)

diff --git 
a/baremaps-core/src/main/java/org/apache/baremaps/storage/geoparquet/GeoParquetDataTable.java
 
b/baremaps-core/src/main/java/org/apache/baremaps/storage/geoparquet/GeoParquetDataTable.java
index f8b3cf81..205003d1 100644
--- 
a/baremaps-core/src/main/java/org/apache/baremaps/storage/geoparquet/GeoParquetDataTable.java
+++ 
b/baremaps-core/src/main/java/org/apache/baremaps/storage/geoparquet/GeoParquetDataTable.java
@@ -17,9 +17,7 @@
 
 package org.apache.baremaps.storage.geoparquet;
 
-import java.io.IOException;
 import java.net.URI;
-import java.net.URISyntaxException;
 import java.util.Iterator;
 import java.util.Spliterator;
 import java.util.stream.Stream;
@@ -49,11 +47,7 @@ public class GeoParquetDataTable implements DataTable {
 
   @Override
   public long size() {
-    try {
-      return reader().size();
-    } catch (URISyntaxException e) {
-      throw new GeoParquetException("Fail to access size from reader", e);
-    }
+    return reader().size();
   }
 
   @Override
@@ -73,13 +67,9 @@ public class GeoParquetDataTable implements DataTable {
 
   @Override
   public Stream<DataRow> parallelStream() {
-    try {
-      return reader().read().map(group -> new DataRowImpl(
-          GeoParquetTypeConversion.asSchema(path.toString(), 
group.getSchema()),
-          GeoParquetTypeConversion.asRowValues(group)));
-    } catch (IOException | URISyntaxException e) {
-      throw new GeoParquetException("Fail to read() the reader", e);
-    }
+    return reader().readParallel().map(group -> new DataRowImpl(
+        GeoParquetTypeConversion.asSchema(path.toString(), group.getSchema()),
+        GeoParquetTypeConversion.asRowValues(group)));
   }
 
   @Override
@@ -96,13 +86,9 @@ public class GeoParquetDataTable implements DataTable {
   @Override
   public DataSchema schema() {
     if (schema == null) {
-      try {
-        Schema schema = reader().getGeoParquetSchema();
-        this.schema = GeoParquetTypeConversion.asSchema(path.toString(), 
schema);
-        return this.schema;
-      } catch (URISyntaxException e) {
-        throw new GeoParquetException("Failed to get the schema.", e);
-      }
+      Schema schema = reader().getGeoParquetSchema();
+      this.schema = GeoParquetTypeConversion.asSchema(path.toString(), schema);
+      return this.schema;
     }
     return schema;
   }
diff --git 
a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetGroupSpliterator.java
 
b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetGroupSpliterator.java
index 465b1226..7a86a386 100644
--- 
a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetGroupSpliterator.java
+++ 
b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetGroupSpliterator.java
@@ -18,60 +18,50 @@
 package org.apache.baremaps.geoparquet;
 
 import java.io.IOException;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Spliterator;
-import java.util.concurrent.ArrayBlockingQueue;
+import java.util.*;
 import java.util.function.Consumer;
 import org.apache.baremaps.geoparquet.data.GeoParquetGroup;
 import org.apache.baremaps.geoparquet.hadoop.GeoParquetGroupReadSupport;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.parquet.hadoop.ParquetReader;
 
-public class GeoParquetGroupSpliterator implements 
Spliterator<GeoParquetGroup> {
+class GeoParquetGroupSpliterator implements Spliterator<GeoParquetGroup> {
 
   private final GeoParquetReader geoParquetReader;
-  private final Queue<FileStatus> queue;
-  private final Map<FileStatus, GeoParquetReader.FileInfo> files;
-  private FileStatus fileStatus = null;
+  private final List<FileStatus> fileStatuses;
   private ParquetReader<GeoParquetGroup> reader;
 
-  GeoParquetGroupSpliterator(GeoParquetReader geoParquetReader,
-      Map<FileStatus, GeoParquetReader.FileInfo> files) {
+  GeoParquetGroupSpliterator(GeoParquetReader geoParquetReader, 
List<FileStatus> files) {
     this.geoParquetReader = geoParquetReader;
-    this.files = files;
-    this.queue = new ArrayBlockingQueue<>(files.keySet().size(), false, 
files.keySet());
+    this.fileStatuses = Collections.synchronizedList(files);
+    setupReaderForNextFile();
+  }
+
+  private void setupReaderForNextFile() {
+    FileStatus fileStatus = fileStatuses.remove(0);
+    try {
+      reader = createParquetReader(fileStatus);
+    } catch (IOException e) {
+      throw new GeoParquetException("Failed to create reader for " + 
fileStatus, e);
+    }
   }
 
   @Override
   public boolean tryAdvance(Consumer<? super GeoParquetGroup> action) {
     try {
-      // Poll the next file
-      if (fileStatus == null) {
-        fileStatus = queue.poll();
-      }
-
-      // If there are no more files, return false
-      if (fileStatus == null) {
-        return false;
-      }
-
-      // Create a new reader if it does not exist
-      if (reader == null) {
-        reader = createParquetReader(fileStatus);
-      }
-
       // Read the next group
       GeoParquetGroup group = reader.read();
 
-      // If the group is null, close the resources and set the variables to 
null
-      if (group == null) {
-        reader.close();
-        reader = null;
-        fileStatus = null;
-
-        // Try to advance again
-        return tryAdvance(action);
+      // If the group is null, try to get the one from the next file.
+      while (group == null) {
+        synchronized (fileStatuses) {
+          if (fileStatuses.isEmpty()) {
+            reader.close();
+            return false;
+          }
+          setupReaderForNextFile();
+        }
+        group = reader.read();
       }
 
       // Accept the group and tell the caller that there are more groups to 
read
@@ -80,13 +70,10 @@ public class GeoParquetGroupSpliterator implements 
Spliterator<GeoParquetGroup>
 
     } catch (IOException e) {
       // If an exception occurs, try to close the resources and throw a 
runtime exception
-      if (reader != null) {
-        try {
-          reader.close();
-        } catch (IOException e2) {
-          // Ignore the exception as the original exception is more important
-        }
-        reader = null;
+      try {
+        reader.close();
+      } catch (IOException e2) {
+        // Ignore the exception as the original exception is more important
       }
       throw new GeoParquetException("IOException caught while trying to read 
the next file.", e);
     }
@@ -102,34 +89,30 @@ public class GeoParquetGroupSpliterator implements 
Spliterator<GeoParquetGroup>
 
   @Override
   public Spliterator<GeoParquetGroup> trySplit() {
-    if (queue.size() < 2) {
-      // There is nothing left to split
-      return null;
-    }
-
-    // Create a new spliterator by polling the next polledFileStatus
-    FileStatus polledFileStatus = queue.poll();
+    List<FileStatus> sublist;
+    synchronized (fileStatuses) {
+      if (fileStatuses.size() < 2) {
+        // There is nothing left to split
+        return null;
+      }
 
-    // If there are no more files, tell the caller that there is nothing to 
split anymore
-    if (polledFileStatus == null) {
-      return null;
+      sublist = fileStatuses.subList(0, fileStatuses.size() / 2);
     }
+    List<FileStatus> secondList = new ArrayList<>(sublist);
+    sublist.clear();
 
-    // Return a new spliterator with the polledFileStatus
-    return new GeoParquetGroupSpliterator(geoParquetReader,
-        Map.of(polledFileStatus, files.get(polledFileStatus)));
+    // Return a new spliterator with the sublist
+    return new GeoParquetGroupSpliterator(geoParquetReader, secondList);
   }
 
   @Override
   public long estimateSize() {
-    return files.values().stream()
-        .map(GeoParquetReader.FileInfo::recordCount)
-        .reduce(0L, Long::sum);
+    return geoParquetReader.size();
   }
 
   @Override
   public int characteristics() {
     // The spliterator is not ordered, or sorted
-    return NONNULL | IMMUTABLE | SIZED | DISTINCT;
+    return NONNULL | CONCURRENT | DISTINCT;
   }
 }
diff --git 
a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetReader.java
 
b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetReader.java
index e88f2c61..03a07e68 100644
--- 
a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetReader.java
+++ 
b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetReader.java
@@ -21,8 +21,8 @@ import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import java.io.IOException;
 import java.net.URI;
-import java.net.URISyntaxException;
 import java.util.*;
+import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
 import org.apache.baremaps.geoparquet.data.GeoParquetGroup;
@@ -45,10 +45,9 @@ import org.apache.parquet.schema.MessageType;
 public class GeoParquetReader {
 
   private final URI uri;
-
   final Configuration configuration;
-
-  private Map<FileStatus, FileInfo> files;
+  private List<FileStatus> files;
+  private Long groupCount;
 
   record FileInfo(FileStatus file, Long recordCount, Map<String, String> 
keyValueMetadata,
       MessageType messageType, GeoParquetMetadata metadata,
@@ -64,51 +63,60 @@ public class GeoParquetReader {
     this.configuration = configuration;
   }
 
-  public MessageType getParquetSchema() throws URISyntaxException {
-    return files().values().stream()
+  public MessageType getParquetSchema() {
+    return files().stream()
         .findFirst()
+        .map(this::getFileInfo)
         .orElseThrow()
         .messageType();
   }
 
-  public GeoParquetMetadata getGeoParquetMetadata() throws URISyntaxException {
-    return files().values().stream()
+  private FileInfo getFileInfo(FileStatus fileStatus) {
+    try {
+      return buildFileInfo(fileStatus);
+    } catch (IOException e) {
+      throw new GeoParquetException("Failed to build Info", e);
+    }
+  }
+
+  public GeoParquetMetadata getGeoParquetMetadata() {
+    return files().stream()
         .findFirst()
+        .map(this::getFileInfo)
         .orElseThrow()
         .metadata();
   }
 
-  public Schema getGeoParquetSchema() throws URISyntaxException {
-    return files().values().stream()
+  public Schema getGeoParquetSchema() {
+    return files().stream()
         .findFirst()
+        .map(this::getFileInfo)
         .orElseThrow()
         .geoParquetSchema();
   }
 
-  public Long size() throws URISyntaxException {
-    return files().values().stream().map(FileInfo::recordCount).reduce(0L, 
Long::sum);
+  public boolean validateSchemasAreIdentical() {
+    // Verify that the files all have the same schema
+    final int messageTypeCount = 
files().stream().parallel().map(this::getFileInfo)
+        .map(FileInfo::messageType).collect(Collectors.toSet()).size();
+    return messageTypeCount == 1;
   }
 
-  private synchronized Map<FileStatus, FileInfo> files() {
+  public long size() {
+    if (groupCount == null) {
+      groupCount = 
files().stream().parallel().map(this::getFileInfo).map(FileInfo::recordCount)
+          .reduce(0L, Long::sum);
+    }
+    return groupCount;
+  }
+
+  private synchronized List<FileStatus> files() {
     try {
       if (files == null) {
-        files = new HashMap<>();
-        FileSystem fs = FileSystem.get(uri, configuration);
-        FileStatus[] fileStatuses = fs.globStatus(new Path(uri));
-
-        for (FileStatus file : fileStatuses) {
-          files.put(file, buildFileInfo(file));
-        }
-
-        // Verify that the files all have the same schema
-        MessageType commonMessageType = null;
-        for (FileInfo entry : files.values()) {
-          if (commonMessageType == null) {
-            commonMessageType = entry.messageType;
-          } else if (!commonMessageType.equals(entry.messageType)) {
-            throw new GeoParquetException("The files do not have the same 
schema");
-          }
-        }
+        Path globPath = new Path(uri.getPath());
+        FileSystem fileSystem = FileSystem.get(uri, configuration);
+
+        files = new 
ArrayList<>(Arrays.asList(fileSystem.globStatus(globPath)));
       }
     } catch (IOException e) {
       throw new GeoParquetException("IOException while attempting to list 
files.", e);
@@ -139,12 +147,16 @@ public class GeoParquetReader {
         geoParquetMetadata, geoParquetSchema);
   }
 
-  public Stream<GeoParquetGroup> readParallel() throws URISyntaxException {
-    return StreamSupport.stream(new GeoParquetGroupSpliterator(this, files()), 
true);
+  public Stream<GeoParquetGroup> readParallel() {
+    return retrieveGeoParquetGroups(true);
+  }
+
+  private Stream<GeoParquetGroup> retrieveGeoParquetGroups(boolean inParallel) 
{
+    return StreamSupport.stream(new GeoParquetGroupSpliterator(this, files()), 
inParallel);
   }
 
-  public Stream<GeoParquetGroup> read() throws IOException, URISyntaxException 
{
-    return readParallel().sequential();
+  public Stream<GeoParquetGroup> read() {
+    return retrieveGeoParquetGroups(false);
   }
 
   private static Configuration createConfiguration() {
diff --git 
a/baremaps-geoparquet/src/test/java/org/apache/baremaps/geoparquet/GeoParquetReaderTest.java
 
b/baremaps-geoparquet/src/test/java/org/apache/baremaps/geoparquet/GeoParquetReaderTest.java
index bd5dfa8f..618bf619 100644
--- 
a/baremaps-geoparquet/src/test/java/org/apache/baremaps/geoparquet/GeoParquetReaderTest.java
+++ 
b/baremaps-geoparquet/src/test/java/org/apache/baremaps/geoparquet/GeoParquetReaderTest.java
@@ -19,10 +19,11 @@ package org.apache.baremaps.geoparquet;
 
 import static org.junit.jupiter.api.Assertions.*;
 
-import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Stream;
+import org.apache.baremaps.geoparquet.data.GeoParquetGroup;
 import org.apache.baremaps.testing.TestFiles;
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
@@ -30,38 +31,62 @@ import org.junit.jupiter.api.Test;
 class GeoParquetReaderTest {
 
   @Test
-  void read() throws IOException, URISyntaxException {
+  void read() {
     URI geoParquet = TestFiles.GEOPARQUET.toUri();
-    final boolean isPrintingContent = true;
+    final boolean isParallel = false;
     final int expectedGroupCount = 5;
 
-    readGroups(geoParquet, isPrintingContent, expectedGroupCount);
+    readGroups(geoParquet, isParallel, expectedGroupCount);
   }
 
-  @Disabled("Takes too long. Around 10 minutes.")
+  @Disabled("Takes too long. Around 4 minutes.")
   @Test
-  void readExternal() throws IOException, URISyntaxException {
+  void readExternal() throws URISyntaxException {
     URI geoParquet = new URI(
         
"s3a://overturemaps-us-west-2/release/2024-03-12-alpha.0/theme=admins/type=locality_area/*.parquet");
-    final boolean isPrintingContent = false;
+    final boolean isParallel = true;
     final int expectedGroupCount = 974708;
 
-    readGroups(geoParquet, isPrintingContent, expectedGroupCount);
+    readGroups(geoParquet, isParallel, expectedGroupCount);
   }
 
-  private static void readGroups(URI geoParquet, boolean isPrintingContent, 
int expectedGroupCount)
-      throws IOException, URISyntaxException {
+  private static void readGroups(URI geoParquet, boolean parallel,
+      int expectedGroupCount) {
     GeoParquetReader geoParquetReader = new GeoParquetReader(geoParquet);
     final AtomicInteger groupCount = new AtomicInteger();
-    geoParquetReader.read().forEach(group -> {
-      groupCount.getAndIncrement();
-      if (isPrintingContent) {
-        System.out.println("-----");
-        System.out.println(group.getSchema());
-        System.out.println(group.getGeometryValue("geometry"));
-      }
-    });
+    Stream<GeoParquetGroup> geoParquetGroupStream;
+    if (parallel) {
+      geoParquetGroupStream = geoParquetReader.readParallel();
+    } else {
+      geoParquetGroupStream = geoParquetReader.read();
+    }
+    geoParquetGroupStream.forEach(group -> groupCount.getAndIncrement());
 
     assertEquals(expectedGroupCount, groupCount.get());
   }
+
+  @Test
+  void validateSchemas() throws URISyntaxException {
+    URI geoParquet = new URI(
+        
"s3a://overturemaps-us-west-2/release/2024-03-12-alpha.0/theme=buildings/type=building/*.parquet");
+
+    GeoParquetReader geoParquetReader = new GeoParquetReader(geoParquet);
+    assertTrue(geoParquetReader.validateSchemasAreIdentical(), "Schemas are 
identical");
+  }
+
+  @Test
+  void sizeForLocalities() throws URISyntaxException {
+    URI geoParquet = new URI(
+        
"s3a://overturemaps-us-west-2/release/2024-03-12-alpha.0/theme=admins/type=locality_area/*.parquet");
+    GeoParquetReader geoParquetReader = new GeoParquetReader(geoParquet);
+    assertEquals(974708L, geoParquetReader.size());
+  }
+
+  @Test
+  void sizeForBuildings() throws URISyntaxException {
+    URI geoParquet = new URI(
+        
"s3a://overturemaps-us-west-2/release/2024-03-12-alpha.0/theme=buildings/type=building/*.parquet");
+    GeoParquetReader geoParquetReader = new GeoParquetReader(geoParquet);
+    assertEquals(2352441548L, geoParquetReader.size());
+  }
 }

Reply via email to