This is an automated email from the ASF dual-hosted git repository.

jiayu pushed a commit to branch sedona-context
in repository https://gitbox.apache.org/repos/asf/sedona.git

commit 03cb8c59047093970dd52317f82087ef4e87c0f9
Author: Jia Yu <[email protected]>
AuthorDate: Fri Jun 2 22:25:12 2023 -0700

    Add SedonaContext
---
 R/.gitignore                                       |   1 +
 docs/api/.gitignore                                |   1 +
 docs/api/sql/Overview.md                           |  18 +--
 docs/setup/install-python.md                       |  22 +++-
 docs/tutorial/flink/sql.md                         |  26 ++--
 docs/tutorial/geopandas-shapely.md                 |  38 +++---
 docs/tutorial/raster.md                            |   4 +-
 docs/tutorial/rdd.md                               |  82 +++---------
 docs/tutorial/sql-pure-sql.md                      |   2 +-
 docs/tutorial/sql.md                               | 145 ++++++++++++++++-----
 docs/tutorial/viz.md                               |  38 ++++--
 ...onaFlinkRegistrator.java => SedonaContext.java} |  68 ++++++----
 .../sedona/flink/SedonaFlinkRegistrator.java       |   9 +-
 .../java/org/apache/sedona/flink/TestBase.java     |   4 +-
 python/sedona/core/jvm/config.py                   |  85 ++++++++++--
 python/sedona/register/geo_registrator.py          |   8 +-
 python/sedona/register/java_libs.py                |   1 +
 .../geo_registrator.py => spark/SedonaContext.py}  |  45 +++----
 python/sedona/spark/__init__.py                    |  41 ++++++
 python/tests/test_base.py                          |  15 +--
 .../SedonaContext.scala}                           |  32 +++--
 .../apache/sedona/sql/SedonaSqlExtensions.scala    |   4 +-
 .../sedona/sql/utils/SedonaSQLRegistrator.scala    |  21 +--
 .../org/apache/sedona/sql/adapterTestJava.java     |  12 +-
 .../org/apache/sedona/sql/TestBaseScala.scala      |  11 +-
 .../org/apache/sedona/sql/TestBaseScala.scala      |  11 +-
 .../org/apache/sedona/sql/TestBaseScala.scala      |  10 +-
 .../org/apache/sedona/viz/sql/TestBaseScala.scala  |  10 +-
 28 files changed, 463 insertions(+), 301 deletions(-)

diff --git a/R/.gitignore b/R/.gitignore
index 31210c8e..ebd7c760 100644
--- a/R/.gitignore
+++ b/R/.gitignore
@@ -46,3 +46,4 @@ tests/testthat/testthat-problems\.rds
 # Generated by roxygen2
 docs
 inst/doc
+/apache.sedona.Rcheck/
diff --git a/docs/api/.gitignore b/docs/api/.gitignore
index 532523db..983d1828 100644
--- a/docs/api/.gitignore
+++ b/docs/api/.gitignore
@@ -1 +1,2 @@
 /rdocs/
+/javadoc/
diff --git a/docs/api/sql/Overview.md b/docs/api/sql/Overview.md
index 855d517a..62215c92 100644
--- a/docs/api/sql/Overview.md
+++ b/docs/api/sql/Overview.md
@@ -3,7 +3,7 @@
 ## Function list
 SedonaSQL supports SQL/MM Part3 Spatial SQL Standard. It includes four kinds 
of SQL operators as follows. All these operators can be directly called through:
 ```scala
-var myDataFrame = sparkSession.sql("YOUR_SQL")
+var myDataFrame = sedona.sql("YOUR_SQL")
 ```
 
 Alternatively, `expr` and `selectExpr` can be used:
@@ -33,15 +33,15 @@ SedonaSQL supports SparkSQL query optimizer, documentation 
is [Here](../Optimize
 The detailed explanation is here [Write a SQL/DataFrame 
application](../../tutorial/sql.md).
 
 1. Add Sedona-core and Sedona-SQL into your project POM.xml or build.sbt
-2. Declare your Spark Session
+2. Create your Sedona config if you want to customize your SparkSession.
 ```scala
-sparkSession = SparkSession.builder().
-      config("spark.serializer","org.apache.spark.serializer.KryoSerializer").
-      config("spark.kryo.registrator", 
"org.apache.sedona.core.serde.SedonaKryoRegistrator").
-      master("local[*]").appName("mySedonaSQLdemo").getOrCreate()
+import org.apache.sedona.spark.SedonaContext
+val config = SedonaContext.config().
+    master("local[*]").appName("SedonaSQL")
+    .getOrCreate()
 ```
-3. Add the following line after your SparkSession declaration:
+3. Add the following line after your Sedona context declaration:
 ```scala
-import org.apache.sedona.sql.utils.SedonaSQLRegistrator
-SedonaSQLRegistrator.registerAll(sparkSession)
+import org.apache.sedona.spark.SedonaContext
+val sedona = SedonaContext.create(config)
 ```
diff --git a/docs/setup/install-python.md b/docs/setup/install-python.md
index 00505616..35749ea7 100644
--- a/docs/setup/install-python.md
+++ b/docs/setup/install-python.md
@@ -43,8 +43,27 @@ You can get it using one of the following methods:
 1. Compile from the source within main project directory and copy it (in 
`spark-shaded/target` folder) to SPARK_HOME/jars/ folder ([more 
details](../compile))
 
 2. Download from [GitHub release](https://github.com/apache/sedona/releases) 
and copy it to SPARK_HOME/jars/ folder
-3. Call the [Maven Central coordinate](../maven-coordinates) in your python 
program. For example, in PySparkSQL
+3. Call the [Maven Central coordinate](../maven-coordinates) in your python 
program. For example,
+==Sedona >= 1.4.1==
+
+```python
+from sedona.spark import *
+config = SedonaContext.config(). \
+    config('spark.jars.packages',
+           'org.apache.sedona:sedona-spark-shaded-3.0_2.12:{{ 
sedona.current_version }},'
+           'org.datasyslab:geotools-wrapper:{{ sedona.current_geotools }}'). \
+    getOrCreate()
+sedona = SedonaContext.create(config)
+```
+
+==Sedona < 1.4.1==
+
+SedonaRegistrator is deprecated in Sedona 1.4.1 and later versions. Please use 
the above method instead.
+
 ```python
+from pyspark.sql import SparkSession
+from sedona.register import SedonaRegistrator
+from sedona.utils import SedonaKryoRegistrator, KryoSerializer
 spark = SparkSession. \
     builder. \
     appName('appName'). \
@@ -54,6 +73,7 @@ spark = SparkSession. \
            'org.apache.sedona:sedona-spark-shaded-3.0_2.12:{{ 
sedona.current_version }},'
            'org.datasyslab:geotools-wrapper:{{ sedona.current_geotools }}'). \
     getOrCreate()
+SedonaRegistrator.registerAll(spark)
 ```
 
 !!!warning
diff --git a/docs/tutorial/flink/sql.md b/docs/tutorial/flink/sql.md
index d4fc21fe..0207f4ed 100644
--- a/docs/tutorial/flink/sql.md
+++ b/docs/tutorial/flink/sql.md
@@ -22,10 +22,20 @@ EnvironmentSettings settings = 
EnvironmentSettings.newInstance().inStreamingMode
 StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
 ```
 
-## Register SedonaSQL
+## Initiate SedonaContext
 
 Add the following line after your `StreamExecutionEnvironment` and 
`StreamTableEnvironment` declaration
 
+==Sedona >= 1.4.1==
+
+```java
+StreamTableEnvironment sedona = SedonaContext.create(env, tableEnv);
+```
+
+==Sedona <1.4.1==
+
+The following method has been deprecated since Sedona 1.4.1. Please use the 
method above to create your SedonaContext.
+
 ```java
 SedonaFlinkRegistrator.registerType(env);
 SedonaFlinkRegistrator.registerFunc(tableEnv);
@@ -63,8 +73,8 @@ Assume you have a Flink Table `tbl` like this:
 You can create a Table with a Geometry type column as follows:
 
 ```java
-tableEnv.createTemporaryView("myTable", tbl)
-Table geomTbl = tableEnv.sqlQuery("SELECT ST_GeomFromWKT(geom_polygon) as 
geom_polygon, name_polygon FROM myTable")
+sedona.createTemporaryView("myTable", tbl)
+Table geomTbl = sedona.sqlQuery("SELECT ST_GeomFromWKT(geom_polygon) as 
geom_polygon, name_polygon FROM myTable")
 geomTbl.execute().print()
 ```
 
@@ -115,7 +125,7 @@ Sedona doesn't control the coordinate unit (degree-based or 
meter-based) of all
 To convert Coordinate Reference System of the Geometry column created before, 
use the following code:
 
 ```java
-Table geomTbl3857 = tableEnv.sqlQuery("SELECT ST_Transform(countyshape, 
"epsg:4326", "epsg:3857") AS geom_polygon, name_polygon FROM myTable")
+Table geomTbl3857 = sedona.sqlQuery("SELECT ST_Transform(countyshape, 
"epsg:4326", "epsg:3857") AS geom_polygon, name_polygon FROM myTable")
 geomTbl3857.execute().print()
 ```
 
@@ -176,7 +186,7 @@ Use ==ST_Contains==, ==ST_Intersects== and so on to run a 
range query over a sin
 The following example finds all counties that are within the given polygon:
 
 ```java
-geomTable = tableEnv.sqlQuery(
+geomTable = sedona.sqlQuery(
   "
     SELECT *
     FROM spatialdf
@@ -195,7 +205,7 @@ Use ==ST_Distance== to calculate the distance and rank the 
distance.
 The following code returns the 5 nearest neighbor of the given polygon.
 
 ```java
-geomTable = tableEnv.sqlQuery(
+geomTable = sedona.sqlQuery(
   "
     SELECT countyname, 
ST_Distance(ST_PolygonFromEnvelope(1.0,100.0,1000.0,1100.0), newcountyshape) AS 
distance
     FROM geomTable
@@ -308,7 +318,7 @@ Since the coordinates are in the longitude and latitude 
system, so the unit of `
 Use TableEnv's toDataStream function
 
 ```java
-DataStream<Row> geomStream = tableEnv.toDataStream(geomTable)
+DataStream<Row> geomStream = sedona.toDataStream(geomTable)
 ```
 
 ### Retrieve Geometries
@@ -473,5 +483,5 @@ DataStream<Row> geomStream = text.map(new 
MapFunction<String, Row>() {
 
 Use TableEnv's fromDataStream function, with two column names `geom` and 
`geom_name`.
 ```java
-Table geomTable = tableEnv.fromDataStream(geomStream, "geom", "geom_name")
+Table geomTable = sedona.fromDataStream(geomStream, "geom", "geom_name")
 ```
diff --git a/docs/tutorial/geopandas-shapely.md 
b/docs/tutorial/geopandas-shapely.md
index 1e9adad0..d41da08d 100644
--- a/docs/tutorial/geopandas-shapely.md
+++ b/docs/tutorial/geopandas-shapely.md
@@ -14,18 +14,16 @@ Loading the data from shapefile using geopandas read_file 
method and create Spar
 ```python
 
 import geopandas as gpd
-from pyspark.sql import SparkSession
+from sedona.spark import *
 
-from sedona.register import SedonaRegistrator
-
-spark = SparkSession.builder.\
+config = SedonaContext.config().\
       getOrCreate()
 
-SedonaRegistrator.registerAll(spark)
+sedona = SedonaContext.create(config)
 
 gdf = gpd.read_file("gis_osm_pois_free_1.shp")
 
-spark.createDataFrame(
+sedona.createDataFrame(
   gdf
 ).show()
 
@@ -55,16 +53,14 @@ Reading data with Spark and converting to GeoPandas
 ```python
 
 import geopandas as gpd
-from pyspark.sql import SparkSession
-
-from sedona.register import SedonaRegistrator
+from sedona.spark import *
 
-spark = SparkSession.builder.\
-    getOrCreate()
+config = SedonaContext.config().
+       getOrCreate()
 
-SedonaRegistrator.registerAll(spark)
+sedona = SedonaContext.create(config)
 
-counties = spark.\
+counties = sedona.\
     read.\
     option("delimiter", "|").\
     option("header", "true").\
@@ -72,7 +68,7 @@ counties = spark.\
 
 counties.createOrReplaceTempView("county")
 
-counties_geom = spark.sql(
+counties_geom = sedona.sql(
     "SELECT *, st_geomFromWKT(geom) as geometry from county"
 )
 
@@ -119,7 +115,7 @@ Schema for target table with integer id and geometry type 
can be defined as foll
 
 from pyspark.sql.types import IntegerType, StructField, StructType
 
-from sedona.sql.types import GeometryType
+from sedona.spark import *
 
 schema = StructType(
     [
@@ -144,7 +140,7 @@ data = [
 ]
 
 
-gdf = spark.createDataFrame(
+gdf = sedona.createDataFrame(
     data,
     schema
 )
@@ -183,7 +179,7 @@ data = [
     [1, MultiPoint([[19.511463, 51.765158], [19.446408, 51.779752]])]
 ]
 
-gdf = spark.createDataFrame(
+gdf = sedona.createDataFrame(
     data,
     schema
 ).show(1, False)
@@ -213,7 +209,7 @@ data = [
     [1, LineString(line)]
 ]
 
-gdf = spark.createDataFrame(
+gdf = sedona.createDataFrame(
     data,
     schema
 )
@@ -245,7 +241,7 @@ data = [
     [1, MultiLineString([line1, line2])]
 ]
 
-gdf = spark.createDataFrame(
+gdf = sedona.createDataFrame(
     data,
     schema
 )
@@ -284,7 +280,7 @@ data = [
     [1, polygon]
 ]
 
-gdf = spark.createDataFrame(
+gdf = sedona.createDataFrame(
     data,
     schema
 )
@@ -324,7 +320,7 @@ data = [
     [1, MultiPolygon(polygons)]
 ]
 
-gdf = spark.createDataFrame(
+gdf = sedona.createDataFrame(
     data,
     schema
 )
diff --git a/docs/tutorial/raster.md b/docs/tutorial/raster.md
index 3ce110e2..0b71149a 100644
--- a/docs/tutorial/raster.md
+++ b/docs/tutorial/raster.md
@@ -3,8 +3,8 @@ Starting from `v1.1.0`, Sedona SQL supports raster data sources 
and raster opera
 ## Initial setup
 
 1. [Set up dependencies](../sql/#set-up-dependencies)
-2. [Initiate Spark session](../sql/#initiate-sparksession)
-3. [Register SedonaSQL](../sql/#register-sedonasql)
+2. [Create Sedona config](../sql/#create-sedona-config)
+3. [Initiate SedonaContext](../sql/#initiate-sedonacontext)
 
 ## API docs
 
diff --git a/docs/tutorial/rdd.md b/docs/tutorial/rdd.md
index 077415dc..584d8053 100644
--- a/docs/tutorial/rdd.md
+++ b/docs/tutorial/rdd.md
@@ -3,65 +3,15 @@ The page outlines the steps to create Spatial RDDs and run 
spatial queries using
 
 ## Set up dependencies
 
-=== "Scala/Java"
-
-       1. Read [Sedona Maven Central 
coordinates](../setup/maven-coordinates.md) and add Sedona dependencies in 
build.sbt or pom.xml.
-       2. Add [Apache Spark 
core](https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.11), 
[Apache 
SparkSQL](https://mvnrepository.com/artifact/org.apache.spark/spark-sql) in 
build.sbt or pom.xml.
-       3. Please see [RDD example project](../demo/)
-
-=== "Python"
-
-       1. Please read [Quick start](../../setup/install-python) to install 
Sedona Python.
-       2. This tutorial is based on [Sedona Core Jupyter Notebook 
example](../jupyter-notebook). You can interact with Sedona Python Jupyter 
notebook immediately on Binder. Click 
[![Binder](https://mybinder.org/badge_logo.svg)](https://mybinder.org/v2/gh/apache/sedona/HEAD?filepath=binder)
 to interact with Sedona Python Jupyter notebook immediately on Binder.
-
-## Initiate SparkContext
-
-=== "Scala"
-
-       ```scala
-       val conf = new SparkConf()
-       conf.setAppName("SedonaRunnableExample") // Change this to a proper name
-       conf.setMaster("local[*]") // Delete this if run in cluster mode
-       // Enable Sedona custom Kryo serializer
-       conf.set("spark.serializer", classOf[KryoSerializer].getName) // 
org.apache.spark.serializer.KryoSerializer
-       conf.set("spark.kryo.registrator", 
classOf[SedonaKryoRegistrator].getName) // 
org.apache.sedona.core.serde.SedonaKryoRegistrator
-       val sc = new SparkContext(conf)
-       ```
-       
-       If you add ==the Sedona full dependencies== as suggested above, please 
use the following two lines to enable Sedona Kryo serializer instead:
-       ```scala
-       conf.set("spark.serializer", classOf[KryoSerializer].getName) // 
org.apache.spark.serializer.KryoSerializer
-       conf.set("spark.kryo.registrator", 
classOf[SedonaVizKryoRegistrator].getName) // 
org.apache.sedona.viz.core.Serde.SedonaVizKryoRegistrator
-       ```
-
-=== "Java"
+Please refer to [Set up dependencies](../sql/#set-up-dependencies) to set up 
dependencies.
 
-       ```java
-       SparkConf conf = new SparkConf()
-       conf.setAppName("SedonaRunnableExample") // Change this to a proper name
-       conf.setMaster("local[*]") // Delete this if run in cluster mode
-       // Enable Sedona custom Kryo serializer
-       conf.set("spark.serializer", KryoSerializer.class.getName) // 
org.apache.spark.serializer.KryoSerializer
-       conf.set("spark.kryo.registrator", SedonaKryoRegistrator.class.getName) 
// org.apache.sedona.core.serde.SedonaKryoRegistrator
-       SparkContext sc = new SparkContext(conf)
-       ```
-       
-       If you use SedonaViz with SedonaRDD, please use the following two lines 
to enable Sedona Kryo serializer instead:
-       ```scala
-       conf.set("spark.serializer", KryoSerializer.class.getName) // 
org.apache.spark.serializer.KryoSerializer
-       conf.set("spark.kryo.registrator", 
SedonaVizKryoRegistrator.class.getName) // 
org.apache.sedona.viz.core.Serde.SedonaVizKryoRegistrator
-       ```
+## Create Sedona config
 
-=== "Python"
+Please refer to [Create Sedona config](../sql/#create-sedona-config) to create 
a Sedona config.
 
-```python
-conf.set("spark.serializer", KryoSerializer.getName)
-conf.set("spark.kryo.registrator", SedonaKryoRegistrator.getName)
-sc = SparkContext(conf=conf)
-```
+## Initiate SedonaContext
 
-!!!warning
-       Sedona has a suite of well-written geometry and index serializers. 
Forgetting to enable these serializers will lead to high memory consumption.
+Please refer to [Initiate SedonaContext](../sql/#initiate-sedonacontext) to 
initiate a SedonaContext.
 
 ## Create a SpatialRDD
 
@@ -97,7 +47,7 @@ Use the following code to create a SpatialRDD
        val wktColumn = 0 // The WKT string starts from Column 0
        val allowTopologyInvalidGeometries = true // Optional
        val skipSyntaxInvalidGeometries = false // Optional
-       val spatialRDD = WktReader.readToGeometryRDD(sparkSession.sparkContext, 
inputLocation, wktColumn, allowTopologyInvalidGeometries, 
skipSyntaxInvalidGeometries)
+       val spatialRDD = WktReader.readToGeometryRDD(sedona.sparkContext, 
inputLocation, wktColumn, allowTopologyInvalidGeometries, 
skipSyntaxInvalidGeometries)
        ```
 
 === "Java"
@@ -107,7 +57,7 @@ Use the following code to create a SpatialRDD
        int wktColumn = 0 // The WKT string starts from Column 0
        boolean allowTopologyInvalidGeometries = true // Optional
        boolean skipSyntaxInvalidGeometries = false // Optional
-       SpatialRDD spatialRDD = 
WktReader.readToGeometryRDD(sparkSession.sparkContext, inputLocation, 
wktColumn, allowTopologyInvalidGeometries, skipSyntaxInvalidGeometries)
+       SpatialRDD spatialRDD = 
WktReader.readToGeometryRDD(sedona.sparkContext, inputLocation, wktColumn, 
allowTopologyInvalidGeometries, skipSyntaxInvalidGeometries)
        ```
 
 === "Python"
@@ -144,7 +94,7 @@ Use the following code to create a generic SpatialRDD:
        val inputLocation = "/Download/polygon.json"
        val allowTopologyInvalidGeometries = true // Optional
        val skipSyntaxInvalidGeometries = false // Optional
-       val spatialRDD = 
GeoJsonReader.readToGeometryRDD(sparkSession.sparkContext, inputLocation, 
allowTopologyInvalidGeometries, skipSyntaxInvalidGeometries)
+       val spatialRDD = GeoJsonReader.readToGeometryRDD(sedona.sparkContext, 
inputLocation, allowTopologyInvalidGeometries, skipSyntaxInvalidGeometries)
        ```
 
 === "Java"
@@ -153,7 +103,7 @@ Use the following code to create a generic SpatialRDD:
        String inputLocation = "/Download/polygon.json"
        boolean allowTopologyInvalidGeometries = true // Optional
        boolean skipSyntaxInvalidGeometries = false // Optional
-       SpatialRDD spatialRDD = 
GeoJsonReader.readToGeometryRDD(sparkSession.sparkContext, inputLocation, 
allowTopologyInvalidGeometries, skipSyntaxInvalidGeometries)
+       SpatialRDD spatialRDD = 
GeoJsonReader.readToGeometryRDD(sedona.sparkContext, inputLocation, 
allowTopologyInvalidGeometries, skipSyntaxInvalidGeometries)
        ```
        
 === "Python"
@@ -173,14 +123,14 @@ Use the following code to create a generic SpatialRDD:
 
        ```scala
        val shapefileInputLocation="/Download/myshapefile"
-       val spatialRDD = 
ShapefileReader.readToGeometryRDD(sparkSession.sparkContext, 
shapefileInputLocation)
+       val spatialRDD = ShapefileReader.readToGeometryRDD(sedona.sparkContext, 
shapefileInputLocation)
        ```
 
 === "Java"
 
        ```java
        String shapefileInputLocation="/Download/myshapefile"
-       SpatialRDD spatialRDD = 
ShapefileReader.readToGeometryRDD(sparkSession.sparkContext, 
shapefileInputLocation)
+       SpatialRDD spatialRDD = 
ShapefileReader.readToGeometryRDD(sedona.sparkContext, shapefileInputLocation)
        ```
 
 === "Python"
@@ -229,12 +179,12 @@ We use checkin.csv CSV file as the example. You can 
create a generic SpatialRDD
 
 1. Load data in SedonaSQL.
 ```scala
-var df = sparkSession.read.format("csv").option("header", 
"false").load(csvPointInputLocation)
+var df = sedona.read.format("csv").option("header", 
"false").load(csvPointInputLocation)
 df.createOrReplaceTempView("inputtable")
 ```
 2. Create a Geometry type column in SedonaSQL
 ```scala
-var spatialDf = sparkSession.sql(
+var spatialDf = sedona.sql(
        """
                |SELECT ST_Point(CAST(inputtable._c0 AS 
Decimal(24,20)),CAST(inputtable._c1 AS Decimal(24,20))) AS checkin
                |FROM inputtable
@@ -288,21 +238,21 @@ To convert Coordinate Reference System of an SpatialRDD, 
use the following code:
 === "Scala"
 
        ```scala
-       val objectRDD = WktReader.readToGeometryRDD(sparkSession.sparkContext, 
inputLocation, wktColumn, allowTopologyInvalidGeometries, 
skipSyntaxInvalidGeometries)
+       val objectRDD = WktReader.readToGeometryRDD(sedona.sparkContext, 
inputLocation, wktColumn, allowTopologyInvalidGeometries, 
skipSyntaxInvalidGeometries)
        objectRDD.CRSTransform("epsg:4326", "epsg:3857", false)
        ```
 
 === "Java"
 
        ```java
-       SpatialRDD objectRDD = 
WktReader.readToGeometryRDD(sparkSession.sparkContext, inputLocation, 
wktColumn, allowTopologyInvalidGeometries, skipSyntaxInvalidGeometries)
+       SpatialRDD objectRDD = WktReader.readToGeometryRDD(sedona.sparkContext, 
inputLocation, wktColumn, allowTopologyInvalidGeometries, 
skipSyntaxInvalidGeometries)
        objectRDD.CRSTransform("epsg:4326", "epsg:3857", false)
        ```
 
 === "Python"
 
        ```python
-       objectRDD = WktReader.readToGeometryRDD(sparkSession.sparkContext, 
inputLocation, wktColumn, allowTopologyInvalidGeometries, 
skipSyntaxInvalidGeometries)
+       objectRDD = WktReader.readToGeometryRDD(sedona.sparkContext, 
inputLocation, wktColumn, allowTopologyInvalidGeometries, 
skipSyntaxInvalidGeometries)
        objectRDD.CRSTransform("epsg:4326", "epsg:3857", False)
        ```
 
diff --git a/docs/tutorial/sql-pure-sql.md b/docs/tutorial/sql-pure-sql.md
index 56a30702..c5b4e54e 100644
--- a/docs/tutorial/sql-pure-sql.md
+++ b/docs/tutorial/sql-pure-sql.md
@@ -29,7 +29,7 @@ Start `spark-sql` as following (replace `<VERSION>` with 
actual version, like, `
         If you are using Spark versions higher than 3.4, please replace the 
`3.4` in artifact names with the corresponding major.minor version of Spark.
 
 
-This will register all User Defined Tyeps, functions and optimizations in 
SedonaSQL and SedonaViz.
+This will register all Sedona types, functions and optimizations in SedonaSQL 
and SedonaViz.
 
 ## Load data
 
diff --git a/docs/tutorial/sql.md b/docs/tutorial/sql.md
index 6a929527..9bec967d 100644
--- a/docs/tutorial/sql.md
+++ b/docs/tutorial/sql.md
@@ -6,21 +6,21 @@ SedonaSQL supports SQL/MM Part3 Spatial SQL Standard. It 
includes four kinds of
 === "Scala"
 
        ```scala
-       var myDataFrame = sparkSession.sql("YOUR_SQL")
+       var myDataFrame = sedona.sql("YOUR_SQL")
        myDataFrame.createOrReplaceTempView("spatialDf")
        ```
 
 === "Java"
 
        ```java
-       Dataset<Row> myDataFrame = sparkSession.sql("YOUR_SQL")
+       Dataset<Row> myDataFrame = sedona.sql("YOUR_SQL")
        myDataFrame.createOrReplaceTempView("spatialDf")
        ```
        
 === "Python"
 
        ```python
-       myDataFrame = sparkSession.sql("YOUR_SQL")
+       myDataFrame = sedona.sql("YOUR_SQL")
        myDataFrame.createOrReplaceTempView("spatialDf")
        ```
 
@@ -39,8 +39,58 @@ Detailed SedonaSQL APIs are available here: [SedonaSQL 
API](../api/sql/Overview.
        1. Please read [Quick start](../../setup/install-python) to install 
Sedona Python.
        2. This tutorial is based on [Sedona SQL Jupyter Notebook 
example](../jupyter-notebook). You can interact with Sedona Python Jupyter 
notebook immediately on Binder. Click 
[![Binder](https://mybinder.org/badge_logo.svg)](https://mybinder.org/v2/gh/apache/sedona/HEAD?filepath=binder)
 to interact with Sedona Python Jupyter notebook immediately on Binder.
 
-## Initiate SparkSession
-Use the following code to initiate your SparkSession at the beginning:
+## Create Sedona config
+
+Use the following code to create your Sedona config at the beginning. If you 
already have a SparkSession (usually named `spark`) created by Wherobots/AWS 
EMR/Databricks, please skip this step and can use `spark` directly.
+
+==Sedona >= 1.4.1==
+
+=== "Scala"
+
+       ```scala
+       import org.apache.sedona.spark.SedonaContext
+
+       val config = SedonaContext.config()
+       .master("local[*]") // Delete this if run in cluster mode
+       .appName("readTestScala") // Change this to a proper name
+       .getOrCreate()
+       ```
+       If you use SedonaViz together with SedonaSQL, please add the following 
line after `SedonaContext.config()` to enable Sedona Kryo serializer:
+       ```scala
+       .config("spark.kryo.registrator", 
classOf[SedonaVizKryoRegistrator].getName) // 
org.apache.sedona.viz.core.Serde.SedonaVizKryoRegistrator
+       ```
+
+=== "Java"
+
+       ```java
+       import org.apache.sedona.spark.SedonaContext;
+
+       SparkSession config = SedonaContext.config()
+       .master("local[*]") // Delete this if run in cluster mode
+       .appName("readTestScala") // Change this to a proper name
+       .getOrCreate()
+       ```
+       If you use SedonaViz together with SedonaSQL, please add the following 
line after `SedonaContext.config()` to enable Sedona Kryo serializer:
+       ```scala
+       .config("spark.kryo.registrator", 
SedonaVizKryoRegistrator.class.getName) // 
org.apache.sedona.viz.core.Serde.SedonaVizKryoRegistrator
+       ```
+       
+=== "Python"
+
+       ```python
+       from sedona.spark import *
+
+       config = SedonaContext.config() .\
+           config('spark.jars.packages',
+                  'org.apache.sedona:sedona-spark-shaded-3.0_2.12:{{ 
sedona.current_version }},'
+                  'org.datasyslab:geotools-wrapper:{{ sedona.current_geotools 
}}'). \
+           getOrCreate()
+       ```
+    If you are using Spark versions >= 3.4, please replace the `3.0` in 
package name of sedona-spark-shaded with the corresponding major.minor version 
of Spark, such as `sedona-spark-shaded-3.4_2.12:{{ sedona.current_version }}`.
+
+==Sedona < 1.4.1==
+
+The following method has been deprecated since Sedona 1.4.1. Please use the 
method above to create your Sedona config.
 
 === "Scala"
 
@@ -75,7 +125,7 @@ Use the following code to initiate your SparkSession at the 
beginning:
        .config("spark.serializer", KryoSerializer.class.getName) // 
org.apache.spark.serializer.KryoSerializer
        .config("spark.kryo.registrator", 
SedonaVizKryoRegistrator.class.getName) // 
org.apache.sedona.viz.core.Serde.SedonaVizKryoRegistrator
        ```
-       
+
 === "Python"
 
        ```python
@@ -91,14 +141,39 @@ Use the following code to initiate your SparkSession at 
the beginning:
        ```
     If you are using Spark versions >= 3.4, please replace the `3.0` in 
package name of sedona-spark-shaded with the corresponding major.minor version 
of Spark, such as `sedona-spark-shaded-3.4_2.12:{{ sedona.current_version }}`.
 
-!!!warning
-       Sedona has a suite of well-written geometry and index serializers. 
Forgetting to enable these serializers will lead to high memory consumption and 
slow performance.
+## Initiate SedonaContext
 
+Add the following line after creating Sedona config. If you already have a 
SparkSession (usually named `spark`) created by Wherobots/AWS EMR/Databricks, 
please call `SedonaContext.create(spark)` instead.
 
+==Sedona >= 1.4.1==
 
-## Register SedonaSQL
+=== "Scala"
 
-Add the following line after your SparkSession declaration
+       ```scala
+       import org.apache.sedona.spark.SedonaContext
+
+       val sedona = SedonaContext.create(config)
+       ```
+
+=== "Java"
+
+       ```java
+       import org.apache.sedona.spark.SedonaContext;
+
+       SparkSession sedona = SedonaContext.create(config)
+       ```
+
+=== "Python"
+
+       ```python
+       from sedona.spark import *
+       
+       sedona = SedonaContext.create(config)
+       ```
+
+==Sedona < 1.4.1==
+
+The following method has been deprecated since Sedona 1.4.1. Please use the 
method above to create your SedonaContext.
 
 === "Scala"
 
@@ -120,8 +195,6 @@ Add the following line after your SparkSession declaration
        SedonaRegistrator.registerAll(spark)
        ```
 
-This function will register Sedona User Defined Type, User Defined Function 
and optimized join query strategy.
-
 You can also register everything by passing `--conf 
spark.sql.extensions=org.apache.sedona.sql.SedonaSqlExtensions` to 
`spark-submit` or `spark-shell`.
 
 ## Load data from files
@@ -140,21 +213,21 @@ Use the following code to load the data and create a raw 
DataFrame:
 
 === "Scala/Java"
        ```scala
-       var rawDf = sparkSession.read.format("csv").option("delimiter", 
"\t").option("header", "false").load("/Download/usa-county.tsv")
+       var rawDf = sedona.read.format("csv").option("delimiter", 
"\t").option("header", "false").load("/Download/usa-county.tsv")
        rawDf.createOrReplaceTempView("rawdf")
        rawDf.show()
        ```
 
 === "Java"
        ```java
-       Dataset<Row> rawDf = 
sparkSession.read.format("csv").option("delimiter", "\t").option("header", 
"false").load("/Download/usa-county.tsv")
+       Dataset<Row> rawDf = sedona.read.format("csv").option("delimiter", 
"\t").option("header", "false").load("/Download/usa-county.tsv")
        rawDf.createOrReplaceTempView("rawdf")
        rawDf.show()
        ```
 
 === "Python"
        ```python
-       rawDf = sparkSession.read.format("csv").option("delimiter", 
"\t").option("header", "false").load("/Download/usa-county.tsv")
+       rawDf = sedona.read.format("csv").option("delimiter", 
"\t").option("header", "false").load("/Download/usa-county.tsv")
        rawDf.createOrReplaceTempView("rawdf")
        rawDf.show()
        ```
@@ -224,7 +297,7 @@ This prevents Spark from interpreting the property and 
allows us to use the ST_G
 
        ```scala
        val schema = "type string, crs string, totalFeatures long, features 
array<struct<type string, geometry string, properties map<string, string>>>"
-       sparkSession.read.schema(schema).json(geojson_path)
+       sedona.read.schema(schema).json(geojson_path)
                .selectExpr("explode(features) as features") // Explode the 
envelope to get one feature per row.
                .select("features.*") // Unpack the features struct.
                .withColumn("geometry", expr("ST_GeomFromGeoJSON(geometry)")) 
// Convert the geometry string.
@@ -235,7 +308,7 @@ This prevents Spark from interpreting the property and 
allows us to use the ST_G
 
        ```java
        String schema = "type string, crs string, totalFeatures long, features 
array<struct<type string, geometry string, properties map<string, string>>>";
-       sparkSession.read.schema(schema).json(geojson_path)
+       sedona.read.schema(schema).json(geojson_path)
                .selectExpr("explode(features) as features") // Explode the 
envelope to get one feature per row.
                .select("features.*") // Unpack the features struct.
                .withColumn("geometry", expr("ST_GeomFromGeoJSON(geometry)")) 
// Convert the geometry string.
@@ -246,7 +319,7 @@ This prevents Spark from interpreting the property and 
allows us to use the ST_G
 
        ```python
        schema = "type string, crs string, totalFeatures long, features 
array<struct<type string, geometry string, properties map<string, string>>>";
-       (sparkSession.read.json(geojson_path, schema=schema) 
+       (sedona.read.json(geojson_path, schema=schema) 
                .selectExpr("explode(features) as features") # Explode the 
envelope to get one feature per row.
                .select("features.*") # Unpack the features struct.
                .withColumn("geometry", f.expr("ST_GeomFromGeoJSON(geometry)")) 
# Convert the geometry string.
@@ -265,21 +338,21 @@ Since v`1.3.0`, Sedona natively supports loading 
GeoParquet file. Sedona will in
 === "Scala/Java"
 
        ```scala
-       val df = 
sparkSession.read.format("geoparquet").load(geoparquetdatalocation1)
+       val df = sedona.read.format("geoparquet").load(geoparquetdatalocation1)
        df.printSchema()
        ```
 
 === "Java"
 
        ```java
-       Dataset<Row> df = 
sparkSession.read.format("geoparquet").load(geoparquetdatalocation1)
+       Dataset<Row> df = 
sedona.read.format("geoparquet").load(geoparquetdatalocation1)
        df.printSchema()
        ```
 
 === "Python"
 
        ```python
-       df = 
sparkSession.read.format("geoparquet").load(geoparquetdatalocation1)
+       df = sedona.read.format("geoparquet").load(geoparquetdatalocation1)
        df.printSchema()
        ```
 
@@ -307,14 +380,14 @@ For Postgis there is no need to add a query to convert 
geometry types since it's
 
        ```scala
        // For any JDBC data source, including Postgis.
-       val df = sparkSession.read.format("jdbc")
+       val df = sedona.read.format("jdbc")
                // Other options.
                .option("query", "SELECT id, ST_AsBinary(geom) as geom FROM 
my_table")
                .load()
                .withColumn("geom", expr("ST_GeomFromWKB(geom)"))
 
        // This is a simplified version that works for Postgis.
-       val df = sparkSession.read.format("jdbc")
+       val df = sedona.read.format("jdbc")
                // Other options.
                .option("dbtable", "my_table")
                .load()
@@ -325,14 +398,14 @@ For Postgis there is no need to add a query to convert 
geometry types since it's
 
        ```java
        // For any JDBC data source, including Postgis.
-       Dataset<Row> df = sparkSession.read().format("jdbc")
+       Dataset<Row> df = sedona.read().format("jdbc")
                // Other options.
                .option("query", "SELECT id, ST_AsBinary(geom) as geom FROM 
my_table")
                .load()
                .withColumn("geom", expr("ST_GeomFromWKB(geom)"))
 
        // This is a simplified version that works for Postgis.
-       Dataset<Row> df = sparkSession.read().format("jdbc")
+       Dataset<Row> df = sedona.read().format("jdbc")
                // Other options.
                .option("dbtable", "my_table")
                .load()
@@ -343,14 +416,14 @@ For Postgis there is no need to add a query to convert 
geometry types since it's
 
        ```python
        # For any JDBC data source, including Postgis.
-       df = (sparkSession.read.format("jdbc")
+       df = (sedona.read.format("jdbc")
                # Other options.
                .option("query", "SELECT id, ST_AsBinary(geom) as geom FROM 
my_table")
                .load()
                .withColumn("geom", f.expr("ST_GeomFromWKB(geom)")))
 
        # This is a simplified version that works for Postgis.
-       df = (sparkSession.read.format("jdbc")
+       df = (sedona.read.format("jdbc")
                # Other options.
                .option("dbtable", "my_table")
                .load()
@@ -526,13 +599,13 @@ Use SedonaSQL DataFrame-RDD Adapter to convert a 
DataFrame to an SpatialRDD. Ple
 === "Scala"
 
        ```scala
-       var spatialDf = Adapter.toDf(spatialRDD, sparkSession)
+       var spatialDf = Adapter.toDf(spatialRDD, sedona)
        ```
 
 === "Java"
 
        ```java
-       Dataset<Row> spatialDf = Adapter.toDf(spatialRDD, sparkSession)
+       Dataset<Row> spatialDf = Adapter.toDf(spatialRDD, sedona)
        ```
        
 === "Python"
@@ -540,7 +613,7 @@ Use SedonaSQL DataFrame-RDD Adapter to convert a DataFrame 
to an SpatialRDD. Ple
        ```python
        from sedona.utils.adapter import Adapter
        
-       spatialDf = Adapter.toDf(spatialRDD, sparkSession)
+       spatialDf = Adapter.toDf(spatialRDD, sedona)
        ```
 
 All other attributes such as price and age will be also brought to the 
DataFrame as long as you specify ==carryOtherAttributes== (see [Read other 
attributes in an SpatialRDD](../rdd#read-other-attributes-in-an-spatialrdd)).
@@ -559,7 +632,7 @@ case. At least one column for the user data must be 
provided.
          StructField("price", DoubleType, nullable = true),
          StructField("age", IntegerType, nullable = true)
        ))
-       val spatialDf = Adapter.toDf(spatialRDD, schema, sparkSession)
+       val spatialDf = Adapter.toDf(spatialRDD, schema, sedona)
        ```
 
 ### SpatialPairRDD to DataFrame
@@ -569,7 +642,7 @@ PairRDD is the result of a spatial join query or distance 
join query. SedonaSQL
 === "Scala"
 
        ```scala
-       var joinResultDf = Adapter.toDf(joinResultPairRDD, 
Seq("left_attribute1", "left_attribute2"), Seq("right_attribute1", 
"right_attribute2"), sparkSession)
+       var joinResultDf = Adapter.toDf(joinResultPairRDD, 
Seq("left_attribute1", "left_attribute2"), Seq("right_attribute1", 
"right_attribute2"), sedona)
        ```
 
 === "Java"
@@ -579,7 +652,7 @@ PairRDD is the result of a spatial join query or distance 
join query. SedonaSQL
        
        List leftFields = new ArrayList<>(Arrays.asList("c1", "c2", "c3"));
        List rightFields = new ArrayList<>(Arrays.asList("c4", "c5", "c6"));
-       Dataset joinResultDf = Adapter.toDf(joinResultPairRDD, 
JavaConverters.asScalaBuffer(leftFields).toSeq(), 
JavaConverters.asScalaBuffer(rightFields).toSeq(), sparkSession);
+       Dataset joinResultDf = Adapter.toDf(joinResultPairRDD, 
JavaConverters.asScalaBuffer(leftFields).toSeq(), 
JavaConverters.asScalaBuffer(rightFields).toSeq(), sedona);
        ```
 
 === "Python"
@@ -595,14 +668,14 @@ or you can use the attribute names directly from the 
input RDD
 
        ```scala
        import scala.collection.JavaConversions._
-       var joinResultDf = Adapter.toDf(joinResultPairRDD, leftRdd.fieldNames, 
rightRdd.fieldNames, sparkSession)
+       var joinResultDf = Adapter.toDf(joinResultPairRDD, leftRdd.fieldNames, 
rightRdd.fieldNames, sedona)
        ```
 
 === "Java"
 
        ```java
        import scala.collection.JavaConverters; 
-       Dataset joinResultDf = Adapter.toDf(joinResultPairRDD, 
JavaConverters.asScalaBuffer(leftRdd.fieldNames).toSeq(), 
JavaConverters.asScalaBuffer(rightRdd.fieldNames).toSeq(), sparkSession);
+       Dataset joinResultDf = Adapter.toDf(joinResultPairRDD, 
JavaConverters.asScalaBuffer(leftRdd.fieldNames).toSeq(), 
JavaConverters.asScalaBuffer(rightRdd.fieldNames).toSeq(), sedona);
        ```
 === "Python"
 
@@ -630,5 +703,5 @@ case. Columns for the left and right user data must be 
provided.
          StructField("rightGeometry", GeometryUDT, nullable = true),
          StructField("category", StringType, nullable = true)
        ))
-       val joinResultDf = Adapter.toDf(joinResultPairRDD, schema, sparkSession)
+       val joinResultDf = Adapter.toDf(joinResultPairRDD, schema, sedona)
        ```
diff --git a/docs/tutorial/viz.md b/docs/tutorial/viz.md
index 48bdc371..cd48ab07 100644
--- a/docs/tutorial/viz.md
+++ b/docs/tutorial/viz.md
@@ -20,10 +20,23 @@ This tutorial mainly focuses on explaining SQL/DataFrame 
API. SedonaViz RDD exam
 1. Read [Sedona Maven Central coordinates](../setup/maven-coordinates.md)
 2. Add [Apache Spark 
core](https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.11), 
[Apache 
SparkSQL](https://mvnrepository.com/artifact/org.apache.spark/spark-sql), 
Sedona-core, Sedona-SQL, Sedona-Viz
 
-## Initiate SparkSession
+## Create Sedona config
 
-Use the following code to initiate your SparkSession at the beginning:
-This will register SedonaViz Kryo serializer.
+Use the following code to create your Sedona config at the beginning. If you 
already have a SparkSession (usually named `spark`) created by Wherobots/AWS 
EMR/Databricks, please skip this step and can use `spark` directly.
+
+==Sedona >= 1.4.1===
+
+```scala
+val config = SedonaContext.config()
+               .config("spark.kryo.registrator", 
classOf[SedonaVizKryoRegistrator].getName) // 
org.apache.sedona.viz.core.Serde.SedonaVizKryoRegistrator
+               .master("local[*]") // Delete this if run in cluster mode
+               .appName("Sedona Viz") // Change this to a proper name
+               .getOrCreate()
+```
+
+==Sedona <1.4.1==
+
+The following method has been deprecated since Sedona 1.4.1. Please use the 
method above to create your Sedona config.
 
 ```scala
 var sparkSession = SparkSession.builder()
@@ -35,17 +48,26 @@ var sparkSession = SparkSession.builder()
 .getOrCreate()
 ```
 
-## Register SedonaSQL and SedonaViz
+## Initiate SedonaContext
+
+Add the following line after creating Sedona config. If you already have a 
SparkSession (usually named `spark`) created by Wherobots/AWS EMR/Databricks, 
please call `SedonaContext.create(spark)` instead.
 
-Add the following line after your SparkSession declaration
+==Sedona >= 1.4.1===
+
+```scala
+val sedona = SedonaContext.create(config)
+SedonaVizRegistrator.registerAll(sedona)
+```
+
+==Sedona <1.4.1==
+
+The following method has been deprecated since Sedona 1.4.1. Please use the 
method above to create your SedonaContext.
 
 ```scala
 SedonaSQLRegistrator.registerAll(sparkSession)
 SedonaVizRegistrator.registerAll(sparkSession)
 ```
 
-This will register all User Defined Tyeps, functions and optimizations in 
SedonaSQL and SedonaViz.
-
 You can also register everything by passing `--conf 
spark.sql.extensions=org.apache.sedona.viz.sql.SedonaVizExtensions,org.apache.sedona.sql.SedonaSqlExtensions`
 to `spark-submit` or `spark-shell`.
 
 ## Create Spatial DataFrame
@@ -153,7 +175,7 @@ This DataFrame will contain a Image type column which has 
only one image.
 Fetch the image from the previous DataFrame
 
 ```
-var image = 
sparkSession.table("images").take(1)(0)(0).asInstanceOf[ImageSerializableWrapper].getImage
+var image = 
sedona.table("images").take(1)(0)(0).asInstanceOf[ImageSerializableWrapper].getImage
 ```
 
 Use Sedona Viz ImageGenerator to store this image on disk.
diff --git 
a/flink/src/main/java/org/apache/sedona/flink/SedonaFlinkRegistrator.java 
b/flink/src/main/java/org/apache/sedona/flink/SedonaContext.java
similarity index 56%
copy from 
flink/src/main/java/org/apache/sedona/flink/SedonaFlinkRegistrator.java
copy to flink/src/main/java/org/apache/sedona/flink/SedonaContext.java
index ef55cdd6..01eda47f 100644
--- a/flink/src/main/java/org/apache/sedona/flink/SedonaFlinkRegistrator.java
+++ b/flink/src/main/java/org/apache/sedona/flink/SedonaContext.java
@@ -1,15 +1,20 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 package org.apache.sedona.flink;
 
@@ -18,24 +23,29 @@ import 
org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.sedona.common.geometryObjects.Circle;
 import org.apache.sedona.common.geometrySerde.GeometrySerde;
 import org.apache.sedona.common.geometrySerde.SpatialIndexSerde;
-import org.locationtech.jts.geom.*;
+import org.locationtech.jts.geom.Envelope;
+import org.locationtech.jts.geom.GeometryCollection;
+import org.locationtech.jts.geom.LineString;
+import org.locationtech.jts.geom.MultiLineString;
+import org.locationtech.jts.geom.MultiPoint;
+import org.locationtech.jts.geom.MultiPolygon;
+import org.locationtech.jts.geom.Point;
+import org.locationtech.jts.geom.Polygon;
 import org.locationtech.jts.index.quadtree.Quadtree;
 import org.locationtech.jts.index.strtree.STRtree;
 
 import java.util.Arrays;
 
-public class SedonaFlinkRegistrator {
-
-    public static void registerFunc(StreamTableEnvironment tblEnv) {
-        Arrays.stream(Catalog.getFuncs()).forEach(
-                func -> 
tblEnv.createTemporarySystemFunction(func.getClass().getSimpleName(), func)
-        );
-        Arrays.stream(Catalog.getPredicates()).forEach(
-                func -> 
tblEnv.createTemporarySystemFunction(func.getClass().getSimpleName(), func)
-        );
-    }
-
-    public static void registerType(StreamExecutionEnvironment env) {
+public class SedonaContext
+{
+    /**
+     * This is the entry point of the entire Sedona system
+     * @param env
+     * @param tblEnv
+     * @return
+     */
+    public static StreamTableEnvironment create(StreamExecutionEnvironment 
env, StreamTableEnvironment tblEnv)
+    {
         GeometrySerde serializer = new GeometrySerde();
         SpatialIndexSerde indexSerializer = new SpatialIndexSerde(serializer);
         env.getConfig().registerTypeWithKryoSerializer(Point.class, 
serializer);
@@ -49,5 +59,13 @@ public class SedonaFlinkRegistrator {
         env.getConfig().registerTypeWithKryoSerializer(Envelope.class, 
serializer);
         env.getConfig().registerTypeWithKryoSerializer(Quadtree.class, 
indexSerializer);
         env.getConfig().registerTypeWithKryoSerializer(STRtree.class, 
indexSerializer);
+
+        Arrays.stream(Catalog.getFuncs()).forEach(
+                func -> 
tblEnv.createTemporarySystemFunction(func.getClass().getSimpleName(), func)
+        );
+        Arrays.stream(Catalog.getPredicates()).forEach(
+                func -> 
tblEnv.createTemporarySystemFunction(func.getClass().getSimpleName(), func)
+        );
+        return tblEnv;
     }
 }
diff --git 
a/flink/src/main/java/org/apache/sedona/flink/SedonaFlinkRegistrator.java 
b/flink/src/main/java/org/apache/sedona/flink/SedonaFlinkRegistrator.java
index ef55cdd6..842016db 100644
--- a/flink/src/main/java/org/apache/sedona/flink/SedonaFlinkRegistrator.java
+++ b/flink/src/main/java/org/apache/sedona/flink/SedonaFlinkRegistrator.java
@@ -26,6 +26,10 @@ import java.util.Arrays;
 
 public class SedonaFlinkRegistrator {
 
+    /**
+     * @deprecated use {@link SedonaContext#create(StreamExecutionEnvironment, 
StreamTableEnvironment)}
+     */
+    @Deprecated
     public static void registerFunc(StreamTableEnvironment tblEnv) {
         Arrays.stream(Catalog.getFuncs()).forEach(
                 func -> 
tblEnv.createTemporarySystemFunction(func.getClass().getSimpleName(), func)
@@ -34,7 +38,10 @@ public class SedonaFlinkRegistrator {
                 func -> 
tblEnv.createTemporarySystemFunction(func.getClass().getSimpleName(), func)
         );
     }
-
+    /**
+     * @deprecated use {@link SedonaContext#create(StreamExecutionEnvironment, 
StreamTableEnvironment)}
+     */
+    @Deprecated
     public static void registerType(StreamExecutionEnvironment env) {
         GeometrySerde serializer = new GeometrySerde();
         SpatialIndexSerde indexSerializer = new SpatialIndexSerde(serializer);
diff --git a/flink/src/test/java/org/apache/sedona/flink/TestBase.java 
b/flink/src/test/java/org/apache/sedona/flink/TestBase.java
index 1148f715..a42e719a 100644
--- a/flink/src/test/java/org/apache/sedona/flink/TestBase.java
+++ b/flink/src/test/java/org/apache/sedona/flink/TestBase.java
@@ -79,9 +79,7 @@ public class TestBase {
         env = enableWebUI? 
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()):
                 StreamExecutionEnvironment.getExecutionEnvironment();
         EnvironmentSettings settings = 
EnvironmentSettings.newInstance().inStreamingMode().build();
-        tableEnv = StreamTableEnvironment.create(env, settings);
-        SedonaFlinkRegistrator.registerType(env);
-        SedonaFlinkRegistrator.registerFunc(tableEnv);
+        tableEnv = SedonaContext.create(env, 
StreamTableEnvironment.create(env, settings));
     }
 
     static List<Row> createPointText(int size){
diff --git a/python/sedona/core/jvm/config.py b/python/sedona/core/jvm/config.py
index 372b5ae5..3350fa87 100644
--- a/python/sedona/core/jvm/config.py
+++ b/python/sedona/core/jvm/config.py
@@ -23,7 +23,11 @@ from typing import Any, Optional, Tuple
 from py4j.protocol import Py4JJavaError
 from pyspark.sql import SparkSession
 from sedona.utils.decorators import classproperty
+import functools
+import inspect
+import warnings
 
+string_types = (type(b''), type(u''))
 
 def is_greater_or_equal_version(version_a: str, version_b: str) -> bool:
     if all([version_b, version_a]):
@@ -59,20 +63,77 @@ def since(version: str):
     return wrapper
 
 
-def depreciated(version: str, substitute: str):
-    def wrapper(function):
-        def applier(*args, **kwargs):
-            result = function(*args, **kwargs)
-            sedona_version = SedonaMeta.version
-            if sedona_version >= version:
-                logging.warning("Function is depreciated")
-                if substitute:
-                    logging.warning(f"Please use {substitute} instead")
-            return result
+def deprecated(reason):
+    """
+    This is a decorator which can be used to mark functions
+    as deprecated. It will result in a warning being emitted
+    when the function is used.
+    """
+
+    if isinstance(reason, string_types):
+
+        # The @deprecated is used with a 'reason'.
+        #
+        # .. code-block:: python
+        #
+        #    @deprecated("please, use another function")
+        #    def old_function(x, y):
+        #      pass
+
+        def decorator(func1):
+
+            if inspect.isclass(func1):
+                fmt1 = "Call to deprecated class {name} ({reason})."
+            else:
+                fmt1 = "Call to deprecated function {name} ({reason})."
+
+            @functools.wraps(func1)
+            def new_func1(*args, **kwargs):
+                warnings.simplefilter('always', DeprecationWarning)
+                warnings.warn(
+                    fmt1.format(name=func1.__name__, reason=reason),
+                    category=DeprecationWarning,
+                    stacklevel=2
+                )
+                warnings.simplefilter('default', DeprecationWarning)
+                return func1(*args, **kwargs)
 
-        return applier
+            return new_func1
 
-    return wrapper
+        return decorator
+
+    elif inspect.isclass(reason) or inspect.isfunction(reason):
+
+        # The @deprecated is used without any 'reason'.
+        #
+        # .. code-block:: python
+        #
+        #    @deprecated
+        #    def old_function(x, y):
+        #      pass
+
+        func2 = reason
+
+        if inspect.isclass(func2):
+            fmt2 = "Call to deprecated class {name}."
+        else:
+            fmt2 = "Call to deprecated function {name}."
+
+        @functools.wraps(func2)
+        def new_func2(*args, **kwargs):
+            warnings.simplefilter('always', DeprecationWarning)
+            warnings.warn(
+                fmt2.format(name=func2.__name__),
+                category=DeprecationWarning,
+                stacklevel=2
+            )
+            warnings.simplefilter('default', DeprecationWarning)
+            return func2(*args, **kwargs)
+
+        return new_func2
+
+    else:
+        raise TypeError(repr(type(reason)))
 
 
 class SparkJars:
diff --git a/python/sedona/register/geo_registrator.py 
b/python/sedona/register/geo_registrator.py
index 5a50a796..36566863 100644
--- a/python/sedona/register/geo_registrator.py
+++ b/python/sedona/register/geo_registrator.py
@@ -16,9 +16,10 @@
 #  under the License.
 
 import attr
-from pyspark.sql import SparkSession
 from py4j.java_gateway import java_import
+from pyspark.sql import SparkSession
 
+from sedona.core.jvm.config import deprecated
 from sedona.register.java_libs import SedonaJvmLib
 from sedona.utils.prep import assign_all
 
@@ -30,6 +31,7 @@ jvm_import = str
 class SedonaRegistrator:
 
     @classmethod
+    @deprecated("Deprecated since 1.4.1, use SedonaContext.create() instead.")
     def registerAll(cls, spark: SparkSession) -> bool:
         """
         This is the core of whole package, It uses py4j to run wrapper which 
takes existing SparkSession
@@ -44,10 +46,10 @@ class SedonaRegistrator:
         return True
 
     @classmethod
+    @deprecated("Deprecated since 1.4.1, use SedonaContext.create() instead.")
     def register(cls, spark: SparkSession):
         return 
spark._jvm.SedonaSQLRegistrator.registerAll(spark._jsparkSession)
 
-
 class PackageImporter:
 
     @staticmethod
@@ -62,4 +64,4 @@ class PackageImporter:
             java_import(jvm, lib.value)
             ImportedJvmLib.import_lib(lib.name)
 
-        return True
+        return True
\ No newline at end of file
diff --git a/python/sedona/register/java_libs.py 
b/python/sedona/register/java_libs.py
index 7db3c443..6489cc4f 100644
--- a/python/sedona/register/java_libs.py
+++ b/python/sedona/register/java_libs.py
@@ -52,6 +52,7 @@ class SedonaJvmLib(Enum):
     st_functions = "org.apache.spark.sql.sedona_sql.expressions.st_functions"
     st_predicates = "org.apache.spark.sql.sedona_sql.expressions.st_predicates"
     st_aggregates = "org.apache.spark.sql.sedona_sql.expressions.st_aggregates"
+    SedonaContext = "org.apache.sedona.spark.SedonaContext"
 
     @classmethod
     def from_str(cls, geo_lib: str) -> 'SedonaJvmLib':
diff --git a/python/sedona/register/geo_registrator.py 
b/python/sedona/spark/SedonaContext.py
similarity index 55%
copy from python/sedona/register/geo_registrator.py
copy to python/sedona/spark/SedonaContext.py
index 5a50a796..abba09c0 100644
--- a/python/sedona/register/geo_registrator.py
+++ b/python/sedona/spark/SedonaContext.py
@@ -17,49 +17,34 @@
 
 import attr
 from pyspark.sql import SparkSession
-from py4j.java_gateway import java_import
 
-from sedona.register.java_libs import SedonaJvmLib
-from sedona.utils.prep import assign_all
-
-assign_all()
-jvm_import = str
+from sedona.register.geo_registrator import PackageImporter
+from sedona.utils import KryoSerializer, SedonaKryoRegistrator
 
 
 @attr.s
-class SedonaRegistrator:
-
+class SedonaContext:
     @classmethod
-    def registerAll(cls, spark: SparkSession) -> bool:
+    def create(cls, spark: SparkSession) -> SparkSession:
         """
         This is the core of whole package, It uses py4j to run wrapper which 
takes existing SparkSession
-        and register all User Defined Functions by Apache Sedona developers, 
for this SparkSession.
+        and register the core logics of Apache Sedona, for this SparkSession.
 
         :param spark: pyspark.sql.SparkSession, spark session instance
-        :return: bool, True if registration was correct.
+        :return: SedonaContext which is an instance of SparkSession
         """
         spark.sql("SELECT 1 as geom").count()
         PackageImporter.import_jvm_lib(spark._jvm)
-        cls.register(spark)
-        return True
+        spark._jvm.SedonaContext.create(spark._jsparkSession)
+        return spark
 
     @classmethod
-    def register(cls, spark: SparkSession):
-        return 
spark._jvm.SedonaSQLRegistrator.registerAll(spark._jsparkSession)
-
-
-class PackageImporter:
-
-    @staticmethod
-    def import_jvm_lib(jvm) -> bool:
-        from sedona.core.utils import ImportedJvmLib
+    def config(cls) -> SparkSession.builder:
         """
-        Imports all the specified methods and functions in jvm
-        :param jvm: Jvm gateway from py4j
-        :return:
+        This method adds the basic Sedona configuration to the SparkSession 
builder.
+        Usually the user does not need to call this method directly, as it is 
configured when a cluster is created.
+        This method is needed when the user wants to manually configure Sedona
+        :return: SparkSession.builder
         """
-        for lib in SedonaJvmLib:
-            java_import(jvm, lib.value)
-            ImportedJvmLib.import_lib(lib.name)
-
-        return True
+        return SparkSession.builder.config("spark.serializer", 
KryoSerializer.getName).\
+            config("spark.kryo.registrator", SedonaKryoRegistrator.getName)
\ No newline at end of file
diff --git a/python/sedona/spark/__init__.py b/python/sedona/spark/__init__.py
new file mode 100644
index 00000000..3cf6e8fa
--- /dev/null
+++ b/python/sedona/spark/__init__.py
@@ -0,0 +1,41 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing,
+#  software distributed under the License is distributed on an
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#  KIND, either express or implied.  See the License for the
+#  specific language governing permissions and limitations
+#  under the License.
+
+from sedona.core.SpatialRDD import SpatialRDD
+from sedona.core.SpatialRDD import PointRDD
+from sedona.core.SpatialRDD import PolygonRDD
+from sedona.core.SpatialRDD import LineStringRDD
+from sedona.core.SpatialRDD import CircleRDD
+from sedona.core.SpatialRDD import RectangleRDD
+from sedona.core.spatialOperator import KNNQuery
+from sedona.core.spatialOperator import JoinQueryRaw
+from sedona.core.spatialOperator import JoinQuery
+from sedona.core.spatialOperator import RangeQueryRaw
+from sedona.core.spatialOperator import RangeQuery
+from sedona.core.formatMapper.shapefileParser import ShapefileReader
+from sedona.core.formatMapper import GeoJsonReader
+from sedona.core.formatMapper import WktReader
+from sedona.core.formatMapper import WkbReader
+from sedona.core.enums import IndexType
+from sedona.core.enums import GridType
+from sedona.core.enums import FileDataSplitter
+from sedona.sql.types import GeometryType
+from sedona.utils.adapter import Adapter
+from sedona.utils import KryoSerializer
+from sedona.utils import SedonaKryoRegistrator
+from sedona.register import SedonaRegistrator
+from sedona.spark.SedonaContext import SedonaContext
\ No newline at end of file
diff --git a/python/tests/test_base.py b/python/tests/test_base.py
index e4da6dc6..433120de 100644
--- a/python/tests/test_base.py
+++ b/python/tests/test_base.py
@@ -15,10 +15,7 @@
 #  specific language governing permissions and limitations
 #  under the License.
 
-from pyspark.sql import SparkSession
-
-from sedona.register import SedonaRegistrator
-from sedona.utils import KryoSerializer, SedonaKryoRegistrator
+from sedona.spark import *
 from sedona.utils.decorators import classproperty
 
 
@@ -27,15 +24,7 @@ class TestBase:
     @classproperty
     def spark(self):
         if not hasattr(self, "__spark"):
-            spark = SparkSession. \
-                builder. \
-                config("spark.serializer", KryoSerializer.getName).\
-                config("spark.kryo.registrator", 
SedonaKryoRegistrator.getName) .\
-                master("local[*]").\
-                getOrCreate()
-
-            SedonaRegistrator.registerAll(spark)
-
+            spark = 
SedonaContext.create(SedonaContext.config().master("local[*]").getOrCreate())
             setattr(self, "__spark", spark)
         return getattr(self, "__spark")
 
diff --git 
a/sql/common/src/main/scala/org/apache/sedona/sql/utils/SedonaSQLRegistrator.scala
 b/sql/common/src/main/scala/org/apache/sedona/spark/SedonaContext.scala
similarity index 64%
copy from 
sql/common/src/main/scala/org/apache/sedona/sql/utils/SedonaSQLRegistrator.scala
copy to sql/common/src/main/scala/org/apache/sedona/spark/SedonaContext.scala
index 91871eb9..7ddf9050 100644
--- 
a/sql/common/src/main/scala/org/apache/sedona/sql/utils/SedonaSQLRegistrator.scala
+++ b/sql/common/src/main/scala/org/apache/sedona/spark/SedonaContext.scala
@@ -16,20 +16,28 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sedona.sql.utils
+package org.apache.sedona.spark
 
+import org.apache.sedona.core.serde.SedonaKryoRegistrator
 import org.apache.sedona.sql.UDF.UdfRegistrator
 import org.apache.sedona.sql.UDT.UdtRegistrator
+import org.apache.spark.serializer.KryoSerializer
 import 
org.apache.spark.sql.sedona_sql.optimization.SpatialFilterPushDownForGeoParquet
-import org.apache.spark.sql.{SQLContext, SparkSession}
 import org.apache.spark.sql.sedona_sql.strategy.join.JoinQueryDetector
+import org.apache.spark.sql.{SQLContext, SparkSession}
 
-object SedonaSQLRegistrator {
-  def registerAll(sqlContext: SQLContext): Unit = {
-    registerAll(sqlContext.sparkSession)
+object SedonaContext {
+  def create(sqlContext: SQLContext): SQLContext = {
+    create(sqlContext.sparkSession)
+    sqlContext
   }
 
-  def registerAll(sparkSession: SparkSession): Unit = {
+  /**
+    * This is the entry point of the entire Sedona system
+    * @param sparkSession
+    * @return
+    */
+  def create(sparkSession: SparkSession):SparkSession = {
     if 
(!sparkSession.experimental.extraStrategies.exists(_.isInstanceOf[JoinQueryDetector]))
 {
       sparkSession.experimental.extraStrategies ++= Seq(new 
JoinQueryDetector(sparkSession))
     }
@@ -38,9 +46,17 @@ object SedonaSQLRegistrator {
     }
     UdtRegistrator.registerAll()
     UdfRegistrator.registerAll(sparkSession)
+    sparkSession
   }
 
-  def dropAll(sparkSession: SparkSession): Unit = {
-    UdfRegistrator.dropAll(sparkSession)
+  /**
+    * This method adds the basic Sedona configurations to the SparkSession
+    * Usually the user does not need to call this method directly
+    * This is only needed when the user needs to manually configure Sedona
+    * @return
+    */
+  def config(): SparkSession.Builder = {
+    SparkSession.builder().config("spark.serializer", 
classOf[KryoSerializer].getName).
+      config("spark.kryo.registrator", classOf[SedonaKryoRegistrator].getName)
   }
 }
diff --git 
a/sql/common/src/main/scala/org/apache/sedona/sql/SedonaSqlExtensions.scala 
b/sql/common/src/main/scala/org/apache/sedona/sql/SedonaSqlExtensions.scala
index e3b741a0..40426467 100644
--- a/sql/common/src/main/scala/org/apache/sedona/sql/SedonaSqlExtensions.scala
+++ b/sql/common/src/main/scala/org/apache/sedona/sql/SedonaSqlExtensions.scala
@@ -18,14 +18,14 @@
  */
 package org.apache.sedona.sql
 
-import org.apache.sedona.sql.utils.SedonaSQLRegistrator
+import org.apache.sedona.spark.SedonaContext
 import org.apache.spark.sql.SparkSessionExtensions
 
 
 class SedonaSqlExtensions extends (SparkSessionExtensions => Unit) {
   def apply(e: SparkSessionExtensions): Unit = {
     e.injectCheckRule(spark => {
-      SedonaSQLRegistrator.registerAll(spark)
+      SedonaContext.create(spark)
       _ => ()
     })
   }
diff --git 
a/sql/common/src/main/scala/org/apache/sedona/sql/utils/SedonaSQLRegistrator.scala
 
b/sql/common/src/main/scala/org/apache/sedona/sql/utils/SedonaSQLRegistrator.scala
index 91871eb9..6673054e 100644
--- 
a/sql/common/src/main/scala/org/apache/sedona/sql/utils/SedonaSQLRegistrator.scala
+++ 
b/sql/common/src/main/scala/org/apache/sedona/sql/utils/SedonaSQLRegistrator.scala
@@ -18,27 +18,20 @@
  */
 package org.apache.sedona.sql.utils
 
+import org.apache.sedona.spark.SedonaContext
 import org.apache.sedona.sql.UDF.UdfRegistrator
-import org.apache.sedona.sql.UDT.UdtRegistrator
-import 
org.apache.spark.sql.sedona_sql.optimization.SpatialFilterPushDownForGeoParquet
 import org.apache.spark.sql.{SQLContext, SparkSession}
-import org.apache.spark.sql.sedona_sql.strategy.join.JoinQueryDetector
 
+@deprecated("Use SedonaContext instead", "1.4.1")
 object SedonaSQLRegistrator {
+  @deprecated("Use SedonaContext.create instead", "1.4.1")
   def registerAll(sqlContext: SQLContext): Unit = {
-    registerAll(sqlContext.sparkSession)
+    SedonaContext.create(sqlContext.sparkSession)
   }
 
-  def registerAll(sparkSession: SparkSession): Unit = {
-    if 
(!sparkSession.experimental.extraStrategies.exists(_.isInstanceOf[JoinQueryDetector]))
 {
-      sparkSession.experimental.extraStrategies ++= Seq(new 
JoinQueryDetector(sparkSession))
-    }
-    if 
(!sparkSession.experimental.extraOptimizations.exists(_.isInstanceOf[SpatialFilterPushDownForGeoParquet]))
 {
-      sparkSession.experimental.extraOptimizations ++= Seq(new 
SpatialFilterPushDownForGeoParquet(sparkSession))
-    }
-    UdtRegistrator.registerAll()
-    UdfRegistrator.registerAll(sparkSession)
-  }
+  @deprecated("Use SedonaContext.create instead", "1.4.1")
+  def registerAll(sparkSession: SparkSession): Unit =
+    SedonaContext.create(sparkSession)
 
   def dropAll(sparkSession: SparkSession): Unit = {
     UdfRegistrator.dropAll(sparkSession)
diff --git 
a/sql/common/src/test/java/org/apache/sedona/sql/adapterTestJava.java 
b/sql/common/src/test/java/org/apache/sedona/sql/adapterTestJava.java
index 793cda50..9c7c17b4 100644
--- a/sql/common/src/test/java/org/apache/sedona/sql/adapterTestJava.java
+++ b/sql/common/src/test/java/org/apache/sedona/sql/adapterTestJava.java
@@ -24,12 +24,11 @@ import org.apache.log4j.Logger;
 import org.apache.sedona.core.enums.GridType;
 import org.apache.sedona.core.enums.IndexType;
 import org.apache.sedona.core.formatMapper.shapefileParser.ShapefileReader;
-import org.apache.sedona.core.serde.SedonaKryoRegistrator;
 import org.apache.sedona.core.spatialOperator.JoinQuery;
 import org.apache.sedona.core.spatialRDD.CircleRDD;
 import org.apache.sedona.core.spatialRDD.SpatialRDD;
+import org.apache.sedona.spark.SedonaContext;
 import org.apache.sedona.sql.utils.Adapter;
-import org.apache.sedona.sql.utils.SedonaSQLRegistrator;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaSparkContext;
@@ -60,15 +59,9 @@ public class adapterTestJava
     @BeforeClass
     public static void onceExecutedBeforeAll()
     {
-        conf = new 
SparkConf().setAppName("adapterTestJava").setMaster("local[2]");
-        conf.set("spark.serializer", 
org.apache.spark.serializer.KryoSerializer.class.getName());
-        conf.set("spark.kryo.registrator", 
SedonaKryoRegistrator.class.getName());
-
-        sc = new JavaSparkContext(conf);
-        sparkSession = new SparkSession(sc.sc());
+        sparkSession = 
SedonaContext.create(SedonaContext.config().master("local[*]").appName("adapterTestJava").getOrCreate());
         Logger.getLogger("org").setLevel(Level.WARN);
         Logger.getLogger("akka").setLevel(Level.WARN);
-        SedonaSQLRegistrator.registerAll(sparkSession.sqlContext());
     }
 
     /**
@@ -77,7 +70,6 @@ public class adapterTestJava
     @AfterClass
     public static void TearDown()
     {
-        SedonaSQLRegistrator.dropAll(sparkSession);
         sparkSession.stop();
     }
 
diff --git 
a/sql/common/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala 
b/sql/common/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala
index 16d97888..664196ce 100644
--- a/sql/common/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala
+++ b/sql/common/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala
@@ -21,10 +21,8 @@ package org.apache.sedona.sql
 import com.google.common.math.DoubleMath
 import org.apache.log4j.{Level, Logger}
 import org.apache.sedona.common.sphere.{Haversine, Spheroid}
-import org.apache.sedona.core.serde.SedonaKryoRegistrator
-import org.apache.sedona.sql.utils.SedonaSQLRegistrator
-import org.apache.spark.serializer.KryoSerializer
-import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.sedona.spark.SedonaContext
+import org.apache.spark.sql.DataFrame
 import org.locationtech.jts.geom.{CoordinateSequence, 
CoordinateSequenceComparator}
 import org.scalatest.{BeforeAndAfterAll, FunSpec}
 
@@ -36,8 +34,7 @@ trait TestBaseScala extends FunSpec with BeforeAndAfterAll {
   Logger.getLogger("org.apache.sedona.core").setLevel(Level.WARN)
 
   val warehouseLocation = System.getProperty("user.dir") + "/target/"
-  val sparkSession = SparkSession.builder().config("spark.serializer", 
classOf[KryoSerializer].getName).
-    config("spark.kryo.registrator", classOf[SedonaKryoRegistrator].getName).
+  val sparkSession = SedonaContext.config().
     master("local[*]").appName("sedonasqlScalaTest")
     .config("spark.sql.warehouse.dir", warehouseLocation)
     // We need to be explicit about broadcasting in tests.
@@ -70,7 +67,7 @@ trait TestBaseScala extends FunSpec with BeforeAndAfterAll {
   val spatialJoinRightInputLocation: String = resourceFolder + 
"spatial-join-query-window.tsv"
 
   override def beforeAll(): Unit = {
-    SedonaSQLRegistrator.registerAll(sparkSession)
+    SedonaContext.create(sparkSession)
   }
 
   override def afterAll(): Unit = {
diff --git 
a/sql/spark-3.0/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala 
b/sql/spark-3.0/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala
index dfe6d04f..c2fa704b 100644
--- a/sql/spark-3.0/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala
+++ b/sql/spark-3.0/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala
@@ -19,10 +19,8 @@
 package org.apache.sedona.sql
 
 import org.apache.log4j.{Level, Logger}
-import org.apache.sedona.core.serde.SedonaKryoRegistrator
-import org.apache.sedona.sql.utils.SedonaSQLRegistrator
-import org.apache.spark.serializer.KryoSerializer
-import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.sedona.spark.SedonaContext
+import org.apache.spark.sql.DataFrame
 import org.scalatest.{BeforeAndAfterAll, FunSpec}
 
 trait TestBaseScala extends FunSpec with BeforeAndAfterAll {
@@ -33,8 +31,7 @@ trait TestBaseScala extends FunSpec with BeforeAndAfterAll {
   Logger.getLogger("org.apache.sedona.core").setLevel(Level.WARN)
 
   val warehouseLocation = System.getProperty("user.dir") + "/target/"
-  val sparkSession = SparkSession.builder().config("spark.serializer", 
classOf[KryoSerializer].getName).
-    config("spark.kryo.registrator", classOf[SedonaKryoRegistrator].getName).
+  val sparkSession = SedonaContext.config().
     master("local[*]").appName("sedonasqlScalaTest")
     .config("spark.sql.warehouse.dir", warehouseLocation)
     // We need to be explicit about broadcasting in tests.
@@ -44,7 +41,7 @@ trait TestBaseScala extends FunSpec with BeforeAndAfterAll {
   val resourceFolder = System.getProperty("user.dir") + 
"/../../core/src/test/resources/"
 
   override def beforeAll(): Unit = {
-    SedonaSQLRegistrator.registerAll(sparkSession)
+    SedonaContext.create(sparkSession)
   }
 
   override def afterAll(): Unit = {
diff --git 
a/sql/spark-3.4/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala 
b/sql/spark-3.4/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala
index dfe6d04f..34511b58 100644
--- a/sql/spark-3.4/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala
+++ b/sql/spark-3.4/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala
@@ -19,10 +19,7 @@
 package org.apache.sedona.sql
 
 import org.apache.log4j.{Level, Logger}
-import org.apache.sedona.core.serde.SedonaKryoRegistrator
-import org.apache.sedona.sql.utils.SedonaSQLRegistrator
-import org.apache.spark.serializer.KryoSerializer
-import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.DataFrame
 import org.scalatest.{BeforeAndAfterAll, FunSpec}
 
 trait TestBaseScala extends FunSpec with BeforeAndAfterAll {
@@ -33,8 +30,7 @@ trait TestBaseScala extends FunSpec with BeforeAndAfterAll {
   Logger.getLogger("org.apache.sedona.core").setLevel(Level.WARN)
 
   val warehouseLocation = System.getProperty("user.dir") + "/target/"
-  val sparkSession = SparkSession.builder().config("spark.serializer", 
classOf[KryoSerializer].getName).
-    config("spark.kryo.registrator", classOf[SedonaKryoRegistrator].getName).
+  val sparkSession = SedonaContext.builder().
     master("local[*]").appName("sedonasqlScalaTest")
     .config("spark.sql.warehouse.dir", warehouseLocation)
     // We need to be explicit about broadcasting in tests.
@@ -44,7 +40,7 @@ trait TestBaseScala extends FunSpec with BeforeAndAfterAll {
   val resourceFolder = System.getProperty("user.dir") + 
"/../../core/src/test/resources/"
 
   override def beforeAll(): Unit = {
-    SedonaSQLRegistrator.registerAll(sparkSession)
+    SedonaContext.create(sparkSession)
   }
 
   override def afterAll(): Unit = {
diff --git a/viz/src/test/scala/org/apache/sedona/viz/sql/TestBaseScala.scala 
b/viz/src/test/scala/org/apache/sedona/viz/sql/TestBaseScala.scala
index 75801d09..885e1630 100644
--- a/viz/src/test/scala/org/apache/sedona/viz/sql/TestBaseScala.scala
+++ b/viz/src/test/scala/org/apache/sedona/viz/sql/TestBaseScala.scala
@@ -19,10 +19,8 @@
 package org.apache.sedona.viz.sql
 
 import org.apache.log4j.{Level, Logger}
-import org.apache.sedona.sql.utils.SedonaSQLRegistrator
-import org.apache.sedona.viz.core.Serde.SedonaVizKryoRegistrator
+import org.apache.sedona.spark.SedonaContext
 import org.apache.sedona.viz.sql.utils.SedonaVizRegistrator
-import org.apache.spark.serializer.KryoSerializer
 import org.apache.spark.sql.{DataFrame, SparkSession}
 import org.scalatest.{BeforeAndAfterAll, FunSpec}
 
@@ -40,10 +38,8 @@ trait TestBaseScala extends FunSpec with BeforeAndAfterAll{
   val csvPointInputLocation = resourceFolder + "arealm.csv"
 
   override def beforeAll(): Unit = {
-    spark = SparkSession.builder().config("spark.serializer", 
classOf[KryoSerializer].getName).
-      config("spark.kryo.registrator", 
classOf[SedonaVizKryoRegistrator].getName).
-      master("local[*]").appName("SedonaVizSQL").getOrCreate()
-    SedonaSQLRegistrator.registerAll(spark)
+    spark = SedonaContext.create(SedonaContext.config().
+      master("local[*]").appName("SedonaVizSQL").getOrCreate())
     SedonaVizRegistrator.registerAll(spark)
     getPoint().createOrReplaceTempView("pointtable")
     getPolygon().createOrReplaceTempView("usdata")

Reply via email to