swamirishi commented on a change in pull request #536:
URL: https://github.com/apache/incubator-sedona/pull/536#discussion_r695177738
##########
File path:
core/src/main/java/org/apache/sedona/core/formatMapper/parquet/ParquetFormatMapper.java
##########
@@ -0,0 +1,215 @@
+package org.apache.sedona.core.formatMapper.parquet;
+
+import org.apache.avro.generic.GenericArray;
+import org.apache.avro.generic.GenericContainer;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.log4j.Logger;
+import org.apache.sedona.core.enums.GeometryType;
+import org.apache.sedona.core.formatMapper.FormatMapper;
+import org.apache.sedona.core.geometryObjects.Circle;
+import org.apache.sedona.core.geometryObjects.schema.CircleSchema;
+import org.apache.sedona.core.geometryObjects.schema.CoordinateSchema;
+import org.apache.sedona.core.geometryObjects.schema.PolygonSchema;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.zookeeper.Op;
+import org.locationtech.jts.geom.*;
+
+import java.io.Serializable;
+import java.util.*;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.StreamSupport;
+
+/**
+ * ParquetFormatMapper class
+ * Responsible for converting Avro-Parquet Records into Geometry Objects
+ * @param <T> Geometry Class Type
+ */
+public class ParquetFormatMapper<T extends Geometry> implements Serializable,
FlatMapFunction<Iterator<GenericRecord>, T> {
+ final static Logger logger = Logger.getLogger(FormatMapper.class);
+ protected GeometryFactory factory = new GeometryFactory();
+ private GeometryType geometryType;
+ private final String geometryColumn;
+ private final List<String> userColumns;
+
+ /**
+ *
+ * @param geometryType
+ * @param geometryColumn
+ * @param userColumns
+ */
+ public ParquetFormatMapper(GeometryType geometryType, String
geometryColumn, List<String> userColumns) {
+ this.geometryType = geometryType;
+ this.geometryColumn = geometryColumn;
+ this.userColumns = userColumns;
+ }
+
+ /**
+ * Gets Coordinate from Avro Record
+ * @param record Avro Record
+ * @return Coordinate
+ */
+ private static Coordinate getCoordinate(GenericRecord record) {
+ Double x = (Double) record.get(CoordinateSchema.X_COORDINATE);
+ Double y = (Double) record.get(CoordinateSchema.Y_COORDINATE);
+ return new Coordinate(x, y);
+ }
+
+ /**
+ * Gets Coordinate from Avro based Record
+ * @param val
+ * @return Coordinate
+ */
+ private static Coordinate getCoordinate(GenericContainer val) {
+ if (val instanceof GenericRecord) {
+ return getCoordinate((GenericRecord) val);
+ }
+ return getCoordinate((GenericArray) val);
+ }
+ /**
+ * Gets Coordinate from Avro based Array based on the first index
+ * @param array
+ * @return Coordinate
+ */
+ private static Coordinate getCoordinate(GenericArray array) {
+ return getCoordinate((GenericRecord) array.get(0));
+ }
+
+ /**
+ * Gets Coordinate Array from Avro Record Array
+ * @param array
+ * @return Coordinate Array
+ */
+ private static Coordinate[] getCoordinates(GenericArray array) {
+ Coordinate[] coordinates = new Coordinate[array.size()];
+ for(int i=0;i<array.size();i++){
+ coordinates[i] = getCoordinate((GenericRecord) array.get(i));
+ }
+ return coordinates;
+ }
+
+ /**
+ * Gets Point Geometry from Coordinate
+ * @param coordinate
+ * @return Point
+ */
+ private Point getPoint(Coordinate coordinate) {
+ return factory.createPoint(coordinate);
+ }
+
+ /**
+ * Gets Circle Geometry Object from Avro Record
+ * @param record
+ * @return Circle
+ */
+ private Circle getCircle(GenericRecord record) {
+ Coordinate center = getCoordinate((GenericRecord)
record.get(CircleSchema.CENTER));
+ Double radius = (Double) record.get(CircleSchema.RADIUS);
+ return new Circle(getPoint(center), radius);
+ }
+ /**
+ * Gets Polygon Geometry Object from Avro Record
+ * @param record
+ * @return Polygon
+ */
+ private Polygon getPolygon(GenericRecord record) {
+ LinearRing exteriorRing =
+ factory.createLinearRing(getCoordinates((GenericArray)
record.get(PolygonSchema.EXTERIOR_RING)));
+ Optional<GenericArray> holes = Optional.ofNullable((GenericArray)
record.get(PolygonSchema.HOLES));
+ List<LinearRing> interiorRings = holes.map(arr->IntStream.range(0,
arr.size())
+ .mapToObj(arr::get)
+ .map(hole ->
getCoordinates((GenericArray) (hole)))
+ .map(c ->
factory.createLinearRing(c))
+
.collect(Collectors.toList())).orElse(Collections.EMPTY_LIST);
+
+ return factory.createPolygon(exteriorRing, (LinearRing[])
interiorRings.toArray(new LinearRing[interiorRings.size()]));
+ }
+
+ /**
+ * Gets Geometry Object of Geometry Type from Coordinate Array
+ * @param coordinates CoordinateArray
+ * @param type geometryType
+ * @return Geometry Object
+ */
+ private Geometry getGeometry(Coordinate[] coordinates, GeometryType type) {
+ Geometry geometry = null;
+ switch (type) {
+ case POINT:
+ geometry = factory.createPoint(coordinates[0]);
+ break;
+ case POLYGON:
+ geometry = factory.createPolygon(coordinates);
Review comment:
This is a defailt implementation wherein a user might just want convert
an array of points into a closed polygon without holes. This is similar to
supporting Offsets and number of columns length in text format for geometry
column.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]