This is an automated email from the ASF dual-hosted git repository.
bchapuis pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-baremaps.git
The following commit(s) were added to refs/heads/main by this push:
new 9a17df2c File-based SplitIterator (#866)
9a17df2c is described below
commit 9a17df2c17c97f22d4cec3fe0b121ef7d7b5d7d1
Author: sebr72 <[email protected]>
AuthorDate: Sat Jun 8 10:41:30 2024 +0200
File-based SplitIterator (#866)
* Separate schema validation, and add more tests
* Improve CPU utilisation
---
.../storage/geoparquet/GeoParquetDataTable.java | 28 ++----
.../geoparquet/GeoParquetGroupSpliterator.java | 101 +++++++++------------
.../baremaps/geoparquet/GeoParquetReader.java | 80 +++++++++-------
.../baremaps/geoparquet/GeoParquetReaderTest.java | 61 +++++++++----
4 files changed, 138 insertions(+), 132 deletions(-)
diff --git
a/baremaps-core/src/main/java/org/apache/baremaps/storage/geoparquet/GeoParquetDataTable.java
b/baremaps-core/src/main/java/org/apache/baremaps/storage/geoparquet/GeoParquetDataTable.java
index f8b3cf81..205003d1 100644
---
a/baremaps-core/src/main/java/org/apache/baremaps/storage/geoparquet/GeoParquetDataTable.java
+++
b/baremaps-core/src/main/java/org/apache/baremaps/storage/geoparquet/GeoParquetDataTable.java
@@ -17,9 +17,7 @@
package org.apache.baremaps.storage.geoparquet;
-import java.io.IOException;
import java.net.URI;
-import java.net.URISyntaxException;
import java.util.Iterator;
import java.util.Spliterator;
import java.util.stream.Stream;
@@ -49,11 +47,7 @@ public class GeoParquetDataTable implements DataTable {
@Override
public long size() {
- try {
- return reader().size();
- } catch (URISyntaxException e) {
- throw new GeoParquetException("Fail to access size from reader", e);
- }
+ return reader().size();
}
@Override
@@ -73,13 +67,9 @@ public class GeoParquetDataTable implements DataTable {
@Override
public Stream<DataRow> parallelStream() {
- try {
- return reader().read().map(group -> new DataRowImpl(
- GeoParquetTypeConversion.asSchema(path.toString(),
group.getSchema()),
- GeoParquetTypeConversion.asRowValues(group)));
- } catch (IOException | URISyntaxException e) {
- throw new GeoParquetException("Fail to read() the reader", e);
- }
+ return reader().readParallel().map(group -> new DataRowImpl(
+ GeoParquetTypeConversion.asSchema(path.toString(), group.getSchema()),
+ GeoParquetTypeConversion.asRowValues(group)));
}
@Override
@@ -96,13 +86,9 @@ public class GeoParquetDataTable implements DataTable {
@Override
public DataSchema schema() {
if (schema == null) {
- try {
- Schema schema = reader().getGeoParquetSchema();
- this.schema = GeoParquetTypeConversion.asSchema(path.toString(),
schema);
- return this.schema;
- } catch (URISyntaxException e) {
- throw new GeoParquetException("Failed to get the schema.", e);
- }
+ Schema schema = reader().getGeoParquetSchema();
+ this.schema = GeoParquetTypeConversion.asSchema(path.toString(), schema);
+ return this.schema;
}
return schema;
}
diff --git
a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetGroupSpliterator.java
b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetGroupSpliterator.java
index 465b1226..7a86a386 100644
---
a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetGroupSpliterator.java
+++
b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetGroupSpliterator.java
@@ -18,60 +18,50 @@
package org.apache.baremaps.geoparquet;
import java.io.IOException;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Spliterator;
-import java.util.concurrent.ArrayBlockingQueue;
+import java.util.*;
import java.util.function.Consumer;
import org.apache.baremaps.geoparquet.data.GeoParquetGroup;
import org.apache.baremaps.geoparquet.hadoop.GeoParquetGroupReadSupport;
import org.apache.hadoop.fs.FileStatus;
import org.apache.parquet.hadoop.ParquetReader;
-public class GeoParquetGroupSpliterator implements
Spliterator<GeoParquetGroup> {
+class GeoParquetGroupSpliterator implements Spliterator<GeoParquetGroup> {
private final GeoParquetReader geoParquetReader;
- private final Queue<FileStatus> queue;
- private final Map<FileStatus, GeoParquetReader.FileInfo> files;
- private FileStatus fileStatus = null;
+ private final List<FileStatus> fileStatuses;
private ParquetReader<GeoParquetGroup> reader;
- GeoParquetGroupSpliterator(GeoParquetReader geoParquetReader,
- Map<FileStatus, GeoParquetReader.FileInfo> files) {
+ GeoParquetGroupSpliterator(GeoParquetReader geoParquetReader,
List<FileStatus> files) {
this.geoParquetReader = geoParquetReader;
- this.files = files;
- this.queue = new ArrayBlockingQueue<>(files.keySet().size(), false,
files.keySet());
+ this.fileStatuses = Collections.synchronizedList(files);
+ setupReaderForNextFile();
+ }
+
+ private void setupReaderForNextFile() {
+ FileStatus fileStatus = fileStatuses.remove(0);
+ try {
+ reader = createParquetReader(fileStatus);
+ } catch (IOException e) {
+ throw new GeoParquetException("Failed to create reader for " +
fileStatus, e);
+ }
}
@Override
public boolean tryAdvance(Consumer<? super GeoParquetGroup> action) {
try {
- // Poll the next file
- if (fileStatus == null) {
- fileStatus = queue.poll();
- }
-
- // If there are no more files, return false
- if (fileStatus == null) {
- return false;
- }
-
- // Create a new reader if it does not exist
- if (reader == null) {
- reader = createParquetReader(fileStatus);
- }
-
// Read the next group
GeoParquetGroup group = reader.read();
- // If the group is null, close the resources and set the variables to
null
- if (group == null) {
- reader.close();
- reader = null;
- fileStatus = null;
-
- // Try to advance again
- return tryAdvance(action);
+ // If the group is null, try to get the one from the next file.
+ while (group == null) {
+ synchronized (fileStatuses) {
+ if (fileStatuses.isEmpty()) {
+ reader.close();
+ return false;
+ }
+ setupReaderForNextFile();
+ }
+ group = reader.read();
}
// Accept the group and tell the caller that there are more groups to
read
@@ -80,13 +70,10 @@ public class GeoParquetGroupSpliterator implements
Spliterator<GeoParquetGroup>
} catch (IOException e) {
// If an exception occurs, try to close the resources and throw a
runtime exception
- if (reader != null) {
- try {
- reader.close();
- } catch (IOException e2) {
- // Ignore the exception as the original exception is more important
- }
- reader = null;
+ try {
+ reader.close();
+ } catch (IOException e2) {
+ // Ignore the exception as the original exception is more important
}
throw new GeoParquetException("IOException caught while trying to read
the next file.", e);
}
@@ -102,34 +89,30 @@ public class GeoParquetGroupSpliterator implements
Spliterator<GeoParquetGroup>
@Override
public Spliterator<GeoParquetGroup> trySplit() {
- if (queue.size() < 2) {
- // There is nothing left to split
- return null;
- }
-
- // Create a new spliterator by polling the next polledFileStatus
- FileStatus polledFileStatus = queue.poll();
+ List<FileStatus> sublist;
+ synchronized (fileStatuses) {
+ if (fileStatuses.size() < 2) {
+ // There is nothing left to split
+ return null;
+ }
- // If there are no more files, tell the caller that there is nothing to
split anymore
- if (polledFileStatus == null) {
- return null;
+ sublist = fileStatuses.subList(0, fileStatuses.size() / 2);
}
+ List<FileStatus> secondList = new ArrayList<>(sublist);
+ sublist.clear();
- // Return a new spliterator with the polledFileStatus
- return new GeoParquetGroupSpliterator(geoParquetReader,
- Map.of(polledFileStatus, files.get(polledFileStatus)));
+ // Return a new spliterator with the sublist
+ return new GeoParquetGroupSpliterator(geoParquetReader, secondList);
}
@Override
public long estimateSize() {
- return files.values().stream()
- .map(GeoParquetReader.FileInfo::recordCount)
- .reduce(0L, Long::sum);
+ return geoParquetReader.size();
}
@Override
public int characteristics() {
// The spliterator is not ordered, or sorted
- return NONNULL | IMMUTABLE | SIZED | DISTINCT;
+ return NONNULL | CONCURRENT | DISTINCT;
}
}
diff --git
a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetReader.java
b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetReader.java
index e88f2c61..03a07e68 100644
---
a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetReader.java
+++
b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetReader.java
@@ -21,8 +21,8 @@ import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.net.URI;
-import java.net.URISyntaxException;
import java.util.*;
+import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.baremaps.geoparquet.data.GeoParquetGroup;
@@ -45,10 +45,9 @@ import org.apache.parquet.schema.MessageType;
public class GeoParquetReader {
private final URI uri;
-
final Configuration configuration;
-
- private Map<FileStatus, FileInfo> files;
+ private List<FileStatus> files;
+ private Long groupCount;
record FileInfo(FileStatus file, Long recordCount, Map<String, String>
keyValueMetadata,
MessageType messageType, GeoParquetMetadata metadata,
@@ -64,51 +63,60 @@ public class GeoParquetReader {
this.configuration = configuration;
}
- public MessageType getParquetSchema() throws URISyntaxException {
- return files().values().stream()
+ public MessageType getParquetSchema() {
+ return files().stream()
.findFirst()
+ .map(this::getFileInfo)
.orElseThrow()
.messageType();
}
- public GeoParquetMetadata getGeoParquetMetadata() throws URISyntaxException {
- return files().values().stream()
+ private FileInfo getFileInfo(FileStatus fileStatus) {
+ try {
+ return buildFileInfo(fileStatus);
+ } catch (IOException e) {
+ throw new GeoParquetException("Failed to build Info", e);
+ }
+ }
+
+ public GeoParquetMetadata getGeoParquetMetadata() {
+ return files().stream()
.findFirst()
+ .map(this::getFileInfo)
.orElseThrow()
.metadata();
}
- public Schema getGeoParquetSchema() throws URISyntaxException {
- return files().values().stream()
+ public Schema getGeoParquetSchema() {
+ return files().stream()
.findFirst()
+ .map(this::getFileInfo)
.orElseThrow()
.geoParquetSchema();
}
- public Long size() throws URISyntaxException {
- return files().values().stream().map(FileInfo::recordCount).reduce(0L,
Long::sum);
+ public boolean validateSchemasAreIdentical() {
+ // Verify that the files all have the same schema
+ final int messageTypeCount =
files().stream().parallel().map(this::getFileInfo)
+ .map(FileInfo::messageType).collect(Collectors.toSet()).size();
+ return messageTypeCount == 1;
}
- private synchronized Map<FileStatus, FileInfo> files() {
+ public long size() {
+ if (groupCount == null) {
+ groupCount =
files().stream().parallel().map(this::getFileInfo).map(FileInfo::recordCount)
+ .reduce(0L, Long::sum);
+ }
+ return groupCount;
+ }
+
+ private synchronized List<FileStatus> files() {
try {
if (files == null) {
- files = new HashMap<>();
- FileSystem fs = FileSystem.get(uri, configuration);
- FileStatus[] fileStatuses = fs.globStatus(new Path(uri));
-
- for (FileStatus file : fileStatuses) {
- files.put(file, buildFileInfo(file));
- }
-
- // Verify that the files all have the same schema
- MessageType commonMessageType = null;
- for (FileInfo entry : files.values()) {
- if (commonMessageType == null) {
- commonMessageType = entry.messageType;
- } else if (!commonMessageType.equals(entry.messageType)) {
- throw new GeoParquetException("The files do not have the same
schema");
- }
- }
+ Path globPath = new Path(uri.getPath());
+ FileSystem fileSystem = FileSystem.get(uri, configuration);
+
+ files = new
ArrayList<>(Arrays.asList(fileSystem.globStatus(globPath)));
}
} catch (IOException e) {
throw new GeoParquetException("IOException while attempting to list
files.", e);
@@ -139,12 +147,16 @@ public class GeoParquetReader {
geoParquetMetadata, geoParquetSchema);
}
- public Stream<GeoParquetGroup> readParallel() throws URISyntaxException {
- return StreamSupport.stream(new GeoParquetGroupSpliterator(this, files()),
true);
+ public Stream<GeoParquetGroup> readParallel() {
+ return retrieveGeoParquetGroups(true);
+ }
+
+ private Stream<GeoParquetGroup> retrieveGeoParquetGroups(boolean inParallel)
{
+ return StreamSupport.stream(new GeoParquetGroupSpliterator(this, files()),
inParallel);
}
- public Stream<GeoParquetGroup> read() throws IOException, URISyntaxException
{
- return readParallel().sequential();
+ public Stream<GeoParquetGroup> read() {
+ return retrieveGeoParquetGroups(false);
}
private static Configuration createConfiguration() {
diff --git
a/baremaps-geoparquet/src/test/java/org/apache/baremaps/geoparquet/GeoParquetReaderTest.java
b/baremaps-geoparquet/src/test/java/org/apache/baremaps/geoparquet/GeoParquetReaderTest.java
index bd5dfa8f..618bf619 100644
---
a/baremaps-geoparquet/src/test/java/org/apache/baremaps/geoparquet/GeoParquetReaderTest.java
+++
b/baremaps-geoparquet/src/test/java/org/apache/baremaps/geoparquet/GeoParquetReaderTest.java
@@ -19,10 +19,11 @@ package org.apache.baremaps.geoparquet;
import static org.junit.jupiter.api.Assertions.*;
-import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Stream;
+import org.apache.baremaps.geoparquet.data.GeoParquetGroup;
import org.apache.baremaps.testing.TestFiles;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
@@ -30,38 +31,62 @@ import org.junit.jupiter.api.Test;
class GeoParquetReaderTest {
@Test
- void read() throws IOException, URISyntaxException {
+ void read() {
URI geoParquet = TestFiles.GEOPARQUET.toUri();
- final boolean isPrintingContent = true;
+ final boolean isParallel = false;
final int expectedGroupCount = 5;
- readGroups(geoParquet, isPrintingContent, expectedGroupCount);
+ readGroups(geoParquet, isParallel, expectedGroupCount);
}
- @Disabled("Takes too long. Around 10 minutes.")
+ @Disabled("Takes too long. Around 4 minutes.")
@Test
- void readExternal() throws IOException, URISyntaxException {
+ void readExternal() throws URISyntaxException {
URI geoParquet = new URI(
"s3a://overturemaps-us-west-2/release/2024-03-12-alpha.0/theme=admins/type=locality_area/*.parquet");
- final boolean isPrintingContent = false;
+ final boolean isParallel = true;
final int expectedGroupCount = 974708;
- readGroups(geoParquet, isPrintingContent, expectedGroupCount);
+ readGroups(geoParquet, isParallel, expectedGroupCount);
}
- private static void readGroups(URI geoParquet, boolean isPrintingContent,
int expectedGroupCount)
- throws IOException, URISyntaxException {
+ private static void readGroups(URI geoParquet, boolean parallel,
+ int expectedGroupCount) {
GeoParquetReader geoParquetReader = new GeoParquetReader(geoParquet);
final AtomicInteger groupCount = new AtomicInteger();
- geoParquetReader.read().forEach(group -> {
- groupCount.getAndIncrement();
- if (isPrintingContent) {
- System.out.println("-----");
- System.out.println(group.getSchema());
- System.out.println(group.getGeometryValue("geometry"));
- }
- });
+ Stream<GeoParquetGroup> geoParquetGroupStream;
+ if (parallel) {
+ geoParquetGroupStream = geoParquetReader.readParallel();
+ } else {
+ geoParquetGroupStream = geoParquetReader.read();
+ }
+ geoParquetGroupStream.forEach(group -> groupCount.getAndIncrement());
assertEquals(expectedGroupCount, groupCount.get());
}
+
+ @Test
+ void validateSchemas() throws URISyntaxException {
+ URI geoParquet = new URI(
+
"s3a://overturemaps-us-west-2/release/2024-03-12-alpha.0/theme=buildings/type=building/*.parquet");
+
+ GeoParquetReader geoParquetReader = new GeoParquetReader(geoParquet);
+ assertTrue(geoParquetReader.validateSchemasAreIdentical(), "Schemas are
identical");
+ }
+
+ @Test
+ void sizeForLocalities() throws URISyntaxException {
+ URI geoParquet = new URI(
+
"s3a://overturemaps-us-west-2/release/2024-03-12-alpha.0/theme=admins/type=locality_area/*.parquet");
+ GeoParquetReader geoParquetReader = new GeoParquetReader(geoParquet);
+ assertEquals(974708L, geoParquetReader.size());
+ }
+
+ @Test
+ void sizeForBuildings() throws URISyntaxException {
+ URI geoParquet = new URI(
+
"s3a://overturemaps-us-west-2/release/2024-03-12-alpha.0/theme=buildings/type=building/*.parquet");
+ GeoParquetReader geoParquetReader = new GeoParquetReader(geoParquet);
+ assertEquals(2352441548L, geoParquetReader.size());
+ }
}