This is an automated email from the ASF dual-hosted git repository. bchapuis pushed a commit to branch geoparquet-filtering-and-simplification in repository https://gitbox.apache.org/repos/asf/incubator-baremaps.git
commit 52017bd08c4f82d02526937173a69f8e7cd33338 Author: Bertil Chapuis <[email protected]> AuthorDate: Mon Oct 7 23:46:45 2024 +0200 Remove unused writer class and use low level ParquetFileReader API --- .../geoparquet/GeoParquetTypeConversion.java | 8 +- .../geoparquet/{data => }/BinaryValue.java | 4 +- .../geoparquet/{data => }/BooleanValue.java | 4 +- .../geoparquet/{data => }/DoubleValue.java | 4 +- .../baremaps/geoparquet/{data => }/FloatValue.java | 4 +- .../{data => }/GeoParquetColumnMetadata.java | 2 +- .../geoparquet/{data => }/GeoParquetGroup.java | 2 +- .../{data => }/GeoParquetGroupConverter.java | 2 +- .../{data => }/GeoParquetGroupFactory.java | 6 +- .../geoparquet/{data => }/GeoParquetGroupImpl.java | 5 +- .../GeoParquetGroupRecordMaterializer.java | 4 +- .../geoparquet/{data => }/GeoParquetMetadata.java | 2 +- .../{data => }/GeoParquetPrimitiveConverter.java | 2 +- .../baremaps/geoparquet/GeoParquetReader.java | 123 ++------------- .../baremaps/geoparquet/GeoParquetSpliterator.java | 172 +++++++++++++++++++++ .../baremaps/geoparquet/{data => }/Int96Value.java | 4 +- .../geoparquet/{data => }/IntegerValue.java | 4 +- .../baremaps/geoparquet/{data => }/LongValue.java | 4 +- .../baremaps/geoparquet/{data => }/Primitive.java | 4 +- .../baremaps/geoparquet/common/GroupWriter.java | 61 -------- .../geoparquet/data/GeoParquetGroupWriter.java | 28 ---- .../geoparquet/data/GeoParquetMaterializer.java | 50 ------ .../hadoop/GeoParquetGroupReadSupport.java | 66 -------- .../hadoop/GeoParquetGroupWriteSupport.java | 85 ---------- .../geoparquet/hadoop/GeoParquetGroupWriter.java | 29 ---- .../geoparquet/hadoop/GeoParquetInputFormat.java | 35 ----- .../geoparquet/hadoop/GeoParquetOutputFormat.java | 59 ------- .../geoparquet/hadoop/GeoParquetWriter.java | 122 --------------- .../baremaps/geoparquet/GeoParquetReaderTest.java | 18 +-- 29 files changed, 224 insertions(+), 689 deletions(-) diff --git a/baremaps-core/src/main/java/org/apache/baremaps/storage/geoparquet/GeoParquetTypeConversion.java b/baremaps-core/src/main/java/org/apache/baremaps/storage/geoparquet/GeoParquetTypeConversion.java index eed61e8d..3e618e9a 100644 --- a/baremaps-core/src/main/java/org/apache/baremaps/storage/geoparquet/GeoParquetTypeConversion.java +++ b/baremaps-core/src/main/java/org/apache/baremaps/storage/geoparquet/GeoParquetTypeConversion.java @@ -24,10 +24,10 @@ import java.util.Map; import org.apache.baremaps.data.storage.*; import org.apache.baremaps.data.storage.DataColumn.Cardinality; import org.apache.baremaps.data.storage.DataColumn.Type; -import org.apache.baremaps.geoparquet.data.GeoParquetGroup; -import org.apache.baremaps.geoparquet.data.GeoParquetGroup.Field; -import org.apache.baremaps.geoparquet.data.GeoParquetGroup.GroupField; -import org.apache.baremaps.geoparquet.data.GeoParquetGroup.Schema; +import org.apache.baremaps.geoparquet.GeoParquetGroup; +import org.apache.baremaps.geoparquet.GeoParquetGroup.Field; +import org.apache.baremaps.geoparquet.GeoParquetGroup.GroupField; +import org.apache.baremaps.geoparquet.GeoParquetGroup.Schema; public class GeoParquetTypeConversion { diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/BinaryValue.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/BinaryValue.java similarity index 93% rename from baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/BinaryValue.java rename to baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/BinaryValue.java index b02678c5..5ab2fab7 100644 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/BinaryValue.java +++ b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/BinaryValue.java @@ -15,13 +15,13 @@ * limitations under the License. */ -package org.apache.baremaps.geoparquet.data; +package org.apache.baremaps.geoparquet; import org.apache.parquet.io.api.Binary; import org.apache.parquet.io.api.RecordConsumer; -public class BinaryValue extends Primitive { +class BinaryValue extends Primitive { private final Binary binary; diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/BooleanValue.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/BooleanValue.java similarity index 92% rename from baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/BooleanValue.java rename to baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/BooleanValue.java index 50b49497..b9ff75a8 100644 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/BooleanValue.java +++ b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/BooleanValue.java @@ -15,11 +15,11 @@ * limitations under the License. */ -package org.apache.baremaps.geoparquet.data; +package org.apache.baremaps.geoparquet; import org.apache.parquet.io.api.RecordConsumer; -public class BooleanValue extends Primitive { +class BooleanValue extends Primitive { private final boolean bool; diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/DoubleValue.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/DoubleValue.java similarity index 93% rename from baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/DoubleValue.java rename to baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/DoubleValue.java index 4537e10d..43bdf1b8 100644 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/DoubleValue.java +++ b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/DoubleValue.java @@ -15,11 +15,11 @@ * limitations under the License. */ -package org.apache.baremaps.geoparquet.data; +package org.apache.baremaps.geoparquet; import org.apache.parquet.io.api.RecordConsumer; -public class DoubleValue extends Primitive { +class DoubleValue extends Primitive { private final double value; diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/FloatValue.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/FloatValue.java similarity index 93% rename from baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/FloatValue.java rename to baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/FloatValue.java index 35aa0322..568cd005 100644 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/FloatValue.java +++ b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/FloatValue.java @@ -15,11 +15,11 @@ * limitations under the License. */ -package org.apache.baremaps.geoparquet.data; +package org.apache.baremaps.geoparquet; import org.apache.parquet.io.api.RecordConsumer; -public class FloatValue extends Primitive { +class FloatValue extends Primitive { private final float value; diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetColumnMetadata.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetColumnMetadata.java similarity index 98% rename from baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetColumnMetadata.java rename to baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetColumnMetadata.java index 1638d287..aa5e96b2 100644 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetColumnMetadata.java +++ b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetColumnMetadata.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.baremaps.geoparquet.data; +package org.apache.baremaps.geoparquet; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.JsonNode; diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroup.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetGroup.java similarity index 99% rename from baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroup.java rename to baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetGroup.java index 81366348..419b9f19 100644 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroup.java +++ b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetGroup.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.baremaps.geoparquet.data; +package org.apache.baremaps.geoparquet; import java.util.List; import org.apache.parquet.io.api.Binary; diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupConverter.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetGroupConverter.java similarity index 97% rename from baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupConverter.java rename to baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetGroupConverter.java index 6649aebd..5257b9ea 100644 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupConverter.java +++ b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetGroupConverter.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.baremaps.geoparquet.data; +package org.apache.baremaps.geoparquet; import org.apache.parquet.io.api.Converter; import org.apache.parquet.io.api.GroupConverter; diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupFactory.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetGroupFactory.java similarity index 96% rename from baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupFactory.java rename to baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetGroupFactory.java index 5abe7764..bcea6b8b 100644 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupFactory.java +++ b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetGroupFactory.java @@ -15,15 +15,15 @@ * limitations under the License. */ -package org.apache.baremaps.geoparquet.data; +package org.apache.baremaps.geoparquet; import java.util.List; -import org.apache.baremaps.geoparquet.data.GeoParquetGroup.Field; +import org.apache.baremaps.geoparquet.GeoParquetGroup.Field; import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; -public class GeoParquetGroupFactory { +class GeoParquetGroupFactory { private final GroupType schema; diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupImpl.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetGroupImpl.java similarity index 99% rename from baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupImpl.java rename to baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetGroupImpl.java index 2a41d531..b9a6a607 100644 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupImpl.java +++ b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetGroupImpl.java @@ -15,11 +15,10 @@ * limitations under the License. */ -package org.apache.baremaps.geoparquet.data; +package org.apache.baremaps.geoparquet; import java.util.ArrayList; import java.util.List; -import org.apache.baremaps.geoparquet.GeoParquetException; import org.apache.parquet.io.api.Binary; import org.apache.parquet.io.api.RecordConsumer; import org.apache.parquet.schema.GroupType; @@ -29,7 +28,7 @@ import org.locationtech.jts.io.ParseException; import org.locationtech.jts.io.WKBReader; import org.locationtech.jts.io.WKBWriter; -public class GeoParquetGroupImpl implements GeoParquetGroup { +class GeoParquetGroupImpl implements GeoParquetGroup { private final GroupType schema; diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupRecordMaterializer.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetGroupRecordMaterializer.java similarity index 94% rename from baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupRecordMaterializer.java rename to baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetGroupRecordMaterializer.java index 5813633d..4583ace8 100644 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupRecordMaterializer.java +++ b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetGroupRecordMaterializer.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.baremaps.geoparquet.data; +package org.apache.baremaps.geoparquet; /* * Licensed to the Apache Software Foundation (ASF) under one or more contributor license @@ -36,7 +36,7 @@ import org.apache.parquet.io.api.GroupConverter; import org.apache.parquet.io.api.RecordMaterializer; import org.apache.parquet.schema.MessageType; -public class GeoParquetGroupRecordMaterializer extends RecordMaterializer<GeoParquetGroup> { +class GeoParquetGroupRecordMaterializer extends RecordMaterializer<GeoParquetGroup> { private final GeoParquetGroupFactory groupFactory; diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetMetadata.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetMetadata.java similarity index 98% rename from baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetMetadata.java rename to baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetMetadata.java index 6e34931c..214a0840 100644 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetMetadata.java +++ b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetMetadata.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.baremaps.geoparquet.data; +package org.apache.baremaps.geoparquet; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.JsonNode; diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetPrimitiveConverter.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetPrimitiveConverter.java similarity index 98% rename from baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetPrimitiveConverter.java rename to baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetPrimitiveConverter.java index d75fccbc..a1ade639 100644 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetPrimitiveConverter.java +++ b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetPrimitiveConverter.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.baremaps.geoparquet.data; +package org.apache.baremaps.geoparquet; import org.apache.parquet.io.api.Binary; import org.apache.parquet.io.api.PrimitiveConverter; diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetReader.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetReader.java index 3a9a705b..74981a77 100644 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetReader.java +++ b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetReader.java @@ -23,15 +23,10 @@ import java.io.IOException; import java.net.URI; import java.util.*; import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.Stream; import java.util.stream.StreamSupport; -import org.apache.baremaps.geoparquet.data.GeoParquetGroup; -import org.apache.baremaps.geoparquet.data.GeoParquetGroup.Schema; -import org.apache.baremaps.geoparquet.data.GeoParquetGroupFactory; -import org.apache.baremaps.geoparquet.data.GeoParquetMetadata; -import org.apache.baremaps.geoparquet.hadoop.GeoParquetGroupReadSupport; +import org.apache.baremaps.geoparquet.GeoParquetGroup.Schema; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -39,8 +34,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider; import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.parquet.hadoop.ParquetFileReader; -import org.apache.parquet.hadoop.ParquetReader; import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.FileMetaData; import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.schema.MessageType; @@ -49,8 +44,8 @@ import org.apache.parquet.schema.MessageType; */ public class GeoParquetReader { - private final Configuration configuration; - private final List<FileStatus> files; + protected final Configuration configuration; + protected final List<FileStatus> files; private final AtomicLong groupCount = new AtomicLong(-1); public GeoParquetReader(URI uri) { @@ -88,13 +83,14 @@ public class GeoParquetReader { try { ParquetMetadata parquetMetadata = ParquetFileReader.readFooter(configuration, fileStatus.getPath()); + long recordCount = parquetMetadata.getBlocks().stream() .mapToLong(BlockMetaData::getRowCount) .sum(); - MessageType messageType = parquetMetadata.getFileMetaData().getSchema(); - Map<String, String> keyValueMetadata = - parquetMetadata.getFileMetaData().getKeyValueMetaData(); + FileMetaData fileMetaData = parquetMetadata.getFileMetaData(); + Map<String, String> keyValueMetadata = fileMetaData.getKeyValueMetaData(); + MessageType messageType = fileMetaData.getSchema(); GeoParquetMetadata geoParquetMetadata = null; Schema geoParquetSchema = null; @@ -160,17 +156,17 @@ public class GeoParquetReader { return groupCount.get(); } - private Stream<GeoParquetGroup> retrieveGeoParquetGroups(boolean inParallel) { - Spliterator<GeoParquetGroup> spliterator = new GeoParquetSpliterator(0, files.size()); + private Stream<GeoParquetGroup> streamGeoParquetGroups(boolean inParallel) { + Spliterator<GeoParquetGroup> spliterator = new GeoParquetSpliterator(this, 0, files.size()); return StreamSupport.stream(spliterator, inParallel); } public Stream<GeoParquetGroup> read() { - return retrieveGeoParquetGroups(false); + return streamGeoParquetGroups(false); } public Stream<GeoParquetGroup> readParallel() { - return retrieveGeoParquetGroups(true); + return streamGeoParquetGroups(true); } private static Configuration createDefaultConfiguration() { @@ -192,99 +188,4 @@ public class GeoParquetReader { } - private class GeoParquetSpliterator implements Spliterator<GeoParquetGroup> { - - private int currentFileIndex; - private int currentEndIndex; - private ParquetReader<GeoParquetGroup> reader; - - GeoParquetSpliterator( - int startIndex, - int endIndex) { - this.currentFileIndex = startIndex; - this.currentEndIndex = endIndex; - setupReaderForNextFile(); - } - - private void setupReaderForNextFile() { - closeCurrentReader(); - - if (currentFileIndex >= currentEndIndex) { - reader = null; - return; - } - - FileStatus fileStatus = files.get(currentFileIndex++); - try { - reader = createParquetReader(fileStatus); - } catch (IOException e) { - throw new GeoParquetException("Failed to create reader for " + fileStatus, e); - } - } - - private void closeCurrentReader() { - if (reader != null) { - try { - reader.close(); - } catch (IOException e) { - // Ignore exceptions during close - } - reader = null; - } - } - - @Override - public boolean tryAdvance(Consumer<? super GeoParquetGroup> action) { - try { - while (true) { - if (reader == null) { - return false; - } - - GeoParquetGroup group = reader.read(); - - if (group == null) { - setupReaderForNextFile(); - continue; - } - - action.accept(group); - return true; - } - } catch (IOException e) { - closeCurrentReader(); - throw new GeoParquetException("IOException caught while trying to read the next file.", e); - } - } - - private ParquetReader<GeoParquetGroup> createParquetReader(FileStatus file) - throws IOException { - return ParquetReader.builder(new GeoParquetGroupReadSupport(), file.getPath()) - .withConf(configuration) - .build(); - } - - @Override - public Spliterator<GeoParquetGroup> trySplit() { - int remainingFiles = currentEndIndex - currentFileIndex; - if (remainingFiles <= 1) { - return null; - } - int mid = currentFileIndex + remainingFiles / 2; - GeoParquetSpliterator split = new GeoParquetSpliterator(mid, currentEndIndex); - this.currentEndIndex = mid; - return split; - } - - @Override - public long estimateSize() { - // Return Long.MAX_VALUE as the actual number of elements is unknown - return Long.MAX_VALUE; - } - - @Override - public int characteristics() { - return NONNULL | IMMUTABLE; - } - } } diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetSpliterator.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetSpliterator.java new file mode 100644 index 00000000..90e3d8cb --- /dev/null +++ b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetSpliterator.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.baremaps.geoparquet; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; +import java.util.Spliterator; +import java.util.function.Consumer; +import org.apache.hadoop.fs.FileStatus; +import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.metadata.FileMetaData; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.io.ColumnIOFactory; +import org.apache.parquet.io.InputFile; +import org.apache.parquet.io.MessageColumnIO; +import org.apache.parquet.io.RecordReader; +import org.apache.parquet.schema.MessageType; + +class GeoParquetSpliterator implements Spliterator<GeoParquetGroup> { + + private final GeoParquetReader reader; + private ParquetFileReader fileReader; + private int fileStartIndex; + private int fileEndIndex; + private MessageType schema; + private GeoParquetMetadata metadata; + private MessageColumnIO columnIO; + private RecordReader<GeoParquetGroup> recordReader; + private int currentRowGroup; + private long rowsReadInGroup; + private long rowsInCurrentGroup; + + GeoParquetSpliterator(GeoParquetReader reader, + int fileStartIndex, + int fileEndIndex) { + this.reader = reader; + this.fileStartIndex = fileStartIndex; + this.fileEndIndex = fileEndIndex; + setupReaderForNextFile(); + } + + private void setupReaderForNextFile() { + closeCurrentReader(); + + if (fileStartIndex >= fileEndIndex) { + fileReader = null; + return; + } + + FileStatus fileStatus = reader.files.get(fileStartIndex++); + try { + InputFile inputFile = HadoopInputFile.fromPath(fileStatus.getPath(), reader.configuration); + fileReader = ParquetFileReader.open(inputFile); + + FileMetaData fileMetaData = fileReader.getFooter().getFileMetaData(); + + schema = fileMetaData.getSchema(); + metadata = new ObjectMapper() + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) + .readValue(fileMetaData.getKeyValueMetaData().get("geo"), GeoParquetMetadata.class); + + columnIO = new ColumnIOFactory().getColumnIO(schema); + currentRowGroup = 0; + rowsReadInGroup = 0; + rowsInCurrentGroup = 0; + advanceToNextRowGroup(); + } catch (IOException e) { + throw new GeoParquetException("Failed to create reader for " + fileStatus, e); + } + } + + private void advanceToNextRowGroup() throws IOException { + if (currentRowGroup >= fileReader.getRowGroups().size()) { + setupReaderForNextFile(); + return; + } + + PageReadStore pages = fileReader.readNextRowGroup(); + if (pages == null) { + setupReaderForNextFile(); + return; + } + + rowsInCurrentGroup = pages.getRowCount(); + rowsReadInGroup = 0; + + GeoParquetGroupRecordMaterializer materializer = + new GeoParquetGroupRecordMaterializer(schema, metadata); + recordReader = columnIO.getRecordReader(pages, materializer); + currentRowGroup++; + } + + @Override + public boolean tryAdvance(Consumer<? super GeoParquetGroup> action) { + try { + while (true) { + if (fileReader == null) { + return false; + } + + if (rowsReadInGroup >= rowsInCurrentGroup) { + advanceToNextRowGroup(); + continue; + } + + GeoParquetGroup group = recordReader.read(); + if (group == null) { + // Should not happen unless there is an error + throw new GeoParquetException("Unexpected null group read from recordReader."); + } + + rowsReadInGroup++; + action.accept(group); + return true; + } + } catch (IOException e) { + closeCurrentReader(); + throw new GeoParquetException("IOException caught while trying to read the next record.", e); + } + } + + private void closeCurrentReader() { + if (fileReader != null) { + try { + fileReader.close(); + } catch (IOException e) { + // Ignore exceptions during close + } + fileReader = null; + } + } + + @Override + public Spliterator<GeoParquetGroup> trySplit() { + int remainingFiles = fileEndIndex - fileStartIndex; + if (remainingFiles <= 1) { + return null; + } + int mid = fileStartIndex + remainingFiles / 2; + GeoParquetSpliterator split = new GeoParquetSpliterator(reader, mid, fileEndIndex); + this.fileEndIndex = mid; + return split; + } + + @Override + public long estimateSize() { + // Return Long.MAX_VALUE as the actual number of elements is unknown + return Long.MAX_VALUE; + } + + @Override + public int characteristics() { + return NONNULL | IMMUTABLE; + } +} diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/Int96Value.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/Int96Value.java similarity index 93% rename from baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/Int96Value.java rename to baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/Int96Value.java index 33576e5c..160cde64 100644 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/Int96Value.java +++ b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/Int96Value.java @@ -15,12 +15,12 @@ * limitations under the License. */ -package org.apache.baremaps.geoparquet.data; +package org.apache.baremaps.geoparquet; import org.apache.parquet.io.api.Binary; import org.apache.parquet.io.api.RecordConsumer; -public class Int96Value extends Primitive { +class Int96Value extends Primitive { private final Binary value; diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/IntegerValue.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/IntegerValue.java similarity index 92% rename from baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/IntegerValue.java rename to baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/IntegerValue.java index 6b3086e9..e740b541 100644 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/IntegerValue.java +++ b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/IntegerValue.java @@ -15,12 +15,12 @@ * limitations under the License. */ -package org.apache.baremaps.geoparquet.data; +package org.apache.baremaps.geoparquet; import org.apache.parquet.io.api.RecordConsumer; -public class IntegerValue extends Primitive { +class IntegerValue extends Primitive { private final int value; diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/LongValue.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/LongValue.java similarity index 93% rename from baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/LongValue.java rename to baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/LongValue.java index 34483b68..67b7c7f3 100644 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/LongValue.java +++ b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/LongValue.java @@ -15,11 +15,11 @@ * limitations under the License. */ -package org.apache.baremaps.geoparquet.data; +package org.apache.baremaps.geoparquet; import org.apache.parquet.io.api.RecordConsumer; -public class LongValue extends Primitive { +class LongValue extends Primitive { private final long value; diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/Primitive.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/Primitive.java similarity index 95% rename from baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/Primitive.java rename to baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/Primitive.java index 79bd9517..46ee4e8a 100644 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/Primitive.java +++ b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/Primitive.java @@ -15,13 +15,13 @@ * limitations under the License. */ -package org.apache.baremaps.geoparquet.data; +package org.apache.baremaps.geoparquet; import org.apache.parquet.io.api.Binary; import org.apache.parquet.io.api.RecordConsumer; import org.locationtech.jts.geom.Geometry; -public abstract class Primitive { +abstract class Primitive { public String getString() { throw new UnsupportedOperationException(); diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/common/GroupWriter.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/common/GroupWriter.java deleted file mode 100644 index 09fc039b..00000000 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/common/GroupWriter.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.baremaps.geoparquet.common; - -import org.apache.baremaps.geoparquet.data.GeoParquetGroupImpl; -import org.apache.parquet.io.api.RecordConsumer; -import org.apache.parquet.schema.GroupType; -import org.apache.parquet.schema.Type; - -public abstract class GroupWriter { - protected final RecordConsumer recordConsumer; - protected final GroupType schema; - - protected GroupWriter(RecordConsumer recordConsumer, GroupType schema) { - this.recordConsumer = recordConsumer; - this.schema = schema; - } - - public final void write(GeoParquetGroupImpl group) { - recordConsumer.startMessage(); - writeGroup(group, schema); - recordConsumer.endMessage(); - } - - private void writeGroup(GeoParquetGroupImpl group, GroupType type) { - int fieldCount = type.getFieldCount(); - for (int field = 0; field < fieldCount; ++field) { - int valueCount = group.getFieldRepetitionCount(field); - if (valueCount > 0) { - Type fieldType = type.getType(field); - String fieldName = fieldType.getName(); - recordConsumer.startField(fieldName, field); - for (int index = 0; index < valueCount; ++index) { - if (fieldType.isPrimitive()) { - group.writeValue(field, index, recordConsumer); - } else { - recordConsumer.startGroup(); - writeGroup(group.getGroup(field, index), fieldType.asGroupType()); - recordConsumer.endGroup(); - } - } - recordConsumer.endField(fieldName, field); - } - } - } -} diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupWriter.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupWriter.java deleted file mode 100644 index 911a3dda..00000000 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupWriter.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.baremaps.geoparquet.data; - -import org.apache.baremaps.geoparquet.common.GroupWriter; -import org.apache.parquet.io.api.RecordConsumer; -import org.apache.parquet.schema.GroupType; - -public class GeoParquetGroupWriter extends GroupWriter { - public GeoParquetGroupWriter(RecordConsumer recordConsumer, GroupType schema) { - super(recordConsumer, schema); - } -} diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetMaterializer.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetMaterializer.java deleted file mode 100644 index a8cc46a0..00000000 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetMaterializer.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.baremaps.geoparquet.data; - -import org.apache.parquet.io.api.GroupConverter; -import org.apache.parquet.io.api.RecordMaterializer; -import org.apache.parquet.schema.MessageType; - -public class GeoParquetMaterializer extends RecordMaterializer<GeoParquetGroupImpl> { - - private final GeoParquetGroupFactory groupFactory; - - private final GeoParquetGroupConverter root; - - public GeoParquetMaterializer(MessageType schema, GeoParquetMetadata metadata) { - this.groupFactory = new GeoParquetGroupFactory(schema, metadata); - this.root = new GeoParquetGroupConverter(null, 0, schema) { - @Override - public void start() { - this.current = groupFactory.newGroup(); - } - }; - } - - @Override - public GeoParquetGroupImpl getCurrentRecord() { - return root.getCurrentRecord(); - } - - @Override - public GroupConverter getRootConverter() { - return root; - } - -} diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/hadoop/GeoParquetGroupReadSupport.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/hadoop/GeoParquetGroupReadSupport.java deleted file mode 100644 index 887d7471..00000000 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/hadoop/GeoParquetGroupReadSupport.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.baremaps.geoparquet.hadoop; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; -import java.util.Map; -import org.apache.baremaps.geoparquet.GeoParquetException; -import org.apache.baremaps.geoparquet.data.GeoParquetGroup; -import org.apache.baremaps.geoparquet.data.GeoParquetGroupRecordMaterializer; -import org.apache.baremaps.geoparquet.data.GeoParquetMetadata; -import org.apache.hadoop.conf.Configuration; -import org.apache.parquet.hadoop.api.ReadSupport; -import org.apache.parquet.io.api.RecordMaterializer; -import org.apache.parquet.schema.MessageType; - -public class GeoParquetGroupReadSupport extends ReadSupport<GeoParquetGroup> { - - public GeoParquetGroupReadSupport() {} - - @Override - public ReadContext init( - Configuration configuration, - Map<String, String> keyValueMetaData, - MessageType fileSchema) { - String partialSchemaString = configuration.get(ReadSupport.PARQUET_READ_SCHEMA); - MessageType requestedProjection = getSchemaForRead(fileSchema, partialSchemaString); - return new ReadContext(requestedProjection); - } - - @Override - public RecordMaterializer<GeoParquetGroup> prepareForRead( - Configuration configuration, - Map<String, String> keyValueMetaData, - MessageType fileSchema, - ReadContext readContext) { - - // Read the GeoParquet metadata of the Parquet file - try { - String json = keyValueMetaData.get("geo"); - GeoParquetMetadata metadata = new ObjectMapper() - .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) - .readValue(json, GeoParquetMetadata.class); - return new GeoParquetGroupRecordMaterializer(readContext.getRequestedSchema(), metadata); - } catch (JsonProcessingException e) { - throw new GeoParquetException("Failed to read GeoParquet's metadata of the Parquet file", e); - } - } - -} diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/hadoop/GeoParquetGroupWriteSupport.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/hadoop/GeoParquetGroupWriteSupport.java deleted file mode 100644 index b28607d6..00000000 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/hadoop/GeoParquetGroupWriteSupport.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.baremaps.geoparquet.hadoop; - -import static org.apache.parquet.schema.MessageTypeParser.parseMessageType; - -import java.util.HashMap; -import java.util.Map; -import java.util.Objects; -import org.apache.baremaps.geoparquet.data.GeoParquetGroupImpl; -import org.apache.hadoop.conf.Configuration; -import org.apache.parquet.hadoop.api.WriteSupport; -import org.apache.parquet.io.api.RecordConsumer; -import org.apache.parquet.schema.MessageType; - -public class GeoParquetGroupWriteSupport extends WriteSupport<GeoParquetGroupImpl> { - - public static final String PARQUET_EXAMPLE_SCHEMA = "parquet.example.schema"; - - public static void setSchema(MessageType schema, Configuration configuration) { - configuration.set(PARQUET_EXAMPLE_SCHEMA, schema.toString()); - } - - public static MessageType getSchema(Configuration configuration) { - return parseMessageType( - Objects.requireNonNull(configuration.get(PARQUET_EXAMPLE_SCHEMA), PARQUET_EXAMPLE_SCHEMA)); - } - - private MessageType schema; - private GeoParquetGroupWriter groupWriter; - private final Map<String, String> extraMetaData; - - public GeoParquetGroupWriteSupport() { - this(null, new HashMap<>()); - } - - GeoParquetGroupWriteSupport(MessageType schema) { - this(schema, new HashMap<>()); - } - - GeoParquetGroupWriteSupport(MessageType schema, Map<String, String> extraMetaData) { - this.schema = schema; - this.extraMetaData = extraMetaData; - } - - @Override - public String getName() { - return "example"; - } - - @Override - public WriteContext init(Configuration configuration) { - // if present, prefer the schema passed to the constructor - if (schema == null) { - schema = getSchema(configuration); - } - return new WriteContext(schema, this.extraMetaData); - } - - @Override - public void prepareForWrite(RecordConsumer recordConsumer) { - groupWriter = new GeoParquetGroupWriter(recordConsumer, schema); - } - - @Override - public void write(GeoParquetGroupImpl group) { - groupWriter.write(group); - } - -} diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/hadoop/GeoParquetGroupWriter.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/hadoop/GeoParquetGroupWriter.java deleted file mode 100644 index 6e579ca9..00000000 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/hadoop/GeoParquetGroupWriter.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.baremaps.geoparquet.hadoop; - -import org.apache.baremaps.geoparquet.common.GroupWriter; -import org.apache.parquet.io.api.RecordConsumer; -import org.apache.parquet.schema.GroupType; - -public class GeoParquetGroupWriter extends GroupWriter { - - public GeoParquetGroupWriter(RecordConsumer recordConsumer, GroupType schema) { - super(recordConsumer, schema); - } -} diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/hadoop/GeoParquetInputFormat.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/hadoop/GeoParquetInputFormat.java deleted file mode 100644 index 1c2bc8ff..00000000 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/hadoop/GeoParquetInputFormat.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.baremaps.geoparquet.hadoop; - -import org.apache.baremaps.geoparquet.data.GeoParquetGroup; -import org.apache.parquet.hadoop.ParquetInputFormat; - -/** - * Example input format to read Parquet files - * - * This Input format uses a rather inefficient data model but works independently of higher level - * abstractions. - */ -public class GeoParquetInputFormat extends ParquetInputFormat<GeoParquetGroup> { - - public GeoParquetInputFormat() { - super(GeoParquetGroupReadSupport.class); - } - -} diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/hadoop/GeoParquetOutputFormat.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/hadoop/GeoParquetOutputFormat.java deleted file mode 100644 index a4d73320..00000000 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/hadoop/GeoParquetOutputFormat.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.baremaps.geoparquet.hadoop; - -import org.apache.baremaps.geoparquet.data.GeoParquetGroupImpl; -import org.apache.hadoop.mapreduce.Job; -import org.apache.parquet.hadoop.ParquetOutputFormat; -import org.apache.parquet.hadoop.util.ContextUtil; -import org.apache.parquet.schema.MessageType; - -/** - * An example output format - * - * must be provided the schema up front - * - * @see GeoParquetOutputFormat#setSchema(Job, MessageType) - * @see GeoParquetGroupWriteSupport#PARQUET_EXAMPLE_SCHEMA - */ -public class GeoParquetOutputFormat extends ParquetOutputFormat<GeoParquetGroupImpl> { - - /** - * set the schema being written to the job conf - * - * @param job a job - * @param schema the schema of the data - */ - public static void setSchema(Job job, MessageType schema) { - GeoParquetGroupWriteSupport.setSchema(schema, ContextUtil.getConfiguration(job)); - } - - /** - * retrieve the schema from the conf - * - * @param job a job - * @return the schema - */ - public static MessageType getSchema(Job job) { - return GeoParquetGroupWriteSupport.getSchema(ContextUtil.getConfiguration(job)); - } - - public GeoParquetOutputFormat() { - super(new GeoParquetGroupWriteSupport()); - } -} diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/hadoop/GeoParquetWriter.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/hadoop/GeoParquetWriter.java deleted file mode 100644 index 4395d5e6..00000000 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/hadoop/GeoParquetWriter.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.baremaps.geoparquet.hadoop; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; -import org.apache.baremaps.geoparquet.data.GeoParquetGroupImpl; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.parquet.column.ParquetProperties; -import org.apache.parquet.hadoop.ParquetWriter; -import org.apache.parquet.hadoop.api.WriteSupport; -import org.apache.parquet.hadoop.metadata.CompressionCodecName; -import org.apache.parquet.io.OutputFile; -import org.apache.parquet.schema.MessageType; - -/** - * An example file writer class. THIS IS AN EXAMPLE ONLY AND NOT INTENDED FOR USE. - */ -public class GeoParquetWriter extends ParquetWriter<GeoParquetGroupImpl> { - - /** - * Creates a Builder for configuring ParquetWriter with the example object model. THIS IS AN - * EXAMPLE ONLY AND NOT INTENDED FOR USE. - * - * @param file the output file to create - * @return a {@link Builder} to create a {@link ParquetWriter} - */ - public static Builder builder(Path file) { - return new Builder(file); - } - - /** - * Creates a Builder for configuring ParquetWriter with the example object model. THIS IS AN - * EXAMPLE ONLY AND NOT INTENDED FOR USE. - * - * @param file the output file to create - * @return a {@link Builder} to create a {@link ParquetWriter} - */ - public static Builder builder(OutputFile file) { - return new Builder(file); - } - - /** - * Create a new {@link GeoParquetWriter}. - * - * @param file The file name to write to. - * @param writeSupport The schema to write with. - * @param compressionCodecName Compression code to use, or CompressionCodecName.UNCOMPRESSED - * @param blockSize the block size threshold. - * @param pageSize See parquet write up. Blocks are subdivided into pages for alignment and other - * purposes. - * @param enableDictionary Whether to use a dictionary to compress columns. - * @param conf The Configuration to use. - * @throws IOException - */ - @SuppressWarnings("squid:S107") - GeoParquetWriter( - Path file, - WriteSupport<GeoParquetGroupImpl> writeSupport, - CompressionCodecName compressionCodecName, - int blockSize, - int pageSize, - boolean enableDictionary, - boolean enableValidation, - ParquetProperties.WriterVersion writerVersion, - Configuration conf) throws IOException { - super(file, writeSupport, compressionCodecName, blockSize, pageSize, - pageSize, enableDictionary, enableValidation, writerVersion, conf); - } - - public static class Builder extends ParquetWriter.Builder<GeoParquetGroupImpl, Builder> { - - private MessageType type = null; - private Map<String, String> extraMetaData = new HashMap<String, String>(); - - private Builder(Path file) { - super(file); - } - - private Builder(OutputFile file) { - super(file); - } - - public Builder withType(MessageType type) { - this.type = type; - return this; - } - - public Builder withExtraMetaData(Map<String, String> extraMetaData) { - this.extraMetaData = extraMetaData; - return this; - } - - @Override - protected Builder self() { - return this; - } - - @Override - protected WriteSupport<GeoParquetGroupImpl> getWriteSupport(Configuration conf) { - return new GeoParquetGroupWriteSupport(type, extraMetaData); - } - - } -} diff --git a/baremaps-geoparquet/src/test/java/org/apache/baremaps/geoparquet/GeoParquetReaderTest.java b/baremaps-geoparquet/src/test/java/org/apache/baremaps/geoparquet/GeoParquetReaderTest.java index f0c38ab4..52813474 100644 --- a/baremaps-geoparquet/src/test/java/org/apache/baremaps/geoparquet/GeoParquetReaderTest.java +++ b/baremaps-geoparquet/src/test/java/org/apache/baremaps/geoparquet/GeoParquetReaderTest.java @@ -23,21 +23,12 @@ import java.net.URI; import java.net.URISyntaxException; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Stream; -import org.apache.baremaps.geoparquet.data.GeoParquetGroup; import org.apache.baremaps.testing.TestFiles; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; class GeoParquetReaderTest { - @Test - void read() { - URI geoParquet = TestFiles.GEOPARQUET.toUri(); - final boolean isParallel = false; - final int expectedGroupCount = 5; - readGroups(geoParquet, isParallel, expectedGroupCount); - } - private static void readGroups(URI geoParquet, boolean parallel, int expectedGroupCount) { GeoParquetReader geoParquetReader = new GeoParquetReader(geoParquet); @@ -49,10 +40,17 @@ class GeoParquetReaderTest { geoParquetGroupStream = geoParquetReader.read(); } geoParquetGroupStream.forEach(group -> groupCount.getAndIncrement()); - assertEquals(expectedGroupCount, groupCount.get()); } + @Test + void read() { + URI geoParquet = TestFiles.GEOPARQUET.toUri(); + final boolean isParallel = false; + final int expectedGroupCount = 5; + readGroups(geoParquet, isParallel, expectedGroupCount); + } + @Disabled("Requires access to the Internet") @Test void readExternal() throws URISyntaxException {
