This is an automated email from the ASF dual-hosted git repository. bchapuis pushed a commit to branch 849-geoparquet in repository https://gitbox.apache.org/repos/asf/incubator-baremaps.git
commit 3b9eaaedf235dbc58c4869fa96a6acabd66192c3 Author: Bertil Chapuis <[email protected]> AuthorDate: Thu May 23 08:27:42 2024 +0200 Add a group interface --- .../baremaps/geoparquet/GeoParquetReader.java | 46 ++- .../baremaps/geoparquet/data/GeoParquetGroup.java | 374 ++++++++++----------- .../geoparquet/data/GeoParquetGroupConverter.java | 4 +- .../geoparquet/data/GeoParquetGroupFactory.java | 13 +- ...oParquetGroup.java => GeoParquetGroupImpl.java} | 131 ++++---- .../geoparquet/data/GeoParquetGroupWriter.java | 12 +- .../geoparquet/data/GeoParquetMaterializer.java | 12 +- .../data/{NanoTime.java => NanoTimeValue.java} | 12 +- .../geoparquet/hadoop/GeoParquetInputFormat.java | 35 -- .../geoparquet/hadoop/GeoParquetReadSupport.java | 49 --- .../baremaps/geoparquet/GeoParquetReaderTest.java | 15 +- 11 files changed, 305 insertions(+), 398 deletions(-) diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetReader.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetReader.java index 933145df..c7348394 100644 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetReader.java +++ b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetReader.java @@ -88,9 +88,8 @@ public class GeoParquetReader { .readValue(json, GeoParquetMetadata.class); // Store the metadata of the Parquet file - this.metadata.put( - fileStatus, - new FileInfo(rowCount, parquetMetadata, geoParquetMetadata)); + FileInfo fileInfo = new FileInfo(rowCount, parquetMetadata, geoParquetMetadata); + this.metadata.put(fileStatus, fileInfo); } } } catch (Exception e) { @@ -98,7 +97,7 @@ public class GeoParquetReader { } } - public Stream<GeoParquetGroup> read() throws IOException { + public Stream<GeoParquetGroupImpl> read() throws IOException { return StreamSupport.stream( Spliterators.spliteratorUnknownSize(new GroupIterator(), Spliterator.ORDERED), false); @@ -129,38 +128,31 @@ public class GeoParquetReader { null); } - private class GroupIterator implements Iterator<GeoParquetGroup> { + private class GroupIterator implements Iterator<GeoParquetGroupImpl> { private Iterator<Map.Entry<FileStatus, FileInfo>> fileIterator; - private Map.Entry<FileStatus, FileInfo> currentFileStatus; - private Iterator<PageReadStore> pageReadStoreIterator; - private PageReadStore currentPageReadStore; - - private Iterator<GeoParquetGroup> simpleGroupIterator; - - private GeoParquetGroup currentGeoParquetGroup; + private Iterator<GeoParquetGroupImpl> geoParquetGroupIterator; public GroupIterator() throws IOException { this.fileIterator = metadata.entrySet().iterator(); this.currentFileStatus = fileIterator.next(); this.pageReadStoreIterator = new PageReadStoreIterator(currentFileStatus); this.currentPageReadStore = pageReadStoreIterator.next(); - this.simpleGroupIterator = new GeoParquetGroupIterator( + this.geoParquetGroupIterator = new GeoParquetGroupIterator( currentFileStatus.getValue(), currentPageReadStore); - this.currentGeoParquetGroup = simpleGroupIterator.next(); } @Override public boolean hasNext() { - if (simpleGroupIterator.hasNext()) { + if (geoParquetGroupIterator.hasNext()) { return true; } else if (pageReadStoreIterator.hasNext()) { currentPageReadStore = pageReadStoreIterator.next(); - simpleGroupIterator = new GeoParquetGroupIterator( + geoParquetGroupIterator = new GeoParquetGroupIterator( currentFileStatus.getValue(), currentPageReadStore); return hasNext(); @@ -178,9 +170,11 @@ public class GeoParquetReader { } @Override - public GeoParquetGroup next() { - currentGeoParquetGroup = simpleGroupIterator.next(); - return currentGeoParquetGroup; + public GeoParquetGroupImpl next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + return geoParquetGroupIterator.next(); } } @@ -234,29 +228,33 @@ public class GeoParquetReader { } } - private static class GeoParquetGroupIterator implements Iterator<GeoParquetGroup> { + private static class GeoParquetGroupIterator implements Iterator<GeoParquetGroupImpl> { private final long rowCount; - private final RecordReader<GeoParquetGroup> recordReader; + private final RecordReader<GeoParquetGroupImpl> recordReader; private long i = 0; private GeoParquetGroupIterator( FileInfo fileInfo, PageReadStore pageReadStore) { + GeoParquetMetadata metadata = fileInfo.getGeoParquetMetadata(); MessageType schema = fileInfo.getParquetMetadata().getFileMetaData().getSchema(); this.rowCount = pageReadStore.getRowCount(); this.recordReader = new ColumnIOFactory() .getColumnIO(schema) - .getRecordReader(pageReadStore, new GeoParquetMaterializer(schema)); + .getRecordReader(pageReadStore, new GeoParquetMaterializer(schema, metadata)); } @Override public boolean hasNext() { - return i <= rowCount; + return i < rowCount; } @Override - public GeoParquetGroup next() { + public GeoParquetGroupImpl next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } i++; return recordReader.read(); } diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroup.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroup.java index 53b0b8f0..297cc617 100644 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroup.java +++ b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroup.java @@ -17,265 +17,245 @@ package org.apache.baremaps.geoparquet.data; -import java.util.ArrayList; import java.util.List; import org.apache.parquet.io.api.Binary; -import org.apache.parquet.io.api.RecordConsumer; import org.apache.parquet.schema.GroupType; -import org.apache.parquet.schema.Type; import org.locationtech.jts.geom.Geometry; -import org.locationtech.jts.io.ParseException; -import org.locationtech.jts.io.WKBReader; -import org.locationtech.jts.io.WKBWriter; -public class GeoParquetGroup { +/** + * A group of fields in a GeoParquet file. + * + */ +public interface GeoParquetGroup { - private final GroupType groupType; + /** + * Returns the GeoParquet schema of the group built upon the Parquet schema and the GeoParquet + * metadata. + * + * @return the GeoParquet schema + */ + Schema getSchema(); - private final List<Object>[] data; + /** + * Returns the Parquet schema of the group. + * + * @return the Parquet schema + */ + GroupType getParquetSchema(); - @SuppressWarnings("unchecked") - public GeoParquetGroup(GroupType groupType) { - this.groupType = groupType; - this.data = new List[groupType.getFields().size()]; - for (int i = 0; i < groupType.getFieldCount(); i++) { - this.data[i] = new ArrayList<>(); - } - } + /** + * Returns the GeoParquet metadata of the group. + * + * @return the Parquet metadata + */ + GeoParquetMetadata getGeoParquetMetadata(); - public GroupType getGroupType() { - return groupType; - } + /** + * Creates a new empty group in the group at the specified field index. + * + * @param fieldIndex the field index + * @return the new group + */ + GeoParquetGroup createGroup(int fieldIndex); - public GeoParquetGroup addGroup(int fieldIndex) { - GeoParquetGroup g = new GeoParquetGroup(groupType.getType(fieldIndex).asGroupType()); - add(fieldIndex, g); - return g; - } + Binary getBinaryValue(int fieldIndex); - public GeoParquetGroup addGroup(String field) { - return addGroup(getGroupType().getFieldIndex(field)); - } + List<Binary> getBinaryValues(int fieldIndex); - public GeoParquetGroup getGroup(int fieldIndex, int index) { - return (GeoParquetGroup) getValue(fieldIndex, index); - } + Boolean getBooleanValue(int fieldIndex); - public GeoParquetGroup getGroup(String field, int index) { - return getGroup(getGroupType().getFieldIndex(field), index); - } + List<Boolean> getBooleanValues(int fieldIndex); - private Object getValue(int fieldIndex, int index) { - List<Object> list; - try { - list = data[fieldIndex]; - } catch (IndexOutOfBoundsException e) { - throw new RuntimeException( - "not found " + fieldIndex + "(" + groupType.getFieldName(fieldIndex) - + ") in group:\n" + this); - } - try { - return list.get(index); - } catch (IndexOutOfBoundsException e) { - throw new RuntimeException( - "not found " + fieldIndex + "(" + groupType.getFieldName(fieldIndex) - + ") element number " + index + " in group:\n" + this); - } - } + Double getDoubleValue(int fieldIndex); - public int getFieldRepetitionCount(int fieldIndex) { - List<Object> list = data[fieldIndex]; - return list == null ? 0 : list.size(); - } + List<Double> getDoubleValues(int fieldIndex); - public String getValueToString(int fieldIndex, int index) { - return String.valueOf(getValue(fieldIndex, index)); - } + Float getFloatValue(int fieldIndex); - public String getString(int fieldIndex, int index) { - return ((BinaryValue) getValue(fieldIndex, index)).getString(); - } + List<Float> getFloatValues(int fieldIndex); - public int getInteger(int fieldIndex, int index) { - return ((IntegerValue) getValue(fieldIndex, index)).getInteger(); - } + Integer getIntegerValue(int fieldIndex); - public long getLong(int fieldIndex, int index) { - return ((LongValue) getValue(fieldIndex, index)).getLong(); - } + List<Integer> getIntegerValues(int fieldIndex); - public double getDouble(int fieldIndex, int index) { - return ((DoubleValue) getValue(fieldIndex, index)).getDouble(); - } + Long getLongValue(int fieldIndex); - public float getFloat(int fieldIndex, int index) { - return ((FloatValue) getValue(fieldIndex, index)).getFloat(); - } + List<Long> getLongValues(int fieldIndex); - public boolean getBoolean(int fieldIndex, int index) { - return ((BooleanValue) getValue(fieldIndex, index)).getBoolean(); - } + String getStringValue(int fieldIndex); - public Binary getBinary(int fieldIndex, int index) { - return ((BinaryValue) getValue(fieldIndex, index)).getBinary(); - } + List<String> getStringValues(int fieldIndex); - public NanoTime getTimeNanos(int fieldIndex, int index) { - return NanoTime.fromInt96((Int96Value) getValue(fieldIndex, index)); - } + Geometry getGeometryValue(int fieldIndex); - public Binary getInt96(int fieldIndex, int index) { - return ((Int96Value) getValue(fieldIndex, index)).getInt96(); - } + List<Geometry> getGeometryValues(int fieldIndex); - public Geometry getGeometry(int fieldIndex, int index) { - byte[] bytes = ((BinaryValue) getValue(fieldIndex, index)).getBinary().getBytes(); - try { - return new WKBReader().read(bytes); - } catch (ParseException e) { - throw new RuntimeException(e); - } - } + GeoParquetGroup getGroupValue(int fieldIndex); - private void add(int fieldIndex, Primitive value) { - Type type = groupType.getType(fieldIndex); - List<Object> list = data[fieldIndex]; - if (!type.isRepetition(Type.Repetition.REPEATED) - && !list.isEmpty()) { - throw new IllegalStateException("field " + fieldIndex + " (" + type.getName() - + ") can not have more than one value: " + list); - } - list.add(value); - } + List<GeoParquetGroup> getGroupValues(int fieldIndex); - public void add(int fieldIndex, int value) { - add(fieldIndex, new IntegerValue(value)); - } + Binary getBinaryValue(String columnName); - public void add(int fieldIndex, long value) { - add(fieldIndex, new LongValue(value)); - } + List<Binary> getBinaryValues(String columnName); - public void add(int fieldIndex, String value) { - add(fieldIndex, new BinaryValue(Binary.fromString(value))); - } + Boolean getBooleanValue(String columnName); - public void add(int fieldIndex, NanoTime value) { - add(fieldIndex, value.toInt96()); - } + List<Boolean> getBooleanValues(String columnName); - public void add(int fieldIndex, boolean value) { - add(fieldIndex, new BooleanValue(value)); - } + Double getDoubleValue(String columnName); - public void add(int fieldIndex, Binary value) { - switch (getGroupType().getType(fieldIndex).asPrimitiveType().getPrimitiveTypeName()) { - case BINARY: - case FIXED_LEN_BYTE_ARRAY: - add(fieldIndex, new BinaryValue(value)); - break; - case INT96: - add(fieldIndex, new Int96Value(value)); - break; - default: - throw new UnsupportedOperationException( - getGroupType().asPrimitiveType().getName() + " not supported for Binary"); - } - } + List<Double> getDoubleValues(String columnName); - public void add(int fieldIndex, float value) { - add(fieldIndex, new FloatValue(value)); - } + Float getFloatValue(String columnName); - public void add(int fieldIndex, double value) { - add(fieldIndex, new DoubleValue(value)); - } + List<Float> getFloatValues(String columnName); - public void add(int fieldIndex, GeoParquetGroup value) { - data[fieldIndex].add(value); - } + Integer getIntegerValue(String columnName); - public void add(int fieldIndex, Geometry geometry) { - byte[] bytes = new WKBWriter().write(geometry); - add(fieldIndex, Binary.fromConstantByteArray(bytes)); - } + List<Integer> getIntegerValues(String columnName); - public void add(String field, int value) { - add(getGroupType().getFieldIndex(field), value); - } + Long getLongValue(String columnName); - public void add(String field, long value) { - add(getGroupType().getFieldIndex(field), value); - } + List<Long> getLongValues(String columnName); + + String getStringValue(String columnName); + + List<String> getStringValues(String columnName); + + Geometry getGeometryValue(String columnName); + + List<Geometry> getGeometryValues(String columnName); + + GeoParquetGroup getGroupValue(String columnName); + + List<GeoParquetGroup> getGroupValues(String columnName); + + void setBinaryValue(int fieldIndex, Binary binaryValue); + + void setBinaryValues(int fieldIndex, List<Binary> binaryValues); + + void setBooleanValue(int fieldIndex, Boolean booleanValue); + + void setBooleanValues(int fieldIndex, List<Boolean> booleanValues); + + void setDoubleValue(int fieldIndex, Double doubleValue); + + void setDoubleValues(int fieldIndex, List<Double> doubleValues); + + void setFloatValue(int fieldIndex, Float floatValue); + + void setFloatValues(int fieldIndex, List<Float> floatValues); + + void setIntegerValue(int fieldIndex, Integer integerValue); + + void setIntegerValues(int fieldIndex, List<Integer> integerValues); + + void setLongValue(int fieldIndex, Long longValue); + + void setLongValues(int fieldIndex, List<Long> longValues); + + void setStringValue(int fieldIndex, String stringValue); + + void setStringValues(int fieldIndex, List<String> stringValues); + + void setGeometryValue(int fieldIndex, Geometry geometryValue); + + void setGeometryValues(int fieldIndex, List<Geometry> geometryValues); + + void setGroupValue(int fieldIndex, GeoParquetGroup groupValue); + + void setGroupValues(int fieldIndex, List<GeoParquetGroup> groupValues); + + void setBinaryValue(String columnName, Binary binaryValue); + + void setBinaryValues(String columnName, List<Binary> binaryValues); + + void setBooleanValue(String columnName, Boolean booleanValue); + + void setBooleanValues(String columnName, List<Boolean> booleanValues); + + void setDoubleValue(String columnName, Double doubleValue); + + void setDoubleValues(String columnName, List<Double> doubleValues); + + void setFloatValue(String columnName, Float floatValue); + + void setFloatValues(String columnName, List<Float> floatValues); + + void setIntegerValue(String columnName, Integer integerValue); + + void setIntegerValues(String columnName, List<Integer> integerValues); + + void setLongValue(String columnName, Long longValue); + + void setLongValues(String columnName, List<Long> longValues); + + void setStringValue(String columnName, String stringValue); + + void setStringValues(String columnName, List<String> stringValues); + + void setGeometryValue(String columnName, Geometry geometryValue); + + void setGeometryValues(String columnName, List<Geometry> geometryValues); + + void setGroupValue(String columnName, GeoParquetGroup groupValue); + + void setGroupValues(String columnName, List<GeoParquetGroup> groupValues); + + /** + * A GeoParquet schema that describes the fields of a group and can easily be introspected. + * + * @param fields the fields of the schema + */ + record Schema(List<Field> fields) { - public void add(String field, float value) { - add(getGroupType().getFieldIndex(field), value); } - public void add(String field, double value) { - add(getGroupType().getFieldIndex(field), value); + /** + * A sealed inteface for the fields of a GeoParquet schema that can be explored with pattern + * matching. + */ + sealed + interface Field { + String name(); + + Cardinality cardinality(); } - public void add(String field, String value) { - add(getGroupType().getFieldIndex(field), value); + record BinaryField(String name, Cardinality cardinality) implements Field { } - public void add(String field, NanoTime value) { - add(getGroupType().getFieldIndex(field), value); + record BooleanField(String name, Cardinality cardinality) implements Field { } - public void add(String field, boolean value) { - add(getGroupType().getFieldIndex(field), value); + record DoubleField(String name, Cardinality cardinality) implements Field { } - public void add(String field, Binary value) { - add(getGroupType().getFieldIndex(field), value); + record FloatField(String name, Cardinality cardinality) implements Field { } - public void add(String field, GeoParquetGroup value) { - add(getGroupType().getFieldIndex(field), value); + record IntegerField(String name, Cardinality cardinality) implements Field { } - public void add(String field, Geometry geometry) { - byte[] bytes = new WKBWriter().write(geometry); - add(getGroupType().getFieldIndex(field), Binary.fromConstantByteArray(bytes)); + record LongField(String name, Cardinality cardinality) implements Field { } - public void writeValue(int field, int index, RecordConsumer recordConsumer) { - ((Primitive) getValue(field, index)).writeValue(recordConsumer); + record StringField(String name, Cardinality cardinality) implements Field { } - @Override - public String toString() { - return toString(""); + record GeometryField(String name, Cardinality cardinality, int srid) implements Field { } - private void appendToString(StringBuilder builder, String indent) { - int i = 0; - for (Type field : groupType.getFields()) { - String name = field.getName(); - List<Object> values = data[i]; - ++i; - if (values != null && !values.isEmpty()) { - for (Object value : values) { - builder.append(indent).append(name); - if (value == null) { - builder.append(": NULL\n"); - } else if (value instanceof GeoParquetGroup) { - builder.append('\n'); - ((GeoParquetGroup) value).appendToString(builder, indent + " "); - } else { - builder.append(": ").append(value.toString()).append('\n'); - } - } - } - } + record GroupField(String name, Cardinality cardinality, Schema schema) implements Field { } - public String toString(String indent) { - StringBuilder builder = new StringBuilder(); - appendToString(builder, indent); - return builder.toString(); + /** + * The cardinality of a GeoParquet field. + */ + enum Cardinality { + REQUIRED, + OPTIONAL, + REPEATED } } diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupConverter.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupConverter.java index c14107e3..c2a04f4d 100644 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupConverter.java +++ b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupConverter.java @@ -26,7 +26,7 @@ class GeoParquetGroupConverter extends GroupConverter { private final GeoParquetGroupConverter parent; private final int index; - protected GeoParquetGroup current; + protected GeoParquetGroupImpl current; private Converter[] converters; GeoParquetGroupConverter(GeoParquetGroupConverter parent, int index, @@ -60,7 +60,7 @@ class GeoParquetGroupConverter extends GroupConverter { @Override public void end() {} - public GeoParquetGroup getCurrentRecord() { + public GeoParquetGroupImpl getCurrentRecord() { return current; } } diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupFactory.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupFactory.java index 6c7d5a4f..c56e0aad 100644 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupFactory.java +++ b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupFactory.java @@ -17,18 +17,21 @@ package org.apache.baremaps.geoparquet.data; -import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.GroupType; public class GeoParquetGroupFactory { - private final MessageType schema; + private final GroupType schema; - public GeoParquetGroupFactory(MessageType schema) { + private final GeoParquetMetadata metadata; + + public GeoParquetGroupFactory(GroupType schema, GeoParquetMetadata metadata) { this.schema = schema; + this.metadata = metadata; } - public GeoParquetGroup newGroup() { - return new GeoParquetGroup(schema); + public GeoParquetGroupImpl newGroup() { + return new GeoParquetGroupImpl(schema, metadata); } } diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroup.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupImpl.java similarity index 71% copy from baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroup.java copy to baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupImpl.java index 53b0b8f0..14f6bad2 100644 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroup.java +++ b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupImpl.java @@ -28,59 +28,45 @@ import org.locationtech.jts.io.ParseException; import org.locationtech.jts.io.WKBReader; import org.locationtech.jts.io.WKBWriter; -public class GeoParquetGroup { +public class GeoParquetGroupImpl { - private final GroupType groupType; + private final GroupType schema; + + private final GeoParquetMetadata metadata; private final List<Object>[] data; @SuppressWarnings("unchecked") - public GeoParquetGroup(GroupType groupType) { - this.groupType = groupType; - this.data = new List[groupType.getFields().size()]; - for (int i = 0; i < groupType.getFieldCount(); i++) { + public GeoParquetGroupImpl(GroupType schema, GeoParquetMetadata metadata) { + this.schema = schema; + this.metadata = metadata; + this.data = new List[schema.getFields().size()]; + for (int i = 0; i < schema.getFieldCount(); i++) { this.data[i] = new ArrayList<>(); } } - public GroupType getGroupType() { - return groupType; + public GroupType getSchema() { + return schema; } - public GeoParquetGroup addGroup(int fieldIndex) { - GeoParquetGroup g = new GeoParquetGroup(groupType.getType(fieldIndex).asGroupType()); + public GeoParquetGroupImpl addGroup(int fieldIndex) { + GeoParquetGroupImpl g = + new GeoParquetGroupImpl(schema.getType(fieldIndex).asGroupType(), metadata); add(fieldIndex, g); return g; } - public GeoParquetGroup addGroup(String field) { - return addGroup(getGroupType().getFieldIndex(field)); + public GeoParquetGroupImpl addGroup(String field) { + return addGroup(getSchema().getFieldIndex(field)); } - public GeoParquetGroup getGroup(int fieldIndex, int index) { - return (GeoParquetGroup) getValue(fieldIndex, index); + public GeoParquetGroupImpl getGroup(int fieldIndex, int index) { + return (GeoParquetGroupImpl) getValue(fieldIndex, index); } - public GeoParquetGroup getGroup(String field, int index) { - return getGroup(getGroupType().getFieldIndex(field), index); - } - - private Object getValue(int fieldIndex, int index) { - List<Object> list; - try { - list = data[fieldIndex]; - } catch (IndexOutOfBoundsException e) { - throw new RuntimeException( - "not found " + fieldIndex + "(" + groupType.getFieldName(fieldIndex) - + ") in group:\n" + this); - } - try { - return list.get(index); - } catch (IndexOutOfBoundsException e) { - throw new RuntimeException( - "not found " + fieldIndex + "(" + groupType.getFieldName(fieldIndex) - + ") element number " + index + " in group:\n" + this); - } + public GeoParquetGroupImpl getGroup(String field, int index) { + return getGroup(getSchema().getFieldIndex(field), index); } public int getFieldRepetitionCount(int fieldIndex) { @@ -120,8 +106,8 @@ public class GeoParquetGroup { return ((BinaryValue) getValue(fieldIndex, index)).getBinary(); } - public NanoTime getTimeNanos(int fieldIndex, int index) { - return NanoTime.fromInt96((Int96Value) getValue(fieldIndex, index)); + public NanoTimeValue getTimeNanos(int fieldIndex, int index) { + return NanoTimeValue.fromInt96((Int96Value) getValue(fieldIndex, index)); } public Binary getInt96(int fieldIndex, int index) { @@ -137,8 +123,26 @@ public class GeoParquetGroup { } } + private Object getValue(int fieldIndex, int index) { + List<Object> list; + try { + list = data[fieldIndex]; + } catch (IndexOutOfBoundsException e) { + throw new RuntimeException( + "not found " + fieldIndex + "(" + schema.getFieldName(fieldIndex) + + ") in group:\n" + this); + } + try { + return list.get(index); + } catch (IndexOutOfBoundsException e) { + throw new RuntimeException( + "not found " + fieldIndex + "(" + schema.getFieldName(fieldIndex) + + ") element number " + index + " in group:\n" + this); + } + } + private void add(int fieldIndex, Primitive value) { - Type type = groupType.getType(fieldIndex); + Type type = schema.getType(fieldIndex); List<Object> list = data[fieldIndex]; if (!type.isRepetition(Type.Repetition.REPEATED) && !list.isEmpty()) { @@ -160,7 +164,7 @@ public class GeoParquetGroup { add(fieldIndex, new BinaryValue(Binary.fromString(value))); } - public void add(int fieldIndex, NanoTime value) { + public void add(int fieldIndex, NanoTimeValue value) { add(fieldIndex, value.toInt96()); } @@ -169,7 +173,7 @@ public class GeoParquetGroup { } public void add(int fieldIndex, Binary value) { - switch (getGroupType().getType(fieldIndex).asPrimitiveType().getPrimitiveTypeName()) { + switch (getSchema().getType(fieldIndex).asPrimitiveType().getPrimitiveTypeName()) { case BINARY: case FIXED_LEN_BYTE_ARRAY: add(fieldIndex, new BinaryValue(value)); @@ -179,7 +183,7 @@ public class GeoParquetGroup { break; default: throw new UnsupportedOperationException( - getGroupType().asPrimitiveType().getName() + " not supported for Binary"); + getSchema().asPrimitiveType().getName() + " not supported for Binary"); } } @@ -191,7 +195,7 @@ public class GeoParquetGroup { add(fieldIndex, new DoubleValue(value)); } - public void add(int fieldIndex, GeoParquetGroup value) { + public void add(int fieldIndex, GeoParquetGroupImpl value) { data[fieldIndex].add(value); } @@ -201,47 +205,47 @@ public class GeoParquetGroup { } public void add(String field, int value) { - add(getGroupType().getFieldIndex(field), value); + add(getSchema().getFieldIndex(field), value); } public void add(String field, long value) { - add(getGroupType().getFieldIndex(field), value); + add(getSchema().getFieldIndex(field), value); } public void add(String field, float value) { - add(getGroupType().getFieldIndex(field), value); + add(getSchema().getFieldIndex(field), value); } public void add(String field, double value) { - add(getGroupType().getFieldIndex(field), value); + add(getSchema().getFieldIndex(field), value); } public void add(String field, String value) { - add(getGroupType().getFieldIndex(field), value); + add(getSchema().getFieldIndex(field), value); } - public void add(String field, NanoTime value) { - add(getGroupType().getFieldIndex(field), value); + public void add(String field, NanoTimeValue value) { + add(getSchema().getFieldIndex(field), value); } public void add(String field, boolean value) { - add(getGroupType().getFieldIndex(field), value); + add(getSchema().getFieldIndex(field), value); } public void add(String field, Binary value) { - add(getGroupType().getFieldIndex(field), value); + add(getSchema().getFieldIndex(field), value); } - public void add(String field, GeoParquetGroup value) { - add(getGroupType().getFieldIndex(field), value); + public void add(String field, GeoParquetGroupImpl value) { + add(getSchema().getFieldIndex(field), value); } public void add(String field, Geometry geometry) { byte[] bytes = new WKBWriter().write(geometry); - add(getGroupType().getFieldIndex(field), Binary.fromConstantByteArray(bytes)); + add(getSchema().getFieldIndex(field), Binary.fromConstantByteArray(bytes)); } - public void writeValue(int field, int index, RecordConsumer recordConsumer) { + void writeValue(int field, int index, RecordConsumer recordConsumer) { ((Primitive) getValue(field, index)).writeValue(recordConsumer); } @@ -250,9 +254,15 @@ public class GeoParquetGroup { return toString(""); } + public String toString(String indent) { + StringBuilder builder = new StringBuilder(); + appendToString(builder, indent); + return builder.toString(); + } + private void appendToString(StringBuilder builder, String indent) { int i = 0; - for (Type field : groupType.getFields()) { + for (Type field : schema.getFields()) { String name = field.getName(); List<Object> values = data[i]; ++i; @@ -261,9 +271,9 @@ public class GeoParquetGroup { builder.append(indent).append(name); if (value == null) { builder.append(": NULL\n"); - } else if (value instanceof GeoParquetGroup) { + } else if (value instanceof GeoParquetGroupImpl) { builder.append('\n'); - ((GeoParquetGroup) value).appendToString(builder, indent + " "); + ((GeoParquetGroupImpl) value).appendToString(builder, indent + " "); } else { builder.append(": ").append(value.toString()).append('\n'); } @@ -271,11 +281,4 @@ public class GeoParquetGroup { } } } - - public String toString(String indent) { - StringBuilder builder = new StringBuilder(); - appendToString(builder, indent); - return builder.toString(); - } - } diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupWriter.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupWriter.java index 4fbdbc32..b114236a 100644 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupWriter.java +++ b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupWriter.java @@ -31,26 +31,26 @@ public class GeoParquetGroupWriter { this.schema = schema; } - public void write(GeoParquetGroup geoParquetGroup) { + public void write(GeoParquetGroupImpl group) { recordConsumer.startMessage(); - writeGroup(geoParquetGroup, schema); + writeGroup(group, schema); recordConsumer.endMessage(); } - private void writeGroup(GeoParquetGroup geoParquetGroup, GroupType type) { + private void writeGroup(GeoParquetGroupImpl group, GroupType type) { int fieldCount = type.getFieldCount(); for (int field = 0; field < fieldCount; ++field) { - int valueCount = geoParquetGroup.getFieldRepetitionCount(field); + int valueCount = group.getFieldRepetitionCount(field); if (valueCount > 0) { Type fieldType = type.getType(field); String fieldName = fieldType.getName(); recordConsumer.startField(fieldName, field); for (int index = 0; index < valueCount; ++index) { if (fieldType.isPrimitive()) { - geoParquetGroup.writeValue(field, index, recordConsumer); + group.writeValue(field, index, recordConsumer); } else { recordConsumer.startGroup(); - writeGroup(geoParquetGroup.getGroup(field, index), fieldType.asGroupType()); + writeGroup(group.getGroup(field, index), fieldType.asGroupType()); recordConsumer.endGroup(); } } diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetMaterializer.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetMaterializer.java index 1847ff6f..a01ba90f 100644 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetMaterializer.java +++ b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetMaterializer.java @@ -21,24 +21,24 @@ import org.apache.parquet.io.api.GroupConverter; import org.apache.parquet.io.api.RecordMaterializer; import org.apache.parquet.schema.MessageType; -public class GeoParquetMaterializer extends RecordMaterializer<GeoParquetGroup> { +public class GeoParquetMaterializer extends RecordMaterializer<GeoParquetGroupImpl> { - private final GeoParquetGroupFactory geoParquetGroupFactory; + private final GeoParquetGroupFactory groupFactory; private GeoParquetGroupConverter root; - public GeoParquetMaterializer(MessageType schema) { - this.geoParquetGroupFactory = new GeoParquetGroupFactory(schema); + public GeoParquetMaterializer(MessageType schema, GeoParquetMetadata metadata) { + this.groupFactory = new GeoParquetGroupFactory(schema, metadata); this.root = new GeoParquetGroupConverter(null, 0, schema) { @Override public void start() { - this.current = geoParquetGroupFactory.newGroup(); + this.current = groupFactory.newGroup(); } }; } @Override - public GeoParquetGroup getCurrentRecord() { + public GeoParquetGroupImpl getCurrentRecord() { return root.getCurrentRecord(); } diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/NanoTime.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/NanoTimeValue.java similarity index 86% rename from baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/NanoTime.java rename to baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/NanoTimeValue.java index 64eb2864..b6037367 100644 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/NanoTime.java +++ b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/NanoTimeValue.java @@ -23,25 +23,25 @@ import org.apache.parquet.Preconditions; import org.apache.parquet.io.api.Binary; import org.apache.parquet.io.api.RecordConsumer; -public class NanoTime extends Primitive { +public class NanoTimeValue extends Primitive { private final int julianDay; private final long timeOfDayNanos; - public static NanoTime fromBinary(Binary bytes) { + public static NanoTimeValue fromBinary(Binary bytes) { Preconditions.checkArgument(bytes.length() == 12, "Must be 12 bytes"); ByteBuffer buf = bytes.toByteBuffer(); buf.order(ByteOrder.LITTLE_ENDIAN); long timeOfDayNanos = buf.getLong(); int julianDay = buf.getInt(); - return new NanoTime(julianDay, timeOfDayNanos); + return new NanoTimeValue(julianDay, timeOfDayNanos); } - public static NanoTime fromInt96(Int96Value int96) { + public static NanoTimeValue fromInt96(Int96Value int96) { ByteBuffer buf = int96.getInt96().toByteBuffer(); - return new NanoTime(buf.getInt(), buf.getLong()); + return new NanoTimeValue(buf.getInt(), buf.getLong()); } - public NanoTime(int julianDay, long timeOfDayNanos) { + public NanoTimeValue(int julianDay, long timeOfDayNanos) { this.julianDay = julianDay; this.timeOfDayNanos = timeOfDayNanos; } diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/hadoop/GeoParquetInputFormat.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/hadoop/GeoParquetInputFormat.java deleted file mode 100644 index 2e4e6749..00000000 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/hadoop/GeoParquetInputFormat.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.baremaps.geoparquet.hadoop; - -import org.apache.baremaps.geoparquet.data.GeoParquetGroup; -import org.apache.parquet.hadoop.ParquetInputFormat; - -/** - * Example input format to read Parquet files - * - * This Input format uses a rather inefficient data model but works independently of higher level - * abstractions. - */ -public class GeoParquetInputFormat extends ParquetInputFormat<GeoParquetGroup> { - - public GeoParquetInputFormat() { - super(GeoParquetReadSupport.class); - } - -} diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/hadoop/GeoParquetReadSupport.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/hadoop/GeoParquetReadSupport.java deleted file mode 100644 index a875f427..00000000 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/hadoop/GeoParquetReadSupport.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.baremaps.geoparquet.hadoop; - -import java.util.Map; -import org.apache.baremaps.geoparquet.data.GeoParquetGroup; -import org.apache.baremaps.geoparquet.data.GeoParquetMaterializer; -import org.apache.hadoop.conf.Configuration; -import org.apache.parquet.hadoop.api.ReadSupport; -import org.apache.parquet.io.api.RecordMaterializer; -import org.apache.parquet.schema.MessageType; - -public class GeoParquetReadSupport extends ReadSupport<GeoParquetGroup> { - - @Override - public ReadContext init( - Configuration configuration, - Map<String, String> keyValueMetaData, - MessageType fileSchema) { - String partialSchemaString = configuration.get(ReadSupport.PARQUET_READ_SCHEMA); - MessageType requestedProjection = getSchemaForRead(fileSchema, partialSchemaString); - return new ReadContext(requestedProjection); - } - - @Override - public RecordMaterializer<GeoParquetGroup> prepareForRead( - Configuration configuration, - Map<String, String> keyValueMetaData, - MessageType fileSchema, - ReadContext readContext) { - return new GeoParquetMaterializer(readContext.getRequestedSchema()); - } - -} diff --git a/baremaps-geoparquet/src/test/java/org/apache/baremaps/geoparquet/GeoParquetReaderTest.java b/baremaps-geoparquet/src/test/java/org/apache/baremaps/geoparquet/GeoParquetReaderTest.java index 5ac331f5..4e7f1442 100644 --- a/baremaps-geoparquet/src/test/java/org/apache/baremaps/geoparquet/GeoParquetReaderTest.java +++ b/baremaps-geoparquet/src/test/java/org/apache/baremaps/geoparquet/GeoParquetReaderTest.java @@ -19,8 +19,9 @@ package org.apache.baremaps.geoparquet; import java.io.IOException; import java.net.URI; -import java.net.URISyntaxException; import org.apache.baremaps.testing.TestFiles; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.Type; import org.junit.jupiter.api.Test; class GeoParquetReaderTest { @@ -30,11 +31,17 @@ class GeoParquetReaderTest { // URI geoParquet = new // URI("s3a://overturemaps-us-west-2/release/2024-03-12-alpha.0/theme=admins/type=locality_area/*.parquet"); URI geoParquet = TestFiles.GEOPARQUET.toUri(); - System.out.println(geoParquet); + GeoParquetReader geoParquetReader = new GeoParquetReader(geoParquet); geoParquetReader.read().forEach(group -> { - System.out.println("--------------------"); - System.out.println(group); + GroupType schema = group.getSchema(); + for (int i = 0; i < schema.getFieldCount(); i++) { + Type fieldType = schema.getType(i); + if (fieldType.isPrimitive()) { + System.out.println(fieldType.asPrimitiveType().getPrimitiveTypeName()); + } + + } }); } }
