This is an automated email from the ASF dual-hosted git repository.

jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-sedona.git


The following commit(s) were added to refs/heads/master by this push:
     new d5fe2b46 [SEDONA-208] Use Spark RuntimeConfig in SedonaConf (#722)
d5fe2b46 is described below

commit d5fe2b46d45215f1703cbb43a181d040fd43c28e
Author: Martin Andersson <[email protected]>
AuthorDate: Sat Dec 3 02:46:53 2022 +0100

    [SEDONA-208] Use Spark RuntimeConfig in SedonaConf (#722)
---
 .../org/apache/sedona/core/utils/SedonaConf.java   | 88 +++++++---------------
 .../apache/sedona/core/utils/SedonaConfTest.java   | 38 ++++++++++
 docs/api/sql/Parameter.md                          |  6 +-
 .../strategy/join/JoinQueryDetector.scala          |  2 +-
 .../strategy/join/TraitJoinQueryExec.scala         |  6 +-
 .../apache/sedona/sql/predicateJoinTestScala.scala |  4 +-
 6 files changed, 74 insertions(+), 70 deletions(-)

diff --git a/core/src/main/java/org/apache/sedona/core/utils/SedonaConf.java 
b/core/src/main/java/org/apache/sedona/core/utils/SedonaConf.java
index 1b8b2b76..b801d4d4 100644
--- a/core/src/main/java/org/apache/sedona/core/utils/SedonaConf.java
+++ b/core/src/main/java/org/apache/sedona/core/utils/SedonaConf.java
@@ -23,7 +23,8 @@ import org.apache.sedona.core.enums.GridType;
 import org.apache.sedona.core.enums.IndexType;
 import org.apache.sedona.core.enums.JoinBuildSide;
 import org.apache.sedona.core.enums.JoinSparitionDominantSide;
-import org.apache.spark.SparkConf;
+import org.apache.spark.sql.RuntimeConfig;
+import org.apache.spark.sql.SparkSession;
 import org.locationtech.jts.geom.Envelope;
 
 import java.io.Serializable;
@@ -35,46 +36,45 @@ public class SedonaConf
 
     // Global parameters of Sedona. All these parameters can be initialized 
through SparkConf.
 
-    private Boolean useIndex = false;
+    private boolean useIndex;
 
-    private IndexType indexType = IndexType.QUADTREE;
+    private IndexType indexType;
 
     // Parameters for JoinQuery including RangeJoin and DistanceJoin
 
-    private JoinSparitionDominantSide joinSparitionDominantSide = 
JoinSparitionDominantSide.LEFT;
+    private JoinSparitionDominantSide joinSparitionDominantSide;
 
-    private JoinBuildSide joinBuildSide = JoinBuildSide.LEFT;
+    private JoinBuildSide joinBuildSide;
 
-    private Long joinApproximateTotalCount = (long) -1;
+    private long joinApproximateTotalCount;
 
-    private Envelope datasetBoundary = new Envelope(0, 0, 0, 0);
+    private Envelope datasetBoundary;
 
-    private Integer fallbackPartitionNum = -1;
+    private int fallbackPartitionNum;
 
-    private GridType joinGridType = GridType.KDBTREE;
+    private GridType joinGridType;
 
-    public SedonaConf(SparkConf sparkConf)
-    {
-        this.useIndex = sparkConf.getBoolean("sedona.global.index", true);
-        this.indexType = 
IndexType.getIndexType(sparkConf.get("sedona.global.indextype", "quadtree"));
-        this.joinApproximateTotalCount = 
sparkConf.getLong("sedona.join.approxcount", -1);
-        String[] boundaryString = sparkConf.get("sedona.join.boundary", 
"0,0,0,0").split(",");
-        this.datasetBoundary = new 
Envelope(Double.parseDouble(boundaryString[0]), 
Double.parseDouble(boundaryString[0]),
-                Double.parseDouble(boundaryString[0]), 
Double.parseDouble(boundaryString[0]));
-        this.joinGridType = 
GridType.getGridType(sparkConf.get("sedona.join.gridtype", "kdbtree"));
-        this.joinBuildSide = 
JoinBuildSide.getBuildSide(sparkConf.get("sedona.join.indexbuildside", "left"));
-        this.joinSparitionDominantSide = 
JoinSparitionDominantSide.getJoinSparitionDominantSide(sparkConf.get("sedona.join.spatitionside",
 "left"));
-        this.fallbackPartitionNum = 
sparkConf.getInt("sedona.join.numpartition", -1);
+    public static SedonaConf fromActiveSession() {
+        return new SedonaConf(SparkSession.active().conf());
     }
 
-    public Boolean getUseIndex()
+    public SedonaConf(RuntimeConfig runtimeConfig)
     {
-        return useIndex;
+        this.useIndex = 
Boolean.parseBoolean(runtimeConfig.get("sedona.global.index", "true"));
+        this.indexType = 
IndexType.getIndexType(runtimeConfig.get("sedona.global.indextype", 
"quadtree"));
+        this.joinApproximateTotalCount = 
Long.parseLong(runtimeConfig.get("sedona.join.approxcount", "-1"));
+        String[] boundaryString = runtimeConfig.get("sedona.join.boundary", 
"0,0,0,0").split(",");
+        this.datasetBoundary = new 
Envelope(Double.parseDouble(boundaryString[0]), 
Double.parseDouble(boundaryString[1]),
+                Double.parseDouble(boundaryString[2]), 
Double.parseDouble(boundaryString[3]));
+        this.joinGridType = 
GridType.getGridType(runtimeConfig.get("sedona.join.gridtype", "kdbtree"));
+        this.joinBuildSide = 
JoinBuildSide.getBuildSide(runtimeConfig.get("sedona.join.indexbuildside", 
"left"));
+        this.joinSparitionDominantSide = 
JoinSparitionDominantSide.getJoinSparitionDominantSide(runtimeConfig.get("sedona.join.spatitionside",
 "left"));
+        this.fallbackPartitionNum = 
Integer.parseInt(runtimeConfig.get("sedona.join.numpartition", "-1"));
     }
 
-    public void setUseIndex(Boolean useIndex)
+    public boolean getUseIndex()
     {
-        this.useIndex = useIndex;
+        return useIndex;
     }
 
     public IndexType getIndexType()
@@ -82,71 +82,37 @@ public class SedonaConf
         return indexType;
     }
 
-    public void setIndexType(IndexType indexType)
-    {
-        this.indexType = indexType;
-    }
 
-    public Long getJoinApproximateTotalCount()
+    public long getJoinApproximateTotalCount()
     {
         return joinApproximateTotalCount;
     }
 
-    public void setJoinApproximateTotalCount(Long joinApproximateTotalCount)
-    {
-        this.joinApproximateTotalCount = joinApproximateTotalCount;
-    }
-
     public Envelope getDatasetBoundary()
     {
         return datasetBoundary;
     }
 
-    public void setDatasetBoundary(Envelope datasetBoundary)
-    {
-        this.datasetBoundary = datasetBoundary;
-    }
-
     public JoinBuildSide getJoinBuildSide()
     {
         return joinBuildSide;
     }
 
-    public void setJoinBuildSide(JoinBuildSide joinBuildSide)
-    {
-        this.joinBuildSide = joinBuildSide;
-    }
-
     public GridType getJoinGridType()
     {
         return joinGridType;
     }
 
-    public void setJoinGridType(GridType joinGridType)
-    {
-        this.joinGridType = joinGridType;
-    }
-
     public JoinSparitionDominantSide getJoinSparitionDominantSide()
     {
         return joinSparitionDominantSide;
     }
 
-    public void setJoinSparitionDominantSide(JoinSparitionDominantSide 
joinSparitionDominantSide)
-    {
-        this.joinSparitionDominantSide = joinSparitionDominantSide;
-    }
-
-    public Integer getFallbackPartitionNum()
+    public int getFallbackPartitionNum()
     {
         return fallbackPartitionNum;
     }
 
-    public void setFallbackPartitionNum(Integer fallbackPartitionNum)
-    {
-        this.fallbackPartitionNum = fallbackPartitionNum;
-    }
-
     public String toString()
     {
         try {
diff --git 
a/core/src/test/java/org/apache/sedona/core/utils/SedonaConfTest.java 
b/core/src/test/java/org/apache/sedona/core/utils/SedonaConfTest.java
new file mode 100644
index 00000000..98907294
--- /dev/null
+++ b/core/src/test/java/org/apache/sedona/core/utils/SedonaConfTest.java
@@ -0,0 +1,38 @@
+package org.apache.sedona.core.utils;
+
+import org.apache.spark.sql.SparkSession;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.locationtech.jts.geom.Envelope;
+
+import static org.junit.Assert.*;
+
+public class SedonaConfTest {
+
+    @BeforeClass
+    public static void setUp() {
+        SparkSession.builder().config("sedona.join.numpartition", 
"2").master("local").getOrCreate();
+
+    }
+
+    @AfterClass
+    public static void tearDown() {
+        SparkSession.active().sparkContext().stop();
+    }
+
+    @Test
+    public void testRuntimeConf() {
+        assertEquals(2, 
SedonaConf.fromActiveSession().getFallbackPartitionNum());
+        SparkSession.active().conf().set("sedona.join.numpartition", "3");
+        assertEquals(3, 
SedonaConf.fromActiveSession().getFallbackPartitionNum());
+    }
+    
+    @Test
+    public void testDatasetBoundary() {
+        SparkSession.active().conf().set("sedona.join.boundary", "1,2,3,4");
+        Envelope datasetBoundary = 
SedonaConf.fromActiveSession().getDatasetBoundary();
+        assertEquals("Env[1.0 : 2.0, 3.0 : 4.0]", datasetBoundary.toString());
+    }
+}
\ No newline at end of file
diff --git a/docs/api/sql/Parameter.md b/docs/api/sql/Parameter.md
index 18dc92e6..11a01620 100644
--- a/docs/api/sql/Parameter.md
+++ b/docs/api/sql/Parameter.md
@@ -11,9 +11,13 @@ sparkSession = SparkSession.builder().
 ```
 2. Check your current SedonaSQL configuration:
 ```Scala
-val sedonaConf = new SedonaConf(sparkSession.sparkContext.getConf)
+val sedonaConf = new SedonaConf(sparkSession.conf)
 println(sedonaConf)
 ```
+3. Sedona parameters can be changed at runtime:
+```Scala
+sparkSession.conf.set("sedona.global.index","false")
+```
 ## Explanation
 
 * sedona.global.index
diff --git 
a/sql/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/JoinQueryDetector.scala
 
b/sql/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/JoinQueryDetector.scala
index fbd1f958..ce00499d 100644
--- 
a/sql/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/JoinQueryDetector.scala
+++ 
b/sql/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/JoinQueryDetector.scala
@@ -110,7 +110,7 @@ class JoinQueryDetector(sparkSession: SparkSession) extends 
Strategy {
           None
       }
 
-      val sedonaConf = new SedonaConf(sparkSession.sparkContext.conf)
+      val sedonaConf = new SedonaConf(sparkSession.conf)
 
       if ((broadcastLeft || broadcastRight) && sedonaConf.getUseIndex) {
         queryDetection match {
diff --git 
a/sql/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/TraitJoinQueryExec.scala
 
b/sql/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/TraitJoinQueryExec.scala
index eb912798..506d7033 100644
--- 
a/sql/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/TraitJoinQueryExec.scala
+++ 
b/sql/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/TraitJoinQueryExec.scala
@@ -52,7 +52,7 @@ trait TraitJoinQueryExec extends TraitJoinQueryBase {
     val leftResultsRaw = left.execute().asInstanceOf[RDD[UnsafeRow]]
     val rightResultsRaw = right.execute().asInstanceOf[RDD[UnsafeRow]]
 
-    val sedonaConf = new SedonaConf(sparkContext.conf)
+    val sedonaConf = SedonaConf.fromActiveSession
     val (leftShapes, rightShapes) =
       toSpatialRddPair(leftResultsRaw, boundLeftShape, rightResultsRaw, 
boundRightShape)
 
@@ -61,13 +61,9 @@ trait TraitJoinQueryExec extends TraitJoinQueryBase {
     if (sedonaConf.getJoinApproximateTotalCount == -1) {
       if (sedonaConf.getJoinSparitionDominantSide == 
JoinSparitionDominantSide.LEFT) {
         leftShapes.analyze()
-        
sedonaConf.setJoinApproximateTotalCount(leftShapes.approximateTotalCount)
-        sedonaConf.setDatasetBoundary(leftShapes.boundaryEnvelope)
       }
       else {
         rightShapes.analyze()
-        
sedonaConf.setJoinApproximateTotalCount(rightShapes.approximateTotalCount)
-        sedonaConf.setDatasetBoundary(rightShapes.boundaryEnvelope)
       }
     }
     log.info("[SedonaSQL] Number of partitions on the left: " + 
leftResultsRaw.partitions.size)
diff --git 
a/sql/src/test/scala/org/apache/sedona/sql/predicateJoinTestScala.scala 
b/sql/src/test/scala/org/apache/sedona/sql/predicateJoinTestScala.scala
index 48684bc5..596e561b 100644
--- a/sql/src/test/scala/org/apache/sedona/sql/predicateJoinTestScala.scala
+++ b/sql/src/test/scala/org/apache/sedona/sql/predicateJoinTestScala.scala
@@ -31,7 +31,7 @@ class predicateJoinTestScala extends TestBaseScala {
   describe("Sedona-SQL Predicate Join Test") {
 
     it("Passed ST_Contains in a join") {
-      val sedonaConf = new SedonaConf(sparkSession.sparkContext.getConf)
+      val sedonaConf = new SedonaConf(sparkSession.conf)
       println(sedonaConf)
 
       var polygonCsvDf = sparkSession.read.format("csv").option("delimiter", 
",").option("header", "false").load(csvPolygonInputLocation)
@@ -129,7 +129,7 @@ class predicateJoinTestScala extends TestBaseScala {
     }
 
     it("Passed ST_Covers in a join") {
-      val sedonaConf = new SedonaConf(sparkSession.sparkContext.getConf)
+      val sedonaConf = new SedonaConf(sparkSession.conf)
       println(sedonaConf)
 
       var polygonCsvDf = sparkSession.read.format("csv").option("delimiter", 
",").option("header", "false").load(csvPolygonInputLocation)

Reply via email to