This is an automated email from the ASF dual-hosted git repository.

ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 8c512f467 [KYUUBI #3444] Support the planOnly mode of kyuubi spark 
engine support SQL lineage
8c512f467 is described below

commit 8c512f467b92fcaada17e01780073ffd5bac1bba
Author: odone <[email protected]>
AuthorDate: Fri Sep 1 18:02:31 2023 +0800

    [KYUUBI #3444] Support the planOnly mode of kyuubi spark engine support SQL 
lineage
    
    Closes #3444
    
    ### _Why are the changes needed?_
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run 
test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests)
 locally before make a pull request
    
    Closes #3558 from iodone/kyuubi-3444.
    
    Closes #3444
    
    acaa72afe [odone] remove plugin dependency from kyuubi spark engine
    739f7dd5b [odone] remove plugin dependency from kyuubi spark engine
    1146eb6e0 [odone] kyuubi-3444
    
    Authored-by: odone <[email protected]>
    Signed-off-by: ulyssesyou <[email protected]>
---
 docs/configuration/settings.md                     |  8 ++++-
 .../plugin/lineage/LineageParserProvider.scala     | 32 ++++-------------
 externals/kyuubi-spark-sql-engine/pom.xml          |  7 ++++
 .../engine/spark/operation/PlanOnlyStatement.scala | 41 ++++++++++++++++++++--
 .../org/apache/spark/kyuubi/SparkUtilsHelper.scala |  9 +++++
 .../org/apache/kyuubi/config/KyuubiConf.scala      | 12 +++++--
 .../org/apache/kyuubi/operation/PlanOnlyMode.scala |  3 ++
 .../apache/kyuubi/operation/SparkQueryTests.scala  | 29 +++++++++++++++
 .../kyuubi/operation/PlanOnlyOperationSuite.scala  | 28 +++++++++++++++
 9 files changed, 138 insertions(+), 31 deletions(-)

diff --git a/docs/configuration/settings.md b/docs/configuration/settings.md
index 674568ad2..d6d142548 100644
--- a/docs/configuration/settings.md
+++ b/docs/configuration/settings.md
@@ -324,6 +324,12 @@ You can configure the Kyuubi properties in 
`$KYUUBI_HOME/conf/kyuubi-defaults.co
 | kyuubi.kubernetes.terminatedApplicationRetainPeriod | PT5M              | 
The period for which the Kyuubi server retains application information after 
the application terminates.                                                     
                                                     | duration | 1.7.1 |
 | kyuubi.kubernetes.trust.certificates                | false             | If 
set to true then client can submit to kubernetes cluster only with token        
                                                                                
                                               | boolean  | 1.7.0 |
 
+### Lineage
+
+|                  Key                  |                        Default       
                  |                      Meaning                      |  Type  
| Since |
+|---------------------------------------|--------------------------------------------------------|---------------------------------------------------|--------|-------|
+| kyuubi.lineage.parser.plugin.provider | 
org.apache.kyuubi.plugin.lineage.LineageParserProvider | The provider for the 
Spark lineage parser plugin. | string | 1.8.0 |
+
 ### Metadata
 
 |                       Key                       |                         
Default                          |                                              
                                                                                
                                                                                
                                                                                
      Meaning                                                                   
                 [...]
@@ -367,7 +373,7 @@ You can configure the Kyuubi properties in 
`$KYUUBI_HOME/conf/kyuubi-defaults.co
 | kyuubi.operation.language                        | SQL                       
                                                      | Choose a programing 
language for the following inputs<ul><li>SQL: (Default) Run all following 
statements as SQL queries.</li><li>SCALA: Run all following input as scala 
codes</li><li>PYTHON: (Experimental) Run all following input as Python codes 
with Spark engine</li></ul>                                                     
                                [...]
 | kyuubi.operation.log.dir.root                    | server_operation_logs     
                                                      | Root directory for 
query operation log at server-side.                                             
                                                                                
                                                                                
                                                                                
                   [...]
 | kyuubi.operation.plan.only.excludes              | 
SetCatalogAndNamespace,UseStatement,SetNamespaceCommand,SetCommand,ResetCommand 
| Comma-separated list of query plan names, in the form of simple class names, 
i.e, for `SET abc=xyz`, the value will be `SetCommand`. For those auxiliary 
plans, such as `switch databases`, `set properties`, or `create temporary view` 
etc., which are used for setup evaluating environments for analyzing actual 
queries, we can use this config to exclude them  [...]
-| kyuubi.operation.plan.only.mode                  | none                      
                                                      | Configures the 
statement performed mode, The value can be 'parse', 'analyze', 'optimize', 
'optimize_with_stats', 'physical', 'execution', or 'none', when it is 'none', 
indicate to the statement will be fully executed, otherwise only way without 
executing the query. different engines currently support different modes, the 
Spark engine supports all modes, a [...]
+| kyuubi.operation.plan.only.mode                  | none                      
                                                      | Configures the 
statement performed mode, The value can be 'parse', 'analyze', 'optimize', 
'optimize_with_stats', 'physical', 'execution', 'lineage' or 'none', when it is 
'none', indicate to the statement will be fully executed, otherwise only way 
without executing the query. different engines currently support different 
modes, the Spark engine supports al [...]
 | kyuubi.operation.plan.only.output.style          | plain                     
                                                      | Configures the planOnly 
output style. The value can be 'plain' or 'json', and the default value is 
'plain'. This configuration supports only the output styles of the Spark engine 
                                                                                
                                                                                
                   [...]
 | kyuubi.operation.progress.enabled                | false                     
                                                      | Whether to enable the 
operation progress. When true, the operation progress will be returned in 
`GetOperationStatus`.                                                           
                                                                                
                                                                                
                      [...]
 | kyuubi.operation.query.timeout                   | &lt;undefined&gt;         
                                                      | Timeout for query 
executions at server-side, take effect with client-side 
timeout(`java.sql.Statement.setQueryTimeout`) together, a running query will be 
cancelled automatically if timeout. It's off by default, which means only 
client-side take full control of whether the query should timeout or not. If 
set, client-side timeout is capped at this point. To [...]
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkUtilsHelper.scala
 
b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/LineageParserProvider.scala
similarity index 56%
copy from 
externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkUtilsHelper.scala
copy to 
extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/LineageParserProvider.scala
index e2f51e648..665efef10 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkUtilsHelper.scala
+++ 
b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/LineageParserProvider.scala
@@ -15,32 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.spark.kyuubi
+package org.apache.kyuubi.plugin.lineage
 
-import scala.util.matching.Regex
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 
-import org.apache.spark.SparkConf
-import org.apache.spark.util.Utils
-
-import org.apache.kyuubi.Logging
-
-/**
- * A place to invoke non-public APIs of [[Utils]], anything to be added here 
need to
- * think twice
- */
-object SparkUtilsHelper extends Logging {
-
-  /**
-   * Redact the sensitive information in the given string.
-   */
-  def redact(regex: Option[Regex], text: String): String = {
-    Utils.redact(regex, text)
-  }
-
-  /**
-   * Get the path of a temporary directory.
-   */
-  def getLocalDir(conf: SparkConf): String = {
-    Utils.getLocalDir(conf)
+import org.apache.kyuubi.plugin.lineage.helper.SparkSQLLineageParseHelper
+object LineageParserProvider {
+  def parse(spark: SparkSession, plan: LogicalPlan): Lineage = {
+    SparkSQLLineageParseHelper(spark).parse(plan)
   }
 }
diff --git a/externals/kyuubi-spark-sql-engine/pom.xml 
b/externals/kyuubi-spark-sql-engine/pom.xml
index 52ceb945d..c27f85fa9 100644
--- a/externals/kyuubi-spark-sql-engine/pom.xml
+++ b/externals/kyuubi-spark-sql-engine/pom.xml
@@ -170,6 +170,13 @@
             <artifactId>flexmark-all</artifactId>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.kyuubi</groupId>
+            
<artifactId>kyuubi-spark-lineage_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala
index 73f856fdf..4f8808313 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala
@@ -17,14 +17,17 @@
 
 package org.apache.kyuubi.engine.spark.operation
 
-import org.apache.spark.sql.Row
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+import org.apache.spark.kyuubi.SparkUtilsHelper
+import org.apache.spark.sql.{Row, SparkSession}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.StructType
 
 import org.apache.kyuubi.KyuubiSQLException
-import org.apache.kyuubi.config.KyuubiConf.{OPERATION_PLAN_ONLY_EXCLUDES, 
OPERATION_PLAN_ONLY_OUT_STYLE}
-import org.apache.kyuubi.operation.{AnalyzeMode, ArrayFetchIterator, 
ExecutionMode, IterableFetchIterator, JsonStyle, OperationHandle, OptimizeMode, 
OptimizeWithStatsMode, ParseMode, PhysicalMode, PlainStyle, PlanOnlyMode, 
PlanOnlyStyle, UnknownMode, UnknownStyle}
+import org.apache.kyuubi.config.KyuubiConf.{LINEAGE_PARSER_PLUGIN_PROVIDER, 
OPERATION_PLAN_ONLY_EXCLUDES, OPERATION_PLAN_ONLY_OUT_STYLE}
+import org.apache.kyuubi.operation.{AnalyzeMode, ArrayFetchIterator, 
ExecutionMode, IterableFetchIterator, JsonStyle, LineageMode, OperationHandle, 
OptimizeMode, OptimizeWithStatsMode, ParseMode, PhysicalMode, PlainStyle, 
PlanOnlyMode, PlanOnlyStyle, UnknownMode, UnknownStyle}
 import org.apache.kyuubi.operation.PlanOnlyMode.{notSupportedModeError, 
unknownModeError}
 import org.apache.kyuubi.operation.PlanOnlyStyle.{notSupportedStyleError, 
unknownStyleError}
 import org.apache.kyuubi.operation.log.OperationLog
@@ -119,6 +122,9 @@ class PlanOnlyStatement(
       case ExecutionMode =>
         val executed = spark.sql(statement).queryExecution.executedPlan
         iter = new IterableFetchIterator(Seq(Row(executed.toString())))
+      case LineageMode =>
+        val result = parseLineage(spark, plan)
+        iter = new IterableFetchIterator(Seq(Row(result)))
       case UnknownMode => throw unknownModeError(mode)
       case _ => throw notSupportedModeError(mode, "Spark SQL")
     }
@@ -143,10 +149,39 @@ class PlanOnlyStatement(
       case ExecutionMode =>
         val executed = spark.sql(statement).queryExecution.executedPlan
         iter = new IterableFetchIterator(Seq(Row(executed.toJSON)))
+      case LineageMode =>
+        val result = parseLineage(spark, plan)
+        iter = new IterableFetchIterator(Seq(Row(result)))
       case UnknownMode => throw unknownModeError(mode)
       case _ =>
         throw KyuubiSQLException(s"The operation mode $mode" +
           " doesn't support in Spark SQL engine.")
     }
   }
+
+  private def parseLineage(spark: SparkSession, plan: LogicalPlan): String = {
+    val analyzed = spark.sessionState.analyzer.execute(plan)
+    spark.sessionState.analyzer.checkAnalysis(analyzed)
+    val optimized = spark.sessionState.optimizer.execute(analyzed)
+    val parserProviderClass = 
session.sessionManager.getConf.get(LINEAGE_PARSER_PLUGIN_PROVIDER)
+
+    try {
+      if (!SparkUtilsHelper.classesArePresent(
+          parserProviderClass)) {
+        throw new Exception(s"'$parserProviderClass' not found," +
+          " need to install kyuubi-spark-lineage plugin before using the 
'lineage' mode")
+      }
+
+      val lineage = Class.forName(parserProviderClass)
+        .getMethod("parse", classOf[SparkSession], classOf[LogicalPlan])
+        .invoke(null, spark, optimized)
+
+      val mapper = new ObjectMapper().registerModule(DefaultScalaModule)
+      mapper.writeValueAsString(lineage)
+    } catch {
+      case e: Throwable =>
+        throw KyuubiSQLException(s"Extract columns lineage failed: 
${e.getMessage}", e)
+    }
+  }
+
 }
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkUtilsHelper.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkUtilsHelper.scala
index e2f51e648..106be3fc7 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkUtilsHelper.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkUtilsHelper.scala
@@ -43,4 +43,13 @@ object SparkUtilsHelper extends Logging {
   def getLocalDir(conf: SparkConf): String = {
     Utils.getLocalDir(conf)
   }
+
+  def classesArePresent(className: String): Boolean = {
+    try {
+      Utils.classForName(className)
+      true
+    } catch {
+      case _: ClassNotFoundException | _: NoClassDefFoundError => false
+    }
+  }
 }
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index 88c61e23d..bbbb73b95 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -2360,7 +2360,7 @@ object KyuubiConf {
   val OPERATION_PLAN_ONLY_MODE: ConfigEntry[String] =
     buildConf("kyuubi.operation.plan.only.mode")
       .doc("Configures the statement performed mode, The value can be 'parse', 
'analyze', " +
-        "'optimize', 'optimize_with_stats', 'physical', 'execution', or 
'none', " +
+        "'optimize', 'optimize_with_stats', 'physical', 'execution', 'lineage' 
or 'none', " +
         "when it is 'none', indicate to the statement will be fully executed, 
otherwise " +
         "only way without executing the query. different engines currently 
support different " +
         "modes, the Spark engine supports all modes, and the Flink engine 
supports 'parse', " +
@@ -2377,10 +2377,11 @@ object KyuubiConf {
             "OPTIMIZE_WITH_STATS",
             "PHYSICAL",
             "EXECUTION",
+            "LINEAGE",
             "NONE").contains(mode),
         "Invalid value for 'kyuubi.operation.plan.only.mode'. Valid values 
are" +
           "'parse', 'analyze', 'optimize', 'optimize_with_stats', 'physical', 
'execution' and " +
-          "'none'.")
+          "'lineage', 'none'.")
       .createWithDefault(NoneMode.name)
 
   val OPERATION_PLAN_ONLY_OUT_STYLE: ConfigEntry[String] =
@@ -2412,6 +2413,13 @@ object KyuubiConf {
         "UseStatement",
         "SetCatalogAndNamespace"))
 
+  val LINEAGE_PARSER_PLUGIN_PROVIDER: ConfigEntry[String] =
+    buildConf("kyuubi.lineage.parser.plugin.provider")
+      .doc("The provider for the Spark lineage parser plugin.")
+      .version("1.8.0")
+      .stringConf
+      
.createWithDefault("org.apache.kyuubi.plugin.lineage.LineageParserProvider")
+
   object OperationLanguages extends Enumeration with Logging {
     type OperationLanguage = Value
     val PYTHON, SQL, SCALA, UNKNOWN = Value
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/PlanOnlyMode.scala 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/PlanOnlyMode.scala
index 3e170f05f..0407dab62 100644
--- 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/PlanOnlyMode.scala
+++ 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/PlanOnlyMode.scala
@@ -41,6 +41,8 @@ case object PhysicalMode extends PlanOnlyMode { val name = 
"physical" }
 
 case object ExecutionMode extends PlanOnlyMode { val name = "execution" }
 
+case object LineageMode extends PlanOnlyMode { val name = "lineage" }
+
 case object NoneMode extends PlanOnlyMode { val name = "none" }
 
 case object UnknownMode extends PlanOnlyMode {
@@ -64,6 +66,7 @@ object PlanOnlyMode {
     case OptimizeWithStatsMode.name => OptimizeWithStatsMode
     case PhysicalMode.name => PhysicalMode
     case ExecutionMode.name => ExecutionMode
+    case LineageMode.name => LineageMode
     case NoneMode.name => NoneMode
     case other => UnknownMode.mode(other)
   }
diff --git 
a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkQueryTests.scala
 
b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkQueryTests.scala
index 9c00a11ba..20d3f6fad 100644
--- 
a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkQueryTests.scala
+++ 
b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkQueryTests.scala
@@ -218,6 +218,35 @@ trait SparkQueryTests extends SparkDataTypeTests with 
HiveJDBCTestHelper {
     }
   }
 
+  test("kyuubi #3444: Plan only mode with lineage mode") {
+
+    val ddl = "create table if not exists t0(a int) using parquet"
+    val dql = "select * from t0"
+    withSessionConf()(Map(KyuubiConf.OPERATION_PLAN_ONLY_MODE.key -> 
NoneMode.name))() {
+      withJdbcStatement("t0") { statement =>
+        statement.execute(ddl)
+        statement.execute("SET kyuubi.operation.plan.only.mode=lineage")
+        val lineageParserClassName = 
"org.apache.kyuubi.plugin.lineage.LineageParserProvider"
+
+        try {
+          val resultSet = statement.executeQuery(dql)
+          assert(resultSet.next())
+          val actualResult =
+            """
+              |{"inputTables":["spark_catalog.default.t0"],"outputTables":[],
+              
|"columnLineage":[{"column":"a","originalColumns":["spark_catalog.default.t0.a"]}]}
+              |""".stripMargin.split("\n").mkString("")
+          assert(resultSet.getString(1) == actualResult)
+        } catch {
+          case e: Throwable =>
+            assert(e.getMessage.contains(s"'$lineageParserClassName' not 
found"))
+        } finally {
+          statement.execute("SET kyuubi.operation.plan.only.mode=none")
+        }
+      }
+    }
+  }
+
   test("execute simple scala code") {
     withJdbcStatement() { statement =>
       statement.execute("SET kyuubi.operation.language=scala")
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/PlanOnlyOperationSuite.scala
 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/PlanOnlyOperationSuite.scala
index 6a37e823d..8773440a6 100644
--- 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/PlanOnlyOperationSuite.scala
+++ 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/PlanOnlyOperationSuite.scala
@@ -201,6 +201,34 @@ class PlanOnlyOperationSuite extends WithKyuubiServer with 
HiveJDBCTestHelper {
     }
   }
 
+  test("kyuubi #3444: Plan only mode with lineage mode") {
+
+    val ddl = "create table if not exists t0(a int) using parquet"
+    val dql = "select * from t0"
+    withSessionConf()(Map(KyuubiConf.OPERATION_PLAN_ONLY_MODE.key -> 
NoneMode.name))() {
+      withJdbcStatement("t0") { statement =>
+        statement.execute(ddl)
+        statement.execute("SET kyuubi.operation.plan.only.mode=lineage")
+        val lineageParserClassName = 
"org.apache.kyuubi.plugin.lineage.LineageParserProvider"
+        try {
+          val resultSet = statement.executeQuery(dql)
+          assert(resultSet.next())
+          val actualResult =
+            """
+              |{"inputTables":["spark_catalog.default.t0"],"outputTables":[],
+              
|"columnLineage":[{"column":"a","originalColumns":["spark_catalog.default.t0.a"]}]}
+              |""".stripMargin.split("\n").mkString("")
+          assert(resultSet.getString(1) == actualResult)
+        } catch {
+          case e: Throwable =>
+            assert(e.getMessage.contains(s"'$lineageParserClassName' not 
found"))
+        } finally {
+          statement.execute("SET kyuubi.operation.plan.only.mode=none")
+        }
+      }
+    }
+  }
+
   private def getOperationPlanWithStatement(statement: Statement): String = {
     val resultSet = statement.executeQuery("select 1 where true")
     assert(resultSet.next())

Reply via email to