This is an automated email from the ASF dual-hosted git repository.

jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sedona.git


The following commit(s) were added to refs/heads/master by this push:
     new 3d1ebdea77 [GH-2509] Refactor the example projects to include better 
examples (#2510)
3d1ebdea77 is described below

commit 3d1ebdea77f7caafd13b73138ce92316bea132dd
Author: Jia Yu <[email protected]>
AuthorDate: Sun Nov 16 01:28:39 2025 -0600

    [GH-2509] Refactor the example projects to include better examples (#2510)
---
 .github/workflows/example.yml                      |  43 +++---
 examples/flink-sql/pom.xml                         |  39 ++++-
 examples/flink-sql/src/main/java/FlinkExample.java | 112 ++++++++------
 examples/flink-sql/src/main/java/Utils.java        | 172 +++++++++++----------
 .../src/test/java/FlinkFunctionsTest.java          |  39 +++++
 examples/java-spark-sql/pom.xml                    | 102 ++++++------
 .../src/main/java/spark/GeoParquetAccessor.java    |  87 +++++------
 .../src/main/java/spark/SedonaGeoParquetMain.java  |  59 ++++---
 .../src/main/java/spark/SedonaSparkSession.java    |  49 +++---
 .../src/test/java/spark/SedonaParquetTest.java     | 128 +++++++--------
 examples/spark-sql/pom.xml                         |  56 ++++++-
 examples/spark-sql/src/main/scala/Main.scala       |  22 ++-
 examples/spark-sql/src/main/scala/RddExample.scala |  54 +++++--
 examples/spark-sql/src/main/scala/SqlExample.scala | 135 ++++++++++++----
 examples/spark-sql/src/main/scala/VizExample.scala |  59 +++++--
 .../spark-sql/src/test/scala/testFunctions.scala   | 121 +++++++++++++++
 16 files changed, 838 insertions(+), 439 deletions(-)

diff --git a/.github/workflows/example.yml b/.github/workflows/example.yml
index bd9cb62cc7..2fa3f61bf6 100644
--- a/.github/workflows/example.yml
+++ b/.github/workflows/example.yml
@@ -39,6 +39,7 @@ concurrency:
 
 jobs:
   build:
+    name: 'Spark ${{ matrix.spark }}, Hadoop ${{ matrix.hadoop }}, Sedona ${{ 
matrix.sedona }}'
     runs-on: ubuntu-22.04
     strategy:
       fail-fast: false
@@ -56,23 +57,6 @@ jobs:
             spark-compat: '3.4'
             sedona: 1.8.0
             hadoop: 3.3.4
-    env:
-      JAVA_TOOL_OPTIONS: >-
-        -XX:+IgnoreUnrecognizedVMOptions
-        --add-opens=java.base/java.lang=ALL-UNNAMED
-        --add-opens=java.base/java.lang.invoke=ALL-UNNAMED
-        --add-opens=java.base/java.lang.reflect=ALL-UNNAMED
-        --add-opens=java.base/java.io=ALL-UNNAMED
-        --add-opens=java.base/java.net=ALL-UNNAMED
-        --add-opens=java.base/java.nio=ALL-UNNAMED
-        --add-opens=java.base/java.util=ALL-UNNAMED
-        --add-opens=java.base/java.util.concurrent=ALL-UNNAMED
-        --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED
-        --add-opens=java.base/sun.nio.ch=ALL-UNNAMED
-        --add-opens=java.base/sun.nio.cs=ALL-UNNAMED
-        --add-opens=java.base/sun.security.action=ALL-UNNAMED
-        --add-opens=java.base/sun.util.calendar=ALL-UNNAMED
-        -Djdk.reflect.useDirectMethodHandle=false
     steps:
       - uses: actions/checkout@v5
       - uses: actions/setup-java@v5
@@ -100,7 +84,8 @@ jobs:
           path: ~/.m2
           key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
           restore-keys: ${{ runner.os }}-m2
-      - env:
+      - name: Test Scala Spark SQL Example
+        env:
           SPARK_VERSION: ${{ matrix.spark }}
           SPARK_LOCAL_IP: 127.0.0.1
           SPARK_COMPAT_VERSION: ${{ matrix.spark-compat }}
@@ -109,16 +94,28 @@ jobs:
         run: |
           cd examples/spark-sql
           mvn versions:set -DnewVersion=${SEDONA_VERSION} 
-DgenerateBackupPoms=false
-          mvn clean install \
+          mvn clean test \
             -Dspark.version=${SPARK_VERSION} \
             -Dspark.compat.version=${SPARK_COMPAT_VERSION} \
             -Dsedona.version=${SEDONA_VERSION} \
             -Dhadoop.version=${HADOOP_VERSION}
-          java -jar target/sedona-spark-example-${SEDONA_VERSION}.jar
-      - env:
+      - name: Test Java Spark SQL Example
+        env:
+          SPARK_VERSION: ${{ matrix.spark }}
+          SPARK_LOCAL_IP: 127.0.0.1
+          SPARK_COMPAT_VERSION: ${{ matrix.spark-compat }}
+          SEDONA_VERSION: ${{ matrix.sedona }}
+          HADOOP_VERSION: ${{ matrix.hadoop }}
+        run: |
+          cd examples/java-spark-sql
+          mvn versions:set -DnewVersion=${SEDONA_VERSION} 
-DgenerateBackupPoms=false
+          mvn clean test \
+            -Dspark.version=${SPARK_VERSION} \
+            -Dspark.compat.version=${SPARK_COMPAT_VERSION}
+      - name: Test Flink SQL Example
+        env:
           SEDONA_VERSION: ${{ matrix.sedona }}
         run: |
           cd examples/flink-sql
           mvn versions:set -DnewVersion=${SEDONA_VERSION} 
-DgenerateBackupPoms=false
-          mvn clean install
-          java -jar target/sedona-flink-example-${SEDONA_VERSION}.jar
+          mvn clean test
diff --git a/examples/flink-sql/pom.xml b/examples/flink-sql/pom.xml
index d6f7e97b68..6c72b6acf4 100644
--- a/examples/flink-sql/pom.xml
+++ b/examples/flink-sql/pom.xml
@@ -31,7 +31,7 @@
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
         <geotools.scope>compile</geotools.scope>
         <flink.version>1.19.0</flink.version>
-        <flink.scope>compile</flink.scope>
+        <flink.scope>provided</flink.scope>
         <scala.compat.version>2.12</scala.compat.version>
         <geotools.version>33.1</geotools.version>
         <log4j.version>2.17.2</log4j.version>
@@ -247,6 +247,20 @@
                     </execution>
                 </executions>
             </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <version>2.22.2</version>
+                <configuration>
+                    <argLine>
+                        --add-opens=java.base/sun.nio.ch=ALL-UNNAMED
+                        --add-opens=java.base/java.nio=ALL-UNNAMED
+                        --add-opens=java.base/java.lang=ALL-UNNAMED
+                        --add-opens=java.base/java.lang.invoke=ALL-UNNAMED
+                        --add-opens=java.base/java.util=ALL-UNNAMED
+                    </argLine>
+                </configuration>
+            </plugin>
             <plugin>
                 <groupId>org.jacoco</groupId>
                 <artifactId>jacoco-maven-plugin</artifactId>
@@ -266,6 +280,29 @@
                     </execution>
                 </executions>
             </plugin>
+            <plugin>
+                <groupId>com.diffplug.spotless</groupId>
+                <artifactId>spotless-maven-plugin</artifactId>
+                <version>2.35.0</version>
+                <configuration>
+                    <java>
+                        <googleJavaFormat>
+                            <version>1.15.0</version>
+                        </googleJavaFormat>
+                        <licenseHeader>
+                            <file>../../tools/maven/license-header.txt</file>
+                        </licenseHeader>
+                    </java>
+                </configuration>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>check</goal>
+                        </goals>
+                        <phase>compile</phase>
+                    </execution>
+                </executions>
+            </plugin>
         </plugins>
         <resources>
             <resource>
diff --git a/examples/flink-sql/src/main/java/FlinkExample.java 
b/examples/flink-sql/src/main/java/FlinkExample.java
index c59eb8125e..7ded36e0d6 100644
--- a/examples/flink-sql/src/main/java/FlinkExample.java
+++ b/examples/flink-sql/src/main/java/FlinkExample.java
@@ -16,70 +16,84 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+import static org.apache.flink.table.api.Expressions.$;
+import static org.apache.flink.table.api.Expressions.call;
 
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
-
 import org.apache.sedona.flink.SedonaFlinkRegistrator;
 import org.apache.sedona.flink.expressions.Constructors;
 
-import static org.apache.flink.table.api.Expressions.$;
-import static org.apache.flink.table.api.Expressions.call;
-
-public class FlinkExample
-{
-    static String[] pointColNames = {"geom_point", "name_point", "event_time", 
"proc_time"};
-
-    static String[] polygonColNames = {"geom_polygon", "name_polygon", 
"event_time", "proc_time"};
+public class FlinkExample {
+  static String[] pointColNames = {"geom_point", "name_point", "event_time", 
"proc_time"};
 
-    public static void main(String[] args) {
-        int testDataSize = 10;
-        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        EnvironmentSettings settings = 
EnvironmentSettings.newInstance().inStreamingMode().build();
-        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
settings);
-        SedonaFlinkRegistrator.registerType(env);
-        SedonaFlinkRegistrator.registerFunc(tableEnv);
+  static String[] polygonColNames = {"geom_polygon", "name_polygon", 
"event_time", "proc_time"};
 
-        // Create a fake WKT string table source
-        Table pointWktTable = Utils.createTextTable(env, tableEnv, 
Utils.createPointWKT(testDataSize), pointColNames);
+  public static void main(String[] args) {
+    testS2SpatialJoin(10);
+  }
 
-        // Create a geometry column
-        Table pointTable = pointWktTable.select(
-            call("ST_GeomFromWKT", $(pointColNames[0])).as(pointColNames[0]),
-            $(pointColNames[1]));
+  public static void testS2SpatialJoin(int testDataSize) {
+    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+    EnvironmentSettings settings = 
EnvironmentSettings.newInstance().inStreamingMode().build();
+    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
settings);
+    SedonaFlinkRegistrator.registerType(env);
+    SedonaFlinkRegistrator.registerFunc(tableEnv);
 
-        // Create S2CellID
-        pointTable = pointTable.select($(pointColNames[0]), 
$(pointColNames[1]),
-                call("ST_S2CellIDs", $(pointColNames[0]), 6).as("s2id_array"));
-        // Explode s2id array
-        tableEnv.createTemporaryView("pointTable", pointTable);
-        pointTable = tableEnv.sqlQuery("SELECT geom_point, name_point, 
s2id_point FROM pointTable CROSS JOIN UNNEST(pointTable.s2id_array) AS 
tmpTbl1(s2id_point)");
+    // Create a fake WKT string table source
+    Table pointWktTable =
+        Utils.createTextTable(env, tableEnv, 
Utils.createPointWKT(testDataSize), pointColNames);
 
+    // Create a geometry column
+    Table pointTable =
+        pointWktTable.select(
+            call("ST_GeomFromWKT", $(pointColNames[0])).as(pointColNames[0]), 
$(pointColNames[1]));
 
-        // Create a fake WKT string table source
-        Table polygonWktTable = Utils.createTextTable(env, tableEnv, 
Utils.createPolygonWKT(testDataSize), polygonColNames);
-        // Create a geometry column
-        Table polygonTable = 
polygonWktTable.select(call(Constructors.ST_GeomFromWKT.class.getSimpleName(),
-                        $(polygonColNames[0])).as(polygonColNames[0]),
-                $(polygonColNames[1]));
-        // Create S2CellID
-        polygonTable = polygonTable.select($(polygonColNames[0]), 
$(polygonColNames[1]),
-                call("ST_S2CellIDs", $(polygonColNames[0]), 
6).as("s2id_array"));
-        // Explode s2id array
-        tableEnv.createTemporaryView("polygonTable", polygonTable);
-        polygonTable = tableEnv.sqlQuery("SELECT geom_polygon, name_polygon, 
s2id_polygon FROM polygonTable CROSS JOIN UNNEST(polygonTable.s2id_array) AS 
tmpTbl2(s2id_polygon)");
+    // Create S2CellID
+    pointTable =
+        pointTable.select(
+            $(pointColNames[0]),
+            $(pointColNames[1]),
+            call("ST_S2CellIDs", $(pointColNames[0]), 6).as("s2id_array"));
+    // Explode s2id array
+    tableEnv.createTemporaryView("pointTable", pointTable);
+    pointTable =
+        tableEnv.sqlQuery(
+            "SELECT geom_point, name_point, s2id_point FROM pointTable CROSS 
JOIN UNNEST(pointTable.s2id_array) AS tmpTbl1(s2id_point)");
 
-        // TODO: TableImpl.print() occurs EOF Exception due to 
https://issues.apache.org/jira/browse/FLINK-35406
-        // Use polygonTable.execute().print() when FLINK-35406 is fixed.
-        polygonTable.execute().collect().forEachRemaining(row -> 
System.out.println(row));
+    // Create a fake WKT string table source
+    Table polygonWktTable =
+        Utils.createTextTable(env, tableEnv, 
Utils.createPolygonWKT(testDataSize), polygonColNames);
+    // Create a geometry column
+    Table polygonTable =
+        polygonWktTable.select(
+            call(Constructors.ST_GeomFromWKT.class.getSimpleName(), 
$(polygonColNames[0]))
+                .as(polygonColNames[0]),
+            $(polygonColNames[1]));
+    // Create S2CellID
+    polygonTable =
+        polygonTable.select(
+            $(polygonColNames[0]),
+            $(polygonColNames[1]),
+            call("ST_S2CellIDs", $(polygonColNames[0]), 6).as("s2id_array"));
+    // Explode s2id array
+    tableEnv.createTemporaryView("polygonTable", polygonTable);
+    polygonTable =
+        tableEnv.sqlQuery(
+            "SELECT geom_polygon, name_polygon, s2id_polygon FROM polygonTable 
CROSS JOIN UNNEST(polygonTable.s2id_array) AS tmpTbl2(s2id_polygon)");
 
-        // Join two tables by their S2 ids
-        Table joinResult = 
pointTable.join(polygonTable).where($("s2id_point").isEqual($("s2id_polygon")));
-        // Optional: remove false positives
-        joinResult = joinResult.where(call("ST_Contains", $("geom_polygon"), 
$("geom_point")));
-        joinResult.execute().collect().forEachRemaining(row -> 
System.out.println(row));
-    }
+    // TODO: TableImpl.print() occurs EOF Exception due to
+    // https://issues.apache.org/jira/browse/FLINK-35406
+    // Use polygonTable.execute().print() when FLINK-35406 is fixed.
+    polygonTable.execute().collect().forEachRemaining(row -> 
System.out.println(row));
 
+    // Join two tables by their S2 ids
+    Table joinResult =
+        
pointTable.join(polygonTable).where($("s2id_point").isEqual($("s2id_polygon")));
+    // Optional: remove false positives
+    joinResult = joinResult.where(call("ST_Contains", $("geom_polygon"), 
$("geom_point")));
+    joinResult.execute().collect().forEachRemaining(row -> 
System.out.println(row));
+  }
 }
diff --git a/examples/flink-sql/src/main/java/Utils.java 
b/examples/flink-sql/src/main/java/Utils.java
index 0a95ab1b7b..abe1c3d3a4 100644
--- a/examples/flink-sql/src/main/java/Utils.java
+++ b/examples/flink-sql/src/main/java/Utils.java
@@ -16,7 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+import static org.apache.flink.table.api.Expressions.$;
 
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -27,91 +32,102 @@ import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.types.Row;
 
-import java.sql.Timestamp;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import static org.apache.flink.table.api.Expressions.$;
-
-public class Utils
-{
-    static Long timestamp_base = new 
Timestamp(System.currentTimeMillis()).getTime();
-    static Long time_interval = 1L; // Generate a record per this interval. 
Unit is second
+public class Utils {
+  static Long timestamp_base = new 
Timestamp(System.currentTimeMillis()).getTime();
+  static Long time_interval = 1L; // Generate a record per this interval. Unit 
is second
 
-    static List<Row> createPointText(int size){
-        List<Row> data = new ArrayList<>();
-        for (int i = 0; i < size; i++) {
-            // Create a number of points (1, 1) (2, 2) ...
-            data.add(Row.of(i + "," + i, "point" + i, timestamp_base + 
time_interval * 1000 * i));
-        }
-        return data;
+  static List<Row> createPointText(int size) {
+    List<Row> data = new ArrayList<>();
+    for (int i = 0; i < size; i++) {
+      // Create a number of points (1, 1) (2, 2) ...
+      data.add(Row.of(i + "," + i, "point" + i, timestamp_base + time_interval 
* 1000 * i));
     }
+    return data;
+  }
 
-    static List<Row> createPolygonText(int size) {
-        List<Row> data = new ArrayList<>();
-        for (int i = 0; i < size; i++) {
-            // Create polygons each of which only has 1 match in points
-            // Each polygon is an envelope like (-0.5, -0.5, 0.5, 0.5)
-            String minX = String.valueOf(i - 0.5);
-            String minY = String.valueOf(i - 0.5);
-            String maxX = String.valueOf(i + 0.5);
-            String maxY = String.valueOf(i + 0.5);
-            List<String> polygon = new ArrayList<>();
-            polygon.add(minX);polygon.add(minY);
-            polygon.add(minX);polygon.add(maxY);
-            polygon.add(maxX);polygon.add(maxY);
-            polygon.add(maxX);polygon.add(minY);
-            polygon.add(minX);polygon.add(minY);
-            data.add(Row.of(String.join(",", polygon), "polygon" + i, 
timestamp_base + time_interval * 1000 * i));
-        }
-        return data;
+  static List<Row> createPolygonText(int size) {
+    List<Row> data = new ArrayList<>();
+    for (int i = 0; i < size; i++) {
+      // Create polygons each of which only has 1 match in points
+      // Each polygon is an envelope like (-0.5, -0.5, 0.5, 0.5)
+      String minX = String.valueOf(i - 0.5);
+      String minY = String.valueOf(i - 0.5);
+      String maxX = String.valueOf(i + 0.5);
+      String maxY = String.valueOf(i + 0.5);
+      List<String> polygon = new ArrayList<>();
+      polygon.add(minX);
+      polygon.add(minY);
+      polygon.add(minX);
+      polygon.add(maxY);
+      polygon.add(maxX);
+      polygon.add(maxY);
+      polygon.add(maxX);
+      polygon.add(minY);
+      polygon.add(minX);
+      polygon.add(minY);
+      data.add(
+          Row.of(
+              String.join(",", polygon), "polygon" + i, timestamp_base + 
time_interval * 1000 * i));
     }
+    return data;
+  }
 
-    static List<Row> createPointWKT(int size){
-        List<Row> data = new ArrayList<>();
-        for (int i = 0; i < size; i++) {
-            // Create a number of points (1, 1) (2, 2) ...
-            data.add(Row.of("POINT (" + i + " " + i +")", "point" + i, 
timestamp_base + time_interval * 1000 * i));
-        }
-        return data;
+  static List<Row> createPointWKT(int size) {
+    List<Row> data = new ArrayList<>();
+    for (int i = 0; i < size; i++) {
+      // Create a number of points (1, 1) (2, 2) ...
+      data.add(
+          Row.of(
+              "POINT (" + i + " " + i + ")",
+              "point" + i,
+              timestamp_base + time_interval * 1000 * i));
     }
+    return data;
+  }
 
-    static List<Row> createPolygonWKT(int size) {
-        List<Row> data = new ArrayList<>();
-        for (int i = 0; i < size; i++) {
-            // Create polygons each of which only has 1 match in points
-            // Each polygon is an envelope like (-0.5, -0.5, 0.5, 0.5)
-            String minX = String.valueOf(i - 0.5);
-            String minY = String.valueOf(i - 0.5);
-            String maxX = String.valueOf(i + 0.5);
-            String maxY = String.valueOf(i + 0.5);
-            List<String> polygon = new ArrayList<>();
-            polygon.add(minX + " " + minY);
-            polygon.add(minX + " " + maxY);
-            polygon.add(maxX + " " + maxY);
-            polygon.add(maxX + " " + minY);
-            polygon.add(minX + " " + minY);
-            data.add(Row.of("POLYGON ((" + String.join(", ", polygon) + "))", 
"polygon" + i, timestamp_base + time_interval * 1000 * i));
-        }
-        return data;
+  static List<Row> createPolygonWKT(int size) {
+    List<Row> data = new ArrayList<>();
+    for (int i = 0; i < size; i++) {
+      // Create polygons each of which only has 1 match in points
+      // Each polygon is an envelope like (-0.5, -0.5, 0.5, 0.5)
+      String minX = String.valueOf(i - 0.5);
+      String minY = String.valueOf(i - 0.5);
+      String maxX = String.valueOf(i + 0.5);
+      String maxY = String.valueOf(i + 0.5);
+      List<String> polygon = new ArrayList<>();
+      polygon.add(minX + " " + minY);
+      polygon.add(minX + " " + maxY);
+      polygon.add(maxX + " " + maxY);
+      polygon.add(maxX + " " + minY);
+      polygon.add(minX + " " + minY);
+      data.add(
+          Row.of(
+              "POLYGON ((" + String.join(", ", polygon) + "))",
+              "polygon" + i,
+              timestamp_base + time_interval * 1000 * i));
     }
+    return data;
+  }
 
-    static Table createTextTable(StreamExecutionEnvironment env, 
StreamTableEnvironment tableEnv, List<Row> data, String[] colNames){
-        TypeInformation<?>[] colTypes = {
-                BasicTypeInfo.STRING_TYPE_INFO,
-                BasicTypeInfo.STRING_TYPE_INFO,
-                BasicTypeInfo.LONG_TYPE_INFO
-        };
-        RowTypeInfo typeInfo = new RowTypeInfo(colTypes, 
Arrays.copyOfRange(colNames, 0, 3));
-        DataStream<Row> ds = env.fromCollection(data).returns(typeInfo);
-        // Generate Time Attribute
-        WatermarkStrategy<Row> wmStrategy =
-                WatermarkStrategy
-                        .<Row>forMonotonousTimestamps()
-                        .withTimestampAssigner((event, timestamp) -> 
event.getFieldAs(2));
-        return 
tableEnv.fromDataStream(ds.assignTimestampsAndWatermarks(wmStrategy), 
$(colNames[0]), $(colNames[1]), $(colNames[2]).rowtime(), 
$(colNames[3]).proctime());
-    }
-
-
+  static Table createTextTable(
+      StreamExecutionEnvironment env,
+      StreamTableEnvironment tableEnv,
+      List<Row> data,
+      String[] colNames) {
+    TypeInformation<?>[] colTypes = {
+      BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, 
BasicTypeInfo.LONG_TYPE_INFO
+    };
+    RowTypeInfo typeInfo = new RowTypeInfo(colTypes, 
Arrays.copyOfRange(colNames, 0, 3));
+    DataStream<Row> ds = env.fromCollection(data).returns(typeInfo);
+    // Generate Time Attribute
+    WatermarkStrategy<Row> wmStrategy =
+        WatermarkStrategy.<Row>forMonotonousTimestamps()
+            .withTimestampAssigner((event, timestamp) -> event.getFieldAs(2));
+    return tableEnv.fromDataStream(
+        ds.assignTimestampsAndWatermarks(wmStrategy),
+        $(colNames[0]),
+        $(colNames[1]),
+        $(colNames[2]).rowtime(),
+        $(colNames[3]).proctime());
+  }
 }
diff --git a/examples/flink-sql/src/test/java/FlinkFunctionsTest.java 
b/examples/flink-sql/src/test/java/FlinkFunctionsTest.java
new file mode 100644
index 0000000000..64c63f8127
--- /dev/null
+++ b/examples/flink-sql/src/test/java/FlinkFunctionsTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import org.junit.Test;
+
+public class FlinkFunctionsTest {
+  @Test
+  public void testS2SpatialJoinWithSmallDataset() {
+    // Test with small dataset
+    FlinkExample.testS2SpatialJoin(5);
+  }
+
+  @Test
+  public void testS2SpatialJoinWithMediumDataset() {
+    // Test with medium dataset
+    FlinkExample.testS2SpatialJoin(10);
+  }
+
+  @Test
+  public void testS2SpatialJoinWithLargeDataset() {
+    // Test with larger dataset
+    FlinkExample.testS2SpatialJoin(20);
+  }
+}
diff --git a/examples/java-spark-sql/pom.xml b/examples/java-spark-sql/pom.xml
index 640cf400f2..787e8d911b 100644
--- a/examples/java-spark-sql/pom.xml
+++ b/examples/java-spark-sql/pom.xml
@@ -23,7 +23,7 @@
 
        <groupId>org.apache.sedona</groupId>
        <artifactId>sedona-java-spark-example</artifactId>
-       <version>1.6.1</version>
+       <version>1.8.0</version>
        <name>Sedona : Examples : Java Spark SQL</name>
        <description>Example project for Apache Sedona with Java and 
Spark.</description>
 
@@ -32,11 +32,31 @@
         <spark.scope>provided</spark.scope>
         <javax.scope>test</javax.scope>
 
-               <sedona.version>1.6.1</sedona.version>
+        <sedona.version>1.8.0</sedona.version>
         <geotools.version>1.8.0-33.1</geotools.version>
-        <spark.version>3.5.7</spark.version>
+        <spark.version>4.0.1</spark.version>
+        <spark.compat.version>4.0</spark.compat.version>
+        <scala.compat.version>2.13</scala.compat.version>
         <javax.servlet.version>4.0.1</javax.servlet.version>
-        <spotless.version>3.0.0</spotless.version>
+
+        <!-- For JDK-17 and above -->
+        <extraJavaArgs>
+            -XX:+IgnoreUnrecognizedVMOptions
+            --add-opens=java.base/java.lang=ALL-UNNAMED
+            --add-opens=java.base/java.lang.invoke=ALL-UNNAMED
+            --add-opens=java.base/java.lang.reflect=ALL-UNNAMED
+            --add-opens=java.base/java.io=ALL-UNNAMED
+            --add-opens=java.base/java.net=ALL-UNNAMED
+            --add-opens=java.base/java.nio=ALL-UNNAMED
+            --add-opens=java.base/java.util=ALL-UNNAMED
+            --add-opens=java.base/java.util.concurrent=ALL-UNNAMED
+            --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED
+            --add-opens=java.base/sun.nio.ch=ALL-UNNAMED
+            --add-opens=java.base/sun.nio.cs=ALL-UNNAMED
+            --add-opens=java.base/sun.security.action=ALL-UNNAMED
+            --add-opens=java.base/sun.util.calendar=ALL-UNNAMED
+            -Djdk.reflect.useDirectMethodHandle=false
+        </extraJavaArgs>
        </properties>
 
        <dependencies>
@@ -47,17 +67,17 @@
         </dependency>
         <dependency>
             <groupId>org.apache.sedona</groupId>
-            <artifactId>sedona-spark-shaded-3.5_2.13</artifactId>
+            
<artifactId>sedona-spark-shaded-${spark.compat.version}_${scala.compat.version}</artifactId>
             <version>${sedona.version}</version>
         </dependency>
         <dependency>
             <groupId>org.apache.spark</groupId>
-            <artifactId>spark-core_2.13</artifactId>
+            <artifactId>spark-core_${scala.compat.version}</artifactId>
             <version>${spark.version}</version>
         </dependency>
         <dependency>
             <groupId>org.apache.spark</groupId>
-            <artifactId>spark-sql_2.13</artifactId>
+            <artifactId>spark-sql_${scala.compat.version}</artifactId>
             <version>${spark.version}</version>
             <scope>${spark.scope}</scope>
         </dependency>
@@ -68,9 +88,10 @@
             <scope>${javax.scope}</scope>
         </dependency>
         <dependency>
-            <groupId>org.junit.jupiter</groupId>
-            <artifactId>junit-jupiter-engine</artifactId>
-            <version>5.2.0-M1</version>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.13.1</version>
+            <scope>test</scope>
         </dependency>
 
        </dependencies>
@@ -79,21 +100,15 @@
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-surefire-plugin</artifactId>
-                <version>3.2.5</version>
+                <version>2.22.2</version>
                 <configuration>
-                    <argLine>
-                        --add-opens=java.base/sun.nio.ch=ALL-UNNAMED
-                        --add-opens=java.base/java.nio=ALL-UNNAMED
-                        --add-opens=java.base/java.lang=ALL-UNNAMED
-                        --add-opens=java.base/java.lang.invoke=ALL-UNNAMED
-                        --add-opens=java.base/java.util=ALL-UNNAMED
-                    </argLine>
+                    <argLine>${extraJavaArgs}</argLine>
                 </configuration>
             </plugin>
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-shade-plugin</artifactId>
-                <version>2.1</version>
+                <version>3.5.0</version>
                 <executions>
                     <execution>
                         <phase>package</phase>
@@ -136,12 +151,7 @@
                                     </excludes>
                                 </filter>
                             </filters>
-                            <argLine>
-                                --add-opens=java.base/sun.nio.ch=ALL-UNNAMED
-                                --add-opens=java.base/java.nio=ALL-UNNAMED
-                                --add-opens=java.base/java.lang=ALL-UNNAMED
-                                --add-opens=java.base/java.util=ALL-UNNAMED
-                            </argLine>
+                            <argLine>${extraJavaArgs}</argLine>
                         </configuration>
                     </execution>
                 </executions>
@@ -149,36 +159,26 @@
             <plugin>
                 <groupId>com.diffplug.spotless</groupId>
                 <artifactId>spotless-maven-plugin</artifactId>
-                <version>${spotless.version}</version>
+                <version>2.35.0</version>
                 <configuration>
-                    <formats>
-                    <!-- you can define as many formats as you want, each is 
independent -->
-                    <format>
-                        <!-- define the files to apply to -->
-                        <includes>
-                        <include>.gitattributes</include>
-                        <include>.gitignore</include>
-                        </includes>
-                        <!-- define the steps to apply to those files -->
-                        <trimTrailingWhitespace/>
-                        <endWithNewline/>
-                        <indent>
-                        <tabs>true</tabs>
-                        <spacesPerTab>4</spacesPerTab>
-                        </indent>
-                    </format>
-                    </formats>
-                    <!-- define a language-specific format -->
                     <java>
-                    <googleJavaFormat>
-                        <version>1.10</version>
-                        <style>AOSP</style>
-                        <reflowLongStrings>true</reflowLongStrings>
-                        <formatJavadoc>false</formatJavadoc>
-                    </googleJavaFormat>
+                        <googleJavaFormat>
+                            <version>1.15.0</version>
+                        </googleJavaFormat>
+                        <licenseHeader>
+                            <file>../../tools/maven/license-header.txt</file>
+                        </licenseHeader>
                     </java>
                 </configuration>
-                </plugin>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>check</goal>
+                        </goals>
+                        <phase>compile</phase>
+                    </execution>
+                </executions>
+            </plugin>
                </plugins>
        </build>
 </project>
diff --git 
a/examples/java-spark-sql/src/main/java/spark/GeoParquetAccessor.java 
b/examples/java-spark-sql/src/main/java/spark/GeoParquetAccessor.java
index ba8e6a1f65..1745d823ab 100644
--- a/examples/java-spark-sql/src/main/java/spark/GeoParquetAccessor.java
+++ b/examples/java-spark-sql/src/main/java/spark/GeoParquetAccessor.java
@@ -16,9 +16,9 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 package spark;
 
+import java.util.List;
 import org.apache.sedona.core.spatialOperator.RangeQuery;
 import org.apache.sedona.core.spatialOperator.SpatialPredicate;
 import org.apache.sedona.core.spatialRDD.SpatialRDD;
@@ -31,60 +31,57 @@ import org.locationtech.jts.geom.Geometry;
 import org.locationtech.jts.geom.GeometryFactory;
 import org.locationtech.jts.geom.Polygon;
 
-import java.util.List;
-
-
 public class GeoParquetAccessor {
 
-    private final SparkSession session;
-    private String parquetPath;
+  private final SparkSession session;
+  private String parquetPath;
 
-    public GeoParquetAccessor() {
-        this.session = new SedonaSparkSession().getSession();
-        this.parquetPath = "";
-    }
+  public GeoParquetAccessor() {
+    this.session = new SedonaSparkSession().getSession();
+    this.parquetPath = "";
+  }
 
-    //Overload with constructor that has Spark session provided
-    //Use to avoid error - can't have two SparkContext objects on one JVM
-    public GeoParquetAccessor(SparkSession session, String parquetPath) {
-        this.session = session;
-        this.parquetPath = parquetPath;
-    }
+  // Overload with constructor that has Spark session provided
+  // Use to avoid error - can't have two SparkContext objects on one JVM
+  public GeoParquetAccessor(SparkSession session, String parquetPath) {
+    this.session = session;
+    this.parquetPath = parquetPath;
+  }
 
-    public List<Geometry> selectFeaturesByPolygon(double xmin, double ymax,
-                                                  double xmax, double ymin,
-                                                  String geometryColumn) {
+  public List<Geometry> selectFeaturesByPolygon(
+      double xmin, double ymax, double xmax, double ymin, String 
geometryColumn) {
 
-        //Read the GeoParquet file into a DataFrame
-        Dataset<Row> insarDF = 
session.read().format("geoparquet").load(parquetPath);
+    // Read the GeoParquet file into a DataFrame
+    Dataset<Row> insarDF = 
session.read().format("geoparquet").load(parquetPath);
 
-        //Convert the DataFrame to a SpatialRDD
-        //The second argument to toSpatialRdd is the name of the geometry 
column.
-        SpatialRDD<Geometry> insarRDD = Adapter.toSpatialRdd(insarDF, 
geometryColumn);
+    // Convert the DataFrame to a SpatialRDD
+    // The second argument to toSpatialRdd is the name of the geometry column.
+    SpatialRDD<Geometry> insarRDD = Adapter.toSpatialRdd(insarDF, 
geometryColumn);
 
-        // Define the polygon for the spatial query
-        GeometryFactory geometryFactory = new GeometryFactory();
-        Coordinate[] coordinates = new Coordinate[] {
-            new Coordinate(xmin, ymin),
-            new Coordinate(xmax, ymin),
-            new Coordinate(xmax, ymax),
-            new Coordinate(xmin, ymax),
-            new Coordinate(xmin, ymin) // A closed polygon has the same start 
and end coordinate
+    // Define the polygon for the spatial query
+    GeometryFactory geometryFactory = new GeometryFactory();
+    Coordinate[] coordinates =
+        new Coordinate[] {
+          new Coordinate(xmin, ymin),
+          new Coordinate(xmax, ymin),
+          new Coordinate(xmax, ymax),
+          new Coordinate(xmin, ymax),
+          new Coordinate(xmin, ymin) // A closed polygon has the same start 
and end coordinate
         };
-        Polygon queryPolygon = geometryFactory.createPolygon(coordinates);
-
-        // Perform the spatial range query
-        // This will return all geometries that intersect with the query 
polygon.
-        // Alternatives are SpatialPredicate.CONTAINS or 
SpatialPredicate.WITHIN
-        SpatialRDD<Geometry> resultRDD = new SpatialRDD<>();
-        try {
-            resultRDD.rawSpatialRDD = RangeQuery.SpatialRangeQuery(insarRDD, 
queryPolygon, SpatialPredicate.INTERSECTS, false);
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
+    Polygon queryPolygon = geometryFactory.createPolygon(coordinates);
 
-        // Collect the results back to the driver
-        return resultRDD.getRawSpatialRDD().collect();
+    // Perform the spatial range query
+    // This will return all geometries that intersect with the query polygon.
+    // Alternatives are SpatialPredicate.CONTAINS or SpatialPredicate.WITHIN
+    SpatialRDD<Geometry> resultRDD = new SpatialRDD<>();
+    try {
+      resultRDD.rawSpatialRDD =
+          RangeQuery.SpatialRangeQuery(insarRDD, queryPolygon, 
SpatialPredicate.INTERSECTS, false);
+    } catch (Exception e) {
+      e.printStackTrace();
     }
 
+    // Collect the results back to the driver
+    return resultRDD.getRawSpatialRDD().collect();
+  }
 }
diff --git 
a/examples/java-spark-sql/src/main/java/spark/SedonaGeoParquetMain.java 
b/examples/java-spark-sql/src/main/java/spark/SedonaGeoParquetMain.java
index 4a11437283..5dfe45a46e 100644
--- a/examples/java-spark-sql/src/main/java/spark/SedonaGeoParquetMain.java
+++ b/examples/java-spark-sql/src/main/java/spark/SedonaGeoParquetMain.java
@@ -16,46 +16,45 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 package spark;
 
-import org.locationtech.jts.geom.Coordinate;
-import org.locationtech.jts.geom.Geometry;
-
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.List;
 import java.util.Properties;
+import org.locationtech.jts.geom.Coordinate;
+import org.locationtech.jts.geom.Geometry;
 
 public class SedonaGeoParquetMain {
 
-    protected static Properties properties;
-    protected static String parquetPath;
-    protected static SedonaSparkSession session;
+  protected static Properties properties;
+  protected static String parquetPath;
+  protected static SedonaSparkSession session;
 
-    public static void main(String args[]) {
+  public static void main(String args[]) {
 
-        session = new SedonaSparkSession();
-        //Get parquetPath and any other application.properties
-        try {
-            ClassLoader loader = 
Thread.currentThread().getContextClassLoader();
-            Properties properties = new Properties();
-            InputStream is = 
loader.getResourceAsStream("application.properties");
-            properties.load(is);
-            parquetPath = properties.getProperty("parquet.path");
-        } catch (IOException e) {
-            e.printStackTrace();
-            parquetPath = "";
-        }
-        GeoParquetAccessor accessor = new GeoParquetAccessor(session.session, 
parquetPath);
-        //Test parquet happens to be in New Zealand Transverse Mercator 
(EPSG:2193) (meters)
-        List<Geometry> geoms = accessor.selectFeaturesByPolygon(1155850, 
4819840, 1252000, 4748100, "geometry");
-        System.out.println("Coordinates of convex hull of points in 
boundary:");
-        for (Geometry geom : geoms) {
-            Coordinate[] convexHullCoordinates = 
geom.convexHull().getCoordinates();
-            for (Coordinate coord : convexHullCoordinates) {
-                System.out.println(String.format("\t%s", coord.toString()));
-            }
-        }
+    session = new SedonaSparkSession();
+    // Get parquetPath and any other application.properties
+    try {
+      ClassLoader loader = Thread.currentThread().getContextClassLoader();
+      Properties properties = new Properties();
+      InputStream is = loader.getResourceAsStream("application.properties");
+      properties.load(is);
+      parquetPath = properties.getProperty("parquet.path");
+    } catch (IOException e) {
+      e.printStackTrace();
+      parquetPath = "";
+    }
+    GeoParquetAccessor accessor = new GeoParquetAccessor(session.session, 
parquetPath);
+    // Test parquet happens to be in New Zealand Transverse Mercator 
(EPSG:2193) (meters)
+    List<Geometry> geoms =
+        accessor.selectFeaturesByPolygon(1155850, 4819840, 1252000, 4748100, 
"geometry");
+    System.out.println("Coordinates of convex hull of points in boundary:");
+    for (Geometry geom : geoms) {
+      Coordinate[] convexHullCoordinates = geom.convexHull().getCoordinates();
+      for (Coordinate coord : convexHullCoordinates) {
+        System.out.println(String.format("\t%s", coord.toString()));
+      }
     }
+  }
 }
diff --git 
a/examples/java-spark-sql/src/main/java/spark/SedonaSparkSession.java 
b/examples/java-spark-sql/src/main/java/spark/SedonaSparkSession.java
index 6be6c99585..aaf1c938fe 100644
--- a/examples/java-spark-sql/src/main/java/spark/SedonaSparkSession.java
+++ b/examples/java-spark-sql/src/main/java/spark/SedonaSparkSession.java
@@ -16,36 +16,35 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 package spark;
 
 import org.apache.sedona.spark.SedonaContext;
 import org.apache.spark.sql.SparkSession;
 
-
 public class SedonaSparkSession {
 
-    public SparkSession session;
-
-    public SedonaSparkSession() {
-
-        //Set configuration for localhost spark cluster. Intended to be run 
from IDE or similar.
-        //Use SedonaContext builder to create SparkSession with Sedona 
extensions
-        SparkSession config = SedonaContext.builder()
-                                   .appName(this.getClass().getSimpleName())
-                                   .master("local[*]")
-                                   .config("spark.ui.enabled", "false")
-                                   .config("spark.driver.extraJavaOptions",
-                                        
"--add-opens=java.base/sun.nio.ch=ALL-UNNAMED 
--add-opens=java.base/java.lang.invoke=ALL-UNNAMED")
-                                   .getOrCreate();
-
-        //Create Sedona-enabled SparkSession
-        this.session = SedonaContext.create(config);
-    }
-
-    public SparkSession getSession() {
-        // Access SparkSession object
-        return this.session;
-    }
-
+  public SparkSession session;
+
+  public SedonaSparkSession() {
+
+    // Set configuration for localhost spark cluster. Intended to be run from 
IDE or similar.
+    // Use SedonaContext builder to create SparkSession with Sedona extensions
+    SparkSession config =
+        SedonaContext.builder()
+            .appName(this.getClass().getSimpleName())
+            .master("local[*]")
+            .config("spark.ui.enabled", "false")
+            .config(
+                "spark.driver.extraJavaOptions",
+                "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED 
--add-opens=java.base/java.lang.invoke=ALL-UNNAMED")
+            .getOrCreate();
+
+    // Create Sedona-enabled SparkSession
+    this.session = SedonaContext.create(config);
+  }
+
+  public SparkSession getSession() {
+    // Access SparkSession object
+    return this.session;
+  }
 }
diff --git a/examples/java-spark-sql/src/test/java/spark/SedonaParquetTest.java 
b/examples/java-spark-sql/src/test/java/spark/SedonaParquetTest.java
index 036cdda956..f965e5b510 100644
--- a/examples/java-spark-sql/src/test/java/spark/SedonaParquetTest.java
+++ b/examples/java-spark-sql/src/test/java/spark/SedonaParquetTest.java
@@ -16,87 +16,71 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 package spark;
 
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.Properties;
-
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
 
 public class SedonaParquetTest {
 
-
-    protected static Properties properties;
-    protected static String parquetPath;
-    protected static SedonaSparkSession session;
-
-    public SedonaParquetTest() {
-    }
-
-    @BeforeAll
-    public static void setUpClass() throws IOException {
-
-        session = new SedonaSparkSession();
-        //Get parquetPath and any other application.properties
-        try {
-            ClassLoader loader = 
Thread.currentThread().getContextClassLoader();
-            Properties properties = new Properties();
-            InputStream is = 
loader.getResourceAsStream("application.properties");
-            properties.load(is);
-            parquetPath = properties.getProperty("parquet.path");
-        } catch (IOException e) {
-            e.printStackTrace();
-            parquetPath = "";
-        }
-
+  protected static Properties properties;
+  protected static String parquetPath;
+  protected static SedonaSparkSession session;
+
+  public SedonaParquetTest() {}
+
+  @BeforeClass
+  public static void setUpClass() throws IOException {
+
+    session = new SedonaSparkSession();
+    // Get parquetPath and any other application.properties
+    try {
+      ClassLoader loader = Thread.currentThread().getContextClassLoader();
+      Properties properties = new Properties();
+      InputStream is = loader.getResourceAsStream("application.properties");
+      properties.load(is);
+      parquetPath = properties.getProperty("parquet.path");
+    } catch (IOException e) {
+      e.printStackTrace();
+      parquetPath = "";
     }
-
-    @AfterAll
-    public static void tearDownClass() {
-    }
-
-    @BeforeEach
-    public void setUp() {
-    }
-
-    @AfterEach
-    public void tearDown() {
-    }
-
-    @Test
-    public void connects() {
-        assertNotNull(session, "SparkSedonaSession not initialized 
correctly.");
-        assertNotNull(session.session, "Spark session not initialized inside 
SparkSedonaSession.");
-    }
-
-    @Test
-    public void parquetAccessible() {
-        File file = new File(parquetPath);
-        assertTrue(file.exists(), "Parquet file does not exist.");
-        assertTrue(file.canRead(), "Can't read geoparquet file on record.");
-    }
-
-    @Test
-    public void canLoadRDD() {
-        assertNotNull(session, "Session is null.");
-        Dataset<Row> insarDF = session.session.read()
-                .format("geoparquet")
-                .load(parquetPath);
-        assertNotNull(insarDF, "Dataset was not created.");
-        assertTrue(insarDF.count() > 0, "Dataset is empty.");
-    }
-
+  }
+
+  @AfterClass
+  public static void tearDownClass() {}
+
+  @Before
+  public void setUp() {}
+
+  @Test
+  public void connects() {
+    assertNotNull("SparkSedonaSession not initialized correctly.", session);
+    assertNotNull("Spark session not initialized inside SparkSedonaSession.", 
session.session);
+  }
+
+  @Test
+  public void parquetAccessible() {
+    File file = new File(parquetPath);
+    assertTrue("Parquet file does not exist.", file.exists());
+    assertTrue("Can't read geoparquet file on record.", file.canRead());
+  }
+
+  @Test
+  public void canLoadRDD() {
+    assertNotNull("Session is null.", session);
+    Dataset<Row> insarDF = 
session.session.read().format("geoparquet").load(parquetPath);
+    assertNotNull("Dataset was not created.", insarDF);
+    assertTrue("Dataset is empty.", insarDF.count() > 0);
+  }
 }
diff --git a/examples/spark-sql/pom.xml b/examples/spark-sql/pom.xml
index f2c647d653..28066a6460 100644
--- a/examples/spark-sql/pom.xml
+++ b/examples/spark-sql/pom.xml
@@ -30,7 +30,7 @@
 
     <properties>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-        <dependency.scope>compile</dependency.scope>
+        <spark.scope>provided</spark.scope>
         <sedona.scope>compile</sedona.scope>
         <geotools.scope>compile</geotools.scope>
 
@@ -66,13 +66,13 @@
             <groupId>org.apache.spark</groupId>
             <artifactId>spark-core_${scala.compat.version}</artifactId>
             <version>${spark.version}</version>
-            <scope>${dependency.scope}</scope>
+            <scope>${spark.scope}</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.spark</groupId>
             <artifactId>spark-sql_${scala.compat.version}</artifactId>
             <version>${spark.version}</version>
-            <scope>${dependency.scope}</scope>
+            <scope>${spark.scope}</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.sedona</groupId>
@@ -185,7 +185,7 @@
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-aws</artifactId>
             <version>${hadoop.version}</version>
-            <scope>${dependency.scope}</scope>
+            <scope>${spark.scope}</scope>
         </dependency>
         <dependency>
             <groupId>junit</groupId>
@@ -193,6 +193,12 @@
             <version>4.13.1</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.scalatest</groupId>
+            <artifactId>scalatest_${scala.compat.version}</artifactId>
+            <version>3.2.15</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
     <repositories>
         <repository>
@@ -355,6 +361,48 @@
                     <argLine>${extraJavaArgs}</argLine>
                 </configuration>
             </plugin>
+            <plugin>
+                <groupId>org.scalatest</groupId>
+                <artifactId>scalatest-maven-plugin</artifactId>
+                <version>2.2.0</version>
+                <configuration>
+                    
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
+                    <junitxml>.</junitxml>
+                    <filereports>TestSuite.txt</filereports>
+                    <argLine>--add-opens=java.base/sun.nio.ch=ALL-UNNAMED 
--add-opens=java.base/java.nio=ALL-UNNAMED 
--add-opens=java.base/java.lang=ALL-UNNAMED 
--add-opens=java.base/java.lang.invoke=ALL-UNNAMED 
--add-opens=java.base/java.util=ALL-UNNAMED 
--add-opens=java.base/java.lang.reflect=ALL-UNNAMED 
--add-opens=java.base/java.io=ALL-UNNAMED 
--add-opens=java.base/java.net=ALL-UNNAMED 
--add-opens=java.base/java.util.concurrent=ALL-UNNAMED 
--add-opens=java.base/java.util.concurrent.a [...]
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>test</id>
+                        <goals>
+                            <goal>test</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>com.diffplug.spotless</groupId>
+                <artifactId>spotless-maven-plugin</artifactId>
+                <version>2.35.0</version>
+                <configuration>
+                    <java>
+                        <googleJavaFormat>
+                            <version>1.15.0</version>
+                        </googleJavaFormat>
+                        <licenseHeader>
+                            <file>../../tools/maven/license-header.txt</file>
+                        </licenseHeader>
+                    </java>
+                </configuration>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>check</goal>
+                        </goals>
+                        <phase>compile</phase>
+                    </execution>
+                </executions>
+            </plugin>
         </plugins>
         <resources>
             <resource>
diff --git a/examples/spark-sql/src/main/scala/Main.scala 
b/examples/spark-sql/src/main/scala/Main.scala
index cd3e4c67a8..45efd85165 100644
--- a/examples/spark-sql/src/main/scala/Main.scala
+++ b/examples/spark-sql/src/main/scala/Main.scala
@@ -26,6 +26,15 @@ import 
org.apache.sedona.viz.core.Serde.SedonaVizKryoRegistrator
 import org.apache.sedona.viz.sql.utils.SedonaVizRegistrator
 
 
+/**
+ * Main entry point for running Sedona SQL, RDD, and Visualization examples.
+ * Demonstrates various spatial operations including:
+ * - SQL-based spatial queries and joins
+ * - GeoParquet I/O operations
+ * - Shapefile and raster data handling
+ * - RDD-based spatial analysis
+ * - Spatial visualization techniques
+ */
 object Main extends App {
   Logger.getRootLogger().setLevel(Level.WARN)
 
@@ -39,20 +48,31 @@ object Main extends App {
 
        val resourceFolder = 
System.getProperty("user.dir")+"/src/test/resources/"
 
+  // SQL-based spatial operations
+  println("=== Running SQL Examples ===")
   testPredicatePushdownAndRangeJonQuery(sedona)
   testDistanceJoinQuery(sedona)
   testAggregateFunction(sedona)
   testShapefileConstructor(sedona)
   testRasterIOAndMapAlgebra(sedona)
 
+  // GeoParquet operations
+  println("\n=== Running GeoParquet Examples ===")
+  testGeoParquetWriter(sedona)
+  testGeoParquetReader(sedona)
+
+  // RDD-based spatial analysis
+  println("\n=== Running RDD Examples ===")
   visualizeSpatialColocation(sedona)
   calculateSpatialColocation(sedona)
 
+  // Visualization examples
+  println("\n=== Running Visualization Examples ===")
   buildScatterPlot(sedona)
   buildHeatMap(sedona)
   buildChoroplethMap(sedona)
   parallelFilterRenderNoStitch(sedona)
   sqlApiVisualization(sedona)
-  System.out.println("All SedonaSQL DEMOs passed!")
 
+  println("\n✅ All Sedona examples completed successfully!")
 }
diff --git a/examples/spark-sql/src/main/scala/RddExample.scala 
b/examples/spark-sql/src/main/scala/RddExample.scala
index 7dc54860cf..6f66305150 100644
--- a/examples/spark-sql/src/main/scala/RddExample.scala
+++ b/examples/spark-sql/src/main/scala/RddExample.scala
@@ -17,7 +17,6 @@
  * under the License.
  */
 
-import Main.resourceFolder
 import org.apache.sedona.core.enums.{GridType, IndexType}
 import org.apache.sedona.core.formatMapper.shapefileParser.ShapefileReader
 import org.apache.sedona.core.spatialOperator.JoinQuery
@@ -34,6 +33,8 @@ import java.awt.Color
 
 object RddExample {
 
+  val resourceFolder = System.getProperty("user.dir")+"/src/test/resources/"
+
   // Data link (in shapefile): https://geo.nyu.edu/catalog/nyu_2451_34514
   val nycArealandmarkShapefileLocation = 
resourceFolder+"nyc-area-landmark-shapefile"
 
@@ -42,16 +43,25 @@ object RddExample {
 
   val colocationMapLocation = System.getProperty("user.dir")+"/colocationMap"
 
+  /**
+   * Visualizes spatial co-location between NYC landmarks and taxi pickup 
points.
+   * Creates an overlay visualization with landmarks (scatter plot) and taxi 
trips (heat map).
+   *
+   * Note: This function uses RDD API to demonstrate low-level spatial 
operations.
+   * For DataFrame-based approach, see SqlExample.
+   *
+   * @param sedona SparkSession with Sedona extensions enabled
+   */
   def visualizeSpatialColocation(sedona: SparkSession): Unit =
   {
     // Prepare NYC area landmarks which includes airports, museums, colleges, 
hospitals
-    var arealmRDD = ShapefileReader.readToPolygonRDD(sedona.sparkContext, 
nycArealandmarkShapefileLocation)
+    val arealmRDD = ShapefileReader.readToPolygonRDD(sedona.sparkContext, 
nycArealandmarkShapefileLocation)
 
     // Prepare NYC taxi trips. Only use the taxi trips' pickup points
-    var tripDf = 
sedona.read.format("csv").option("delimiter",",").option("header","false").load(nyctripCSVLocation)
+    val tripDf = 
sedona.read.format("csv").option("delimiter",",").option("header","false").load(nyctripCSVLocation)
     // Convert from DataFrame to RDD. This can also be done directly through 
Sedona RDD API.
     tripDf.createOrReplaceTempView("tripdf")
-    var tripRDD = Adapter.toSpatialRdd(sedona.sql("select 
ST_Point(cast(tripdf._c0 as Decimal(24, 14)), cast(tripdf._c1 as Decimal(24, 
14))) as point from tripdf")
+    val tripRDD = Adapter.toSpatialRdd(sedona.sql("select 
ST_Point(cast(tripdf._c0 as Decimal(24, 14)), cast(tripdf._c1 as Decimal(24, 
14))) as point from tripdf")
       , "point")
 
     // Convert the Coordinate Reference System from degree-based to 
meter-based. This returns the accurate distance calculate.
@@ -79,6 +89,16 @@ object RddExample {
     imageGenerator.SaveRasterImageAsLocalFile(overlayOperator.backRasterImage, 
colocationMapLocation, ImageType.PNG)
   }
 
+  /**
+   * Calculates spatial co-location using Ripley's K function.
+   * Analyzes whether taxi trips are clustered around NYC landmarks at various 
distance thresholds.
+   * Uses distance join queries to compute co-location statistics.
+   *
+   * The Ripley's K function tests for spatial clustering/dispersion by 
comparing
+   * observed vs expected point patterns at increasing distance bands.
+   *
+   * @param sedona SparkSession with Sedona extensions enabled
+   */
   def calculateSpatialColocation(sedona: SparkSession): Unit =
   {
 
@@ -88,22 +108,22 @@ object RddExample {
     // Use the center point of area landmarks to check co-location. This is 
required by Ripley's K function.
     arealmRDD.rawSpatialRDD = arealmRDD.rawSpatialRDD.rdd.map[Geometry](f=>
     {
-      var geom = f.getCentroid
+      val geom = f.getCentroid
       // Copy non-spatial attributes
       geom.setUserData(f.getUserData)
       geom
     })
 
     // The following two lines are optional. The purpose is to show the 
structure of the shapefile.
-    var arealmDf = Adapter.toDf(arealmRDD, sedona)
+    val arealmDf = Adapter.toDf(arealmRDD, sedona)
     arealmDf.show()
 
     // Prepare NYC taxi trips. Only use the taxi trips' pickup points
-    var tripDf = 
sedona.read.format("csv").option("delimiter",",").option("header","false").load(nyctripCSVLocation)
+    val tripDf = 
sedona.read.format("csv").option("delimiter",",").option("header","false").load(nyctripCSVLocation)
     tripDf.show() // Optional
     // Convert from DataFrame to RDD. This can also be done directly through 
Sedona RDD API.
     tripDf.createOrReplaceTempView("tripdf")
-    var tripRDD = Adapter.toSpatialRdd(sedona.sql("select 
ST_Point(cast(tripdf._c0 as Decimal(24, 14)), cast(tripdf._c1 as Decimal(24, 
14))) as point, 'def' as trip_attr from tripdf")
+    val tripRDD = Adapter.toSpatialRdd(sedona.sql("select 
ST_Point(cast(tripdf._c0 as Decimal(24, 14)), cast(tripdf._c1 as Decimal(24, 
14))) as point, 'def' as trip_attr from tripdf")
       , "point")
 
     // Convert the Coordinate Reference System from degree-based to 
meter-based. This returns the accurate distance calculate.
@@ -127,27 +147,27 @@ object RddExample {
     val beginDistance = 0.0
     var currentDistance = 0.0
 
-    // Start the iteration
+    // Start the iteration - test multiple distance bands
     println("distance(meter),observedL,difference,coLocationStatus")
     for (i <- 1 to iterationTimes)
     {
       currentDistance = beginDistance + i*distanceIncrement
 
-      var bufferedArealmRDD = new CircleRDD(arealmRDD,currentDistance)
+      val bufferedArealmRDD = new CircleRDD(arealmRDD,currentDistance)
       bufferedArealmRDD.spatialPartitioning(tripRDD.getPartitioner)
       //    Run Sedona Distance Join Query
-      var adjacentMatrix = JoinQuery.DistanceJoinQueryFlat(tripRDD, 
bufferedArealmRDD,true,true)
+      val adjacentMatrix = JoinQuery.DistanceJoinQueryFlat(tripRDD, 
bufferedArealmRDD,true,true)
 
       //      Uncomment the following two lines if you want to see what the 
join result looks like in SparkSQL
       //      import scala.collection.JavaConversions._
-      //      var adjacentMatrixDf = Adapter.toDf(adjacentMatrix, 
arealmRDD.fieldNames, tripRDD.fieldNames, sparkSession)
+      //      val adjacentMatrixDf = Adapter.toDf(adjacentMatrix, 
arealmRDD.fieldNames, tripRDD.fieldNames, sparkSession)
       //      adjacentMatrixDf.show()
 
-      var observedK = 
adjacentMatrix.count()*area*1.0/(arealmRDD.approximateTotalCount*tripRDD.approximateTotalCount)
-      var observedL = Math.sqrt(observedK/Math.PI)
-      var expectedL = currentDistance
-      var colocationDifference = observedL  - expectedL
-      var colocationStatus = {if (colocationDifference>0) "Co-located" else 
"Dispersed"}
+      val observedK = 
adjacentMatrix.count()*area*1.0/(arealmRDD.approximateTotalCount*tripRDD.approximateTotalCount)
+      val observedL = Math.sqrt(observedK/Math.PI)
+      val expectedL = currentDistance
+      val colocationDifference = observedL  - expectedL
+      val colocationStatus = {if (colocationDifference>0) "Co-located" else 
"Dispersed"}
 
       
println(s"""$currentDistance,$observedL,$colocationDifference,$colocationStatus""")
     }
diff --git a/examples/spark-sql/src/main/scala/SqlExample.scala 
b/examples/spark-sql/src/main/scala/SqlExample.scala
index 367f06160f..ed34c08d2f 100644
--- a/examples/spark-sql/src/main/scala/SqlExample.scala
+++ b/examples/spark-sql/src/main/scala/SqlExample.scala
@@ -17,42 +17,46 @@
  * under the License.
  */
 
-import Main.resourceFolder
-import org.apache.sedona.core.formatMapper.shapefileParser.ShapefileReader
-import org.apache.sedona.core.spatialRDD.SpatialRDD
 import org.apache.sedona.core.utils.SedonaConf
-import org.apache.sedona.sql.utils.Adapter
-import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.{SaveMode, SparkSession}
 import org.locationtech.jts.geom.{Coordinate, Geometry, GeometryFactory}
 
 
 object SqlExample {
 
+  val resourceFolder = System.getProperty("user.dir")+"/src/test/resources/"
+
   val csvPolygonInputLocation = resourceFolder + "testenvelope.csv"
   val csvPointInputLocation = resourceFolder + "testpoint.csv"
   val shapefileInputLocation = resourceFolder + "shapefiles/dbf"
   val rasterdatalocation = resourceFolder + "raster/"
 
+  /**
+   * Demonstrates predicate pushdown optimization and range join queries with 
spatial indexing.
+   * Tests ST_Contains predicate with polygon and point data, including 
spatial filter pushdown.
+   *
+   * @param sedona SparkSession with Sedona extensions enabled
+   */
   def testPredicatePushdownAndRangeJonQuery(sedona: SparkSession):Unit =
   {
     val sedonaConf = new SedonaConf(sedona.conf)
     println(sedonaConf)
 
-    var polygonCsvDf = 
sedona.read.format("csv").option("delimiter",",").option("header","false").load(csvPolygonInputLocation)
+    val polygonCsvDf = 
sedona.read.format("csv").option("delimiter",",").option("header","false").load(csvPolygonInputLocation)
     polygonCsvDf.createOrReplaceTempView("polygontable")
     polygonCsvDf.show()
-    var polygonDf = sedona.sql("select 
ST_PolygonFromEnvelope(cast(polygontable._c0 as 
Decimal(24,20)),cast(polygontable._c1 as Decimal(24,20)), cast(polygontable._c2 
as Decimal(24,20)), cast(polygontable._c3 as Decimal(24,20))) as polygonshape 
from polygontable")
+    val polygonDf = sedona.sql("select 
ST_PolygonFromEnvelope(cast(polygontable._c0 as 
Decimal(24,20)),cast(polygontable._c1 as Decimal(24,20)), cast(polygontable._c2 
as Decimal(24,20)), cast(polygontable._c3 as Decimal(24,20))) as polygonshape 
from polygontable")
     polygonDf.createOrReplaceTempView("polygondf")
     polygonDf.show()
 
-    var pointCsvDF = 
sedona.read.format("csv").option("delimiter",",").option("header","false").load(csvPointInputLocation)
+    val pointCsvDF = 
sedona.read.format("csv").option("delimiter",",").option("header","false").load(csvPointInputLocation)
     pointCsvDF.createOrReplaceTempView("pointtable")
     pointCsvDF.show()
-    var pointDf = sedona.sql("select ST_Point(cast(pointtable._c0 as 
Decimal(24,20)),cast(pointtable._c1 as Decimal(24,20))) as pointshape from 
pointtable")
+    val pointDf = sedona.sql("select ST_Point(cast(pointtable._c0 as 
Decimal(24,20)),cast(pointtable._c1 as Decimal(24,20))) as pointshape from 
pointtable")
     pointDf.createOrReplaceTempView("pointdf")
     pointDf.show()
 
-    var rangeJoinDf = sedona.sql("select * from polygondf, pointdf where 
ST_Contains(polygondf.polygonshape,pointdf.pointshape) " +
+    val rangeJoinDf = sedona.sql("select * from polygondf, pointdf where 
ST_Contains(polygondf.polygonshape,pointdf.pointshape) " +
       "and ST_Contains(ST_PolygonFromEnvelope(1.0,101.0,501.0,601.0), 
polygondf.polygonshape)")
 
     // Write result to GeoParquet file
@@ -62,41 +66,53 @@ object SqlExample {
     assert (rangeJoinDf.count()==500)
   }
 
+  /**
+   * Demonstrates distance join query that finds all point pairs within a 
specified distance.
+   * Uses ST_Distance predicate with distance-based join optimization.
+   *
+   * @param sedona SparkSession with Sedona extensions enabled
+   */
   def testDistanceJoinQuery(sedona: SparkSession): Unit =
   {
     val sedonaConf = new SedonaConf(sedona.conf)
     println(sedonaConf)
 
-    var pointCsvDF1 = 
sedona.read.format("csv").option("delimiter",",").option("header","false").load(csvPointInputLocation)
+    val pointCsvDF1 = 
sedona.read.format("csv").option("delimiter",",").option("header","false").load(csvPointInputLocation)
     pointCsvDF1.createOrReplaceTempView("pointtable")
     pointCsvDF1.show()
-    var pointDf1 = sedona.sql("select ST_Point(cast(pointtable._c0 as 
Decimal(24,20)),cast(pointtable._c1 as Decimal(24,20))) as pointshape1 from 
pointtable")
+    val pointDf1 = sedona.sql("select ST_Point(cast(pointtable._c0 as 
Decimal(24,20)),cast(pointtable._c1 as Decimal(24,20))) as pointshape1 from 
pointtable")
     pointDf1.createOrReplaceTempView("pointdf1")
     pointDf1.show()
 
-    var pointCsvDF2 = 
sedona.read.format("csv").option("delimiter",",").option("header","false").load(csvPointInputLocation)
+    val pointCsvDF2 = 
sedona.read.format("csv").option("delimiter",",").option("header","false").load(csvPointInputLocation)
     pointCsvDF2.createOrReplaceTempView("pointtable")
     pointCsvDF2.show()
-    var pointDf2 = sedona.sql("select ST_Point(cast(pointtable._c0 as 
Decimal(24,20)),cast(pointtable._c1 as Decimal(24,20))) as pointshape2 from 
pointtable")
+    val pointDf2 = sedona.sql("select ST_Point(cast(pointtable._c0 as 
Decimal(24,20)),cast(pointtable._c1 as Decimal(24,20))) as pointshape2 from 
pointtable")
     pointDf2.createOrReplaceTempView("pointdf2")
     pointDf2.show()
 
-    var distanceJoinDf = sedona.sql("select * from pointdf1, pointdf2 where 
ST_Distance(pointdf1.pointshape1,pointdf2.pointshape2) < 2")
+    val distanceJoinDf = sedona.sql("select * from pointdf1, pointdf2 where 
ST_Distance(pointdf1.pointshape1,pointdf2.pointshape2) < 2")
     distanceJoinDf.explain()
     distanceJoinDf.show(10)
     assert (distanceJoinDf.count()==2998)
   }
 
+  /**
+   * Demonstrates spatial aggregate function ST_Envelope_Aggr to compute 
bounding box of point set.
+   * Validates the computed envelope matches the expected boundary.
+   *
+   * @param sedona SparkSession with Sedona extensions enabled
+   */
   def testAggregateFunction(sedona: SparkSession): Unit =
   {
     val sedonaConf = new SedonaConf(sedona.conf)
     println(sedonaConf)
 
-    var pointCsvDF = 
sedona.read.format("csv").option("delimiter",",").option("header","false").load(csvPointInputLocation)
+    val pointCsvDF = 
sedona.read.format("csv").option("delimiter",",").option("header","false").load(csvPointInputLocation)
     pointCsvDF.createOrReplaceTempView("pointtable")
-    var pointDf = sedona.sql("select ST_Point(cast(pointtable._c0 as 
Decimal(24,20)), cast(pointtable._c1 as Decimal(24,20))) as arealandmark from 
pointtable")
+    val pointDf = sedona.sql("select ST_Point(cast(pointtable._c0 as 
Decimal(24,20)), cast(pointtable._c1 as Decimal(24,20))) as arealandmark from 
pointtable")
     pointDf.createOrReplaceTempView("pointdf")
-    var boundary = sedona.sql("select ST_Envelope_Aggr(pointdf.arealandmark) 
from pointdf")
+    val boundary = sedona.sql("select ST_Envelope_Aggr(pointdf.arealandmark) 
from pointdf")
     val coordinates:Array[Coordinate] = new Array[Coordinate](5)
     coordinates(0) = new Coordinate(1.1,101.1)
     coordinates(1) = new Coordinate(1.1,1100.1)
@@ -108,25 +124,92 @@ object SqlExample {
     
assert(boundary.take(1)(0).get(0)==geometryFactory.createPolygon(coordinates))
   }
 
+  /**
+   * Demonstrates reading shapefiles using the modern DataFrame-based reader.
+   * Shows how to load shapefile data and query geometry and attribute fields.
+   *
+   * @param sedona SparkSession with Sedona extensions enabled
+   */
   def testShapefileConstructor(sedona: SparkSession): Unit =
   {
-    var spatialRDD = new SpatialRDD[Geometry]
-    spatialRDD = ShapefileReader.readToGeometryRDD(sedona.sparkContext, 
shapefileInputLocation)
-    var rawSpatialDf = Adapter.toDf(spatialRDD,sedona)
-    rawSpatialDf.createOrReplaceTempView("rawSpatialDf")
-    var spatialDf = sedona.sql("""
+    // Read shapefile using the DataFrame-based reader
+    val spatialDf = 
sedona.read.format("shapefile").load(shapefileInputLocation)
+    spatialDf.createOrReplaceTempView("rawSpatialDf")
+
+    // Select specific columns
+    val resultDf = sedona.sql("""
                                        | SELECT geometry, STATEFP, COUNTYFP
                                        | FROM rawSpatialDf
                                      """.stripMargin)
-    spatialDf.show()
-    spatialDf.printSchema()
+    resultDf.show()
+    resultDf.printSchema()
   }
 
+  /**
+   * Demonstrates raster data I/O and map algebra operations.
+   * Loads GeoTIFF raster data and performs various raster operations.
+   *
+   * @param sedona SparkSession with Sedona extensions enabled
+   */
   def testRasterIOAndMapAlgebra(sedona: SparkSession): Unit = {
-    var df = sedona.read.format("binaryFile").option("dropInvalid", 
true).load(rasterdatalocation).selectExpr("RS_FromGeoTiff(content) as raster", 
"path")
+    val df = sedona.read.format("binaryFile").option("dropInvalid", 
true).load(rasterdatalocation).selectExpr("RS_FromGeoTiff(content) as raster", 
"path")
     df.printSchema()
     df.show()
     df.selectExpr("RS_Metadata(raster) as metadata", "RS_GeoReference(raster) 
as georef", "RS_NumBands(raster) as numBands").show(false)
     df.selectExpr("RS_AddBand(raster, raster, 1) as raster_extraband").show()
   }
+
+  /**
+   * Demonstrates writing spatial DataFrame to GeoParquet format.
+   * GeoParquet is a cloud-native geospatial data format based on Apache 
Parquet.
+   *
+   * @param sedona SparkSession with Sedona extensions enabled
+   */
+  def testGeoParquetWriter(sedona: SparkSession): Unit = {
+    // Create a sample DataFrame with geometries
+    val pointCsvDF = 
sedona.read.format("csv").option("delimiter",",").option("header","false").load(csvPointInputLocation)
+    pointCsvDF.createOrReplaceTempView("pointtable")
+    val pointDf = sedona.sql("select ST_Point(cast(pointtable._c0 as 
Decimal(24,20)),cast(pointtable._c1 as Decimal(24,20))) as geometry from 
pointtable")
+
+    // Write to GeoParquet format
+    val geoParquetOutputPath = "target/test-classes/output/points.geoparquet"
+    pointDf.write
+      .format("geoparquet")
+      .mode(SaveMode.Overwrite)
+      .save(geoParquetOutputPath)
+
+    println(s"GeoParquet file written to: $geoParquetOutputPath")
+    pointDf.show(5)
+  }
+
+  /**
+   * Demonstrates reading GeoParquet files and performing spatial operations.
+   * Shows how to load GeoParquet data and apply spatial transformations.
+   *
+   * @param sedona SparkSession with Sedona extensions enabled
+   */
+  def testGeoParquetReader(sedona: SparkSession): Unit = {
+    // First, ensure we have a GeoParquet file by writing one
+    testGeoParquetWriter(sedona)
+
+    // Read GeoParquet file
+    val geoParquetInputPath = "target/test-classes/output/points.geoparquet"
+    val geoParquetDf = sedona.read
+      .format("geoparquet")
+      .load(geoParquetInputPath)
+
+    println(s"GeoParquet file read from: $geoParquetInputPath")
+    geoParquetDf.printSchema()
+    geoParquetDf.show(5)
+
+    // Perform spatial operations on the loaded data
+    geoParquetDf.createOrReplaceTempView("geoparquet_points")
+    val bufferedDf = sedona.sql("""
+                                    | SELECT ST_Buffer(geometry, 0.1) as 
buffered_geometry
+                                    | FROM geoparquet_points
+                                  """.stripMargin)
+
+    println("Applied spatial operations on GeoParquet data:")
+    bufferedDf.show(5)
+  }
 }
diff --git a/examples/spark-sql/src/main/scala/VizExample.scala 
b/examples/spark-sql/src/main/scala/VizExample.scala
index b7b333a156..1108f4a09d 100644
--- a/examples/spark-sql/src/main/scala/VizExample.scala
+++ b/examples/spark-sql/src/main/scala/VizExample.scala
@@ -17,7 +17,6 @@
  * under the License.
  */
 
-import Main.resourceFolder
 import org.apache.sedona.common.enums.FileDataSplitter
 import org.apache.sedona.core.enums.{GridType, IndexType}
 import org.apache.sedona.core.spatialOperator.JoinQuery
@@ -33,6 +32,8 @@ import java.awt.Color
 
 object VizExample {
 
+  val resourceFolder = System.getProperty("user.dir")+"/src/test/resources/"
+
   val demoOutputPath = "target/demo"
 
   val scatterPlotOutputPath = System.getProperty("user.dir") + "/" + 
demoOutputPath + "/scatterplot"
@@ -55,27 +56,42 @@ object VizExample {
   val PolygonNumPartitions = 5
   val USMainLandBoundary = new Envelope(-126.790180, -64.630926, 24.863836, 
50.000)
 
-  def buildScatterPlot(sedona: SparkSession): Boolean = {
+  /**
+   * Creates a scatter plot visualization of polygon data.
+   * Generates a PNG image showing spatial distribution of polygons.
+   *
+   * @param sedona SparkSession with Sedona extensions enabled
+   */
+  def buildScatterPlot(sedona: SparkSession): Unit = {
     val spatialRDD = new PolygonRDD(sedona.sparkContext, PolygonInputLocation, 
PolygonSplitter, false, PolygonNumPartitions)
-    var visualizationOperator = new ScatterPlot(1000, 600, USMainLandBoundary, 
false)
+    val visualizationOperator = new ScatterPlot(1000, 600, USMainLandBoundary, 
false)
     visualizationOperator.CustomizeColor(255, 255, 255, 255, Color.GREEN, true)
     visualizationOperator.Visualize(sedona.sparkContext, spatialRDD)
-    var imageGenerator = new ImageGenerator
+    val imageGenerator = new ImageGenerator
     
imageGenerator.SaveRasterImageAsLocalFile(visualizationOperator.rasterImage, 
scatterPlotOutputPath, ImageType.PNG)
-    true
   }
 
-  def buildHeatMap(sedona: SparkSession): Boolean = {
+  /**
+   * Creates a heat map visualization showing density of rectangle geometries.
+   * Generates a PNG image with heat intensity based on spatial clustering.
+   *
+   * @param sedona SparkSession with Sedona extensions enabled
+   */
+  def buildHeatMap(sedona: SparkSession): Unit = {
     val spatialRDD = new RectangleRDD(sedona.sparkContext, 
RectangleInputLocation, RectangleSplitter, false, RectangleNumPartitions)
     val visualizationOperator = new HeatMap(1000, 600, USMainLandBoundary, 
false, 2)
     visualizationOperator.Visualize(sedona.sparkContext, spatialRDD)
     val imageGenerator = new ImageGenerator
     
imageGenerator.SaveRasterImageAsLocalFile(visualizationOperator.rasterImage, 
heatMapOutputPath, ImageType.PNG)
-    true
   }
 
-
-  def buildChoroplethMap(sedona: SparkSession): Boolean = {
+  /**
+   * Creates a choropleth map by performing spatial join and visualizing join 
counts.
+   * Combines heat map with polygon overlay to show spatial relationships.
+   *
+   * @param sedona SparkSession with Sedona extensions enabled
+   */
+  def buildChoroplethMap(sedona: SparkSession): Unit = {
     val spatialRDD = new PointRDD(sedona.sparkContext, PointInputLocation, 
PointOffset, PointSplitter, false, PointNumPartitions)
     val queryRDD = new PolygonRDD(sedona.sparkContext, PolygonInputLocation, 
PolygonSplitter, false, PolygonNumPartitions)
     spatialRDD.spatialPartitioning(GridType.KDBTREE)
@@ -92,20 +108,30 @@ object VizExample {
     overlayOperator.JoinImage(frontImage.rasterImage)
     val imageGenerator = new ImageGenerator
     imageGenerator.SaveRasterImageAsLocalFile(overlayOperator.backRasterImage, 
choroplethMapOutputPath, ImageType.PNG)
-    true
   }
 
-  def parallelFilterRenderNoStitch(sedona: SparkSession): Boolean = {
+  /**
+   * Demonstrates parallel rendering without image stitching.
+   * Creates tiled heat map images for distributed rendering.
+   *
+   * @param sedona SparkSession with Sedona extensions enabled
+   */
+  def parallelFilterRenderNoStitch(sedona: SparkSession): Unit = {
     val spatialRDD = new RectangleRDD(sedona.sparkContext, 
RectangleInputLocation, RectangleSplitter, false, RectangleNumPartitions)
     val visualizationOperator = new HeatMap(1000, 600, USMainLandBoundary, 
false, 2, 4, 4, true, true)
     visualizationOperator.Visualize(sedona.sparkContext, spatialRDD)
     val imageGenerator = new ImageGenerator
     
imageGenerator.SaveRasterImageAsLocalFile(visualizationOperator.distributedRasterImage,
 parallelFilterRenderOutputPath, ImageType.PNG)
-    true
   }
 
-  def sqlApiVisualization(sedona: SparkSession): Boolean = {
-    var pointDf = sedona.read.format("csv").option("delimiter", 
",").option("header", "false").load(PointInputLocation)
+  /**
+   * Demonstrates visualization using Sedona SQL API with pixelization and 
rendering.
+   * Creates heat map using SQL functions for rasterization and colorization.
+   *
+   * @param sedona SparkSession with Sedona extensions enabled
+   */
+  def sqlApiVisualization(sedona: SparkSession): Unit = {
+    val pointDf = sedona.read.format("csv").option("delimiter", 
",").option("header", "false").load(PointInputLocation)
     pointDf.selectExpr("ST_Point(cast(_c0 as Decimal(24,20)),cast(_c1 as 
Decimal(24,20))) as shape")
       
.filter("ST_Contains(ST_PolygonFromEnvelope(-126.790180,24.863836,-64.630926,50.000),shape)").createOrReplaceTempView("pointtable")
     sedona.sql(
@@ -127,8 +153,8 @@ object VizExample {
                                |SELECT ST_Render(pixel, ST_Colorize(weight, 
(SELECT max(weight) FROM pixelaggregates), 'red')) AS image
                                |FROM pixelaggregates
                        """.stripMargin)
-    var image = 
sedona.table("images").take(1)(0)(0).asInstanceOf[ImageSerializableWrapper].getImage
-    var imageGenerator = new ImageGenerator
+    val image = 
sedona.table("images").take(1)(0)(0).asInstanceOf[ImageSerializableWrapper].getImage
+    val imageGenerator = new ImageGenerator
     imageGenerator.SaveRasterImageAsLocalFile(image, sqlApiOutputPath, 
ImageType.PNG)
     sedona.sql(
       """
@@ -137,7 +163,6 @@ object VizExample {
                                |FROM images
                        """.stripMargin)
     sedona.table("imagestring").show()
-    true
   }
 
 }
diff --git a/examples/spark-sql/src/test/scala/testFunctions.scala 
b/examples/spark-sql/src/test/scala/testFunctions.scala
new file mode 100644
index 0000000000..ac6d19a3aa
--- /dev/null
+++ b/examples/spark-sql/src/test/scala/testFunctions.scala
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.log4j.{Level, Logger}
+import org.apache.sedona.spark.SedonaContext
+import org.apache.sedona.viz.core.Serde.SedonaVizKryoRegistrator
+import org.apache.sedona.viz.sql.utils.SedonaVizRegistrator
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.funsuite.AnyFunSuite
+import org.apache.spark.sql.SparkSession
+
+class testFunctions extends AnyFunSuite with BeforeAndAfterAll {
+
+  var sedona: SparkSession = _
+
+  override def beforeAll(): Unit = {
+    Logger.getRootLogger().setLevel(Level.WARN)
+
+    // Main object initialization happens on first access
+    // Access resourceFolder to trigger Main's initialization
+    println(s"Resource folder: ${Main.resourceFolder}")
+
+    // Create Spark session with driver JVM options for Java module access
+    val config = SedonaContext.builder().appName("SedonaSQL-test")
+      .master("local[*]")
+      .config("spark.driver.extraJavaOptions",
+        "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED " +
+        "--add-opens=java.base/java.nio=ALL-UNNAMED " +
+        "--add-opens=java.base/java.lang=ALL-UNNAMED " +
+        "--add-opens=java.base/java.lang.invoke=ALL-UNNAMED " +
+        "--add-opens=java.base/java.util=ALL-UNNAMED " +
+        "--add-opens=java.base/java.lang.reflect=ALL-UNNAMED " +
+        "--add-opens=java.base/java.io=ALL-UNNAMED " +
+        "--add-opens=java.base/java.net=ALL-UNNAMED " +
+        "--add-opens=java.base/java.util.concurrent=ALL-UNNAMED " +
+        "--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED " +
+        "--add-opens=java.base/sun.nio.cs=ALL-UNNAMED " +
+        "--add-opens=java.base/sun.security.action=ALL-UNNAMED " +
+        "--add-opens=java.base/sun.util.calendar=ALL-UNNAMED")
+      .config("spark.kryo.registrator", 
classOf[SedonaVizKryoRegistrator].getName)
+      .getOrCreate()
+    sedona = SedonaContext.create(config)
+
+    SedonaVizRegistrator.registerAll(sedona)
+  }
+
+  override def afterAll(): Unit = {
+    if (sedona != null) {
+      sedona.stop()
+    }
+  }
+
+  test("SqlExample - testPredicatePushdownAndRangeJonQuery") {
+    SqlExample.testPredicatePushdownAndRangeJonQuery(sedona)
+  }
+
+  test("SqlExample - testDistanceJoinQuery") {
+    SqlExample.testDistanceJoinQuery(sedona)
+  }
+
+  test("SqlExample - testAggregateFunction") {
+    SqlExample.testAggregateFunction(sedona)
+  }
+
+  test("SqlExample - testShapefileConstructor") {
+    SqlExample.testShapefileConstructor(sedona)
+  }
+
+  test("SqlExample - testRasterIOAndMapAlgebra") {
+    SqlExample.testRasterIOAndMapAlgebra(sedona)
+  }
+
+  test("RddExample - visualizeSpatialColocation") {
+    RddExample.visualizeSpatialColocation(sedona)
+  }
+
+  test("RddExample - calculateSpatialColocation") {
+    RddExample.calculateSpatialColocation(sedona)
+  }
+
+  test("VizExample - buildScatterPlot") {
+    VizExample.buildScatterPlot(sedona)
+    succeed // Test passes if function completes without exception
+  }
+
+  test("VizExample - buildHeatMap") {
+    VizExample.buildHeatMap(sedona)
+    succeed // Test passes if function completes without exception
+  }
+
+  test("VizExample - buildChoroplethMap") {
+    VizExample.buildChoroplethMap(sedona)
+    succeed // Test passes if function completes without exception
+  }
+
+  test("VizExample - parallelFilterRenderNoStitch") {
+    VizExample.parallelFilterRenderNoStitch(sedona)
+    succeed // Test passes if function completes without exception
+  }
+
+  test("VizExample - sqlApiVisualization") {
+    VizExample.sqlApiVisualization(sedona)
+    succeed // Test passes if function completes without exception
+  }
+}

Reply via email to