Imbruced commented on a change in pull request #536:
URL: https://github.com/apache/incubator-sedona/pull/536#discussion_r691363414



##########
File path: pom.xml
##########
@@ -171,6 +209,12 @@
             <artifactId>gt-coverage</artifactId>
             <version>${geotools.version}</version>
             <scope>${dependency.scope}</scope>
+            <exclusions>

Review comment:
       guava ;) always causes issue with dependencies.

##########
File path: 
core/src/main/java/org/apache/sedona/core/geometryObjects/schema/PolygonSchema.java
##########
@@ -0,0 +1,37 @@
+package org.apache.sedona.core.geometryObjects.schema;
+
+import com.google.common.collect.Lists;
+import org.apache.sedona.core.io.avro.constants.AvroConstants;
+import org.apache.sedona.core.io.avro.schema.*;
+import org.apache.sedona.core.io.avro.utils.AvroUtils;
+import org.apache.sedona.core.utils.SedonaUtils;
+
+/**
+ * Polygon Schema class representing AvroSchema of Polygon Geometry
+ */
+public class PolygonSchema extends RecordSchema {
+    public static final String POLYGON = "polygon";
+    public static final String EXTERIOR_RING = "ex";
+    public static final String HOLES = "holes";
+    
+    private PolygonSchema() {
+        super(AvroConstants.SEDONA_NAMESPACE,POLYGON,
+              Lists.newArrayList(new Field(EXTERIOR_RING, 
CoordinateArraySchema.getSchema()),
+                                 new Field(HOLES, new UnionSchema(
+                                         new 
ArraySchema(CoordinateArraySchema.getSchema()),
+                                         new 
SimpleSchema(AvroConstants.PrimitiveDataType.NULL)))));

Review comment:
       is better approach to save null or empty list ? WDYT ?

##########
File path: pom.xml
##########
@@ -351,6 +429,25 @@
                     </execution>
                 </executions>
             </plugin>
+<!--            <plugin>-->

Review comment:
       Can we remove commented blocks ? 

##########
File path: 
core/src/main/java/org/apache/sedona/core/formatMapper/parquet/ParquetFormatMapper.java
##########
@@ -0,0 +1,215 @@
+package org.apache.sedona.core.formatMapper.parquet;
+
+import org.apache.avro.generic.GenericArray;
+import org.apache.avro.generic.GenericContainer;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.log4j.Logger;
+import org.apache.sedona.core.enums.GeometryType;
+import org.apache.sedona.core.formatMapper.FormatMapper;
+import org.apache.sedona.core.geometryObjects.Circle;
+import org.apache.sedona.core.geometryObjects.schema.CircleSchema;
+import org.apache.sedona.core.geometryObjects.schema.CoordinateSchema;
+import org.apache.sedona.core.geometryObjects.schema.PolygonSchema;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.zookeeper.Op;
+import org.locationtech.jts.geom.*;
+
+import java.io.Serializable;
+import java.util.*;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.StreamSupport;
+
+/**
+ * ParquetFormatMapper class
+ * Responsible for converting Avro-Parquet Records into Geometry Objects
+ * @param <T> Geometry Class Type
+ */
+public class ParquetFormatMapper<T extends Geometry> implements Serializable, 
FlatMapFunction<Iterator<GenericRecord>, T> {
+    final static Logger logger = Logger.getLogger(FormatMapper.class);
+    protected GeometryFactory factory = new GeometryFactory();
+    private GeometryType geometryType;
+    private final String geometryColumn;
+    private final List<String> userColumns;
+    
+    /**
+     *
+     * @param geometryType
+     * @param geometryColumn
+     * @param userColumns
+     */
+    public ParquetFormatMapper(GeometryType geometryType, String 
geometryColumn, List<String> userColumns) {
+        this.geometryType = geometryType;
+        this.geometryColumn = geometryColumn;
+        this.userColumns = userColumns;
+    }
+    
+    /**
+     * Gets Coordinate from Avro Record
+     * @param record Avro Record
+     * @return Coordinate
+     */
+    private static Coordinate getCoordinate(GenericRecord record) {
+        Double x = (Double) record.get(CoordinateSchema.X_COORDINATE);
+        Double y = (Double) record.get(CoordinateSchema.Y_COORDINATE);
+        return new Coordinate(x, y);
+    }
+    
+    /**
+     * Gets Coordinate from Avro based Record
+     * @param val
+     * @return Coordinate
+     */
+    private static Coordinate getCoordinate(GenericContainer val) {
+        if (val instanceof GenericRecord) {
+            return getCoordinate((GenericRecord) val);
+        }
+        return getCoordinate((GenericArray) val);
+    }
+    /**
+     * Gets Coordinate from Avro based Array based on the first index
+     * @param array
+     * @return Coordinate
+     */
+    private static Coordinate getCoordinate(GenericArray array) {
+        return getCoordinate((GenericRecord) array.get(0));
+    }
+    
+    /**
+     * Gets Coordinate Array from Avro Record Array
+     * @param array
+     * @return Coordinate Array
+     */
+    private static Coordinate[] getCoordinates(GenericArray array) {
+        Coordinate[] coordinates = new Coordinate[array.size()];
+        for(int i=0;i<array.size();i++){
+            coordinates[i] = getCoordinate((GenericRecord) array.get(i));
+        }
+        return coordinates;
+    }
+    
+    /**
+     * Gets Point Geometry from Coordinate
+     * @param coordinate
+     * @return Point
+     */
+    private Point getPoint(Coordinate coordinate) {
+        return factory.createPoint(coordinate);
+    }
+    
+    /**
+     * Gets Circle Geometry Object from Avro Record
+     * @param record
+     * @return Circle
+     */
+    private Circle getCircle(GenericRecord record) {
+        Coordinate center = getCoordinate((GenericRecord) 
record.get(CircleSchema.CENTER));
+        Double radius = (Double) record.get(CircleSchema.RADIUS);
+        return new Circle(getPoint(center), radius);
+    }
+    /**
+     * Gets Polygon Geometry Object from Avro Record
+     * @param record
+     * @return Polygon
+     */
+    private Polygon getPolygon(GenericRecord record) {
+        LinearRing exteriorRing =
+                factory.createLinearRing(getCoordinates((GenericArray) 
record.get(PolygonSchema.EXTERIOR_RING)));
+        Optional<GenericArray> holes =  Optional.ofNullable((GenericArray) 
record.get(PolygonSchema.HOLES));
+        List<LinearRing> interiorRings = holes.map(arr->IntStream.range(0, 
arr.size())
+                                                            .mapToObj(arr::get)
+                                                            .map(hole -> 
getCoordinates((GenericArray) (hole)))
+                                                            .map(c -> 
factory.createLinearRing(c))
+                                                            
.collect(Collectors.toList())).orElse(Collections.EMPTY_LIST);
+        
+        return factory.createPolygon(exteriorRing, (LinearRing[]) 
interiorRings.toArray(new LinearRing[interiorRings.size()]));
+    }
+    
+    /**
+     * Gets Geometry Object of Geometry Type from Coordinate Array
+     * @param coordinates CoordinateArray
+     * @param type geometryType
+     * @return Geometry Object
+     */
+    private Geometry getGeometry(Coordinate[] coordinates, GeometryType type) {
+        Geometry geometry = null;
+        switch (type) {
+            case POINT:
+                geometry = factory.createPoint(coordinates[0]);
+                break;
+            case POLYGON:
+                geometry = factory.createPolygon(coordinates);

Review comment:
       what if polygon has holes, does that change something ? 

##########
File path: 
core/src/main/java/org/apache/sedona/core/io/parquet/ParquetFileReader.java
##########
@@ -0,0 +1,60 @@
+package org.apache.sedona.core.io.parquet;
+
+import com.google.common.collect.Lists;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.parquet.avro.AvroParquetInputFormat;
+import org.apache.parquet.cli.util.Expressions;
+import org.apache.parquet.cli.util.Schemas;
+import org.apache.parquet.hadoop.ParquetInputFormat;
+import org.apache.sedona.core.constants.SedonaConstants;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import scala.Tuple2;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+public class ParquetFileReader {
+    /**
+     * Reads Parquet File with given geometry Column and the relevant User 
columns
+     * @param sc
+     * @param geometryColumn
+     * @param userColumns
+     * @param inputPaths
+     * @return Avro Record RDD which needs to be deserialized into a 
GeometryRDD
+     * @throws IOException
+     */
+    public static JavaRDD<GenericRecord> readFile(JavaSparkContext sc,
+                                                  String geometryColumn,
+                                                  List<String> userColumns,
+                                                  String... inputPaths) throws 
IOException {
+        final Job job = Job.getInstance(sc.hadoopConfiguration());

Review comment:
       is that mean that current solution works only in hadoop ecosystem ?

##########
File path: 
core/src/main/java/org/apache/sedona/core/formatMapper/ParquetReader.java
##########
@@ -0,0 +1,38 @@
+package org.apache.sedona.core.formatMapper;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.sedona.core.enums.GeometryType;
+import org.apache.sedona.core.formatMapper.parquet.ParquetFormatMapper;
+import org.apache.sedona.core.geometryObjects.Circle;
+import org.apache.sedona.core.io.parquet.ParquetFileReader;
+import org.apache.sedona.core.spatialRDD.SpatialRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.locationtech.jts.geom.Geometry;
+import org.locationtech.jts.geom.GeometryFactory;
+import org.locationtech.jts.geom.LineString;
+import org.locationtech.jts.geom.Polygon;
+
+import java.io.IOException;
+import java.util.List;
+
+public class ParquetReader extends RddReader {
+    public static <T extends Geometry> SpatialRDD<T> createSpatialRDD(JavaRDD 
rawRDD,
+                                                                      
ParquetFormatMapper<T> formatMapper,
+                                                                      
GeometryType geometryType) {
+        SpatialRDD spatialRDD = new SpatialRDD<T>(geometryType);
+        spatialRDD.rawSpatialRDD = rawRDD.mapPartitions(formatMapper);
+        return spatialRDD;
+    }
+    
+    public static <T extends Geometry> SpatialRDD<T> 
readToGeometryRDD(JavaSparkContext sc,
+                                                                       
List<String> inputPath,

Review comment:
       Is it possible to specify parent path in case of partitioned files ? Can 
we add additional method ? 

##########
File path: 
core/src/main/java/org/apache/sedona/core/formatMapper/ParquetReader.java
##########
@@ -0,0 +1,38 @@
+package org.apache.sedona.core.formatMapper;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.sedona.core.enums.GeometryType;
+import org.apache.sedona.core.formatMapper.parquet.ParquetFormatMapper;
+import org.apache.sedona.core.geometryObjects.Circle;
+import org.apache.sedona.core.io.parquet.ParquetFileReader;
+import org.apache.sedona.core.spatialRDD.SpatialRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.locationtech.jts.geom.Geometry;
+import org.locationtech.jts.geom.GeometryFactory;
+import org.locationtech.jts.geom.LineString;
+import org.locationtech.jts.geom.Polygon;
+
+import java.io.IOException;
+import java.util.List;
+
+public class ParquetReader extends RddReader {
+    public static <T extends Geometry> SpatialRDD<T> createSpatialRDD(JavaRDD 
rawRDD,
+                                                                      
ParquetFormatMapper<T> formatMapper,
+                                                                      
GeometryType geometryType) {
+        SpatialRDD spatialRDD = new SpatialRDD<T>(geometryType);
+        spatialRDD.rawSpatialRDD = rawRDD.mapPartitions(formatMapper);
+        return spatialRDD;
+    }
+    
+    public static <T extends Geometry> SpatialRDD<T> 
readToGeometryRDD(JavaSparkContext sc,

Review comment:
       Is that final API for the user ? Is that possible to read the geometry 
column and its type from parquet metadata ? 
   

##########
File path: 
core/src/test/java/org/apache/sedona/core/io/parquet/ParquetFileReaderTest.java
##########
@@ -0,0 +1,87 @@
+package org.apache.sedona.core.io.parquet;
+
+import com.clearspring.analytics.util.Lists;
+import org.apache.commons.io.FileUtils;
+import org.apache.sedona.core.TestBase;
+import org.apache.sedona.core.enums.FileDataSplitter;
+import org.apache.sedona.core.enums.GeometryType;
+import org.apache.sedona.core.exceptions.SedonaException;
+import org.apache.sedona.core.formatMapper.ParquetReader;
+import org.apache.sedona.core.formatMapper.WktReader;
+import org.apache.sedona.core.formatMapper.WktReaderTest;
+import org.apache.sedona.core.spatialRDD.PointRDD;
+import org.apache.sedona.core.spatialRDD.SpatialRDD;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.locationtech.jts.geom.Point;
+import org.locationtech.jts.util.Assert;
+
+import java.io.IOException;
+import java.nio.file.FileSystem;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class ParquetFileReaderTest extends TestBase {
+    private static String filePath;
+    @BeforeClass
+    public static void onceExecutedBeforeAll()
+            throws IOException
+    {
+        initialize(ParquetFileReaderTest.class.getName());
+        filePath = 
ParquetFileReaderTest.class.getClassLoader().getResource("crs-test-point.csv").getPath();
+    }
+    
+    @AfterClass
+    public static void tearDown()
+            throws Exception
+    {
+        sc.stop();
+    }
+    
+    @Before
+    public void delete() throws IOException {
+        if(testOutputPathDir.exists()){
+            FileUtils.deleteDirectory(testOutputPathDir);
+        }
+    }
+    
+    @Test
+    public void testParquetReader() throws IOException, SedonaException {
+        PointRDD pointRDD = new PointRDD(sc, filePath, 0, 
FileDataSplitter.CSV, false, 1);
+        pointRDD.saveAsParquet(sc,
+                               "p",
+                               Lists.newArrayList(),
+                               testOutputPathDir.getAbsolutePath(),
+                               "test.namespace",
+                               "name");
+        List<Point> l1 = 
pointRDD.getRawSpatialRDD().collect().stream().collect(Collectors.toList());
+        Collections.sort(l1, new Comparator<Point>() {
+            @Override
+            public int compare(Point o1, Point o2) {
+                return o1.compareTo(o2);
+            }
+        });
+        
+        SpatialRDD<Point> pointRDD1 = ParquetReader.readToGeometryRDD(sc,
+                                                                        
Arrays.stream(testOutputPathDir.listFiles())

Review comment:
       is that possible to load parquets from parent directory ex. 
   test
       - *.parquet
       - *.parquet
       - *.parquet
       - *.parquet
   Can we read this like readToGeometryRDD(..., test) instead of listing all 
files ? 

##########
File path: 
core/src/main/java/org/apache/sedona/core/geometryObjects/schema/CoordinateSchema.java
##########
@@ -0,0 +1,39 @@
+package org.apache.sedona.core.geometryObjects.schema;
+
+import com.google.common.collect.Lists;
+import org.apache.sedona.core.io.avro.constants.AvroConstants;
+import org.apache.sedona.core.io.avro.schema.Field;
+import org.apache.sedona.core.io.avro.schema.SimpleSchema;
+import org.apache.sedona.core.io.avro.schema.RecordSchema;
+import org.apache.sedona.core.utils.SedonaUtils;
+
+public class CoordinateSchema extends RecordSchema {
+    public static final String X_COORDINATE = "x";
+    public static final String Y_COORDINATE = "y";
+    public static final String COORDINATE = "coordinate";
+    
+    private CoordinateSchema() {
+        super(AvroConstants.SEDONA_NAMESPACE,COORDINATE,
+              Lists.newArrayList(new Field(X_COORDINATE, new 
SimpleSchema(AvroConstants.PrimitiveDataType.DOUBLE)),
+                                 new Field(Y_COORDINATE, new 
SimpleSchema(AvroConstants.PrimitiveDataType.DOUBLE))));
+    }
+    
+    private static CoordinateSchema schema;
+    
+    public static CoordinateSchema getSchema(){
+        if(SedonaUtils.isNull(schema)){
+            synchronized (CoordinateSchema.class){
+                if(SedonaUtils.isNull(schema)){
+                    schema = new CoordinateSchema();
+                }
+            }
+        }
+        return schema;
+    }
+
+//    public CoordinateSchema(String name, String namespace) {

Review comment:
       can we remove that code ? 

##########
File path: core/src/main/java/org/apache/sedona/core/showcase/Example.java
##########
@@ -52,99 +52,99 @@
 public class Example
         implements Serializable
 {
-
+    
     /**
      * The sc.
      */
     public static JavaSparkContext sc;
-
+    
     /**
      * The geometry factory.
      */
     static GeometryFactory geometryFactory;
-
+    
     /**
      * The Point RDD input location.
      */
     static String PointRDDInputLocation;
-
+    
     /**
      * The Point RDD offset.
      */
     static Integer PointRDDOffset;
-
+    
     /**
      * The Point RDD num partitions.
      */
     static Integer PointRDDNumPartitions;
-
+    
     /**
      * The Point RDD splitter.
      */
     static FileDataSplitter PointRDDSplitter;
-
+    
     /**
      * The Point RDD index type.
      */
     static IndexType PointRDDIndexType;
-
+    

Review comment:
       I dont like that kind of changes ;) 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to