This is an automated email from the ASF dual-hosted git repository.
jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sedona.git
The following commit(s) were added to refs/heads/master by this push:
new 12903e1cfa [SEDONA-636] fix some bugs when sedona parser running with
other parser (#1706)
12903e1cfa is described below
commit 12903e1cfa92b7ae4ec7fc67806a8155814fdd95
Author: freamdx <[email protected]>
AuthorDate: Wed Dec 11 08:47:10 2024 +0800
[SEDONA-636] fix some bugs when sedona parser running with other parser
(#1706)
---
.../org/apache/sedona/spark/SedonaContext.scala | 5 +-
.../org/apache/sedona/sql/ParserRegistrator.scala | 60 ----------------------
.../apache/sedona/sql/SedonaSqlExtensions.scala | 11 ++++
.../apache/sedona/sql/parser/SedonaSqlParser.scala | 8 +--
.../org/apache/sedona/sql/SQLSyntaxTestScala.scala | 23 +++++++--
.../org/apache/sedona/sql/TestBaseScala.scala | 5 ++
.../apache/sedona/sql/parser/SedonaSqlParser.scala | 8 +--
.../org/apache/sedona/sql/SQLSyntaxTestScala.scala | 23 +++++++--
.../org/apache/sedona/sql/TestBaseScala.scala | 5 ++
.../apache/sedona/sql/parser/SedonaSqlParser.scala | 8 +--
.../org/apache/sedona/sql/SQLSyntaxTestScala.scala | 23 +++++++--
.../org/apache/sedona/sql/TestBaseScala.scala | 5 ++
12 files changed, 90 insertions(+), 94 deletions(-)
diff --git
a/spark/common/src/main/scala/org/apache/sedona/spark/SedonaContext.scala
b/spark/common/src/main/scala/org/apache/sedona/spark/SedonaContext.scala
index 4c8fcab692..db266c38fb 100644
--- a/spark/common/src/main/scala/org/apache/sedona/spark/SedonaContext.scala
+++ b/spark/common/src/main/scala/org/apache/sedona/spark/SedonaContext.scala
@@ -20,7 +20,7 @@ package org.apache.sedona.spark
import org.apache.sedona.common.utils.TelemetryCollector
import org.apache.sedona.core.serde.SedonaKryoRegistrator
-import org.apache.sedona.sql.{ParserRegistrator, RasterRegistrator}
+import org.apache.sedona.sql.RasterRegistrator
import org.apache.sedona.sql.UDF.UdfRegistrator
import org.apache.sedona.sql.UDT.UdtRegistrator
import org.apache.spark.serializer.KryoSerializer
@@ -65,9 +65,6 @@ object SedonaContext {
RasterRegistrator.registerAll(sparkSession)
UdtRegistrator.registerAll()
UdfRegistrator.registerAll(sparkSession)
- if (sparkSession.conf.get("spark.sedona.enableParserExtension",
"true").toBoolean) {
- ParserRegistrator.register(sparkSession)
- }
sparkSession
}
diff --git
a/spark/common/src/main/scala/org/apache/sedona/sql/ParserRegistrator.scala
b/spark/common/src/main/scala/org/apache/sedona/sql/ParserRegistrator.scala
deleted file mode 100644
index db3c623a09..0000000000
--- a/spark/common/src/main/scala/org/apache/sedona/sql/ParserRegistrator.scala
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sedona.sql
-
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.parser.ParserInterface
-import org.apache.spark.sql.parser.ParserFactory
-
-object ParserRegistrator {
-
- /**
- * Register the custom Sedona Spark parser
- * @param sparkSession
- */
- def register(sparkSession: SparkSession): Unit = {
- // try to register the parser with the new constructor for spark 3.1 and
above
- try {
- val parserClassName = "org.apache.sedona.sql.parser.SedonaSqlParser"
- val delegate: ParserInterface = sparkSession.sessionState.sqlParser
-
- val parser = ParserFactory.getParser(parserClassName, delegate)
- val field =
sparkSession.sessionState.getClass.getDeclaredField("sqlParser")
- field.setAccessible(true)
- field.set(sparkSession.sessionState, parser)
- return // return if the new constructor is available
- } catch {
- case _: Exception =>
- }
-
- // try to register the parser with the legacy constructor for spark 3.0
- try {
- val parserClassName = "org.apache.sedona.sql.parser.SedonaSqlParser"
- val delegate: ParserInterface = sparkSession.sessionState.sqlParser
-
- val parser =
- ParserFactory.getParser(parserClassName,
sparkSession.sessionState.conf, delegate)
- val field =
sparkSession.sessionState.getClass.getDeclaredField("sqlParser")
- field.setAccessible(true)
- field.set(sparkSession.sessionState, parser)
- } catch {
- case _: Exception =>
- }
- }
-}
diff --git
a/spark/common/src/main/scala/org/apache/sedona/sql/SedonaSqlExtensions.scala
b/spark/common/src/main/scala/org/apache/sedona/sql/SedonaSqlExtensions.scala
index be0774ac90..fbc3567192 100644
---
a/spark/common/src/main/scala/org/apache/sedona/sql/SedonaSqlExtensions.scala
+++
b/spark/common/src/main/scala/org/apache/sedona/sql/SedonaSqlExtensions.scala
@@ -19,13 +19,24 @@
package org.apache.sedona.sql
import org.apache.sedona.spark.SedonaContext
+import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSessionExtensions
+import org.apache.spark.sql.parser.ParserFactory
class SedonaSqlExtensions extends (SparkSessionExtensions => Unit) {
+ private lazy val enableParser =
+
SparkContext.getOrCreate().getConf.get("spark.sedona.enableParserExtension",
"true").toBoolean
+
def apply(e: SparkSessionExtensions): Unit = {
e.injectCheckRule(spark => {
SedonaContext.create(spark)
_ => ()
})
+
+ if (enableParser) {
+ e.injectParser { case (_, parser) =>
+
ParserFactory.getParser("org.apache.sedona.sql.parser.SedonaSqlParser", parser)
+ }
+ }
}
}
diff --git
a/spark/spark-3.3/src/main/scala/org/apache/sedona/sql/parser/SedonaSqlParser.scala
b/spark/spark-3.3/src/main/scala/org/apache/sedona/sql/parser/SedonaSqlParser.scala
index 6c70419122..56c27ba76b 100644
---
a/spark/spark-3.3/src/main/scala/org/apache/sedona/sql/parser/SedonaSqlParser.scala
+++
b/spark/spark-3.3/src/main/scala/org/apache/sedona/sql/parser/SedonaSqlParser.scala
@@ -35,12 +35,8 @@ class SedonaSqlParser(delegate: ParserInterface) extends
SparkSqlParser {
override def parsePlan(sqlText: String): LogicalPlan =
try {
parse(sqlText) { parser =>
- parserBuilder.visit(parser.singleStatement()) match {
- case plan: LogicalPlan => plan
- case _ =>
- delegate.parsePlan(sqlText)
- }
- }
+ parserBuilder.visit(parser.singleStatement())
+ }.asInstanceOf[LogicalPlan]
} catch {
case _: Exception =>
delegate.parsePlan(sqlText)
diff --git
a/spark/spark-3.3/src/test/scala/org/apache/sedona/sql/SQLSyntaxTestScala.scala
b/spark/spark-3.3/src/test/scala/org/apache/sedona/sql/SQLSyntaxTestScala.scala
index 72680aacd4..6f873d0a08 100644
---
a/spark/spark-3.3/src/test/scala/org/apache/sedona/sql/SQLSyntaxTestScala.scala
+++
b/spark/spark-3.3/src/test/scala/org/apache/sedona/sql/SQLSyntaxTestScala.scala
@@ -44,14 +44,29 @@ class SQLSyntaxTestScala extends TestBaseScala with
TableDrivenPropertyChecks {
it(
"should be able to create a regular table with geometry column should
work without a workaround") {
- sparkSession.sql("CREATE TABLE T_TEST_EXPLICIT_GEOMETRY (GEO_COL
GEOMETRY)")
- sparkSession.catalog.tableExists("T_TEST_EXPLICIT_GEOMETRY") should
be(true)
+ try {
+ sparkSession.sql("CREATE TABLE T_TEST_EXPLICIT_GEOMETRY (GEO_COL
GEOMETRY)")
+ sparkSession.catalog.tableExists("T_TEST_EXPLICIT_GEOMETRY") should
be(true)
+ sparkSession.sparkContext.getConf.get(keyParserExtension) should
be("true")
+ } catch {
+ case ex: Exception =>
+ ex.getClass.getName.endsWith("ParseException") should be(true)
+ sparkSession.sparkContext.getConf.get(keyParserExtension) should
be("false")
+ }
}
it(
"should be able to create a regular table with regular and geometry
column should work without a workaround") {
- sparkSession.sql("CREATE TABLE T_TEST_EXPLICIT_GEOMETRY_2 (INT_COL INT,
GEO_COL GEOMETRY)")
- sparkSession.catalog.tableExists("T_TEST_EXPLICIT_GEOMETRY_2") should
be(true)
+ try {
+ sparkSession.sql(
+ "CREATE TABLE T_TEST_EXPLICIT_GEOMETRY_2 (INT_COL INT, GEO_COL
GEOMETRY)")
+ sparkSession.catalog.tableExists("T_TEST_EXPLICIT_GEOMETRY_2") should
be(true)
+ sparkSession.sparkContext.getConf.get(keyParserExtension) should
be("true")
+ } catch {
+ case ex: Exception =>
+ ex.getClass.getName.endsWith("ParseException") should be(true)
+ sparkSession.sparkContext.getConf.get(keyParserExtension) should
be("false")
+ }
}
}
}
diff --git
a/spark/spark-3.3/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala
b/spark/spark-3.3/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala
index f629648b29..8d13f6138d 100644
--- a/spark/spark-3.3/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala
+++ b/spark/spark-3.3/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala
@@ -23,6 +23,8 @@ import org.apache.sedona.spark.SedonaContext
import org.apache.spark.sql.DataFrame
import org.scalatest.{BeforeAndAfterAll, FunSpec}
+import java.util.concurrent.ThreadLocalRandom
+
trait TestBaseScala extends FunSpec with BeforeAndAfterAll {
Logger.getRootLogger().setLevel(Level.WARN)
Logger.getLogger("org.apache").setLevel(Level.WARN)
@@ -30,6 +32,7 @@ trait TestBaseScala extends FunSpec with BeforeAndAfterAll {
Logger.getLogger("akka").setLevel(Level.WARN)
Logger.getLogger("org.apache.sedona.core").setLevel(Level.WARN)
+ val keyParserExtension = "spark.sedona.enableParserExtension"
val warehouseLocation = System.getProperty("user.dir") + "/target/"
val sparkSession = SedonaContext
.builder()
@@ -38,6 +41,8 @@ trait TestBaseScala extends FunSpec with BeforeAndAfterAll {
.config("spark.sql.warehouse.dir", warehouseLocation)
.config("sedona.join.autoBroadcastJoinThreshold", "-1")
.config("spark.sql.session.timeZone", "UTC")
+ .config("spark.sql.extensions",
"org.apache.sedona.sql.SedonaSqlExtensions")
+ .config(keyParserExtension, ThreadLocalRandom.current().nextBoolean())
.getOrCreate()
val sparkSessionMinio = SedonaContext
diff --git
a/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/parser/SedonaSqlParser.scala
b/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/parser/SedonaSqlParser.scala
index 6c70419122..56c27ba76b 100644
---
a/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/parser/SedonaSqlParser.scala
+++
b/spark/spark-3.4/src/main/scala/org/apache/sedona/sql/parser/SedonaSqlParser.scala
@@ -35,12 +35,8 @@ class SedonaSqlParser(delegate: ParserInterface) extends
SparkSqlParser {
override def parsePlan(sqlText: String): LogicalPlan =
try {
parse(sqlText) { parser =>
- parserBuilder.visit(parser.singleStatement()) match {
- case plan: LogicalPlan => plan
- case _ =>
- delegate.parsePlan(sqlText)
- }
- }
+ parserBuilder.visit(parser.singleStatement())
+ }.asInstanceOf[LogicalPlan]
} catch {
case _: Exception =>
delegate.parsePlan(sqlText)
diff --git
a/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/SQLSyntaxTestScala.scala
b/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/SQLSyntaxTestScala.scala
index 72680aacd4..6f873d0a08 100644
---
a/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/SQLSyntaxTestScala.scala
+++
b/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/SQLSyntaxTestScala.scala
@@ -44,14 +44,29 @@ class SQLSyntaxTestScala extends TestBaseScala with
TableDrivenPropertyChecks {
it(
"should be able to create a regular table with geometry column should
work without a workaround") {
- sparkSession.sql("CREATE TABLE T_TEST_EXPLICIT_GEOMETRY (GEO_COL
GEOMETRY)")
- sparkSession.catalog.tableExists("T_TEST_EXPLICIT_GEOMETRY") should
be(true)
+ try {
+ sparkSession.sql("CREATE TABLE T_TEST_EXPLICIT_GEOMETRY (GEO_COL
GEOMETRY)")
+ sparkSession.catalog.tableExists("T_TEST_EXPLICIT_GEOMETRY") should
be(true)
+ sparkSession.sparkContext.getConf.get(keyParserExtension) should
be("true")
+ } catch {
+ case ex: Exception =>
+ ex.getClass.getName.endsWith("ParseException") should be(true)
+ sparkSession.sparkContext.getConf.get(keyParserExtension) should
be("false")
+ }
}
it(
"should be able to create a regular table with regular and geometry
column should work without a workaround") {
- sparkSession.sql("CREATE TABLE T_TEST_EXPLICIT_GEOMETRY_2 (INT_COL INT,
GEO_COL GEOMETRY)")
- sparkSession.catalog.tableExists("T_TEST_EXPLICIT_GEOMETRY_2") should
be(true)
+ try {
+ sparkSession.sql(
+ "CREATE TABLE T_TEST_EXPLICIT_GEOMETRY_2 (INT_COL INT, GEO_COL
GEOMETRY)")
+ sparkSession.catalog.tableExists("T_TEST_EXPLICIT_GEOMETRY_2") should
be(true)
+ sparkSession.sparkContext.getConf.get(keyParserExtension) should
be("true")
+ } catch {
+ case ex: Exception =>
+ ex.getClass.getName.endsWith("ParseException") should be(true)
+ sparkSession.sparkContext.getConf.get(keyParserExtension) should
be("false")
+ }
}
}
}
diff --git
a/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala
b/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala
index 34746d0b28..ae1ed5d091 100644
--- a/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala
+++ b/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala
@@ -23,6 +23,8 @@ import org.apache.sedona.spark.SedonaContext
import org.apache.spark.sql.DataFrame
import org.scalatest.{BeforeAndAfterAll, FunSpec}
+import java.util.concurrent.ThreadLocalRandom
+
trait TestBaseScala extends FunSpec with BeforeAndAfterAll {
Logger.getRootLogger().setLevel(Level.WARN)
Logger.getLogger("org.apache").setLevel(Level.WARN)
@@ -30,6 +32,7 @@ trait TestBaseScala extends FunSpec with BeforeAndAfterAll {
Logger.getLogger("akka").setLevel(Level.WARN)
Logger.getLogger("org.apache.sedona.core").setLevel(Level.WARN)
+ val keyParserExtension = "spark.sedona.enableParserExtension"
val warehouseLocation = System.getProperty("user.dir") + "/target/"
val sparkSession = SedonaContext
.builder()
@@ -38,6 +41,8 @@ trait TestBaseScala extends FunSpec with BeforeAndAfterAll {
.config("spark.sql.warehouse.dir", warehouseLocation)
// We need to be explicit about broadcasting in tests.
.config("sedona.join.autoBroadcastJoinThreshold", "-1")
+ .config("spark.sql.extensions",
"org.apache.sedona.sql.SedonaSqlExtensions")
+ .config(keyParserExtension, ThreadLocalRandom.current().nextBoolean())
.getOrCreate()
val sparkSessionMinio = SedonaContext
diff --git
a/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/parser/SedonaSqlParser.scala
b/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/parser/SedonaSqlParser.scala
index 6c70419122..56c27ba76b 100644
---
a/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/parser/SedonaSqlParser.scala
+++
b/spark/spark-3.5/src/main/scala/org/apache/sedona/sql/parser/SedonaSqlParser.scala
@@ -35,12 +35,8 @@ class SedonaSqlParser(delegate: ParserInterface) extends
SparkSqlParser {
override def parsePlan(sqlText: String): LogicalPlan =
try {
parse(sqlText) { parser =>
- parserBuilder.visit(parser.singleStatement()) match {
- case plan: LogicalPlan => plan
- case _ =>
- delegate.parsePlan(sqlText)
- }
- }
+ parserBuilder.visit(parser.singleStatement())
+ }.asInstanceOf[LogicalPlan]
} catch {
case _: Exception =>
delegate.parsePlan(sqlText)
diff --git
a/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/SQLSyntaxTestScala.scala
b/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/SQLSyntaxTestScala.scala
index 72680aacd4..6f873d0a08 100644
---
a/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/SQLSyntaxTestScala.scala
+++
b/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/SQLSyntaxTestScala.scala
@@ -44,14 +44,29 @@ class SQLSyntaxTestScala extends TestBaseScala with
TableDrivenPropertyChecks {
it(
"should be able to create a regular table with geometry column should
work without a workaround") {
- sparkSession.sql("CREATE TABLE T_TEST_EXPLICIT_GEOMETRY (GEO_COL
GEOMETRY)")
- sparkSession.catalog.tableExists("T_TEST_EXPLICIT_GEOMETRY") should
be(true)
+ try {
+ sparkSession.sql("CREATE TABLE T_TEST_EXPLICIT_GEOMETRY (GEO_COL
GEOMETRY)")
+ sparkSession.catalog.tableExists("T_TEST_EXPLICIT_GEOMETRY") should
be(true)
+ sparkSession.sparkContext.getConf.get(keyParserExtension) should
be("true")
+ } catch {
+ case ex: Exception =>
+ ex.getClass.getName.endsWith("ParseException") should be(true)
+ sparkSession.sparkContext.getConf.get(keyParserExtension) should
be("false")
+ }
}
it(
"should be able to create a regular table with regular and geometry
column should work without a workaround") {
- sparkSession.sql("CREATE TABLE T_TEST_EXPLICIT_GEOMETRY_2 (INT_COL INT,
GEO_COL GEOMETRY)")
- sparkSession.catalog.tableExists("T_TEST_EXPLICIT_GEOMETRY_2") should
be(true)
+ try {
+ sparkSession.sql(
+ "CREATE TABLE T_TEST_EXPLICIT_GEOMETRY_2 (INT_COL INT, GEO_COL
GEOMETRY)")
+ sparkSession.catalog.tableExists("T_TEST_EXPLICIT_GEOMETRY_2") should
be(true)
+ sparkSession.sparkContext.getConf.get(keyParserExtension) should
be("true")
+ } catch {
+ case ex: Exception =>
+ ex.getClass.getName.endsWith("ParseException") should be(true)
+ sparkSession.sparkContext.getConf.get(keyParserExtension) should
be("false")
+ }
}
}
}
diff --git
a/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala
b/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala
index 34746d0b28..ae1ed5d091 100644
--- a/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala
+++ b/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala
@@ -23,6 +23,8 @@ import org.apache.sedona.spark.SedonaContext
import org.apache.spark.sql.DataFrame
import org.scalatest.{BeforeAndAfterAll, FunSpec}
+import java.util.concurrent.ThreadLocalRandom
+
trait TestBaseScala extends FunSpec with BeforeAndAfterAll {
Logger.getRootLogger().setLevel(Level.WARN)
Logger.getLogger("org.apache").setLevel(Level.WARN)
@@ -30,6 +32,7 @@ trait TestBaseScala extends FunSpec with BeforeAndAfterAll {
Logger.getLogger("akka").setLevel(Level.WARN)
Logger.getLogger("org.apache.sedona.core").setLevel(Level.WARN)
+ val keyParserExtension = "spark.sedona.enableParserExtension"
val warehouseLocation = System.getProperty("user.dir") + "/target/"
val sparkSession = SedonaContext
.builder()
@@ -38,6 +41,8 @@ trait TestBaseScala extends FunSpec with BeforeAndAfterAll {
.config("spark.sql.warehouse.dir", warehouseLocation)
// We need to be explicit about broadcasting in tests.
.config("sedona.join.autoBroadcastJoinThreshold", "-1")
+ .config("spark.sql.extensions",
"org.apache.sedona.sql.SedonaSqlExtensions")
+ .config(keyParserExtension, ThreadLocalRandom.current().nextBoolean())
.getOrCreate()
val sparkSessionMinio = SedonaContext