Kontinuation commented on code in PR #2185:
URL: https://github.com/apache/sedona/pull/2185#discussion_r2239249416


##########
common/src/main/java/org/apache/sedona/common/geometryObjects/S2GeographyWrapper.java:
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sedona.common.geometryObjects;
+
+import org.apache.sedona.common.S2Geography.S2Geography;
+import org.locationtech.jts.geom.PrecisionModel;
+
+public abstract class S2GeographyWrapper extends S2Geography {

Review Comment:
   I don't see `S2GeographyWrapper` being used anywhere, can we remove this?



##########
common/src/main/java/org/apache/sedona/common/S2Geography/S2Geography.java:
##########
@@ -140,6 +149,22 @@ public void getCellUnionBound(List<S2CellId> cellIds) {
     region.getCellUnionBound(cellIds);
   }
 
+  @Override
+  public String toString() {
+    return this.toText(new PrecisionModel());
+  }
+
+  public String toString(PrecisionModel precisionModel) {
+    // returns "POINT (x y)"

Review Comment:
   Remove commented code



##########
common/src/main/java/org/apache/sedona/common/S2Constructors.java:
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sedona.common;
+
+import org.apache.sedona.common.S2Geography.*;
+import org.apache.sedona.common.enums.FileDataSplitter;
+import org.apache.sedona.common.utils.S2GeographyFormatUtils;
+import org.locationtech.jts.geom.GeometryFactory;
+import org.locationtech.jts.geom.PrecisionModel;
+import org.locationtech.jts.io.ParseException;
+
+public class S2Constructors {
+
+  public static S2Geography geogFromWKT(String wkt, int srid) throws 
ParseException {
+    if (wkt == null) {
+      return null;
+    }
+    GeometryFactory geometryFactory = new GeometryFactory(new 
PrecisionModel(), srid);
+    return new WKTReader(geometryFactory).read(wkt);
+  }
+
+  public static S2Geography geogFromEWKT(String ewkt) throws ParseException {
+    if (ewkt == null) {
+      return null;
+    }
+    int SRID = 0;
+    String wkt = ewkt;
+
+    int index = ewkt.indexOf("SRID=");
+    if (index != -1) {
+      int semicolonIndex = ewkt.indexOf(';', index);
+      if (semicolonIndex != -1) {
+        SRID = Integer.parseInt(ewkt.substring(index + 5, semicolonIndex));
+        wkt = ewkt.substring(semicolonIndex + 1);
+      } else {
+        throw new ParseException("Invalid EWKT string");
+      }
+    }
+    GeometryFactory geometryFactory = new GeometryFactory(new 
PrecisionModel(), SRID);
+    return new WKTReader(geometryFactory).read(wkt);
+  }
+
+  public static S2Geography geogFromWKB(byte[] wkb) throws ParseException {
+    // return geogFromWKB(wkb, -1);
+    return new WKBReader().read(wkb);
+  }
+
+  public static S2Geography geogFromWKB(byte[] wkb, int SRID) throws 
ParseException {
+    S2Geography geog = new WKBReader().read(wkb);
+    //    if (geog.getFactory().getSRID() != geom.getSRID() || (SRID >= 0 && 
geom.getSRID() !=
+    // SRID)) {
+    //      // Make sure that the geometry and the geometry factory have the 
correct SRID
+    //      if (SRID < 0) {
+    //        SRID = geom.getSRID();
+    //      }
+    //      return Functions.setSRID(geog, SRID);
+    //    } else {
+    return geog;

Review Comment:
   Geography does not have an SRID property for now (it is assumed to be 
OGC84). We can bring it back once we add SRID to S2Geography.



##########
common/src/main/java/org/apache/sedona/common/S2Functions.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sedona.common;
+
+import com.google.common.geometry.S2LatLng;
+import com.google.common.geometry.S2Point;
+import java.util.List;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.sedona.common.S2Geography.Distance;
+import org.apache.sedona.common.S2Geography.PointGeography;
+import org.apache.sedona.common.S2Geography.S2Geography;
+import org.apache.sedona.common.S2Geography.ShapeIndexGeography;
+
+public class S2Functions {
+
+  public static double distance(S2Geography left, S2Geography right) {
+    ShapeIndexGeography indexLeft = new ShapeIndexGeography(left);
+    ShapeIndexGeography indexRight = new ShapeIndexGeography(right);
+    return Distance.S2_distance((ShapeIndexGeography) indexLeft, 
(ShapeIndexGeography) indexRight);
+  }
+
+  public static double maxDistance(S2Geography left, S2Geography right) {
+    ShapeIndexGeography indexLeft = new ShapeIndexGeography(left);
+    ShapeIndexGeography indexRight = new ShapeIndexGeography(right);
+    return Distance.S2_maxDistance(
+        (ShapeIndexGeography) indexLeft, (ShapeIndexGeography) indexRight);
+  }
+
+  public static S2Geography cloestPoint(S2Geography left, S2Geography right) 
throws Exception {
+    ShapeIndexGeography indexLeft = new ShapeIndexGeography(left);
+    ShapeIndexGeography indexRight = new ShapeIndexGeography(right);
+    S2Geography s2Geography =
+        new PointGeography(
+            Distance.S2_closestPoint(
+                (ShapeIndexGeography) indexLeft, (ShapeIndexGeography) 
indexRight));
+    return s2Geography;
+  }
+
+  public static S2Geography minimumClearanceLineBetween(S2Geography left, 
S2Geography right)
+      throws Exception {
+    ShapeIndexGeography indexLeft = new ShapeIndexGeography(left);
+    ShapeIndexGeography indexRight = new ShapeIndexGeography(right);
+    Pair<S2Point, S2Point> res =
+        Distance.S2_minimumClearanceLineBetween(
+            (ShapeIndexGeography) indexLeft, (ShapeIndexGeography) indexRight);
+    S2Geography geography = new PointGeography(List.of(res.getLeft(), 
res.getRight()));
+    return geography;
+  }
+
+  public static String asWKT(S2Geography geography) {
+    return geography.toString();
+  }
+
+  public static String resultToText(S2Geography pointGeography) {

Review Comment:
   Let's rename it to `geogToText`. The parameter name should be `geography` 
rather than `pointGeography`.



##########
spark/common/src/test/scala/org/apache/sedona/sql/S2constructorTestScala.scala:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sedona.sql
+
+import org.apache.sedona.common.S2Geography.{S2Geography}
+import org.locationtech.jts.geom.{PrecisionModel}
+
+class S2constructorTestScala extends TestBaseScala {
+
+  import sparkSession.implicits._
+  val precisionModel: PrecisionModel = new 
PrecisionModel(PrecisionModel.FIXED);
+
+  it("Passed ST_S2PointFromWKB") {
+    val geometryDf = Seq(
+      "0101000000000000000000F03F0000000000000040",
+      
"0102000000020000000000000084d600c00000000080b5d6bf00000060e1eff7bf00000080075de5bf",
+      "010100000000000000000024400000000000002e40",
+      
"0103000000010000000500000000000000000000000000000000000000000000000000f03f000000000000f03f0000000000001440000000000000f03f0000000000001440000000000000000000000000000000000000000000000000")
+      .map(Tuple1.apply)
+      .toDF("wkb")
+
+    geometryDf.createOrReplaceTempView("wkbtable")
+
+    var validPointDf = sparkSession.sql("SELECT 
ST_S2PointFromWKB(wkbtable.wkb) FROM wkbtable")
+    var rows = validPointDf.collect()
+    assert(rows.length == 4)
+
+    var expectedPoints = Seq("POINT (1 2)", null, "POINT (10 15)", null)
+    for (i <- rows.indices) {
+      if (expectedPoints(i) == null) {
+        assert(rows(i).isNullAt(0))
+      } else {
+        assert(rows(i).getAs[S2Geography](0).toString(precisionModel) == 
expectedPoints(i))
+      }
+    }
+  }
+
+  it("Passed ST_S2LinestringFromWKB") {
+    val geometryDf = Seq(
+      
"010200000003000000000000000000000000000000000000000000000000000840000000000000084000000000000010400000000000001040",
+      "0101000000000000000000F03F0000000000000040",
+      
"01020000000300000000000000000000c000000000000000c000000000000010400000000000001040000000000000104000000000000000c0",
+      
"0103000000010000000500000000000000000000000000000000000000000000000000f03f000000000000f03f0000000000001440000000000000f03f0000000000001440000000000000000000000000000000000000000000000000")
+      .map(Tuple1.apply)
+      .toDF("wkb")
+
+    geometryDf.createOrReplaceTempView("wkbtable")
+
+    var validLineDf =
+      sparkSession.sql("SELECT ST_S2LinestringFromWKB(wkbtable.wkb) FROM 
wkbtable")
+    var rows = validLineDf.collect()
+    assert(rows.length == 4)
+
+    var expectedPoints =
+      Seq("LINESTRING (0 0, 3 3, 4 4)", null, "LINESTRING (-2 -2, 4 4, 4 -2)", 
null)
+    for (i <- rows.indices) {
+      if (expectedPoints(i) == null) {
+        assert(rows(i).isNullAt(0))
+      } else {
+        assert(rows(i).getAs[S2Geography](0).toString(precisionModel) == 
expectedPoints(i))
+      }
+    }
+  }
+
+  it("Passed ST_S2GeogFromWKB") {
+    // UTF-8 encoded WKB String
+    val polygonWkbDf = sparkSession.read
+      .format("csv")
+      .option("delimiter", "\t")
+      .option("header", "false")
+      .load("spark/common/src/test/resources/county_small_wkb.tsv")
+    polygonWkbDf.createOrReplaceTempView("polygontable")
+    val polygonDf =
+      sparkSession.sql("select ST_GeomFromWKB(polygontable._c0) as countyshape 
from polygontable")
+    assert(polygonDf.count() == 100)
+    // RAW binary array
+    val wkbSeq = Seq[Array[Byte]](
+      Array[Byte](1, 2, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, -124, -42, 0, -64, 0, 
0, 0, 0, -128, -75,
+        -42, -65, 0, 0, 0, 96, -31, -17, -9, -65, 0, 0, 0, -128, 7, 93, -27, 
-65))
+    val rawWkbDf = wkbSeq.toDF("wkb")
+    rawWkbDf.createOrReplaceTempView("rawWKBTable")
+    val geometries = {
+      sparkSession.sql("SELECT ST_S2GeogFromWKB(rawWKBTable.wkb) as 
countyshape from rawWKBTable")
+    }
+    val expectedGeom =
+      "LINESTRING (-2.1047439575195317 -0.35482788085937506, 
-1.4960645437240603 -0.6676061153411864)"
+    assert(
+      geometries
+        .first()
+        .getAs[S2Geography](0)
+        .toString(new PrecisionModel(1e16))
+        .equals(expectedGeom))
+    // null input
+    val nullGeom = sparkSession.sql("SELECT ST_S2GeogFromWKB(null)")
+    assert(nullGeom.first().isNullAt(0))
+    // Fail on wrong input type
+    intercept[Exception] {
+      sparkSession.sql("SELECT ST_S2GeogFromWKB(0)").collect()
+    }
+  }
+
+  it("Passed ST_S2MLineFromText") {
+    var mLineDf =
+      sparkSession.sql("select ST_S2MLineFromText('MULTILINESTRING ((1 2, 3 
4), (4 5, 6 7))')")
+    assert(mLineDf.count() == 1)
+  }
+
+  it("Passed ST_S2MPolyFromText") {
+    var mLineDf = sparkSession.sql(
+      "select ST_MPolyFromText('MULTIPOLYGON(((-70.916 42.1002,-70.9468 
42.0946,-70.9765 42.0872 )))')")
+    assert(mLineDf.count() == 1)
+  }
+
+  it("Passed ST_S2MPointFromText") {
+    val baseDf =
+      sparkSession.sql("SELECT 'MULTIPOINT ((10 10), (20 20), (30 30))' as 
geom, 4326 as srid")
+    var actual =
+      baseDf
+        .selectExpr("ST_S2MPointFromText(geom)")
+        .first()
+        .get(0)
+        .asInstanceOf[S2Geography]
+        .toText(new PrecisionModel(PrecisionModel.FIXED))
+    val expected = "MULTIPOINT ((10 10), (20 20), (30 30))"
+    assert(expected.equals(actual))
+
+//    val actualGeom =
+//      baseDf.selectExpr("ST_S2MPointFromText(geom, 
srid)").first().get(0).asInstanceOf[Geometry]
+//    actual = actualGeom.toText
+//    assert(expected.equals(actual))
+//    val actualSrid = actualGeom.getSRID
+//    assert(4326 == actualSrid)

Review Comment:
   Remove unused code. We can add new tests for SRID once we support it.



##########
spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/Functions.scala:
##########
@@ -40,7 +40,7 @@ import org.apache.spark.util.Utils
 import com.mapzen.jpostal.{AddressExpander, AddressParser}
 import org.apache.spark.sql.catalyst.expressions.codegen.Block.BlockHelper
 
-private[apache] case class ST_LabelPoint(inputExpressions: Seq[Expression])
+case class ST_LabelPoint(inputExpressions: Seq[Expression])

Review Comment:
   Why removing the package scoping qualifiers in this file?



##########
common/src/main/java/org/apache/sedona/common/utils/S2GeographyFormatUtils.java:
##########
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sedona.common.utils;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.Serializable;
+import java.util.*;
+import org.apache.sedona.common.S2Geography.*;
+import org.apache.sedona.common.S2Geography.WKTReader;
+import org.apache.sedona.common.enums.FileDataSplitter;
+import org.locationtech.jts.geom.*;
+import org.locationtech.jts.io.ParseException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.wololo.geojson.Feature;
+import org.wololo.geojson.GeoJSONFactory;
+import org.wololo.jts2geojson.GeoJSONReader;
+
+/** This format mapper is isolated on purpose for the sake of sharing across 
Spark and Flink */
+public class S2GeographyFormatUtils<T extends S2Geography> implements 
Serializable {

Review Comment:
   We need to do a clean up for this file.



##########
spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/InferredExpression.scala:
##########
@@ -210,6 +220,10 @@ object InferredTypes {
       expr.toGeography(input)
     } else if (t =:= typeOf[Array[Geometry]]) { expr => input =>
       expr.toGeometryArray(input)
+    } else if (t =:= typeOf[S2Geography]) { expr => input =>
+      expr.toS2Geography(input)
+    } else if (t =:= typeOf[Array[S2Geography]]) { expr => input =>
+      expr.toS2GeographyArray(input)

Review Comment:
   `S2Geography` superseds the placeholder `Geography` type, so the old `else 
if (t =:= typeOf[Geography])` can be removed.



##########
common/src/main/java/org/apache/sedona/common/S2Functions.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sedona.common;
+
+import com.google.common.geometry.S2LatLng;
+import com.google.common.geometry.S2Point;
+import java.util.List;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.sedona.common.S2Geography.Distance;
+import org.apache.sedona.common.S2Geography.PointGeography;
+import org.apache.sedona.common.S2Geography.S2Geography;
+import org.apache.sedona.common.S2Geography.ShapeIndexGeography;
+
+public class S2Functions {
+
+  public static double distance(S2Geography left, S2Geography right) {
+    ShapeIndexGeography indexLeft = new ShapeIndexGeography(left);
+    ShapeIndexGeography indexRight = new ShapeIndexGeography(right);
+    return Distance.S2_distance((ShapeIndexGeography) indexLeft, 
(ShapeIndexGeography) indexRight);
+  }
+
+  public static double maxDistance(S2Geography left, S2Geography right) {
+    ShapeIndexGeography indexLeft = new ShapeIndexGeography(left);
+    ShapeIndexGeography indexRight = new ShapeIndexGeography(right);
+    return Distance.S2_maxDistance(
+        (ShapeIndexGeography) indexLeft, (ShapeIndexGeography) indexRight);
+  }
+
+  public static S2Geography cloestPoint(S2Geography left, S2Geography right) 
throws Exception {

Review Comment:
   closestPoint



##########
common/src/main/java/org/apache/sedona/common/S2Geography/PointGeography.java:
##########
@@ -239,6 +239,9 @@ public byte get(long i) {
       pts = S2Point.Shape.FAST_CODER.decode(bytes, cursor);
     }
 
+    if (tag.getKind() == GeographyKind.SINGLEPOINT) {
+      return new SinglePointGeography(pts.get(0));
+    }

Review Comment:
   We need to add some encode/decode tests for these newly added Geography 
types to make sure that they roundtrip correctly.



##########
common/src/main/java/org/apache/sedona/common/S2Geography/S2GeographySerializer.java:
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sedona.common.S2Geography;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import org.locationtech.jts.geom.*;
+
+public class S2GeographySerializer {
+  private static final Coordinate NULL_COORDINATE = new Coordinate(Double.NaN, 
Double.NaN);
+  private static final PrecisionModel PRECISION_MODEL = new PrecisionModel();
+
+  public static byte[] serialize(S2Geography geography) throws IOException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    geography.encodeTagged(baos, new EncodeOptions());
+    return baos.toByteArray();
+  }
+
+  public static S2Geography deserialize(byte[] buffer) throws IOException {
+    ByteArrayInputStream bais = new ByteArrayInputStream(buffer);
+    // WKBReader reader = new WKBReader();

Review Comment:
   Remove commented code



##########
spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/S2PredicatesBox.scala:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.spark.sql.sedona_sql.expressions
+
+import com.google.common.geometry.{S2BooleanOperation, S2LatLngRect}
+import org.apache.sedona.common.S2Geography.{PolygonGeography, Predicates, 
S2Geography, S2GeographySerializer, ShapeIndexGeography}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
+import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, 
Expression}
+import org.apache.spark.sql.sedona_sql.UDT.GeographyUDT
+import org.apache.spark.sql.types.{AbstractDataType, BooleanType, DataType, 
DoubleType}
+import 
org.apache.spark.sql.sedona_sql.expressions.InferrableFunctionConverter._
+
+abstract class ST_S2PredicateBox

Review Comment:
   This UDF has strange semantics and I wonder if there's meaningful use cases 
for it.



##########
spark/common/src/main/java/org/apache/sedona/core/s2geographyRDD/S2GeographyRDD.java:
##########
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sedona.core.s2geographyRDD;
+
+import java.io.Serializable;
+import org.apache.log4j.Logger;
+import org.apache.sedona.common.S2Geography.S2Geography;
+
+public class S2GeographyRDD<T extends S2Geography> implements Serializable {
+

Review Comment:
   This PR does not implement full-fledged S2GeographyRDD support and focuses 
on implementing UDT and UDFs for Spark SQL, so let's remove this and 
`s2geographyRDD.PointRDD`.



##########
spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/InferredExpression.scala:
##########
@@ -166,6 +168,14 @@ object InferrableType {
     new InferrableType[Geography] {}
   implicit val geographyArrayInstance: InferrableType[Array[Geography]] =
     new InferrableType[Array[Geography]] {}
+  implicit val s2geographyInstance: InferrableType[S2Geography] =
+    new InferrableType[S2Geography]
+  implicit val s2geographyArrayInstance: InferrableType[Array[S2Geography]] =
+    new InferrableType[Array[S2Geography]]
+  implicit val s2point: InferrableType[S2Point] =
+    new InferrableType[S2Point]
+  implicit val s2pointArrayInstance: InferrableType[Array[S2Point]] =
+    new InferrableType[Array[S2Point]]

Review Comment:
   Do we need this? S2Point is a subtype of S2Geography so it could be handled 
by `s2geographyInstance` added above.



##########
spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/S2Constructors.scala:
##########
@@ -0,0 +1,495 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.spark.sql.sedona_sql.expressions
+
+import org.apache.sedona.common.S2Geography.S2Geography
+import org.apache.sedona.common.S2Constructors
+import org.apache.sedona.common.enums.FileDataSplitter
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
+import org.apache.spark.sql.catalyst.expressions.{Expression, 
ImplicitCastInputTypes}
+import org.apache.spark.sql.sedona_sql.UDT.{GeographyUDT}
+import 
org.apache.spark.sql.sedona_sql.expressions.implicits.{S2GeographyEnhancer}
+import 
org.apache.spark.sql.sedona_sql.expressions.InferrableFunctionConverter._
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * Return a point from a string. The string must be plain string and each 
coordinate must be
+ * separated by a delimiter.
+ *
+ * @param inputExpressions
+ *   This function takes 2 parameters. The first parameter is the input 
geometry string, the
+ *   second parameter is the delimiter. String format should be similar to 
CSV/TSV
+ */
+case class ST_S2PointFromText(inputExpressions: Seq[Expression])
+    extends InferredExpression(S2Constructors.pointFromText _) {
+  protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]) = 
{
+    copy(inputExpressions = newChildren)
+  }
+}
+
+/**
+ * Return a polygon from a string. The string must be plain string and each 
coordinate must be
+ * separated by a delimiter.
+ *
+ * @param inputExpressions
+ */
+case class ST_S2PolygonFromText(inputExpressions: Seq[Expression])
+    extends InferredExpression(S2Constructors.polygonFromText _) {
+  protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]) = 
{
+    copy(inputExpressions = newChildren)
+  }
+}
+
+/**
+ * Return a line from a string. The string must be plain string and each 
coordinate must be
+ * separated by a delimiter.
+ *
+ * @param inputExpressions
+ */
+case class ST_S2LineFromText(inputExpressions: Seq[Expression])
+    extends InferredExpression(S2Constructors.lineFromText _) {
+  protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]) = 
{
+    copy(inputExpressions = newChildren)
+  }
+}
+
+/**
+ * Return a linestring from a string. The string must be plain string and each 
coordinate must be
+ * separated by a delimiter.
+ *
+ * @param inputExpressions
+ */
+case class ST_S2LinestringFromText(inputExpressions: Seq[Expression])
+    extends InferredExpression(S2Constructors.lineStringFromText _) {
+  protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]) = 
{
+    copy(inputExpressions = newChildren)
+  }
+}
+
+/**
+ * Return a Geometry from a WKT string
+ *
+ * @param inputExpressions
+ *   This function takes a geometry string and a srid. The string format must 
be WKT.
+ */
+case class ST_S2GeogFromWKT(inputExpressions: Seq[Expression])
+    extends InferredExpression(S2Constructors.geogFromWKT _) {
+
+  protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]) = 
{
+    copy(inputExpressions = newChildren)
+  }
+}
+
+/**
+ * Return a Geometry from a OGC Extended WKT string
+ *
+ * @param inputExpressions
+ *   This function takes a geometry string. The string format must be OGC 
Extended Well-Known text
+ *   (EWKT) representation.
+ */
+case class ST_S2GeogFromEWKT(inputExpressions: Seq[Expression])
+    extends InferredExpression(S2Constructors.geogFromEWKT _) {
+
+  protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]) = 
{
+    copy(inputExpressions = newChildren)
+  }
+}
+
+/**
+ * Return a Geometry from a WKT string. Alias to ST_GeomFromWKT
+ *
+ * @param inputExpressions
+ *   This function takes a geometry string and a srid. The string format must 
be WKT.
+ */
+case class ST_S2GeometryFromText(inputExpressions: Seq[Expression])
+    extends InferredExpression(S2Constructors.geogFromWKT _) {
+
+  protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]) = 
{
+    copy(inputExpressions = newChildren)
+  }
+}
+
+/**
+ * Return a Geometry from a WKT string
+ *
+ * @param inputExpressions
+ *   This function takes a geometry string and a srid. The string format must 
be WKT.
+ */
+case class ST_S2GeogFromText(inputExpressions: Seq[Expression])
+    extends InferredExpression(S2Constructors.geogFromWKT _) {
+
+  protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]) = 
{
+    copy(inputExpressions = newChildren)
+  }
+}
+
+/**
+ * Return a Geometry from a WKB string
+ *
+ * @param inputExpressions
+ *   This function takes 1 parameter which is the utf-8 encoded geometry wkb 
string or the binary
+ *   wkb array.
+ */
+case class ST_S2GeogFromWKB(inputExpressions: Seq[Expression])
+    extends Expression
+    with FoldableExpression
+    with ImplicitCastInputTypes
+    with CodegenFallback
+    with UserDataGenerator {
+  // This is an expression which takes one input expressions
+  assert(inputExpressions.length == 1)
+
+  override def nullable: Boolean = true
+
+  override def eval(inputRow: InternalRow): Any = {
+    val arg = inputExpressions.head.eval(inputRow)
+    try {
+      arg match {
+        case geomString: UTF8String =>
+          // Parse UTF-8 encoded wkb string
+          S2Constructors
+            .geogFromText(geomString.toString, FileDataSplitter.WKB)
+            .toGenericArrayData
+        case wkb: Array[Byte] =>
+          // convert raw wkb byte array to geometry
+          S2Constructors.geogFromWKB(wkb).toGenericArrayData
+        case null => null
+      }
+    } catch {
+      case e: Exception =>
+        
InferredExpression.throwExpressionInferenceException(getClass.getSimpleName, 
Seq(arg), e)
+    }
+  }
+
+  override def dataType: DataType = GeographyUDT
+
+  override def inputTypes: Seq[AbstractDataType] = 
Seq(TypeCollection(StringType, BinaryType))
+
+  override def children: Seq[Expression] = inputExpressions
+
+  protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]) = 
{
+    copy(inputExpressions = newChildren)
+  }
+}
+
+case class ST_S2GeogFromEWKB(inputExpressions: Seq[Expression])
+    extends Expression
+    with FoldableExpression
+    with ImplicitCastInputTypes
+    with CodegenFallback
+    with UserDataGenerator {
+  // This is an expression which takes one input expressions
+  assert(inputExpressions.length == 1)
+
+  override def nullable: Boolean = true
+
+  override def eval(inputRow: InternalRow): Any = {
+    val arg = inputExpressions.head.eval(inputRow)
+    try {
+      arg match {
+        case geomString: UTF8String =>
+          // Parse UTF-8 encoded wkb string
+          S2Constructors
+            .geogFromText(geomString.toString, FileDataSplitter.WKB)
+            .toGenericArrayData
+        case wkb: Array[Byte] =>
+          // convert raw wkb byte array to geometry
+          S2Constructors.geogFromWKB(wkb).toGenericArrayData
+        case null => null
+      }
+    } catch {
+      case e: Exception =>
+        
InferredExpression.throwExpressionInferenceException(getClass.getSimpleName, 
Seq(arg), e)
+    }
+  }
+
+  override def dataType: DataType = GeographyUDT
+
+  override def inputTypes: Seq[AbstractDataType] = 
Seq(TypeCollection(StringType, BinaryType))
+
+  override def children: Seq[Expression] = inputExpressions
+
+  protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]) = 
{
+    copy(inputExpressions = newChildren)
+  }
+}
+
+case class ST_S2LineFromWKB(inputExpressions: Seq[Expression])
+    extends Expression
+    with FoldableExpression
+    with ImplicitCastInputTypes
+    with CodegenFallback
+    with UserDataGenerator {
+
+  // Validate the number of input expressions (1 or 2)
+  assert(inputExpressions.length >= 1 && inputExpressions.length <= 2)
+
+  override def nullable: Boolean = true
+
+  override def eval(inputRow: InternalRow): Any = {
+    val wkb = inputExpressions.head.eval(inputRow)
+    val srid =
+      if (inputExpressions.length > 1) 
inputExpressions(1).eval(inputRow).asInstanceOf[Int]
+      else -1
+    try {
+      wkb match {
+        case geomString: UTF8String =>
+          // Parse UTF-8 encoded WKB string
+          val geom = S2Constructors.lineStringFromText(geomString.toString, 
"wkb")
+          val kind = S2Geography.GeographyKind.fromKind(geom.getKind)
+          if (kind == (S2Geography.GeographyKind.POLYLINE) || kind == 
S2Geography.GeographyKind.SINGLEPOLYLINE) {
+            // (if (srid != -1) Functions.setSRID(geom, srid) else 
geom).toGenericArrayData
+            geom.toGenericArrayData
+          } else {
+            null
+          }
+
+        case wkbArray: Array[Byte] =>
+          // Convert raw WKB byte array to geometry
+          S2Constructors.lineFromWKB(wkbArray, srid).toGenericArrayData
+
+        case _ => null
+      }
+    } catch {
+      case e: Exception =>
+        InferredExpression.throwExpressionInferenceException(
+          getClass.getSimpleName,
+          Seq(wkb, srid),
+          e)
+    }
+  }
+
+  override def dataType: DataType = GeographyUDT
+
+  override def inputTypes: Seq[AbstractDataType] =
+    if (inputExpressions.length == 1) Seq(TypeCollection(StringType, 
BinaryType))
+    else Seq(TypeCollection(StringType, BinaryType), IntegerType)
+
+  override def children: Seq[Expression] = inputExpressions
+
+  protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]): 
ST_S2LineFromWKB = {
+    copy(inputExpressions = newChildren)
+  }
+}
+
+case class ST_S2LinestringFromWKB(inputExpressions: Seq[Expression])
+    extends Expression
+    with FoldableExpression
+    with ImplicitCastInputTypes
+    with CodegenFallback
+    with UserDataGenerator {
+
+  // Validate the number of input expressions (1 or 2)
+  assert(inputExpressions.length >= 1 && inputExpressions.length <= 2)
+
+  override def nullable: Boolean = true
+
+  override def eval(inputRow: InternalRow): Any = {
+    val wkb = inputExpressions.head.eval(inputRow)
+    val srid =
+      if (inputExpressions.length > 1) 
inputExpressions(1).eval(inputRow).asInstanceOf[Int]
+      else -1
+
+    try {
+      wkb match {
+        case geomString: UTF8String =>
+          // Parse UTF-8 encoded WKB string
+          val geom = S2Constructors.lineStringFromText(geomString.toString, 
"wkb")
+          val kind = S2Geography.GeographyKind.fromKind(geom.getKind)
+          if (kind == S2Geography.GeographyKind.POLYLINE || kind == 
S2Geography.GeographyKind.SINGLEPOLYLINE) {
+            // (if (srid != -1) Functions.setSRID(geom, srid) else 
geom).toGenericArrayData
+            geom.toGenericArrayData
+          } else {
+            null
+          }
+
+        case wkbArray: Array[Byte] =>
+          // Convert raw WKB byte array to geometry
+          S2Constructors.lineFromWKB(wkbArray, srid).toGenericArrayData
+
+        case _ => null
+      }
+    } catch {
+      case e: Exception =>
+        InferredExpression.throwExpressionInferenceException(
+          getClass.getSimpleName,
+          Seq(wkb, srid),
+          e)
+    }
+  }
+
+  override def dataType: DataType = GeographyUDT
+
+  override def inputTypes: Seq[AbstractDataType] =
+    if (inputExpressions.length == 1) Seq(TypeCollection(StringType, 
BinaryType))
+    else Seq(TypeCollection(StringType, BinaryType), IntegerType)
+
+  override def children: Seq[Expression] = inputExpressions
+
+  protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]) = 
{
+    copy(inputExpressions = newChildren)
+  }
+}
+
+case class ST_S2PointFromWKB(inputExpressions: Seq[Expression])
+    extends Expression
+    with FoldableExpression
+    with ImplicitCastInputTypes
+    with CodegenFallback
+    with UserDataGenerator {
+
+  // Validate the number of input expressions (1 or 2)
+  assert(inputExpressions.length >= 1 && inputExpressions.length <= 2)
+
+  override def nullable: Boolean = true
+
+  override def eval(inputRow: InternalRow): Any = {
+    val wkb = inputExpressions.head.eval(inputRow)
+    val srid =
+      if (inputExpressions.length > 1) 
inputExpressions(1).eval(inputRow).asInstanceOf[Int]
+      else -1
+
+    try {
+      wkb match {
+        case geomString: UTF8String =>
+          // Parse UTF-8 encoded WKB string
+          val geom = S2Constructors.pointFromText(geomString.toString, "wkb")
+          val kind = S2Geography.GeographyKind.fromKind(geom.getKind)
+          if (kind == S2Geography.GeographyKind.POINT || kind == 
S2Geography.GeographyKind.SINGLEPOINT) {
+            // (if (srid != -1) Functions.setSRID(geom, srid) else 
geom).toGenericArrayData
+            geom.toGenericArrayData
+          } else {
+            null
+          }
+        case wkbArray: Array[Byte] =>
+          // Convert raw WKB byte array to geometry
+          S2Constructors.pointFromWKB(wkbArray, srid).toGenericArrayData
+        case _ => null
+      }
+    } catch {
+      case e: Exception =>
+        InferredExpression.throwExpressionInferenceException(
+          getClass.getSimpleName,
+          Seq(wkb, srid),
+          e)
+    }
+  }
+
+  override def dataType: DataType = GeographyUDT
+
+  override def inputTypes: Seq[AbstractDataType] =
+    if (inputExpressions.length == 1) Seq(TypeCollection(StringType, 
BinaryType))
+    else Seq(TypeCollection(StringType, BinaryType), IntegerType)
+
+  override def children: Seq[Expression] = inputExpressions
+
+  protected def withNewChildrenInternal(
+      newChildren: IndexedSeq[Expression]): ST_S2PointFromWKB = {
+    copy(inputExpressions = newChildren)
+  }
+}
+
+/**
+ * Return a polygon given minX,minY,maxX,maxY and optional SRID
+ *
+ * @param inputExpressions
+ */
+//case class ST_S2MakeEnvelope(inputExpressions: Seq[Expression])
+//    extends InferredExpression(
+//      inferrableFunction5(S2Constructors.makeEnvelope),
+//      inferrableFunction4(S2Constructors.makeEnvelope)) {
+//
+//  protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]) 
= {
+//    copy(inputExpressions = newChildren)
+//  }
+//}
+
+/**
+ * Return a polygon given minX,minY,maxX,maxY
+ *
+ * @param inputExpressions
+ */
+//case class ST_S2PolygonFromEnvelope(inputExpressions: Seq[Expression])
+//    extends InferredExpression(S2Constructors.polygonFromEnvelope _) {
+//
+//  protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]) 
= {
+//    copy(inputExpressions = newChildren)
+//  }
+//}
+
+trait S2UserDataGenerator {

Review Comment:
   We don't need `S2UserDataGenerator`. The existing `UserDataGenerator` in 
geometry UDFs for parsing GeoJSON may not work either.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to