This is an automated email from the ASF dual-hosted git repository.
ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 8c512f467 [KYUUBI #3444] Support the planOnly mode of kyuubi spark
engine support SQL lineage
8c512f467 is described below
commit 8c512f467b92fcaada17e01780073ffd5bac1bba
Author: odone <[email protected]>
AuthorDate: Fri Sep 1 18:02:31 2023 +0800
[KYUUBI #3444] Support the planOnly mode of kyuubi spark engine support SQL
lineage
Closes #3444
### _Why are the changes needed?_
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run
test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #3558 from iodone/kyuubi-3444.
Closes #3444
acaa72afe [odone] remove plugin dependency from kyuubi spark engine
739f7dd5b [odone] remove plugin dependency from kyuubi spark engine
1146eb6e0 [odone] kyuubi-3444
Authored-by: odone <[email protected]>
Signed-off-by: ulyssesyou <[email protected]>
---
docs/configuration/settings.md | 8 ++++-
.../plugin/lineage/LineageParserProvider.scala | 32 ++++-------------
externals/kyuubi-spark-sql-engine/pom.xml | 7 ++++
.../engine/spark/operation/PlanOnlyStatement.scala | 41 ++++++++++++++++++++--
.../org/apache/spark/kyuubi/SparkUtilsHelper.scala | 9 +++++
.../org/apache/kyuubi/config/KyuubiConf.scala | 12 +++++--
.../org/apache/kyuubi/operation/PlanOnlyMode.scala | 3 ++
.../apache/kyuubi/operation/SparkQueryTests.scala | 29 +++++++++++++++
.../kyuubi/operation/PlanOnlyOperationSuite.scala | 28 +++++++++++++++
9 files changed, 138 insertions(+), 31 deletions(-)
diff --git a/docs/configuration/settings.md b/docs/configuration/settings.md
index 674568ad2..d6d142548 100644
--- a/docs/configuration/settings.md
+++ b/docs/configuration/settings.md
@@ -324,6 +324,12 @@ You can configure the Kyuubi properties in
`$KYUUBI_HOME/conf/kyuubi-defaults.co
| kyuubi.kubernetes.terminatedApplicationRetainPeriod | PT5M |
The period for which the Kyuubi server retains application information after
the application terminates.
| duration | 1.7.1 |
| kyuubi.kubernetes.trust.certificates | false | If
set to true then client can submit to kubernetes cluster only with token
| boolean | 1.7.0 |
+### Lineage
+
+| Key | Default
| Meaning | Type
| Since |
+|---------------------------------------|--------------------------------------------------------|---------------------------------------------------|--------|-------|
+| kyuubi.lineage.parser.plugin.provider |
org.apache.kyuubi.plugin.lineage.LineageParserProvider | The provider for the
Spark lineage parser plugin. | string | 1.8.0 |
+
### Metadata
| Key |
Default |
Meaning
[...]
@@ -367,7 +373,7 @@ You can configure the Kyuubi properties in
`$KYUUBI_HOME/conf/kyuubi-defaults.co
| kyuubi.operation.language | SQL
| Choose a programing
language for the following inputs<ul><li>SQL: (Default) Run all following
statements as SQL queries.</li><li>SCALA: Run all following input as scala
codes</li><li>PYTHON: (Experimental) Run all following input as Python codes
with Spark engine</li></ul>
[...]
| kyuubi.operation.log.dir.root | server_operation_logs
| Root directory for
query operation log at server-side.
[...]
| kyuubi.operation.plan.only.excludes |
SetCatalogAndNamespace,UseStatement,SetNamespaceCommand,SetCommand,ResetCommand
| Comma-separated list of query plan names, in the form of simple class names,
i.e, for `SET abc=xyz`, the value will be `SetCommand`. For those auxiliary
plans, such as `switch databases`, `set properties`, or `create temporary view`
etc., which are used for setup evaluating environments for analyzing actual
queries, we can use this config to exclude them [...]
-| kyuubi.operation.plan.only.mode | none
| Configures the
statement performed mode, The value can be 'parse', 'analyze', 'optimize',
'optimize_with_stats', 'physical', 'execution', or 'none', when it is 'none',
indicate to the statement will be fully executed, otherwise only way without
executing the query. different engines currently support different modes, the
Spark engine supports all modes, a [...]
+| kyuubi.operation.plan.only.mode | none
| Configures the
statement performed mode, The value can be 'parse', 'analyze', 'optimize',
'optimize_with_stats', 'physical', 'execution', 'lineage' or 'none', when it is
'none', indicate to the statement will be fully executed, otherwise only way
without executing the query. different engines currently support different
modes, the Spark engine supports al [...]
| kyuubi.operation.plan.only.output.style | plain
| Configures the planOnly
output style. The value can be 'plain' or 'json', and the default value is
'plain'. This configuration supports only the output styles of the Spark engine
[...]
| kyuubi.operation.progress.enabled | false
| Whether to enable the
operation progress. When true, the operation progress will be returned in
`GetOperationStatus`.
[...]
| kyuubi.operation.query.timeout | <undefined>
| Timeout for query
executions at server-side, take effect with client-side
timeout(`java.sql.Statement.setQueryTimeout`) together, a running query will be
cancelled automatically if timeout. It's off by default, which means only
client-side take full control of whether the query should timeout or not. If
set, client-side timeout is capped at this point. To [...]
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkUtilsHelper.scala
b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/LineageParserProvider.scala
similarity index 56%
copy from
externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkUtilsHelper.scala
copy to
extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/LineageParserProvider.scala
index e2f51e648..665efef10 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkUtilsHelper.scala
+++
b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/LineageParserProvider.scala
@@ -15,32 +15,14 @@
* limitations under the License.
*/
-package org.apache.spark.kyuubi
+package org.apache.kyuubi.plugin.lineage
-import scala.util.matching.Regex
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.SparkConf
-import org.apache.spark.util.Utils
-
-import org.apache.kyuubi.Logging
-
-/**
- * A place to invoke non-public APIs of [[Utils]], anything to be added here
need to
- * think twice
- */
-object SparkUtilsHelper extends Logging {
-
- /**
- * Redact the sensitive information in the given string.
- */
- def redact(regex: Option[Regex], text: String): String = {
- Utils.redact(regex, text)
- }
-
- /**
- * Get the path of a temporary directory.
- */
- def getLocalDir(conf: SparkConf): String = {
- Utils.getLocalDir(conf)
+import org.apache.kyuubi.plugin.lineage.helper.SparkSQLLineageParseHelper
+object LineageParserProvider {
+ def parse(spark: SparkSession, plan: LogicalPlan): Lineage = {
+ SparkSQLLineageParseHelper(spark).parse(plan)
}
}
diff --git a/externals/kyuubi-spark-sql-engine/pom.xml
b/externals/kyuubi-spark-sql-engine/pom.xml
index 52ceb945d..c27f85fa9 100644
--- a/externals/kyuubi-spark-sql-engine/pom.xml
+++ b/externals/kyuubi-spark-sql-engine/pom.xml
@@ -170,6 +170,13 @@
<artifactId>flexmark-all</artifactId>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.kyuubi</groupId>
+
<artifactId>kyuubi-spark-lineage_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala
index 73f856fdf..4f8808313 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala
@@ -17,14 +17,17 @@
package org.apache.kyuubi.engine.spark.operation
-import org.apache.spark.sql.Row
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+import org.apache.spark.kyuubi.SparkUtilsHelper
+import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
import org.apache.kyuubi.KyuubiSQLException
-import org.apache.kyuubi.config.KyuubiConf.{OPERATION_PLAN_ONLY_EXCLUDES,
OPERATION_PLAN_ONLY_OUT_STYLE}
-import org.apache.kyuubi.operation.{AnalyzeMode, ArrayFetchIterator,
ExecutionMode, IterableFetchIterator, JsonStyle, OperationHandle, OptimizeMode,
OptimizeWithStatsMode, ParseMode, PhysicalMode, PlainStyle, PlanOnlyMode,
PlanOnlyStyle, UnknownMode, UnknownStyle}
+import org.apache.kyuubi.config.KyuubiConf.{LINEAGE_PARSER_PLUGIN_PROVIDER,
OPERATION_PLAN_ONLY_EXCLUDES, OPERATION_PLAN_ONLY_OUT_STYLE}
+import org.apache.kyuubi.operation.{AnalyzeMode, ArrayFetchIterator,
ExecutionMode, IterableFetchIterator, JsonStyle, LineageMode, OperationHandle,
OptimizeMode, OptimizeWithStatsMode, ParseMode, PhysicalMode, PlainStyle,
PlanOnlyMode, PlanOnlyStyle, UnknownMode, UnknownStyle}
import org.apache.kyuubi.operation.PlanOnlyMode.{notSupportedModeError,
unknownModeError}
import org.apache.kyuubi.operation.PlanOnlyStyle.{notSupportedStyleError,
unknownStyleError}
import org.apache.kyuubi.operation.log.OperationLog
@@ -119,6 +122,9 @@ class PlanOnlyStatement(
case ExecutionMode =>
val executed = spark.sql(statement).queryExecution.executedPlan
iter = new IterableFetchIterator(Seq(Row(executed.toString())))
+ case LineageMode =>
+ val result = parseLineage(spark, plan)
+ iter = new IterableFetchIterator(Seq(Row(result)))
case UnknownMode => throw unknownModeError(mode)
case _ => throw notSupportedModeError(mode, "Spark SQL")
}
@@ -143,10 +149,39 @@ class PlanOnlyStatement(
case ExecutionMode =>
val executed = spark.sql(statement).queryExecution.executedPlan
iter = new IterableFetchIterator(Seq(Row(executed.toJSON)))
+ case LineageMode =>
+ val result = parseLineage(spark, plan)
+ iter = new IterableFetchIterator(Seq(Row(result)))
case UnknownMode => throw unknownModeError(mode)
case _ =>
throw KyuubiSQLException(s"The operation mode $mode" +
" doesn't support in Spark SQL engine.")
}
}
+
+ private def parseLineage(spark: SparkSession, plan: LogicalPlan): String = {
+ val analyzed = spark.sessionState.analyzer.execute(plan)
+ spark.sessionState.analyzer.checkAnalysis(analyzed)
+ val optimized = spark.sessionState.optimizer.execute(analyzed)
+ val parserProviderClass =
session.sessionManager.getConf.get(LINEAGE_PARSER_PLUGIN_PROVIDER)
+
+ try {
+ if (!SparkUtilsHelper.classesArePresent(
+ parserProviderClass)) {
+ throw new Exception(s"'$parserProviderClass' not found," +
+ " need to install kyuubi-spark-lineage plugin before using the
'lineage' mode")
+ }
+
+ val lineage = Class.forName(parserProviderClass)
+ .getMethod("parse", classOf[SparkSession], classOf[LogicalPlan])
+ .invoke(null, spark, optimized)
+
+ val mapper = new ObjectMapper().registerModule(DefaultScalaModule)
+ mapper.writeValueAsString(lineage)
+ } catch {
+ case e: Throwable =>
+ throw KyuubiSQLException(s"Extract columns lineage failed:
${e.getMessage}", e)
+ }
+ }
+
}
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkUtilsHelper.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkUtilsHelper.scala
index e2f51e648..106be3fc7 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkUtilsHelper.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkUtilsHelper.scala
@@ -43,4 +43,13 @@ object SparkUtilsHelper extends Logging {
def getLocalDir(conf: SparkConf): String = {
Utils.getLocalDir(conf)
}
+
+ def classesArePresent(className: String): Boolean = {
+ try {
+ Utils.classForName(className)
+ true
+ } catch {
+ case _: ClassNotFoundException | _: NoClassDefFoundError => false
+ }
+ }
}
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index 88c61e23d..bbbb73b95 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -2360,7 +2360,7 @@ object KyuubiConf {
val OPERATION_PLAN_ONLY_MODE: ConfigEntry[String] =
buildConf("kyuubi.operation.plan.only.mode")
.doc("Configures the statement performed mode, The value can be 'parse',
'analyze', " +
- "'optimize', 'optimize_with_stats', 'physical', 'execution', or
'none', " +
+ "'optimize', 'optimize_with_stats', 'physical', 'execution', 'lineage'
or 'none', " +
"when it is 'none', indicate to the statement will be fully executed,
otherwise " +
"only way without executing the query. different engines currently
support different " +
"modes, the Spark engine supports all modes, and the Flink engine
supports 'parse', " +
@@ -2377,10 +2377,11 @@ object KyuubiConf {
"OPTIMIZE_WITH_STATS",
"PHYSICAL",
"EXECUTION",
+ "LINEAGE",
"NONE").contains(mode),
"Invalid value for 'kyuubi.operation.plan.only.mode'. Valid values
are" +
"'parse', 'analyze', 'optimize', 'optimize_with_stats', 'physical',
'execution' and " +
- "'none'.")
+ "'lineage', 'none'.")
.createWithDefault(NoneMode.name)
val OPERATION_PLAN_ONLY_OUT_STYLE: ConfigEntry[String] =
@@ -2412,6 +2413,13 @@ object KyuubiConf {
"UseStatement",
"SetCatalogAndNamespace"))
+ val LINEAGE_PARSER_PLUGIN_PROVIDER: ConfigEntry[String] =
+ buildConf("kyuubi.lineage.parser.plugin.provider")
+ .doc("The provider for the Spark lineage parser plugin.")
+ .version("1.8.0")
+ .stringConf
+
.createWithDefault("org.apache.kyuubi.plugin.lineage.LineageParserProvider")
+
object OperationLanguages extends Enumeration with Logging {
type OperationLanguage = Value
val PYTHON, SQL, SCALA, UNKNOWN = Value
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/PlanOnlyMode.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/PlanOnlyMode.scala
index 3e170f05f..0407dab62 100644
---
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/PlanOnlyMode.scala
+++
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/PlanOnlyMode.scala
@@ -41,6 +41,8 @@ case object PhysicalMode extends PlanOnlyMode { val name =
"physical" }
case object ExecutionMode extends PlanOnlyMode { val name = "execution" }
+case object LineageMode extends PlanOnlyMode { val name = "lineage" }
+
case object NoneMode extends PlanOnlyMode { val name = "none" }
case object UnknownMode extends PlanOnlyMode {
@@ -64,6 +66,7 @@ object PlanOnlyMode {
case OptimizeWithStatsMode.name => OptimizeWithStatsMode
case PhysicalMode.name => PhysicalMode
case ExecutionMode.name => ExecutionMode
+ case LineageMode.name => LineageMode
case NoneMode.name => NoneMode
case other => UnknownMode.mode(other)
}
diff --git
a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkQueryTests.scala
b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkQueryTests.scala
index 9c00a11ba..20d3f6fad 100644
---
a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkQueryTests.scala
+++
b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkQueryTests.scala
@@ -218,6 +218,35 @@ trait SparkQueryTests extends SparkDataTypeTests with
HiveJDBCTestHelper {
}
}
+ test("kyuubi #3444: Plan only mode with lineage mode") {
+
+ val ddl = "create table if not exists t0(a int) using parquet"
+ val dql = "select * from t0"
+ withSessionConf()(Map(KyuubiConf.OPERATION_PLAN_ONLY_MODE.key ->
NoneMode.name))() {
+ withJdbcStatement("t0") { statement =>
+ statement.execute(ddl)
+ statement.execute("SET kyuubi.operation.plan.only.mode=lineage")
+ val lineageParserClassName =
"org.apache.kyuubi.plugin.lineage.LineageParserProvider"
+
+ try {
+ val resultSet = statement.executeQuery(dql)
+ assert(resultSet.next())
+ val actualResult =
+ """
+ |{"inputTables":["spark_catalog.default.t0"],"outputTables":[],
+
|"columnLineage":[{"column":"a","originalColumns":["spark_catalog.default.t0.a"]}]}
+ |""".stripMargin.split("\n").mkString("")
+ assert(resultSet.getString(1) == actualResult)
+ } catch {
+ case e: Throwable =>
+ assert(e.getMessage.contains(s"'$lineageParserClassName' not
found"))
+ } finally {
+ statement.execute("SET kyuubi.operation.plan.only.mode=none")
+ }
+ }
+ }
+ }
+
test("execute simple scala code") {
withJdbcStatement() { statement =>
statement.execute("SET kyuubi.operation.language=scala")
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/PlanOnlyOperationSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/PlanOnlyOperationSuite.scala
index 6a37e823d..8773440a6 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/PlanOnlyOperationSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/PlanOnlyOperationSuite.scala
@@ -201,6 +201,34 @@ class PlanOnlyOperationSuite extends WithKyuubiServer with
HiveJDBCTestHelper {
}
}
+ test("kyuubi #3444: Plan only mode with lineage mode") {
+
+ val ddl = "create table if not exists t0(a int) using parquet"
+ val dql = "select * from t0"
+ withSessionConf()(Map(KyuubiConf.OPERATION_PLAN_ONLY_MODE.key ->
NoneMode.name))() {
+ withJdbcStatement("t0") { statement =>
+ statement.execute(ddl)
+ statement.execute("SET kyuubi.operation.plan.only.mode=lineage")
+ val lineageParserClassName =
"org.apache.kyuubi.plugin.lineage.LineageParserProvider"
+ try {
+ val resultSet = statement.executeQuery(dql)
+ assert(resultSet.next())
+ val actualResult =
+ """
+ |{"inputTables":["spark_catalog.default.t0"],"outputTables":[],
+
|"columnLineage":[{"column":"a","originalColumns":["spark_catalog.default.t0.a"]}]}
+ |""".stripMargin.split("\n").mkString("")
+ assert(resultSet.getString(1) == actualResult)
+ } catch {
+ case e: Throwable =>
+ assert(e.getMessage.contains(s"'$lineageParserClassName' not
found"))
+ } finally {
+ statement.execute("SET kyuubi.operation.plan.only.mode=none")
+ }
+ }
+ }
+ }
+
private def getOperationPlanWithStatement(statement: Statement): String = {
val resultSet = statement.executeQuery("select 1 where true")
assert(resultSet.next())