This is an automated email from the ASF dual-hosted git repository. bchapuis pushed a commit to branch geoparquet-filtering-and-simplification in repository https://gitbox.apache.org/repos/asf/incubator-baremaps.git
commit e866f7625061e2c324b3e842dab600ae2d0d382d Author: Bertil Chapuis <[email protected]> AuthorDate: Mon Sep 30 22:20:21 2024 +0200 Improve spliterator concurrency --- .../geoparquet/GeoParquetGroupSpliterator.java | 129 +++++++++++---------- 1 file changed, 65 insertions(+), 64 deletions(-) diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetGroupSpliterator.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetGroupSpliterator.java index 7a86a386..c17797f6 100644 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetGroupSpliterator.java +++ b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetGroupSpliterator.java @@ -1,24 +1,8 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - package org.apache.baremaps.geoparquet; import java.io.IOException; -import java.util.*; +import java.util.List; +import java.util.Spliterator; import java.util.function.Consumer; import org.apache.baremaps.geoparquet.data.GeoParquetGroup; import org.apache.baremaps.geoparquet.hadoop.GeoParquetGroupReadSupport; @@ -29,16 +13,37 @@ class GeoParquetGroupSpliterator implements Spliterator<GeoParquetGroup> { private final GeoParquetReader geoParquetReader; private final List<FileStatus> fileStatuses; + private int currentFileIndex; + private int currentEndIndex; private ParquetReader<GeoParquetGroup> reader; - GeoParquetGroupSpliterator(GeoParquetReader geoParquetReader, List<FileStatus> files) { + public GeoParquetGroupSpliterator(GeoParquetReader geoParquetReader, List<FileStatus> files) { + this(geoParquetReader, files, 0, files.size()); + } + + GeoParquetGroupSpliterator( + GeoParquetReader geoParquetReader, + List<FileStatus> fileStatuses, + int startIndex, + int endIndex) { this.geoParquetReader = geoParquetReader; - this.fileStatuses = Collections.synchronizedList(files); + this.fileStatuses = fileStatuses; + this.currentFileIndex = startIndex; + this.currentEndIndex = endIndex; setupReaderForNextFile(); } + + private void setupReaderForNextFile() { - FileStatus fileStatus = fileStatuses.remove(0); + closeCurrentReader(); + + if (currentFileIndex >= currentEndIndex) { + reader = null; + return; + } + + FileStatus fileStatus = fileStatuses.get(currentFileIndex++); try { reader = createParquetReader(fileStatus); } catch (IOException e) { @@ -46,73 +51,69 @@ class GeoParquetGroupSpliterator implements Spliterator<GeoParquetGroup> { } } + private void closeCurrentReader() { + if (reader != null) { + try { + reader.close(); + } catch (IOException e) { + // Ignore exceptions during close + } + reader = null; + } + } + @Override public boolean tryAdvance(Consumer<? super GeoParquetGroup> action) { try { - // Read the next group - GeoParquetGroup group = reader.read(); - - // If the group is null, try to get the one from the next file. - while (group == null) { - synchronized (fileStatuses) { - if (fileStatuses.isEmpty()) { - reader.close(); - return false; - } - setupReaderForNextFile(); + while (true) { + if (reader == null) { + return false; } - group = reader.read(); - } - // Accept the group and tell the caller that there are more groups to read - action.accept(group); - return true; + GeoParquetGroup group = reader.read(); - } catch (IOException e) { - // If an exception occurs, try to close the resources and throw a runtime exception - try { - reader.close(); - } catch (IOException e2) { - // Ignore the exception as the original exception is more important + if (group == null) { + setupReaderForNextFile(); + continue; + } + + action.accept(group); + return true; } + } catch (IOException e) { + closeCurrentReader(); throw new GeoParquetException("IOException caught while trying to read the next file.", e); } } private ParquetReader<GeoParquetGroup> createParquetReader(FileStatus file) - throws IOException { - return ParquetReader - .builder(new GeoParquetGroupReadSupport(), file.getPath()) - .withConf(geoParquetReader.configuration) - .build(); + throws IOException { + return ParquetReader.builder(new GeoParquetGroupReadSupport(), file.getPath()) + .withConf(geoParquetReader.configuration) + .build(); } @Override public Spliterator<GeoParquetGroup> trySplit() { - List<FileStatus> sublist; - synchronized (fileStatuses) { - if (fileStatuses.size() < 2) { - // There is nothing left to split - return null; - } - - sublist = fileStatuses.subList(0, fileStatuses.size() / 2); + int remainingFiles = currentEndIndex - currentFileIndex; + if (remainingFiles <= 1) { + return null; } - List<FileStatus> secondList = new ArrayList<>(sublist); - sublist.clear(); - - // Return a new spliterator with the sublist - return new GeoParquetGroupSpliterator(geoParquetReader, secondList); + int mid = currentFileIndex + remainingFiles / 2; + GeoParquetGroupSpliterator split = new GeoParquetGroupSpliterator( + geoParquetReader, fileStatuses, mid, currentEndIndex); + this.currentEndIndex = mid; + return split; } @Override public long estimateSize() { - return geoParquetReader.size(); + // Return Long.MAX_VALUE as the actual number of elements is unknown + return Long.MAX_VALUE; } @Override public int characteristics() { - // The spliterator is not ordered, or sorted - return NONNULL | CONCURRENT | DISTINCT; + return NONNULL | IMMUTABLE; } }
