This is an automated email from the ASF dual-hosted git repository.
bchapuis pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-baremaps.git
The following commit(s) were added to refs/heads/main by this push:
new db7ef92f Code and sonar cleanups (#858)
db7ef92f is described below
commit db7ef92f2a3948cda7bf232a7a6b24adde05b1c1
Author: sebr72 <[email protected]>
AuthorDate: Fri May 31 10:29:58 2024 +0200
Code and sonar cleanups (#858)
* Move duplicated code to superclass
* Correctly close the reader
* Remove unnecessary constructor
* End conversion implementation.
* Use GeoParquet's Runtime Exception
* Remove, reorder and refactor code
* Avoid using restricted identifier
* Add new assertion
---
.../baremaps/geoparquet/GeoParquetReader.java | 64 ++++++++++++----------
.../GroupWriter.java} | 13 ++---
.../geoparquet/data/GeoParquetColumnMetadata.java | 2 -
.../geoparquet/data/GeoParquetGroupConverter.java | 6 +-
.../geoparquet/data/GeoParquetGroupImpl.java | 46 +++++++++-------
.../geoparquet/data/GeoParquetGroupWriter.java | 39 +------------
.../geoparquet/data/GeoParquetMetadata.java | 5 +-
.../baremaps/geoparquet/data/Int96Value.java | 2 +-
.../apache/baremaps/geoparquet/data/Primitive.java | 3 +-
.../hadoop/GeoParquetGroupReadSupport.java | 6 +-
.../hadoop/GeoParquetGroupWriteSupport.java | 10 ++--
.../geoparquet/hadoop/GeoParquetGroupWriter.java | 39 +------------
.../baremaps/geoparquet/GeoParquetReaderTest.java | 37 +++++++++++--
13 files changed, 121 insertions(+), 151 deletions(-)
diff --git
a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetReader.java
b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetReader.java
index ee8a7da5..4e760d8b 100644
---
a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetReader.java
+++
b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetReader.java
@@ -91,7 +91,7 @@ public class GeoParquetReader {
return files().values().stream().map(FileInfo::recordCount).reduce(0L,
Long::sum);
}
- private Map<FileStatus, FileInfo> files() throws URISyntaxException {
+ private synchronized Map<FileStatus, FileInfo> files() throws
URISyntaxException {
try {
if (files == null) {
files = new HashMap<>();
@@ -101,22 +101,7 @@ public class GeoParquetReader {
// Iterate over all the files in the path
for (FileStatus file : fileSystem.globStatus(globPath)) {
- ParquetFileReader reader = ParquetFileReader.open(configuration,
file.getPath());
- Long recordCount = reader.getRecordCount();
- MessageType messageType = reader.getFileMetaData().getSchema();
- Map<String, String> keyValueMetadata =
reader.getFileMetaData().getKeyValueMetaData();
- GeoParquetMetadata geoParquetMetadata = null;
- GeoParquetGroup.Schema geoParquetSchema = null;
- if (keyValueMetadata.containsKey("geo")) {
- ObjectMapper objectMapper = new ObjectMapper();
-
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
false);
- geoParquetMetadata =
- objectMapper.readValue(keyValueMetadata.get("geo"),
GeoParquetMetadata.class);
- geoParquetSchema =
- GeoParquetGroupFactory.createGeoParquetSchema(messageType,
geoParquetMetadata);
- }
- files.put(file, new FileInfo(file, recordCount, keyValueMetadata,
messageType,
- geoParquetMetadata, geoParquetSchema));
+ files.put(file, buildFileInfo(file));
}
// Verify that the files all have the same schema
@@ -135,6 +120,29 @@ public class GeoParquetReader {
return files;
}
+ private FileInfo buildFileInfo(FileStatus file) throws IOException {
+ long recordCount;
+ MessageType messageType;
+ Map<String, String> keyValueMetadata;
+ try (ParquetFileReader reader = ParquetFileReader.open(configuration,
file.getPath())) {
+ recordCount = reader.getRecordCount();
+ messageType = reader.getFileMetaData().getSchema();
+ keyValueMetadata = reader.getFileMetaData().getKeyValueMetaData();
+ }
+ GeoParquetMetadata geoParquetMetadata = null;
+ Schema geoParquetSchema = null;
+ if (keyValueMetadata.containsKey("geo")) {
+ ObjectMapper objectMapper = new ObjectMapper();
+
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
false);
+ geoParquetMetadata =
+ objectMapper.readValue(keyValueMetadata.get("geo"),
GeoParquetMetadata.class);
+ geoParquetSchema =
+ GeoParquetGroupFactory.createGeoParquetSchema(messageType,
geoParquetMetadata);
+ }
+ return new FileInfo(file, recordCount, keyValueMetadata, messageType,
+ geoParquetMetadata, geoParquetSchema);
+ }
+
public Stream<GeoParquetGroup> readParallel() throws URISyntaxException {
return StreamSupport.stream(
new GeoParquetGroupSpliterator(files()),
@@ -150,7 +158,7 @@ public class GeoParquetReader {
private final Queue<FileStatus> queue;
private final Map<FileStatus, FileInfo> files;
- private FileStatus file;
+ private FileStatus fileStatus;
private ParquetReader<GeoParquetGroup> reader;
@@ -164,18 +172,18 @@ public class GeoParquetReader {
public boolean tryAdvance(Consumer<? super GeoParquetGroup> action) {
try {
// Poll the next file
- if (file == null) {
- file = queue.poll();
+ if (fileStatus == null) {
+ fileStatus = queue.poll();
}
// If there are no more files, return false
- if (file == null) {
+ if (fileStatus == null) {
return false;
}
// Create a new reader if it does not exist
if (reader == null) {
- reader = createParquetReader(file);
+ reader = createParquetReader(fileStatus);
}
// Read the next group
@@ -185,7 +193,7 @@ public class GeoParquetReader {
if (group == null) {
reader.close();
reader = null;
- file = null;
+ fileStatus = null;
// Try to advance again
return tryAdvance(action);
@@ -218,16 +226,16 @@ public class GeoParquetReader {
@Override
public Spliterator<GeoParquetGroup> trySplit() {
- // Create a new spliterator by polling the next file
- FileStatus file = queue.poll();
+ // Create a new spliterator by polling the next polledFileStatus
+ FileStatus polledFileStatus = queue.poll();
// If there are no more files, tell the caller that there is nothing to
split anymore
- if (file == null) {
+ if (polledFileStatus == null) {
return null;
}
- // Return a new spliterator with the file
- return new GeoParquetGroupSpliterator(Map.of(file, files.get(file)));
+ // Return a new spliterator with the polledFileStatus
+ return new GeoParquetGroupSpliterator(Map.of(polledFileStatus,
files.get(polledFileStatus)));
}
@Override
diff --git
a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/hadoop/GeoParquetGroupWriter.java
b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/common/GroupWriter.java
similarity index 87%
copy from
baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/hadoop/GeoParquetGroupWriter.java
copy to
baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/common/GroupWriter.java
index 8c0afd65..09fc039b 100644
---
a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/hadoop/GeoParquetGroupWriter.java
+++
b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/common/GroupWriter.java
@@ -15,24 +15,23 @@
* limitations under the License.
*/
-package org.apache.baremaps.geoparquet.hadoop;
+package org.apache.baremaps.geoparquet.common;
import org.apache.baremaps.geoparquet.data.GeoParquetGroupImpl;
import org.apache.parquet.io.api.RecordConsumer;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.Type;
-public class GeoParquetGroupWriter {
+public abstract class GroupWriter {
+ protected final RecordConsumer recordConsumer;
+ protected final GroupType schema;
- private final RecordConsumer recordConsumer;
- private final GroupType schema;
-
- public GeoParquetGroupWriter(RecordConsumer recordConsumer, GroupType
schema) {
+ protected GroupWriter(RecordConsumer recordConsumer, GroupType schema) {
this.recordConsumer = recordConsumer;
this.schema = schema;
}
- public void write(GeoParquetGroupImpl group) {
+ public final void write(GeoParquetGroupImpl group) {
recordConsumer.startMessage();
writeGroup(group, schema);
recordConsumer.endMessage();
diff --git
a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetColumnMetadata.java
b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetColumnMetadata.java
index 223ebd79..1638d287 100644
---
a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetColumnMetadata.java
+++
b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetColumnMetadata.java
@@ -42,8 +42,6 @@ public class GeoParquetColumnMetadata {
@JsonProperty("bbox")
private Double[] bbox;
- public GeoParquetColumnMetadata() {}
-
public String getEncoding() {
return encoding;
}
diff --git
a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupConverter.java
b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupConverter.java
index c2a04f4d..6649aebd 100644
---
a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupConverter.java
+++
b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupConverter.java
@@ -27,7 +27,7 @@ class GeoParquetGroupConverter extends GroupConverter {
private final GeoParquetGroupConverter parent;
private final int index;
protected GeoParquetGroupImpl current;
- private Converter[] converters;
+ private final Converter[] converters;
GeoParquetGroupConverter(GeoParquetGroupConverter parent, int index,
GroupType schema) {
@@ -58,7 +58,9 @@ class GeoParquetGroupConverter extends GroupConverter {
}
@Override
- public void end() {}
+ public void end() {
+ current = null;
+ }
public GeoParquetGroupImpl getCurrentRecord() {
return current;
diff --git
a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupImpl.java
b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupImpl.java
index f00b2533..2cd49058 100644
---
a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupImpl.java
+++
b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupImpl.java
@@ -19,6 +19,7 @@ package org.apache.baremaps.geoparquet.data;
import java.util.ArrayList;
import java.util.List;
+import org.apache.baremaps.geoparquet.GeoParquetException;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.io.api.RecordConsumer;
import org.apache.parquet.schema.GroupType;
@@ -37,7 +38,6 @@ public class GeoParquetGroupImpl implements GeoParquetGroup {
private final List<?>[] data;
- @SuppressWarnings("unchecked")
public GeoParquetGroupImpl(GroupType schema, GeoParquetMetadata metadata,
Schema geoParquetSchema) {
this.schema = schema;
@@ -115,31 +115,38 @@ public class GeoParquetGroupImpl implements
GeoParquetGroup {
try {
return new WKBReader().read(bytes);
} catch (ParseException e) {
- throw new RuntimeException(e);
+ throw new GeoParquetException("WKBReader failed to parse", e);
}
}
private Object getValue(int fieldIndex, int index) {
- List<?> list;
- try {
- list = data[fieldIndex];
- } catch (IndexOutOfBoundsException e) {
- throw new RuntimeException(
- "not found " + fieldIndex + "(" + schema.getFieldName(fieldIndex)
- + ") in group:\n" + this);
- }
+ List<?> list = getObjects(fieldIndex);
try {
return list.get(index);
} catch (IndexOutOfBoundsException e) {
- throw new RuntimeException(
- "not found " + fieldIndex + "(" + schema.getFieldName(fieldIndex)
- + ") element number " + index + " in group:\n" + this);
+ String elementText = String.format(" element number %d ", index);
+ throw createGeoParquetException(fieldIndex, elementText);
}
}
+ private List<?> getObjects(int fieldIndex) {
+ List<?> list;
+ if (fieldIndex < 0 || fieldIndex >= data.length) {
+ throw createGeoParquetException(fieldIndex, "");
+ }
+ list = data[fieldIndex];
+ return list;
+ }
+
+ private GeoParquetException createGeoParquetException(int fieldIndex, String
elementText) {
+ String msg = String.format("Not found %d (%s)%s in group%n%s", fieldIndex,
+ schema.getFieldName(fieldIndex), elementText, this);
+ return new GeoParquetException(msg);
+ }
+
private void add(int fieldIndex, Primitive value) {
org.apache.parquet.schema.Type type = schema.getType(fieldIndex);
- List list = data[fieldIndex];
+ List list = getObjects(fieldIndex);
if (!type.isRepetition(org.apache.parquet.schema.Type.Repetition.REPEATED)
&& !list.isEmpty()) {
throw new IllegalStateException("field " + fieldIndex + " (" +
type.getName()
@@ -166,8 +173,7 @@ public class GeoParquetGroupImpl implements GeoParquetGroup
{
public void add(int fieldIndex, Binary value) {
switch
(getParquetSchema().getType(fieldIndex).asPrimitiveType().getPrimitiveTypeName())
{
- case BINARY:
- case FIXED_LEN_BYTE_ARRAY:
+ case BINARY, FIXED_LEN_BYTE_ARRAY:
add(fieldIndex, new BinaryValue(value));
break;
case INT96:
@@ -260,11 +266,11 @@ public class GeoParquetGroupImpl implements
GeoParquetGroup {
builder.append(indent).append(name);
if (value == null) {
builder.append(": NULL\n");
- } else if (value instanceof GeoParquetGroupImpl) {
+ } else if (value instanceof GeoParquetGroupImpl geoParquetGroupImpl)
{
builder.append('\n');
- ((GeoParquetGroupImpl) value).appendToString(builder, indent + "
");
+ geoParquetGroupImpl.appendToString(builder, indent + " ");
} else {
- builder.append(": ").append(value.toString()).append('\n');
+ builder.append(": ").append(value).append('\n');
}
}
}
@@ -402,7 +408,7 @@ public class GeoParquetGroupImpl implements GeoParquetGroup
{
try {
geometries.add(new WKBReader().read(binary.getBytes()));
} catch (ParseException e) {
- throw new RuntimeException(e);
+ throw new GeoParquetException("WKBReader failed to parse.", e);
}
}
return geometries;
diff --git
a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupWriter.java
b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupWriter.java
index b114236a..911a3dda 100644
---
a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupWriter.java
+++
b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetGroupWriter.java
@@ -17,45 +17,12 @@
package org.apache.baremaps.geoparquet.data;
+import org.apache.baremaps.geoparquet.common.GroupWriter;
import org.apache.parquet.io.api.RecordConsumer;
import org.apache.parquet.schema.GroupType;
-import org.apache.parquet.schema.Type;
-
-public class GeoParquetGroupWriter {
-
- private final RecordConsumer recordConsumer;
- private final GroupType schema;
+public class GeoParquetGroupWriter extends GroupWriter {
public GeoParquetGroupWriter(RecordConsumer recordConsumer, GroupType
schema) {
- this.recordConsumer = recordConsumer;
- this.schema = schema;
- }
-
- public void write(GeoParquetGroupImpl group) {
- recordConsumer.startMessage();
- writeGroup(group, schema);
- recordConsumer.endMessage();
- }
-
- private void writeGroup(GeoParquetGroupImpl group, GroupType type) {
- int fieldCount = type.getFieldCount();
- for (int field = 0; field < fieldCount; ++field) {
- int valueCount = group.getFieldRepetitionCount(field);
- if (valueCount > 0) {
- Type fieldType = type.getType(field);
- String fieldName = fieldType.getName();
- recordConsumer.startField(fieldName, field);
- for (int index = 0; index < valueCount; ++index) {
- if (fieldType.isPrimitive()) {
- group.writeValue(field, index, recordConsumer);
- } else {
- recordConsumer.startGroup();
- writeGroup(group.getGroup(field, index), fieldType.asGroupType());
- recordConsumer.endGroup();
- }
- }
- recordConsumer.endField(fieldName, field);
- }
- }
+ super(recordConsumer, schema);
}
}
diff --git
a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetMetadata.java
b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetMetadata.java
index eccbedc6..43c9f032 100644
---
a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetMetadata.java
+++
b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/GeoParquetMetadata.java
@@ -33,8 +33,6 @@ public class GeoParquetMetadata {
@JsonProperty("columns")
private Map<String, GeoParquetColumnMetadata> columns;
- public GeoParquetMetadata() {}
-
public String getVersion() {
return version;
}
@@ -61,7 +59,7 @@ public class GeoParquetMetadata {
public int getSrid(String column) {
JsonNode crsId = getColumns().get(column).getCrs().get("id");
- int srid = switch (crsId.get("authority").asText()) {
+ return switch (crsId.get("authority").asText()) {
case "OGC" -> switch (crsId.get("code").asText()) {
case "CRS84" -> 4326;
default -> 0;
@@ -69,7 +67,6 @@ public class GeoParquetMetadata {
case "EPSG" -> crsId.get("code").asInt();
default -> 0;
};
- return srid;
}
public boolean isGeometryColumn(String column) {
diff --git
a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/Int96Value.java
b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/Int96Value.java
index d29588ec..33576e5c 100644
---
a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/Int96Value.java
+++
b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/Int96Value.java
@@ -40,6 +40,6 @@ public class Int96Value extends Primitive {
@Override
public String toString() {
- return "Int96Value{" + String.valueOf(value) + "}";
+ return "Int96Value{" + value + "}";
}
}
diff --git
a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/Primitive.java
b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/Primitive.java
index d6fdb9ec..79bd9517 100644
---
a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/Primitive.java
+++
b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/data/Primitive.java
@@ -59,6 +59,5 @@ public abstract class Primitive {
throw new UnsupportedOperationException();
}
- abstract public void writeValue(RecordConsumer recordConsumer);
-
+ public abstract void writeValue(RecordConsumer recordConsumer);
}
diff --git
a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/hadoop/GeoParquetGroupReadSupport.java
b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/hadoop/GeoParquetGroupReadSupport.java
index 51b575f8..21dfc376 100644
---
a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/hadoop/GeoParquetGroupReadSupport.java
+++
b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/hadoop/GeoParquetGroupReadSupport.java
@@ -17,9 +17,11 @@
package org.apache.baremaps.geoparquet.hadoop;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Map;
+import org.apache.baremaps.geoparquet.GeoParquetException;
import org.apache.baremaps.geoparquet.data.GeoParquetGroup;
import org.apache.baremaps.geoparquet.data.GeoParquetGroupRecordConverter;
import org.apache.baremaps.geoparquet.data.GeoParquetMetadata;
@@ -51,8 +53,8 @@ public class GeoParquetGroupReadSupport extends
ReadSupport<GeoParquetGroup> {
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
.readValue(json, GeoParquetMetadata.class);
return new
GeoParquetGroupRecordConverter(readContext.getRequestedSchema(), metadata);
- } catch (Exception e) {
- throw new RuntimeException("Failed to read the GeoParquet metadata of
the Parquet file", e);
+ } catch (JsonProcessingException e) {
+ throw new GeoParquetException("Failed to read GeoParquet's metadata of
the Parquet file", e);
}
}
diff --git
a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/hadoop/GeoParquetGroupWriteSupport.java
b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/hadoop/GeoParquetGroupWriteSupport.java
index 2ee7ad63..b28607d6 100644
---
a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/hadoop/GeoParquetGroupWriteSupport.java
+++
b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/hadoop/GeoParquetGroupWriteSupport.java
@@ -43,14 +43,14 @@ public class GeoParquetGroupWriteSupport extends
WriteSupport<GeoParquetGroupImp
private MessageType schema;
private GeoParquetGroupWriter groupWriter;
- private Map<String, String> extraMetaData;
+ private final Map<String, String> extraMetaData;
public GeoParquetGroupWriteSupport() {
- this(null, new HashMap<String, String>());
+ this(null, new HashMap<>());
}
GeoParquetGroupWriteSupport(MessageType schema) {
- this(schema, new HashMap<String, String>());
+ this(schema, new HashMap<>());
}
GeoParquetGroupWriteSupport(MessageType schema, Map<String, String>
extraMetaData) {
@@ -78,8 +78,8 @@ public class GeoParquetGroupWriteSupport extends
WriteSupport<GeoParquetGroupImp
}
@Override
- public void write(GeoParquetGroupImpl record) {
- groupWriter.write(record);
+ public void write(GeoParquetGroupImpl group) {
+ groupWriter.write(group);
}
}
diff --git
a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/hadoop/GeoParquetGroupWriter.java
b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/hadoop/GeoParquetGroupWriter.java
index 8c0afd65..6e579ca9 100644
---
a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/hadoop/GeoParquetGroupWriter.java
+++
b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/hadoop/GeoParquetGroupWriter.java
@@ -17,46 +17,13 @@
package org.apache.baremaps.geoparquet.hadoop;
-import org.apache.baremaps.geoparquet.data.GeoParquetGroupImpl;
+import org.apache.baremaps.geoparquet.common.GroupWriter;
import org.apache.parquet.io.api.RecordConsumer;
import org.apache.parquet.schema.GroupType;
-import org.apache.parquet.schema.Type;
-public class GeoParquetGroupWriter {
-
- private final RecordConsumer recordConsumer;
- private final GroupType schema;
+public class GeoParquetGroupWriter extends GroupWriter {
public GeoParquetGroupWriter(RecordConsumer recordConsumer, GroupType
schema) {
- this.recordConsumer = recordConsumer;
- this.schema = schema;
- }
-
- public void write(GeoParquetGroupImpl group) {
- recordConsumer.startMessage();
- writeGroup(group, schema);
- recordConsumer.endMessage();
- }
-
- private void writeGroup(GeoParquetGroupImpl group, GroupType type) {
- int fieldCount = type.getFieldCount();
- for (int field = 0; field < fieldCount; ++field) {
- int valueCount = group.getFieldRepetitionCount(field);
- if (valueCount > 0) {
- Type fieldType = type.getType(field);
- String fieldName = fieldType.getName();
- recordConsumer.startField(fieldName, field);
- for (int index = 0; index < valueCount; ++index) {
- if (fieldType.isPrimitive()) {
- group.writeValue(field, index, recordConsumer);
- } else {
- recordConsumer.startGroup();
- writeGroup(group.getGroup(field, index), fieldType.asGroupType());
- recordConsumer.endGroup();
- }
- }
- recordConsumer.endField(fieldName, field);
- }
- }
+ super(recordConsumer, schema);
}
}
diff --git
a/baremaps-geoparquet/src/test/java/org/apache/baremaps/geoparquet/GeoParquetReaderTest.java
b/baremaps-geoparquet/src/test/java/org/apache/baremaps/geoparquet/GeoParquetReaderTest.java
index 2039937e..bd5dfa8f 100644
---
a/baremaps-geoparquet/src/test/java/org/apache/baremaps/geoparquet/GeoParquetReaderTest.java
+++
b/baremaps-geoparquet/src/test/java/org/apache/baremaps/geoparquet/GeoParquetReaderTest.java
@@ -17,26 +17,51 @@
package org.apache.baremaps.geoparquet;
+import static org.junit.jupiter.api.Assertions.*;
+
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.baremaps.testing.TestFiles;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
class GeoParquetReaderTest {
@Test
void read() throws IOException, URISyntaxException {
- // URI geoParquet = new
- //
URI("s3a://overturemaps-us-west-2/release/2024-03-12-alpha.0/theme=admins/type=locality_area/*.parquet");
URI geoParquet = TestFiles.GEOPARQUET.toUri();
+ final boolean isPrintingContent = true;
+ final int expectedGroupCount = 5;
+
+ readGroups(geoParquet, isPrintingContent, expectedGroupCount);
+ }
+
+ @Disabled("Takes too long. Around 10 minutes.")
+ @Test
+ void readExternal() throws IOException, URISyntaxException {
+ URI geoParquet = new URI(
+
"s3a://overturemaps-us-west-2/release/2024-03-12-alpha.0/theme=admins/type=locality_area/*.parquet");
+ final boolean isPrintingContent = false;
+ final int expectedGroupCount = 974708;
+ readGroups(geoParquet, isPrintingContent, expectedGroupCount);
+ }
+
+ private static void readGroups(URI geoParquet, boolean isPrintingContent,
int expectedGroupCount)
+ throws IOException, URISyntaxException {
GeoParquetReader geoParquetReader = new GeoParquetReader(geoParquet);
+ final AtomicInteger groupCount = new AtomicInteger();
geoParquetReader.read().forEach(group -> {
- System.out.println("-----");
- System.out.println(group.getSchema());
- System.out.println(group.getGeometryValue("geometry"));
-
+ groupCount.getAndIncrement();
+ if (isPrintingContent) {
+ System.out.println("-----");
+ System.out.println(group.getSchema());
+ System.out.println(group.getGeometryValue("geometry"));
+ }
});
+
+ assertEquals(expectedGroupCount, groupCount.get());
}
}