This is an automated email from the ASF dual-hosted git repository.
jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-sedona.git
The following commit(s) were added to refs/heads/master by this push:
new d5fe2b46 [SEDONA-208] Use Spark RuntimeConfig in SedonaConf (#722)
d5fe2b46 is described below
commit d5fe2b46d45215f1703cbb43a181d040fd43c28e
Author: Martin Andersson <[email protected]>
AuthorDate: Sat Dec 3 02:46:53 2022 +0100
[SEDONA-208] Use Spark RuntimeConfig in SedonaConf (#722)
---
.../org/apache/sedona/core/utils/SedonaConf.java | 88 +++++++---------------
.../apache/sedona/core/utils/SedonaConfTest.java | 38 ++++++++++
docs/api/sql/Parameter.md | 6 +-
.../strategy/join/JoinQueryDetector.scala | 2 +-
.../strategy/join/TraitJoinQueryExec.scala | 6 +-
.../apache/sedona/sql/predicateJoinTestScala.scala | 4 +-
6 files changed, 74 insertions(+), 70 deletions(-)
diff --git a/core/src/main/java/org/apache/sedona/core/utils/SedonaConf.java
b/core/src/main/java/org/apache/sedona/core/utils/SedonaConf.java
index 1b8b2b76..b801d4d4 100644
--- a/core/src/main/java/org/apache/sedona/core/utils/SedonaConf.java
+++ b/core/src/main/java/org/apache/sedona/core/utils/SedonaConf.java
@@ -23,7 +23,8 @@ import org.apache.sedona.core.enums.GridType;
import org.apache.sedona.core.enums.IndexType;
import org.apache.sedona.core.enums.JoinBuildSide;
import org.apache.sedona.core.enums.JoinSparitionDominantSide;
-import org.apache.spark.SparkConf;
+import org.apache.spark.sql.RuntimeConfig;
+import org.apache.spark.sql.SparkSession;
import org.locationtech.jts.geom.Envelope;
import java.io.Serializable;
@@ -35,46 +36,45 @@ public class SedonaConf
// Global parameters of Sedona. All these parameters can be initialized
through SparkConf.
- private Boolean useIndex = false;
+ private boolean useIndex;
- private IndexType indexType = IndexType.QUADTREE;
+ private IndexType indexType;
// Parameters for JoinQuery including RangeJoin and DistanceJoin
- private JoinSparitionDominantSide joinSparitionDominantSide =
JoinSparitionDominantSide.LEFT;
+ private JoinSparitionDominantSide joinSparitionDominantSide;
- private JoinBuildSide joinBuildSide = JoinBuildSide.LEFT;
+ private JoinBuildSide joinBuildSide;
- private Long joinApproximateTotalCount = (long) -1;
+ private long joinApproximateTotalCount;
- private Envelope datasetBoundary = new Envelope(0, 0, 0, 0);
+ private Envelope datasetBoundary;
- private Integer fallbackPartitionNum = -1;
+ private int fallbackPartitionNum;
- private GridType joinGridType = GridType.KDBTREE;
+ private GridType joinGridType;
- public SedonaConf(SparkConf sparkConf)
- {
- this.useIndex = sparkConf.getBoolean("sedona.global.index", true);
- this.indexType =
IndexType.getIndexType(sparkConf.get("sedona.global.indextype", "quadtree"));
- this.joinApproximateTotalCount =
sparkConf.getLong("sedona.join.approxcount", -1);
- String[] boundaryString = sparkConf.get("sedona.join.boundary",
"0,0,0,0").split(",");
- this.datasetBoundary = new
Envelope(Double.parseDouble(boundaryString[0]),
Double.parseDouble(boundaryString[0]),
- Double.parseDouble(boundaryString[0]),
Double.parseDouble(boundaryString[0]));
- this.joinGridType =
GridType.getGridType(sparkConf.get("sedona.join.gridtype", "kdbtree"));
- this.joinBuildSide =
JoinBuildSide.getBuildSide(sparkConf.get("sedona.join.indexbuildside", "left"));
- this.joinSparitionDominantSide =
JoinSparitionDominantSide.getJoinSparitionDominantSide(sparkConf.get("sedona.join.spatitionside",
"left"));
- this.fallbackPartitionNum =
sparkConf.getInt("sedona.join.numpartition", -1);
+ public static SedonaConf fromActiveSession() {
+ return new SedonaConf(SparkSession.active().conf());
}
- public Boolean getUseIndex()
+ public SedonaConf(RuntimeConfig runtimeConfig)
{
- return useIndex;
+ this.useIndex =
Boolean.parseBoolean(runtimeConfig.get("sedona.global.index", "true"));
+ this.indexType =
IndexType.getIndexType(runtimeConfig.get("sedona.global.indextype",
"quadtree"));
+ this.joinApproximateTotalCount =
Long.parseLong(runtimeConfig.get("sedona.join.approxcount", "-1"));
+ String[] boundaryString = runtimeConfig.get("sedona.join.boundary",
"0,0,0,0").split(",");
+ this.datasetBoundary = new
Envelope(Double.parseDouble(boundaryString[0]),
Double.parseDouble(boundaryString[1]),
+ Double.parseDouble(boundaryString[2]),
Double.parseDouble(boundaryString[3]));
+ this.joinGridType =
GridType.getGridType(runtimeConfig.get("sedona.join.gridtype", "kdbtree"));
+ this.joinBuildSide =
JoinBuildSide.getBuildSide(runtimeConfig.get("sedona.join.indexbuildside",
"left"));
+ this.joinSparitionDominantSide =
JoinSparitionDominantSide.getJoinSparitionDominantSide(runtimeConfig.get("sedona.join.spatitionside",
"left"));
+ this.fallbackPartitionNum =
Integer.parseInt(runtimeConfig.get("sedona.join.numpartition", "-1"));
}
- public void setUseIndex(Boolean useIndex)
+ public boolean getUseIndex()
{
- this.useIndex = useIndex;
+ return useIndex;
}
public IndexType getIndexType()
@@ -82,71 +82,37 @@ public class SedonaConf
return indexType;
}
- public void setIndexType(IndexType indexType)
- {
- this.indexType = indexType;
- }
- public Long getJoinApproximateTotalCount()
+ public long getJoinApproximateTotalCount()
{
return joinApproximateTotalCount;
}
- public void setJoinApproximateTotalCount(Long joinApproximateTotalCount)
- {
- this.joinApproximateTotalCount = joinApproximateTotalCount;
- }
-
public Envelope getDatasetBoundary()
{
return datasetBoundary;
}
- public void setDatasetBoundary(Envelope datasetBoundary)
- {
- this.datasetBoundary = datasetBoundary;
- }
-
public JoinBuildSide getJoinBuildSide()
{
return joinBuildSide;
}
- public void setJoinBuildSide(JoinBuildSide joinBuildSide)
- {
- this.joinBuildSide = joinBuildSide;
- }
-
public GridType getJoinGridType()
{
return joinGridType;
}
- public void setJoinGridType(GridType joinGridType)
- {
- this.joinGridType = joinGridType;
- }
-
public JoinSparitionDominantSide getJoinSparitionDominantSide()
{
return joinSparitionDominantSide;
}
- public void setJoinSparitionDominantSide(JoinSparitionDominantSide
joinSparitionDominantSide)
- {
- this.joinSparitionDominantSide = joinSparitionDominantSide;
- }
-
- public Integer getFallbackPartitionNum()
+ public int getFallbackPartitionNum()
{
return fallbackPartitionNum;
}
- public void setFallbackPartitionNum(Integer fallbackPartitionNum)
- {
- this.fallbackPartitionNum = fallbackPartitionNum;
- }
-
public String toString()
{
try {
diff --git
a/core/src/test/java/org/apache/sedona/core/utils/SedonaConfTest.java
b/core/src/test/java/org/apache/sedona/core/utils/SedonaConfTest.java
new file mode 100644
index 00000000..98907294
--- /dev/null
+++ b/core/src/test/java/org/apache/sedona/core/utils/SedonaConfTest.java
@@ -0,0 +1,38 @@
+package org.apache.sedona.core.utils;
+
+import org.apache.spark.sql.SparkSession;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.locationtech.jts.geom.Envelope;
+
+import static org.junit.Assert.*;
+
+public class SedonaConfTest {
+
+ @BeforeClass
+ public static void setUp() {
+ SparkSession.builder().config("sedona.join.numpartition",
"2").master("local").getOrCreate();
+
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ SparkSession.active().sparkContext().stop();
+ }
+
+ @Test
+ public void testRuntimeConf() {
+ assertEquals(2,
SedonaConf.fromActiveSession().getFallbackPartitionNum());
+ SparkSession.active().conf().set("sedona.join.numpartition", "3");
+ assertEquals(3,
SedonaConf.fromActiveSession().getFallbackPartitionNum());
+ }
+
+ @Test
+ public void testDatasetBoundary() {
+ SparkSession.active().conf().set("sedona.join.boundary", "1,2,3,4");
+ Envelope datasetBoundary =
SedonaConf.fromActiveSession().getDatasetBoundary();
+ assertEquals("Env[1.0 : 2.0, 3.0 : 4.0]", datasetBoundary.toString());
+ }
+}
\ No newline at end of file
diff --git a/docs/api/sql/Parameter.md b/docs/api/sql/Parameter.md
index 18dc92e6..11a01620 100644
--- a/docs/api/sql/Parameter.md
+++ b/docs/api/sql/Parameter.md
@@ -11,9 +11,13 @@ sparkSession = SparkSession.builder().
```
2. Check your current SedonaSQL configuration:
```Scala
-val sedonaConf = new SedonaConf(sparkSession.sparkContext.getConf)
+val sedonaConf = new SedonaConf(sparkSession.conf)
println(sedonaConf)
```
+3. Sedona parameters can be changed at runtime:
+```Scala
+sparkSession.conf.set("sedona.global.index","false")
+```
## Explanation
* sedona.global.index
diff --git
a/sql/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/JoinQueryDetector.scala
b/sql/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/JoinQueryDetector.scala
index fbd1f958..ce00499d 100644
---
a/sql/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/JoinQueryDetector.scala
+++
b/sql/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/JoinQueryDetector.scala
@@ -110,7 +110,7 @@ class JoinQueryDetector(sparkSession: SparkSession) extends
Strategy {
None
}
- val sedonaConf = new SedonaConf(sparkSession.sparkContext.conf)
+ val sedonaConf = new SedonaConf(sparkSession.conf)
if ((broadcastLeft || broadcastRight) && sedonaConf.getUseIndex) {
queryDetection match {
diff --git
a/sql/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/TraitJoinQueryExec.scala
b/sql/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/TraitJoinQueryExec.scala
index eb912798..506d7033 100644
---
a/sql/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/TraitJoinQueryExec.scala
+++
b/sql/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/TraitJoinQueryExec.scala
@@ -52,7 +52,7 @@ trait TraitJoinQueryExec extends TraitJoinQueryBase {
val leftResultsRaw = left.execute().asInstanceOf[RDD[UnsafeRow]]
val rightResultsRaw = right.execute().asInstanceOf[RDD[UnsafeRow]]
- val sedonaConf = new SedonaConf(sparkContext.conf)
+ val sedonaConf = SedonaConf.fromActiveSession
val (leftShapes, rightShapes) =
toSpatialRddPair(leftResultsRaw, boundLeftShape, rightResultsRaw,
boundRightShape)
@@ -61,13 +61,9 @@ trait TraitJoinQueryExec extends TraitJoinQueryBase {
if (sedonaConf.getJoinApproximateTotalCount == -1) {
if (sedonaConf.getJoinSparitionDominantSide ==
JoinSparitionDominantSide.LEFT) {
leftShapes.analyze()
-
sedonaConf.setJoinApproximateTotalCount(leftShapes.approximateTotalCount)
- sedonaConf.setDatasetBoundary(leftShapes.boundaryEnvelope)
}
else {
rightShapes.analyze()
-
sedonaConf.setJoinApproximateTotalCount(rightShapes.approximateTotalCount)
- sedonaConf.setDatasetBoundary(rightShapes.boundaryEnvelope)
}
}
log.info("[SedonaSQL] Number of partitions on the left: " +
leftResultsRaw.partitions.size)
diff --git
a/sql/src/test/scala/org/apache/sedona/sql/predicateJoinTestScala.scala
b/sql/src/test/scala/org/apache/sedona/sql/predicateJoinTestScala.scala
index 48684bc5..596e561b 100644
--- a/sql/src/test/scala/org/apache/sedona/sql/predicateJoinTestScala.scala
+++ b/sql/src/test/scala/org/apache/sedona/sql/predicateJoinTestScala.scala
@@ -31,7 +31,7 @@ class predicateJoinTestScala extends TestBaseScala {
describe("Sedona-SQL Predicate Join Test") {
it("Passed ST_Contains in a join") {
- val sedonaConf = new SedonaConf(sparkSession.sparkContext.getConf)
+ val sedonaConf = new SedonaConf(sparkSession.conf)
println(sedonaConf)
var polygonCsvDf = sparkSession.read.format("csv").option("delimiter",
",").option("header", "false").load(csvPolygonInputLocation)
@@ -129,7 +129,7 @@ class predicateJoinTestScala extends TestBaseScala {
}
it("Passed ST_Covers in a join") {
- val sedonaConf = new SedonaConf(sparkSession.sparkContext.getConf)
+ val sedonaConf = new SedonaConf(sparkSession.conf)
println(sedonaConf)
var polygonCsvDf = sparkSession.read.format("csv").option("delimiter",
",").option("header", "false").load(csvPolygonInputLocation)