This is an automated email from the ASF dual-hosted git repository.
ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new bb671be2b [KYUUBI #4925] Add default catalog using `spark_catalog`
with the lineage result
bb671be2b is described below
commit bb671be2bd8cd8a29cb996a81b319daf4cfefd6c
Author: odone <[email protected]>
AuthorDate: Wed Aug 9 18:28:32 2023 +0800
[KYUUBI #4925] Add default catalog using `spark_catalog` with the lineage
result
### _Why are the changes needed?_
close #4925
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run
test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #4932 from iodone/kyuubi-4868.
Closes #4925
ff3195772 [odone] remove the catalog with atlas supporting
fda2fe321 [odone] add default catalog to v1 table when parsing lineage
Authored-by: odone <[email protected]>
Signed-off-by: ulyssesyou <[email protected]>
---
.../dispatcher/atlas/AtlasEntityHelper.scala | 45 +-
.../helper/SparkSQLLineageParseHelper.scala | 49 +-
.../apache/spark/kyuubi/lineage/LineageConf.scala | 3 +
.../atlas/AtlasLineageDispatcherSuite.scala | 33 +-
.../events/OperationLineageEventSuite.scala | 20 +-
.../helper/SparkSQLLineageParserHelperSuite.scala | 591 ++++++++++++---------
6 files changed, 433 insertions(+), 308 deletions(-)
diff --git
a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/AtlasEntityHelper.scala
b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/AtlasEntityHelper.scala
index 9575b5258..cfa19b7aa 100644
---
a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/AtlasEntityHelper.scala
+++
b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/AtlasEntityHelper.scala
@@ -20,7 +20,7 @@ package org.apache.kyuubi.plugin.lineage.dispatcher.atlas
import scala.collection.JavaConverters._
import org.apache.atlas.model.instance.{AtlasEntity, AtlasObjectId,
AtlasRelatedObjectId}
-import org.apache.spark.kyuubi.lineage.SparkContextHelper
+import org.apache.spark.kyuubi.lineage.{LineageConf, SparkContextHelper}
import org.apache.spark.sql.execution.QueryExecution
import org.apache.kyuubi.plugin.lineage.Lineage
@@ -87,8 +87,9 @@ object AtlasEntityHelper {
if (inputs.nonEmpty && outputs.nonEmpty) {
val entity = new AtlasEntity(COLUMN_LINEAGE_TYPE)
+ val outputColumnName =
buildColumnQualifiedName(columnLineage.column).get
val qualifiedName =
-
s"${processEntity.getAttribute("qualifiedName")}:${columnLineage.column}"
+ s"${processEntity.getAttribute("qualifiedName")}:${outputColumnName}"
entity.setAttribute("qualifiedName", qualifiedName)
entity.setAttribute("name", qualifiedName)
entity.setRelationshipAttribute("inputs", inputs.asJava)
@@ -104,33 +105,33 @@ object AtlasEntityHelper {
}
def tableObjectId(tableName: String): Option[AtlasObjectId] = {
- val dbTb = tableName.split('.')
- if (dbTb.length == 2) {
- val qualifiedName = tableQualifiedName(cluster, dbTb(0), dbTb(1))
- // TODO parse datasource type
- Some(new AtlasObjectId(HIVE_TABLE_TYPE, "qualifiedName", qualifiedName))
- } else {
- None
- }
+ buildTableQualifiedName(tableName)
+ .map(new AtlasObjectId(HIVE_TABLE_TYPE, "qualifiedName", _))
}
- def tableQualifiedName(cluster: String, db: String, table: String): String =
{
- s"${db.toLowerCase}.${table.toLowerCase}@$cluster"
+ def buildTableQualifiedName(tableName: String): Option[String] = {
+ val defaultCatalog = LineageConf.DEFAULT_CATALOG
+ tableName.split('.') match {
+ case Array(`defaultCatalog`, db, table) =>
+ Some(s"${db.toLowerCase}.${table.toLowerCase}@$cluster")
+ case _ =>
+ None
+ }
}
def columnObjectId(columnName: String): Option[AtlasObjectId] = {
- val dbTbCol = columnName.split('.')
- if (dbTbCol.length == 3) {
- val qualifiedName = columnQualifiedName(cluster, dbTbCol(0), dbTbCol(1),
dbTbCol(2))
- // TODO parse datasource type
- Some(new AtlasObjectId(HIVE_COLUMN_TYPE, "qualifiedName", qualifiedName))
- } else {
- None
- }
+ buildColumnQualifiedName(columnName)
+ .map(new AtlasObjectId(HIVE_COLUMN_TYPE, "qualifiedName", _))
}
- def columnQualifiedName(cluster: String, db: String, table: String, column:
String): String = {
- s"${db.toLowerCase}.${table.toLowerCase}.${column.toLowerCase}@$cluster"
+ def buildColumnQualifiedName(columnName: String): Option[String] = {
+ val defaultCatalog = LineageConf.DEFAULT_CATALOG
+ columnName.split('.') match {
+ case Array(`defaultCatalog`, db, table, column) =>
+
Some(s"${db.toLowerCase}.${table.toLowerCase}.${column.toLowerCase}@$cluster")
+ case _ =>
+ None
+ }
}
def objectId(entity: AtlasEntity): AtlasObjectId = {
diff --git
a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala
b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala
index a5f805fa9..dad37eac5 100644
---
a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala
+++
b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala
@@ -199,7 +199,7 @@ trait LineageParser {
} else {
getQuery(plan)
}
- val view = getField[TableIdentifier](plan, "name").unquotedString
+ val view = getV1TableName(getField[TableIdentifier](plan,
"name").unquotedString)
extractColumnsLineage(query, parentColumnsLineage).map { case (k, v) =>
k.withName(s"$view.${k.name}") -> v
}
@@ -207,7 +207,7 @@ trait LineageParser {
case p
if p.nodeName == "CreateViewCommand"
&& getField[ViewType](plan, "viewType") == PersistedView =>
- val view = getField[TableIdentifier](plan, "name").unquotedString
+ val view = getV1TableName(getField[TableIdentifier](plan,
"name").unquotedString)
val outputCols =
getField[Seq[(String, Option[String])]](plan,
"userSpecifiedColumns").map(_._1)
val query =
@@ -223,7 +223,7 @@ trait LineageParser {
}
case p if p.nodeName == "CreateDataSourceTableAsSelectCommand" =>
- val table = getField[CatalogTable](plan, "table").qualifiedName
+ val table = getV1TableName(getField[CatalogTable](plan,
"table").qualifiedName)
extractColumnsLineage(getQuery(plan), parentColumnsLineage).map { case
(k, v) =>
k.withName(s"$table.${k.name}") -> v
}
@@ -231,7 +231,7 @@ trait LineageParser {
case p
if p.nodeName == "CreateHiveTableAsSelectCommand" ||
p.nodeName == "OptimizedCreateHiveTableAsSelectCommand" =>
- val table = getField[CatalogTable](plan, "tableDesc").qualifiedName
+ val table = getV1TableName(getField[CatalogTable](plan,
"tableDesc").qualifiedName)
extractColumnsLineage(getQuery(plan), parentColumnsLineage).map { case
(k, v) =>
k.withName(s"$table.${k.name}") -> v
}
@@ -259,7 +259,8 @@ trait LineageParser {
case p if p.nodeName == "InsertIntoDataSourceCommand" =>
val logicalRelation = getField[LogicalRelation](plan,
"logicalRelation")
- val table =
logicalRelation.catalogTable.map(_.qualifiedName).getOrElse("")
+ val table = logicalRelation
+ .catalogTable.map(t => getV1TableName(t.qualifiedName)).getOrElse("")
extractColumnsLineage(getQuery(plan), parentColumnsLineage).map {
case (k, v) if table.nonEmpty =>
k.withName(s"$table.${k.name}") -> v
@@ -267,7 +268,8 @@ trait LineageParser {
case p if p.nodeName == "InsertIntoHadoopFsRelationCommand" =>
val table =
- getField[Option[CatalogTable]](plan,
"catalogTable").map(_.qualifiedName)
+ getField[Option[CatalogTable]](plan, "catalogTable")
+ .map(t => getV1TableName(t.qualifiedName))
.getOrElse("")
extractColumnsLineage(getQuery(plan), parentColumnsLineage).map {
case (k, v) if table.nonEmpty =>
@@ -286,7 +288,7 @@ trait LineageParser {
}
case p if p.nodeName == "InsertIntoHiveTable" =>
- val table = getField[CatalogTable](plan, "table").qualifiedName
+ val table = getV1TableName(getField[CatalogTable](plan,
"table").qualifiedName)
extractColumnsLineage(getQuery(plan), parentColumnsLineage).map { case
(k, v) =>
k.withName(s"$table.${k.name}") -> v
}
@@ -298,7 +300,7 @@ trait LineageParser {
if p.nodeName == "AppendData"
|| p.nodeName == "OverwriteByExpression"
|| p.nodeName == "OverwritePartitionsDynamic" =>
- val table = getField[NamedRelation](plan, "table").name
+ val table = getV2TableName(getField[NamedRelation](plan, "table"))
extractColumnsLineage(getQuery(plan), parentColumnsLineage).map { case
(k, v) =>
k.withName(s"$table.${k.name}") -> v
}
@@ -409,22 +411,22 @@ trait LineageParser {
joinColumnsLineage(parentColumnsLineage, childrenColumnsLineage)
case p: LogicalRelation if p.catalogTable.nonEmpty =>
- val tableName = p.catalogTable.get.qualifiedName
+ val tableName = getV1TableName(p.catalogTable.get.qualifiedName)
joinRelationColumnLineage(parentColumnsLineage, p.output,
Seq(tableName))
case p: HiveTableRelation =>
- val tableName = p.tableMeta.qualifiedName
+ val tableName = getV1TableName(p.tableMeta.qualifiedName)
joinRelationColumnLineage(parentColumnsLineage, p.output,
Seq(tableName))
case p: DataSourceV2ScanRelation =>
- val tableName = p.name
+ val tableName = getV2TableName(p)
joinRelationColumnLineage(parentColumnsLineage, p.output,
Seq(tableName))
// For creating the view from v2 table, the logical plan of table will
// be the `DataSourceV2Relation` not the `DataSourceV2ScanRelation`.
// because the view from the table is not going to read it.
case p: DataSourceV2Relation =>
- val tableName = p.name
+ val tableName = getV2TableName(p)
joinRelationColumnLineage(parentColumnsLineage, p.output,
Seq(tableName))
case p: LocalRelation =>
@@ -445,7 +447,7 @@ trait LineageParser {
case p: View =>
if (!p.isTempView && SparkContextHelper.getConf(
LineageConf.SKIP_PARSING_PERMANENT_VIEW_ENABLED)) {
- val viewName = p.desc.qualifiedName
+ val viewName = getV1TableName(p.desc.qualifiedName)
joinRelationColumnLineage(parentColumnsLineage, p.output,
Seq(viewName))
} else {
val viewColumnsLineage =
@@ -476,6 +478,27 @@ trait LineageParser {
}
private def getQuery(plan: LogicalPlan): LogicalPlan =
getField[LogicalPlan](plan, "query")
+
+ private def getV2TableName(plan: NamedRelation): String = {
+ plan match {
+ case relation: DataSourceV2ScanRelation =>
+ val catalog =
relation.relation.catalog.map(_.name()).getOrElse(LineageConf.DEFAULT_CATALOG)
+ val database =
relation.relation.identifier.get.namespace().mkString(".")
+ val table = relation.relation.identifier.get.name()
+ s"$catalog.$database.$table"
+ case relation: DataSourceV2Relation =>
+ val catalog =
relation.catalog.map(_.name()).getOrElse(LineageConf.DEFAULT_CATALOG)
+ val database = relation.identifier.get.namespace().mkString(".")
+ val table = relation.identifier.get.name()
+ s"$catalog.$database.$table"
+ case _ =>
+ plan.name
+ }
+ }
+
+ private def getV1TableName(qualifiedName: String): String = {
+ Seq(LineageConf.DEFAULT_CATALOG,
qualifiedName).filter(_.nonEmpty).mkString(".")
+ }
}
case class SparkSQLLineageParseHelper(sparkSession: SparkSession) extends
LineageParser
diff --git
a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/spark/kyuubi/lineage/LineageConf.scala
b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/spark/kyuubi/lineage/LineageConf.scala
index 5b7d3dfe1..e264b1f35 100644
---
a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/spark/kyuubi/lineage/LineageConf.scala
+++
b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/spark/kyuubi/lineage/LineageConf.scala
@@ -18,6 +18,7 @@
package org.apache.spark.kyuubi.lineage
import org.apache.spark.internal.config.ConfigBuilder
+import org.apache.spark.sql.internal.SQLConf
import org.apache.kyuubi.plugin.lineage.LineageDispatcherType
@@ -45,4 +46,6 @@ object LineageConf {
"Unsupported lineage dispatchers")
.createWithDefault(Seq(LineageDispatcherType.SPARK_EVENT.toString))
+ val DEFAULT_CATALOG: String = SQLConf.get.getConf(SQLConf.DEFAULT_CATALOG)
+
}
diff --git
a/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/AtlasLineageDispatcherSuite.scala
b/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/AtlasLineageDispatcherSuite.scala
index 4d41b2a57..8e8d18f21 100644
---
a/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/AtlasLineageDispatcherSuite.scala
+++
b/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/AtlasLineageDispatcherSuite.scala
@@ -20,12 +20,11 @@ package org.apache.kyuubi.plugin.lineage.dispatcher.atlas
import java.util
import scala.collection.JavaConverters._
-import scala.collection.immutable.List
import org.apache.atlas.model.instance.{AtlasEntity, AtlasObjectId}
import org.apache.commons.lang3.StringUtils
import org.apache.spark.SparkConf
-import org.apache.spark.kyuubi.lineage.LineageConf.{DISPATCHERS,
SKIP_PARSING_PERMANENT_VIEW_ENABLED}
+import org.apache.spark.kyuubi.lineage.LineageConf.{DEFAULT_CATALOG,
DISPATCHERS, SKIP_PARSING_PERMANENT_VIEW_ENABLED}
import org.apache.spark.kyuubi.lineage.SparkContextHelper
import org.apache.spark.sql.SparkListenerExtensionTest
import org.scalatest.concurrent.PatienceConfiguration.Timeout
@@ -33,7 +32,7 @@ import org.scalatest.time.SpanSugar._
import org.apache.kyuubi.KyuubiFunSuite
import org.apache.kyuubi.plugin.lineage.Lineage
-import
org.apache.kyuubi.plugin.lineage.dispatcher.atlas.AtlasEntityHelper.{COLUMN_LINEAGE_TYPE,
PROCESS_TYPE}
+import
org.apache.kyuubi.plugin.lineage.dispatcher.atlas.AtlasEntityHelper.{buildColumnQualifiedName,
buildTableQualifiedName, COLUMN_LINEAGE_TYPE, PROCESS_TYPE}
import
org.apache.kyuubi.plugin.lineage.helper.SparkListenerHelper.SPARK_RUNTIME_VERSION
class AtlasLineageDispatcherSuite extends KyuubiFunSuite with
SparkListenerExtensionTest {
@@ -67,11 +66,17 @@ class AtlasLineageDispatcherSuite extends KyuubiFunSuite
with SparkListenerExten
spark.sql("create table test_table1(a string, d int)")
spark.sql("insert into test_table1 select a, b + c as d from
test_table0").collect()
val expected = Lineage(
- List("default.test_table0"),
- List("default.test_table1"),
+ List(s"$DEFAULT_CATALOG.default.test_table0"),
+ List(s"$DEFAULT_CATALOG.default.test_table1"),
List(
- ("default.test_table1.a", Set("default.test_table0.a")),
- ("default.test_table1.d", Set("default.test_table0.b",
"default.test_table0.c"))))
+ (
+ s"$DEFAULT_CATALOG.default.test_table1.a",
+ Set(s"$DEFAULT_CATALOG.default.test_table0.a")),
+ (
+ s"$DEFAULT_CATALOG.default.test_table1.d",
+ Set(
+ s"$DEFAULT_CATALOG.default.test_table0.b",
+ s"$DEFAULT_CATALOG.default.test_table0.c"))))
eventually(Timeout(5.seconds)) {
assert(mockAtlasClient.getEntities != null &&
mockAtlasClient.getEntities.nonEmpty)
}
@@ -100,8 +105,10 @@ class AtlasLineageDispatcherSuite extends KyuubiFunSuite
with SparkListenerExten
.asInstanceOf[util.Collection[AtlasObjectId]].asScala.map(getQualifiedName)
val outputs = entity.getRelationshipAttribute("outputs")
.asInstanceOf[util.Collection[AtlasObjectId]].asScala.map(getQualifiedName)
- assertResult(expected.inputTables.map(s => s"$s@$cluster"))(inputs)
- assertResult(expected.outputTables.map(s => s"$s@$cluster"))(outputs)
+ assertResult(expected.inputTables
+ .flatMap(buildTableQualifiedName(_).toSeq))(inputs)
+ assertResult(expected.outputTables
+ .flatMap(buildTableQualifiedName(_).toSeq))(outputs)
}
def checkAtlasColumnLineageEntities(
@@ -114,18 +121,20 @@ class AtlasLineageDispatcherSuite extends KyuubiFunSuite
with SparkListenerExten
case (entity, expectedLineage) =>
assert(entity.getTypeName == COLUMN_LINEAGE_TYPE)
val expectedQualifiedName =
-
s"${processEntity.getAttribute("qualifiedName")}:${expectedLineage.column}"
+ s"${processEntity.getAttribute("qualifiedName")}:" +
+ s"${buildColumnQualifiedName(expectedLineage.column).get}"
assert(entity.getAttribute("qualifiedName") == expectedQualifiedName)
assert(entity.getAttribute("name") == expectedQualifiedName)
val inputs = entity.getRelationshipAttribute("inputs")
.asInstanceOf[util.Collection[AtlasObjectId]].asScala.map(getQualifiedName)
- assertResult(expectedLineage.originalColumns.map(s =>
s"$s@$cluster"))(inputs.toSet)
+ assertResult(expectedLineage.originalColumns
+ .flatMap(buildColumnQualifiedName(_).toSet))(inputs.toSet)
val outputs = entity.getRelationshipAttribute("outputs")
.asInstanceOf[util.Collection[AtlasObjectId]].asScala.map(getQualifiedName)
assert(outputs.size == 1)
- assert(s"${expectedLineage.column}@$cluster" == outputs.head)
+ assert(buildColumnQualifiedName(expectedLineage.column).toSeq.head ==
outputs.head)
assert(getQualifiedName(entity.getRelationshipAttribute("process").asInstanceOf[
AtlasObjectId]) == processEntity.getAttribute("qualifiedName"))
diff --git
a/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/events/OperationLineageEventSuite.scala
b/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/events/OperationLineageEventSuite.scala
index ff0a55ff1..378eb3bb4 100644
---
a/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/events/OperationLineageEventSuite.scala
+++
b/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/events/OperationLineageEventSuite.scala
@@ -19,8 +19,6 @@ package org.apache.kyuubi.plugin.lineage.events
import java.util.concurrent.{CountDownLatch, TimeUnit}
-import scala.collection.immutable.List
-
import org.apache.spark.SparkConf
import org.apache.spark.kyuubi.lineage.LineageConf._
import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent}
@@ -82,11 +80,11 @@ class OperationLineageEventSuite extends KyuubiFunSuite
with SparkListenerExtens
spark.sql("create table test_table0(a string, b string)")
spark.sql("select a as col0, b as col1 from test_table0").collect()
val expected = Lineage(
- List("default.test_table0"),
+ List(s"$DEFAULT_CATALOG.default.test_table0"),
List(),
List(
- ("col0", Set("default.test_table0.a")),
- ("col1", Set("default.test_table0.b"))))
+ ("col0", Set(s"$DEFAULT_CATALOG.default.test_table0.a")),
+ ("col1", Set(s"$DEFAULT_CATALOG.default.test_table0.b"))))
countDownLatch.await(20, TimeUnit.SECONDS)
assert(actualSparkEventLineage == expected)
assert(actualKyuubiEventLineage == expected)
@@ -97,11 +95,11 @@ class OperationLineageEventSuite extends KyuubiFunSuite
with SparkListenerExtens
val countDownLatch = new CountDownLatch(1)
var executionId: Long = -1
val expected = Lineage(
- List("default.table1", "default.table0"),
+ List(s"$DEFAULT_CATALOG.default.table1",
s"$DEFAULT_CATALOG.default.table0"),
List(),
List(
- ("aa", Set("default.table1.a")),
- ("bb", Set("default.table0.b"))))
+ ("aa", Set(s"$DEFAULT_CATALOG.default.table1.a")),
+ ("bb", Set(s"$DEFAULT_CATALOG.default.table0.b"))))
spark.sparkContext.addSparkListener(new SparkListener {
override def onOtherEvent(event: SparkListenerEvent): Unit = {
@@ -163,11 +161,11 @@ class OperationLineageEventSuite extends KyuubiFunSuite
with SparkListenerExtens
s" where a in ('HELLO') and c = 'HELLO'").collect()
val expected = Lineage(
- List("default.t2"),
+ List(s"$DEFAULT_CATALOG.default.t2"),
List(),
List(
- ("k", Set("default.t2.a")),
- ("b", Set("default.t2.b"))))
+ ("k", Set(s"$DEFAULT_CATALOG.default.t2.a")),
+ ("b", Set(s"$DEFAULT_CATALOG.default.t2.b"))))
countDownLatch.await(20, TimeUnit.SECONDS)
assert(actual == expected)
}
diff --git
a/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParserHelperSuite.scala
b/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParserHelperSuite.scala
index d3cd41abc..3c19163db 100644
---
a/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParserHelperSuite.scala
+++
b/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParserHelperSuite.scala
@@ -38,6 +38,7 @@ class SparkSQLLineageParserHelperSuite extends KyuubiFunSuite
if (SPARK_RUNTIME_VERSION <= "3.1")
"org.apache.spark.sql.connector.InMemoryTableCatalog"
else "org.apache.spark.sql.connector.catalog.InMemoryTableCatalog"
+ val DEFAULT_CATALOG = LineageConf.DEFAULT_CATALOG
override protected val catalogImpl: String = "hive"
override def sparkConf(): SparkConf = {
@@ -76,20 +77,26 @@ class SparkSQLLineageParserHelperSuite extends
KyuubiFunSuite
val ret0 =
extractLineage("alter view alterviewascommand as select key from
test_db0.test_table0")
assert(ret0 == Lineage(
- List("test_db0.test_table0"),
- List("default.alterviewascommand"),
- List(("default.alterviewascommand.key",
Set("test_db0.test_table0.key")))))
+ List(s"$DEFAULT_CATALOG.test_db0.test_table0"),
+ List(s"$DEFAULT_CATALOG.default.alterviewascommand"),
+ List((
+ s"$DEFAULT_CATALOG.default.alterviewascommand.key",
+ Set(s"$DEFAULT_CATALOG.test_db0.test_table0.key")))))
spark.sql("create view alterviewascommand1 as select * from
test_db0.test_table0")
val ret1 =
extractLineage("alter view alterviewascommand1 as select * from
test_db0.test_table0")
assert(ret1 == Lineage(
- List("test_db0.test_table0"),
- List("default.alterviewascommand1"),
+ List(s"$DEFAULT_CATALOG.test_db0.test_table0"),
+ List(s"$DEFAULT_CATALOG.default.alterviewascommand1"),
List(
- ("default.alterviewascommand1.key", Set("test_db0.test_table0.key")),
- ("default.alterviewascommand1.value",
Set("test_db0.test_table0.value")))))
+ (
+ s"$DEFAULT_CATALOG.default.alterviewascommand1.key",
+ Set(s"$DEFAULT_CATALOG.test_db0.test_table0.key")),
+ (
+ s"$DEFAULT_CATALOG.default.alterviewascommand1.value",
+ Set(s"$DEFAULT_CATALOG.test_db0.test_table0.value")))))
}
}
@@ -106,11 +113,11 @@ class SparkSQLLineageParserHelperSuite extends
KyuubiFunSuite
" select col1 as a, col2 as b, col3 as c from v2_catalog.db.tbb")
assert(result == Lineage(
List("v2_catalog.db.tbb"),
- List("default.test_view"),
+ List(s"$DEFAULT_CATALOG.default.test_view"),
List(
- ("default.test_view.a", Set("v2_catalog.db.tbb.col1")),
- ("default.test_view.b", Set("v2_catalog.db.tbb.col2")),
- ("default.test_view.c", Set("v2_catalog.db.tbb.col3")))))
+ (s"$DEFAULT_CATALOG.default.test_view.a",
Set("v2_catalog.db.tbb.col1")),
+ (s"$DEFAULT_CATALOG.default.test_view.b",
Set("v2_catalog.db.tbb.col2")),
+ (s"$DEFAULT_CATALOG.default.test_view.c",
Set("v2_catalog.db.tbb.col3")))))
}
}
@@ -126,32 +133,32 @@ class SparkSQLLineageParserHelperSuite extends
KyuubiFunSuite
s"insert into table v2_catalog.db.tb0 " +
s"select key as col1, value as col2 from test_db0.test_table0")
assert(ret0 == Lineage(
- List("test_db0.test_table0"),
+ List(s"$DEFAULT_CATALOG.test_db0.test_table0"),
List("v2_catalog.db.tb0"),
List(
- ("v2_catalog.db.tb0.col1", Set("test_db0.test_table0.key")),
- ("v2_catalog.db.tb0.col2", Set("test_db0.test_table0.value")))))
+ ("v2_catalog.db.tb0.col1",
Set(s"$DEFAULT_CATALOG.test_db0.test_table0.key")),
+ ("v2_catalog.db.tb0.col2",
Set(s"$DEFAULT_CATALOG.test_db0.test_table0.value")))))
val ret1 =
extractLineage(
s"insert overwrite table v2_catalog.db.tb0 partition(col2) " +
s"select key as col1, value as col2 from test_db0.test_table0")
assert(ret1 == Lineage(
- List("test_db0.test_table0"),
+ List(s"$DEFAULT_CATALOG.test_db0.test_table0"),
List("v2_catalog.db.tb0"),
List(
- ("v2_catalog.db.tb0.col1", Set("test_db0.test_table0.key")),
- ("v2_catalog.db.tb0.col2", Set("test_db0.test_table0.value")))))
+ ("v2_catalog.db.tb0.col1",
Set(s"$DEFAULT_CATALOG.test_db0.test_table0.key")),
+ ("v2_catalog.db.tb0.col2",
Set(s"$DEFAULT_CATALOG.test_db0.test_table0.value")))))
val ret2 =
extractLineage(
s"insert overwrite table v2_catalog.db.tb0 partition(col2 = 'bb') " +
s"select key as col1 from test_db0.test_table0")
assert(ret2 == Lineage(
- List("test_db0.test_table0"),
+ List(s"$DEFAULT_CATALOG.test_db0.test_table0"),
List("v2_catalog.db.tb0"),
List(
- ("v2_catalog.db.tb0.col1", Set("test_db0.test_table0.key")),
+ ("v2_catalog.db.tb0.col1",
Set(s"$DEFAULT_CATALOG.test_db0.test_table0.key")),
("v2_catalog.db.tb0.col2", Set()))))
}
}
@@ -220,29 +227,41 @@ class SparkSQLLineageParserHelperSuite extends
KyuubiFunSuite
val ret0 = extractLineage(
"create view createviewcommand(a, b) as select key, value from
test_db0.test_table0")
assert(ret0 == Lineage(
- List("test_db0.test_table0"),
- List("default.createviewcommand"),
+ List(s"$DEFAULT_CATALOG.test_db0.test_table0"),
+ List(s"$DEFAULT_CATALOG.default.createviewcommand"),
List(
- ("default.createviewcommand.a", Set("test_db0.test_table0.key")),
- ("default.createviewcommand.b", Set("test_db0.test_table0.value")))))
+ (
+ s"$DEFAULT_CATALOG.default.createviewcommand.a",
+ Set(s"$DEFAULT_CATALOG.test_db0.test_table0.key")),
+ (
+ s"$DEFAULT_CATALOG.default.createviewcommand.b",
+ Set(s"$DEFAULT_CATALOG.test_db0.test_table0.value")))))
val ret1 = extractLineage(
"create view createviewcommand1 as select key, value from
test_db0.test_table0")
assert(ret1 == Lineage(
- List("test_db0.test_table0"),
- List("default.createviewcommand1"),
+ List(s"$DEFAULT_CATALOG.test_db0.test_table0"),
+ List(s"$DEFAULT_CATALOG.default.createviewcommand1"),
List(
- ("default.createviewcommand1.key", Set("test_db0.test_table0.key")),
- ("default.createviewcommand1.value",
Set("test_db0.test_table0.value")))))
+ (
+ s"$DEFAULT_CATALOG.default.createviewcommand1.key",
+ Set(s"$DEFAULT_CATALOG.test_db0.test_table0.key")),
+ (
+ s"$DEFAULT_CATALOG.default.createviewcommand1.value",
+ Set(s"$DEFAULT_CATALOG.test_db0.test_table0.value")))))
val ret2 = extractLineage(
"create view createviewcommand2 as select * from test_db0.test_table0")
assert(ret2 == Lineage(
- List("test_db0.test_table0"),
- List("default.createviewcommand2"),
+ List(s"$DEFAULT_CATALOG.test_db0.test_table0"),
+ List(s"$DEFAULT_CATALOG.default.createviewcommand2"),
List(
- ("default.createviewcommand2.key", Set("test_db0.test_table0.key")),
- ("default.createviewcommand2.value",
Set("test_db0.test_table0.value")))))
+ (
+ s"$DEFAULT_CATALOG.default.createviewcommand2.key",
+ Set(s"$DEFAULT_CATALOG.test_db0.test_table0.key")),
+ (
+ s"$DEFAULT_CATALOG.default.createviewcommand2.value",
+ Set(s"$DEFAULT_CATALOG.test_db0.test_table0.value")))))
}
}
@@ -253,25 +272,29 @@ class SparkSQLLineageParserHelperSuite extends
KyuubiFunSuite
extractLineage("create table createdatasourcetableasselectcommand
using parquet" +
" AS SELECT key, value FROM test_db0.test_table0")
assert(ret0 == Lineage(
- List("test_db0.test_table0"),
- List("default.createdatasourcetableasselectcommand"),
+ List(s"$DEFAULT_CATALOG.test_db0.test_table0"),
+
List(s"$DEFAULT_CATALOG.default.createdatasourcetableasselectcommand"),
List(
- ("default.createdatasourcetableasselectcommand.key",
Set("test_db0.test_table0.key")),
(
- "default.createdatasourcetableasselectcommand.value",
- Set("test_db0.test_table0.value")))))
+
s"$DEFAULT_CATALOG.default.createdatasourcetableasselectcommand.key",
+ Set(s"$DEFAULT_CATALOG.test_db0.test_table0.key")),
+ (
+
s"$DEFAULT_CATALOG.default.createdatasourcetableasselectcommand.value",
+ Set(s"$DEFAULT_CATALOG.test_db0.test_table0.value")))))
val ret1 =
extractLineage("create table createdatasourcetableasselectcommand1
using parquet" +
" AS SELECT * FROM test_db0.test_table0")
assert(ret1 == Lineage(
- List("test_db0.test_table0"),
- List("default.createdatasourcetableasselectcommand1"),
+ List(s"$DEFAULT_CATALOG.test_db0.test_table0"),
+
List(s"$DEFAULT_CATALOG.default.createdatasourcetableasselectcommand1"),
List(
- ("default.createdatasourcetableasselectcommand1.key",
Set("test_db0.test_table0.key")),
(
- "default.createdatasourcetableasselectcommand1.value",
- Set("test_db0.test_table0.value")))))
+
s"$DEFAULT_CATALOG.default.createdatasourcetableasselectcommand1.key",
+ Set(s"$DEFAULT_CATALOG.test_db0.test_table0.key")),
+ (
+
s"$DEFAULT_CATALOG.default.createdatasourcetableasselectcommand1.value",
+ Set(s"$DEFAULT_CATALOG.test_db0.test_table0.value")))))
}
}
@@ -280,20 +303,28 @@ class SparkSQLLineageParserHelperSuite extends
KyuubiFunSuite
val ret0 = extractLineage("create table createhivetableasselectcommand
using hive" +
" as select key, value from test_db0.test_table0")
assert(ret0 == Lineage(
- List("test_db0.test_table0"),
- List("default.createhivetableasselectcommand"),
+ List(s"$DEFAULT_CATALOG.test_db0.test_table0"),
+ List(s"$DEFAULT_CATALOG.default.createhivetableasselectcommand"),
List(
- ("default.createhivetableasselectcommand.key",
Set("test_db0.test_table0.key")),
- ("default.createhivetableasselectcommand.value",
Set("test_db0.test_table0.value")))))
+ (
+ s"$DEFAULT_CATALOG.default.createhivetableasselectcommand.key",
+ Set(s"$DEFAULT_CATALOG.test_db0.test_table0.key")),
+ (
+ s"$DEFAULT_CATALOG.default.createhivetableasselectcommand.value",
+ Set(s"$DEFAULT_CATALOG.test_db0.test_table0.value")))))
val ret1 = extractLineage("create table createhivetableasselectcommand1
using hive" +
" as select * from test_db0.test_table0")
assert(ret1 == Lineage(
- List("test_db0.test_table0"),
- List("default.createhivetableasselectcommand1"),
+ List(s"$DEFAULT_CATALOG.test_db0.test_table0"),
+ List(s"$DEFAULT_CATALOG.default.createhivetableasselectcommand1"),
List(
- ("default.createhivetableasselectcommand1.key",
Set("test_db0.test_table0.key")),
- ("default.createhivetableasselectcommand1.value",
Set("test_db0.test_table0.value")))))
+ (
+ s"$DEFAULT_CATALOG.default.createhivetableasselectcommand1.key",
+ Set(s"$DEFAULT_CATALOG.test_db0.test_table0.key")),
+ (
+ s"$DEFAULT_CATALOG.default.createhivetableasselectcommand1.value",
+ Set(s"$DEFAULT_CATALOG.test_db0.test_table0.value")))))
}
}
@@ -304,13 +335,15 @@ class SparkSQLLineageParserHelperSuite extends
KyuubiFunSuite
"create table optimizedcreatehivetableasselectcommand stored as
parquet " +
"as select * from test_db0.test_table0")
assert(ret == Lineage(
- List("test_db0.test_table0"),
- List("default.optimizedcreatehivetableasselectcommand"),
+ List(s"$DEFAULT_CATALOG.test_db0.test_table0"),
+
List(s"$DEFAULT_CATALOG.default.optimizedcreatehivetableasselectcommand"),
List(
- ("default.optimizedcreatehivetableasselectcommand.key",
Set("test_db0.test_table0.key")),
(
- "default.optimizedcreatehivetableasselectcommand.value",
- Set("test_db0.test_table0.value")))))
+
s"$DEFAULT_CATALOG.default.optimizedcreatehivetableasselectcommand.key",
+ Set(s"$DEFAULT_CATALOG.test_db0.test_table0.key")),
+ (
+
s"$DEFAULT_CATALOG.default.optimizedcreatehivetableasselectcommand.value",
+ Set(s"$DEFAULT_CATALOG.test_db0.test_table0.value")))))
}
}
@@ -321,24 +354,28 @@ class SparkSQLLineageParserHelperSuite extends
KyuubiFunSuite
val ret0 = extractLineage("create table
v2_catalog.db.createhivetableasselectcommand" +
" as select key, value from test_db0.test_table0")
assert(ret0 == Lineage(
- List("test_db0.test_table0"),
+ List(s"$DEFAULT_CATALOG.test_db0.test_table0"),
List("v2_catalog.db.createhivetableasselectcommand"),
List(
- ("v2_catalog.db.createhivetableasselectcommand.key",
Set("test_db0.test_table0.key")),
+ (
+ "v2_catalog.db.createhivetableasselectcommand.key",
+ Set(s"$DEFAULT_CATALOG.test_db0.test_table0.key")),
(
"v2_catalog.db.createhivetableasselectcommand.value",
- Set("test_db0.test_table0.value")))))
+ Set(s"$DEFAULT_CATALOG.test_db0.test_table0.value")))))
val ret1 = extractLineage("create table
v2_catalog.db.createhivetableasselectcommand1" +
" as select * from test_db0.test_table0")
assert(ret1 == Lineage(
- List("test_db0.test_table0"),
+ List(s"$DEFAULT_CATALOG.test_db0.test_table0"),
List("v2_catalog.db.createhivetableasselectcommand1"),
List(
- ("v2_catalog.db.createhivetableasselectcommand1.key",
Set("test_db0.test_table0.key")),
+ (
+ "v2_catalog.db.createhivetableasselectcommand1.key",
+ Set(s"$DEFAULT_CATALOG.test_db0.test_table0.key")),
(
"v2_catalog.db.createhivetableasselectcommand1.value",
- Set("test_db0.test_table0.value")))))
+ Set(s"$DEFAULT_CATALOG.test_db0.test_table0.value")))))
}
}
@@ -366,21 +403,29 @@ class SparkSQLLineageParserHelperSuite extends
KyuubiFunSuite
extractLineage(
s"insert into table $tableName select key, value from
test_db0.test_table0")
assert(ret0 == Lineage(
- List("test_db0.test_table0"),
- List("default.insertintodatasourcecommand"),
+ List(s"$DEFAULT_CATALOG.test_db0.test_table0"),
+ List(s"$DEFAULT_CATALOG.default.insertintodatasourcecommand"),
List(
- ("default.insertintodatasourcecommand.a",
Set("test_db0.test_table0.key")),
- ("default.insertintodatasourcecommand.b",
Set("test_db0.test_table0.value")))))
+ (
+ s"$DEFAULT_CATALOG.default.insertintodatasourcecommand.a",
+ Set(s"$DEFAULT_CATALOG.test_db0.test_table0.key")),
+ (
+ s"$DEFAULT_CATALOG.default.insertintodatasourcecommand.b",
+ Set(s"$DEFAULT_CATALOG.test_db0.test_table0.value")))))
val ret1 =
extractLineage(
s"insert into table $tableName select * from test_db0.test_table0")
assert(ret1 == Lineage(
- List("test_db0.test_table0"),
- List("default.insertintodatasourcecommand"),
+ List(s"$DEFAULT_CATALOG.test_db0.test_table0"),
+ List(s"$DEFAULT_CATALOG.default.insertintodatasourcecommand"),
List(
- ("default.insertintodatasourcecommand.a",
Set("test_db0.test_table0.key")),
- ("default.insertintodatasourcecommand.b",
Set("test_db0.test_table0.value")))))
+ (
+ s"$DEFAULT_CATALOG.default.insertintodatasourcecommand.a",
+ Set(s"$DEFAULT_CATALOG.test_db0.test_table0.key")),
+ (
+ s"$DEFAULT_CATALOG.default.insertintodatasourcecommand.b",
+ Set(s"$DEFAULT_CATALOG.test_db0.test_table0.value")))))
val ret2 =
extractLineage(
@@ -388,11 +433,15 @@ class SparkSQLLineageParserHelperSuite extends
KyuubiFunSuite
s"select (select key from test_db0.test_table1 limit 1) + 1 as aa,
" +
s"value as bb from test_db0.test_table0")
assert(ret2 == Lineage(
- List("test_db0.test_table1", "test_db0.test_table0"),
- List("default.insertintodatasourcecommand"),
+ List(s"$DEFAULT_CATALOG.test_db0.test_table1",
s"$DEFAULT_CATALOG.test_db0.test_table0"),
+ List(s"$DEFAULT_CATALOG.default.insertintodatasourcecommand"),
List(
- ("default.insertintodatasourcecommand.a",
Set("test_db0.test_table1.key")),
- ("default.insertintodatasourcecommand.b",
Set("test_db0.test_table0.value")))))
+ (
+ s"$DEFAULT_CATALOG.default.insertintodatasourcecommand.a",
+ Set(s"$DEFAULT_CATALOG.test_db0.test_table1.key")),
+ (
+ s"$DEFAULT_CATALOG.default.insertintodatasourcecommand.b",
+ Set(s"$DEFAULT_CATALOG.test_db0.test_table0.value")))))
}
}
@@ -406,11 +455,15 @@ class SparkSQLLineageParserHelperSuite extends
KyuubiFunSuite
s"insert into table $tableName select key, value from
test_db0.test_table0")
assert(ret0 == Lineage(
- List("test_db0.test_table0"),
- List("default.insertintohadoopfsrelationcommand"),
+ List(s"$DEFAULT_CATALOG.test_db0.test_table0"),
+ List(s"$DEFAULT_CATALOG.default.insertintohadoopfsrelationcommand"),
List(
- ("default.insertintohadoopfsrelationcommand.a",
Set("test_db0.test_table0.key")),
- ("default.insertintohadoopfsrelationcommand.b",
Set("test_db0.test_table0.value")))))
+ (
+ s"$DEFAULT_CATALOG.default.insertintohadoopfsrelationcommand.a",
+ Set(s"$DEFAULT_CATALOG.test_db0.test_table0.key")),
+ (
+ s"$DEFAULT_CATALOG.default.insertintohadoopfsrelationcommand.b",
+ Set(s"$DEFAULT_CATALOG.test_db0.test_table0.value")))))
}
}
@@ -423,12 +476,12 @@ class SparkSQLLineageParserHelperSuite extends
KyuubiFunSuite
|USING parquet
|SELECT * FROM
test_db0.test_table_part0""".stripMargin)
assert(ret0 == Lineage(
- List("test_db0.test_table_part0"),
+ List(s"$DEFAULT_CATALOG.test_db0.test_table_part0"),
List(s"""`$directory.path`"""),
List(
- (s"""`$directory.path`.key""", Set("test_db0.test_table_part0.key")),
- (s"""`$directory.path`.value""",
Set("test_db0.test_table_part0.value")),
- (s"""`$directory.path`.pid""", Set("test_db0.test_table_part0.pid")))))
+ (s"""`$directory.path`.key""",
Set(s"$DEFAULT_CATALOG.test_db0.test_table_part0.key")),
+ (s"""`$directory.path`.value""",
Set(s"$DEFAULT_CATALOG.test_db0.test_table_part0.value")),
+ (s"""`$directory.path`.pid""",
Set(s"$DEFAULT_CATALOG.test_db0.test_table_part0.pid")))))
}
test("columns lineage extract - InsertIntoHiveDirCommand") {
@@ -439,12 +492,12 @@ class SparkSQLLineageParserHelperSuite extends
KyuubiFunSuite
|USING parquet
|SELECT * FROM
test_db0.test_table_part0""".stripMargin)
assert(ret0 == Lineage(
- List("test_db0.test_table_part0"),
+ List(s"$DEFAULT_CATALOG.test_db0.test_table_part0"),
List(s"""`$directory.path`"""),
List(
- (s"""`$directory.path`.key""", Set("test_db0.test_table_part0.key")),
- (s"""`$directory.path`.value""",
Set("test_db0.test_table_part0.value")),
- (s"""`$directory.path`.pid""", Set("test_db0.test_table_part0.pid")))))
+ (s"""`$directory.path`.key""",
Set(s"$DEFAULT_CATALOG.test_db0.test_table_part0.key")),
+ (s"""`$directory.path`.value""",
Set(s"$DEFAULT_CATALOG.test_db0.test_table_part0.value")),
+ (s"""`$directory.path`.pid""",
Set(s"$DEFAULT_CATALOG.test_db0.test_table_part0.pid")))))
}
test("columns lineage extract - InsertIntoHiveTable") {
@@ -456,11 +509,15 @@ class SparkSQLLineageParserHelperSuite extends
KyuubiFunSuite
s"insert into table $tableName select * from test_db0.test_table0")
assert(ret0 == Lineage(
- List("test_db0.test_table0"),
- List(s"default.$tableName"),
+ List(s"$DEFAULT_CATALOG.test_db0.test_table0"),
+ List(s"$DEFAULT_CATALOG.default.$tableName"),
List(
- (s"default.$tableName.a", Set("test_db0.test_table0.key")),
- (s"default.$tableName.b", Set("test_db0.test_table0.value")))))
+ (
+ s"$DEFAULT_CATALOG.default.$tableName.a",
+ Set(s"$DEFAULT_CATALOG.test_db0.test_table0.key")),
+ (
+ s"$DEFAULT_CATALOG.default.$tableName.b",
+ Set(s"$DEFAULT_CATALOG.test_db0.test_table0.value")))))
}
}
@@ -468,20 +525,20 @@ class SparkSQLLineageParserHelperSuite extends
KyuubiFunSuite
test("columns lineage extract - logical relation sql") {
val ret0 = extractLineage("select key, value from test_db0.test_table0")
assert(ret0 == Lineage(
- List("test_db0.test_table0"),
+ List(s"$DEFAULT_CATALOG.test_db0.test_table0"),
List(),
List(
- ("key", Set("test_db0.test_table0.key")),
- ("value", Set("test_db0.test_table0.value")))))
+ ("key", Set(s"$DEFAULT_CATALOG.test_db0.test_table0.key")),
+ ("value", Set(s"$DEFAULT_CATALOG.test_db0.test_table0.value")))))
val ret1 = extractLineage("select * from test_db0.test_table_part0")
assert(ret1 == Lineage(
- List("test_db0.test_table_part0"),
+ List(s"$DEFAULT_CATALOG.test_db0.test_table_part0"),
List(),
List(
- ("key", Set("test_db0.test_table_part0.key")),
- ("value", Set("test_db0.test_table_part0.value")),
- ("pid", Set("test_db0.test_table_part0.pid")))))
+ ("key", Set(s"$DEFAULT_CATALOG.test_db0.test_table_part0.key")),
+ ("value", Set(s"$DEFAULT_CATALOG.test_db0.test_table_part0.value")),
+ ("pid", Set(s"$DEFAULT_CATALOG.test_db0.test_table_part0.pid")))))
}
@@ -578,22 +635,22 @@ class SparkSQLLineageParserHelperSuite extends
KyuubiFunSuite
|select tmp0_0 as a0, tmp1_0 as a1 from tmp0 join tmp1 where tmp1_0
= tmp0_0
|""".stripMargin
val sql0ExpectResult = Lineage(
- List("default.tmp0", "default.tmp1"),
+ List(s"$DEFAULT_CATALOG.default.tmp0",
s"$DEFAULT_CATALOG.default.tmp1"),
List(),
List(
- "a0" -> Set("default.tmp0.tmp0_0"),
- "a1" -> Set("default.tmp1.tmp1_0")))
+ "a0" -> Set(s"$DEFAULT_CATALOG.default.tmp0.tmp0_0"),
+ "a1" -> Set(s"$DEFAULT_CATALOG.default.tmp1.tmp1_0")))
val sql1 =
"""
|select count(tmp1_0) as cnt, tmp1_1 from tmp1 group by tmp1_1
|""".stripMargin
val sql1ExpectResult = Lineage(
- List("default.tmp1"),
+ List(s"$DEFAULT_CATALOG.default.tmp1"),
List(),
List(
- "cnt" -> Set("default.tmp1.tmp1_0"),
- "tmp1_1" -> Set("default.tmp1.tmp1_1")))
+ "cnt" -> Set(s"$DEFAULT_CATALOG.default.tmp1.tmp1_0"),
+ "tmp1_1" -> Set(s"$DEFAULT_CATALOG.default.tmp1.tmp1_1")))
val ret0 = extractLineage(sql0)
assert(ret0 == sql0ExpectResult)
@@ -660,14 +717,14 @@ class SparkSQLLineageParserHelperSuite extends
KyuubiFunSuite
val ret0 = extractLineage(sql0)
assert(ret0 == Lineage(
List(
- "test_db.goods_detail0",
+ s"$DEFAULT_CATALOG.test_db.goods_detail0",
"v2_catalog.test_db_v2.goods_detail1",
"v2_catalog.test_db_v2.mall_icon_schedule",
"v2_catalog.test_db_v2.mall_icon"),
List(),
List(
- ("goods_id", Set("test_db.goods_detail0.goods_id")),
- ("cate_grory", Set("test_db.goods_detail0.cat_id")),
+ ("goods_id",
Set(s"$DEFAULT_CATALOG.test_db.goods_detail0.goods_id")),
+ ("cate_grory",
Set(s"$DEFAULT_CATALOG.test_db.goods_detail0.cat_id")),
("cat_id", Set("v2_catalog.test_db_v2.goods_detail1.cat_id")),
("product_id",
Set("v2_catalog.test_db_v2.goods_detail1.product_id")),
("start_time",
Set("v2_catalog.test_db_v2.mall_icon_schedule.start_time")),
@@ -728,12 +785,24 @@ class SparkSQLLineageParserHelperSuite extends
KyuubiFunSuite
|""".stripMargin
val ret0 = extractLineage(sql0)
assert(ret0 == Lineage(
- List("test_db.test_table0", "test_db.test_table1"),
+ List(s"$DEFAULT_CATALOG.test_db.test_table0",
s"$DEFAULT_CATALOG.test_db.test_table1"),
List(),
List(
- ("a", Set("test_db.test_table0.a", "test_db.test_table1.a")),
- ("b", Set("test_db.test_table0.b", "test_db.test_table1.b")),
- ("c", Set("test_db.test_table0.b", "test_db.test_table1.c")))))
+ (
+ "a",
+ Set(
+ s"$DEFAULT_CATALOG.test_db.test_table0.a",
+ s"$DEFAULT_CATALOG.test_db.test_table1.a")),
+ (
+ "b",
+ Set(
+ s"$DEFAULT_CATALOG.test_db.test_table0.b",
+ s"$DEFAULT_CATALOG.test_db.test_table1.b")),
+ (
+ "c",
+ Set(
+ s"$DEFAULT_CATALOG.test_db.test_table0.b",
+ s"$DEFAULT_CATALOG.test_db.test_table1.c")))))
}
}
@@ -769,18 +838,20 @@ class SparkSQLLineageParserHelperSuite extends
KyuubiFunSuite
|""".stripMargin
val ret0 = extractLineage(sql0)
assert(ret0 == Lineage(
- List("test_db.test_order_item"),
+ List(s"$DEFAULT_CATALOG.test_db.test_order_item"),
List(),
List(
- ("stat_date", Set("test_db.test_order_item.stat_date")),
- ("channel_id", Set("test_db.test_order_item.channel_id")),
- ("sub_channel_id", Set("test_db.test_order_item.sub_channel_id")),
- ("user_type", Set("test_db.test_order_item.user_type")),
- ("country_name", Set("test_db.test_order_item.country_name")),
- ("get_count0", Set("test_db.test_order_item.order_id")),
+ ("stat_date",
Set(s"$DEFAULT_CATALOG.test_db.test_order_item.stat_date")),
+ ("channel_id",
Set(s"$DEFAULT_CATALOG.test_db.test_order_item.channel_id")),
+ ("sub_channel_id",
Set(s"$DEFAULT_CATALOG.test_db.test_order_item.sub_channel_id")),
+ ("user_type",
Set(s"$DEFAULT_CATALOG.test_db.test_order_item.user_type")),
+ ("country_name",
Set(s"$DEFAULT_CATALOG.test_db.test_order_item.country_name")),
+ ("get_count0",
Set(s"$DEFAULT_CATALOG.test_db.test_order_item.order_id")),
(
"get_amount0",
- Set("test_db.test_order_item.goods_count",
"test_db.test_order_item.shop_price")),
+ Set(
+ s"$DEFAULT_CATALOG.test_db.test_order_item.goods_count",
+ s"$DEFAULT_CATALOG.test_db.test_order_item.shop_price")),
("add_time", Set[String]()))))
val sql1 =
"""
@@ -807,24 +878,32 @@ class SparkSQLLineageParserHelperSuite extends
KyuubiFunSuite
|""".stripMargin
val ret1 = extractLineage(sql1)
assert(ret1 == Lineage(
- List("test_db.test_order_item", "test_db.test_p0_order_item"),
+ List(
+ s"$DEFAULT_CATALOG.test_db.test_order_item",
+ s"$DEFAULT_CATALOG.test_db.test_p0_order_item"),
List(),
List(
(
"channel_id",
- Set("test_db.test_order_item.channel_id",
"test_db.test_p0_order_item.channel_id")),
+ Set(
+ s"$DEFAULT_CATALOG.test_db.test_order_item.channel_id",
+ s"$DEFAULT_CATALOG.test_db.test_p0_order_item.channel_id")),
(
"sub_channel_id",
Set(
- "test_db.test_order_item.sub_channel_id",
- "test_db.test_p0_order_item.sub_channel_id")),
+ s"$DEFAULT_CATALOG.test_db.test_order_item.sub_channel_id",
+ s"$DEFAULT_CATALOG.test_db.test_p0_order_item.sub_channel_id")),
(
"country_name",
- Set("test_db.test_order_item.country_name",
"test_db.test_p0_order_item.country_name")),
- ("get_count0", Set("test_db.test_order_item.order_id")),
+ Set(
+ s"$DEFAULT_CATALOG.test_db.test_order_item.country_name",
+ s"$DEFAULT_CATALOG.test_db.test_p0_order_item.country_name")),
+ ("get_count0",
Set(s"$DEFAULT_CATALOG.test_db.test_order_item.order_id")),
(
"get_amount0",
- Set("test_db.test_order_item.goods_count",
"test_db.test_order_item.shop_price")),
+ Set(
+ s"$DEFAULT_CATALOG.test_db.test_order_item.goods_count",
+ s"$DEFAULT_CATALOG.test_db.test_order_item.shop_price")),
("add_time", Set[String]()))))
}
}
@@ -833,56 +912,56 @@ class SparkSQLLineageParserHelperSuite extends
KyuubiFunSuite
val sql0 = """select key as a, count(*) as b, 1 as c from
test_db0.test_table0 group by key"""
val ret0 = extractLineage(sql0)
assert(ret0 == Lineage(
- List("test_db0.test_table0"),
+ List(s"$DEFAULT_CATALOG.test_db0.test_table0"),
List(),
List(
- ("a", Set("test_db0.test_table0.key")),
- ("b", Set("test_db0.test_table0.__count__")),
+ ("a", Set(s"$DEFAULT_CATALOG.test_db0.test_table0.key")),
+ ("b", Set(s"$DEFAULT_CATALOG.test_db0.test_table0.__count__")),
("c", Set()))))
val sql1 = """select count(*) as a, 1 as b from test_db0.test_table0"""
val ret1 = extractLineage(sql1)
assert(ret1 == Lineage(
- List("test_db0.test_table0"),
+ List(s"$DEFAULT_CATALOG.test_db0.test_table0"),
List(),
List(
- ("a", Set("test_db0.test_table0.__count__")),
+ ("a", Set(s"$DEFAULT_CATALOG.test_db0.test_table0.__count__")),
("b", Set()))))
val sql2 = """select every(key == 1) as a, 1 as b from
test_db0.test_table0"""
val ret2 = extractLineage(sql2)
assert(ret2 == Lineage(
- List("test_db0.test_table0"),
+ List(s"$DEFAULT_CATALOG.test_db0.test_table0"),
List(),
List(
- ("a", Set("test_db0.test_table0.key")),
+ ("a", Set(s"$DEFAULT_CATALOG.test_db0.test_table0.key")),
("b", Set()))))
val sql3 = """select count(*) as a, 1 as b from test_db0.test_table0"""
val ret3 = extractLineage(sql3)
assert(ret3 == Lineage(
- List("test_db0.test_table0"),
+ List(s"$DEFAULT_CATALOG.test_db0.test_table0"),
List(),
List(
- ("a", Set("test_db0.test_table0.__count__")),
+ ("a", Set(s"$DEFAULT_CATALOG.test_db0.test_table0.__count__")),
("b", Set()))))
val sql4 = """select first(key) as a, 1 as b from test_db0.test_table0"""
val ret4 = extractLineage(sql4)
assert(ret4 == Lineage(
- List("test_db0.test_table0"),
+ List(s"$DEFAULT_CATALOG.test_db0.test_table0"),
List(),
List(
- ("a", Set("test_db0.test_table0.key")),
+ ("a", Set(s"$DEFAULT_CATALOG.test_db0.test_table0.key")),
("b", Set()))))
val sql5 = """select avg(key) as a, 1 as b from test_db0.test_table0"""
val ret5 = extractLineage(sql5)
assert(ret5 == Lineage(
- List("test_db0.test_table0"),
+ List(s"$DEFAULT_CATALOG.test_db0.test_table0"),
List(),
List(
- ("a", Set("test_db0.test_table0.key")),
+ ("a", Set(s"$DEFAULT_CATALOG.test_db0.test_table0.key")),
("b", Set()))))
val sql6 =
@@ -890,19 +969,27 @@ class SparkSQLLineageParserHelperSuite extends
KyuubiFunSuite
| 1 as b from test_db0.test_table0""".stripMargin
val ret6 = extractLineage(sql6)
assert(ret6 == Lineage(
- List("test_db0.test_table0"),
+ List(s"$DEFAULT_CATALOG.test_db0.test_table0"),
List(),
List(
- ("a", Set("test_db0.test_table0.value", "test_db0.test_table0.key")),
+ (
+ "a",
+ Set(
+ s"$DEFAULT_CATALOG.test_db0.test_table0.value",
+ s"$DEFAULT_CATALOG.test_db0.test_table0.key")),
("b", Set()))))
val sql7 = """select count(*) + sum(key) as a, 1 as b from
test_db0.test_table0"""
val ret7 = extractLineage(sql7)
assert(ret7 == Lineage(
- List("test_db0.test_table0"),
+ List(s"$DEFAULT_CATALOG.test_db0.test_table0"),
List(),
List(
- ("a", Set("test_db0.test_table0.__count__",
"test_db0.test_table0.key")),
+ (
+ "a",
+ Set(
+ s"$DEFAULT_CATALOG.test_db0.test_table0.__count__",
+ s"$DEFAULT_CATALOG.test_db0.test_table0.key")),
("b", Set()))))
}
@@ -922,11 +1009,11 @@ class SparkSQLLineageParserHelperSuite extends
KyuubiFunSuite
|""".stripMargin
val ret0 = extractLineage(sql0)
assert(ret0 == Lineage(
- List("default.table1", "default.table0"),
+ List(s"$DEFAULT_CATALOG.default.table1",
s"$DEFAULT_CATALOG.default.table0"),
List(),
List(
- ("aa", Set("default.table1.a")),
- ("bb", Set("default.table0.b")))))
+ ("aa", Set(s"$DEFAULT_CATALOG.default.table1.a")),
+ ("bb", Set(s"$DEFAULT_CATALOG.default.table0.b")))))
val df0 = spark.sql("select a as a0, b as b0 from table0 where a = 2")
df0.cache()
@@ -935,11 +1022,11 @@ class SparkSQLLineageParserHelperSuite extends
KyuubiFunSuite
val analyzed = df.queryExecution.analyzed
val ret1 = SparkSQLLineageParseHelper(spark).transformToLineage(0,
analyzed).get
assert(ret1 == Lineage(
- List("default.table0", "default.table1"),
+ List(s"$DEFAULT_CATALOG.default.table0",
s"$DEFAULT_CATALOG.default.table1"),
List(),
List(
- ("aa", Set("default.table0.a")),
- ("bb", Set("default.table1.b")))))
+ ("aa", Set(s"$DEFAULT_CATALOG.default.table0.a")),
+ ("bb", Set(s"$DEFAULT_CATALOG.default.table1.b")))))
}
}
@@ -957,12 +1044,12 @@ class SparkSQLLineageParserHelperSuite extends
KyuubiFunSuite
|""".stripMargin
val ret0 = extractLineage(sql0)
assert(ret0 == Lineage(
- List("default.table0", "default.table1"),
+ List(s"$DEFAULT_CATALOG.default.table0",
s"$DEFAULT_CATALOG.default.table1"),
List(),
List(
- ("aa", Set("default.table0.a")),
- ("bb", Set("default.table1.b")),
- ("cc", Set("default.table1.c")))))
+ ("aa", Set(s"$DEFAULT_CATALOG.default.table0.a")),
+ ("bb", Set(s"$DEFAULT_CATALOG.default.table1.b")),
+ ("cc", Set(s"$DEFAULT_CATALOG.default.table1.c")))))
val sql1 =
"""
@@ -970,11 +1057,11 @@ class SparkSQLLineageParserHelperSuite extends
KyuubiFunSuite
|""".stripMargin
val ret1 = extractLineage(sql1)
assert(ret1 == Lineage(
- List("default.table1"),
+ List(s"$DEFAULT_CATALOG.default.table1"),
List(),
List(
- ("aa", Set("default.table1.a")),
- ("bb", Set("default.table1.b")))))
+ ("aa", Set(s"$DEFAULT_CATALOG.default.table1.a")),
+ ("bb", Set(s"$DEFAULT_CATALOG.default.table1.b")))))
val sql2 =
"""
@@ -982,11 +1069,11 @@ class SparkSQLLineageParserHelperSuite extends
KyuubiFunSuite
|""".stripMargin
val ret2 = extractLineage(sql2)
assert(ret2 == Lineage(
- List("default.table0", "default.table1"),
+ List(s"$DEFAULT_CATALOG.default.table0",
s"$DEFAULT_CATALOG.default.table1"),
List(),
List(
- ("aa", Set("default.table0.__count__")),
- ("bb", Set("default.table1.b")))))
+ ("aa", Set(s"$DEFAULT_CATALOG.default.table0.__count__")),
+ ("bb", Set(s"$DEFAULT_CATALOG.default.table1.b")))))
// ListQuery
val sql3 =
@@ -995,12 +1082,12 @@ class SparkSQLLineageParserHelperSuite extends
KyuubiFunSuite
|""".stripMargin
val ret3 = extractLineage(sql3)
assert(ret3 == Lineage(
- List("default.table0"),
+ List(s"$DEFAULT_CATALOG.default.table0"),
List(),
List(
- ("a", Set("default.table0.a")),
- ("b", Set("default.table0.b")),
- ("c", Set("default.table0.c")))))
+ ("a", Set(s"$DEFAULT_CATALOG.default.table0.a")),
+ ("b", Set(s"$DEFAULT_CATALOG.default.table0.b")),
+ ("c", Set(s"$DEFAULT_CATALOG.default.table0.c")))))
// Exists
val sql4 =
@@ -1009,12 +1096,12 @@ class SparkSQLLineageParserHelperSuite extends
KyuubiFunSuite
|""".stripMargin
val ret4 = extractLineage(sql4)
assert(ret4 == Lineage(
- List("default.table0"),
+ List(s"$DEFAULT_CATALOG.default.table0"),
List(),
List(
- ("a", Set("default.table0.a")),
- ("b", Set("default.table0.b")),
- ("c", Set("default.table0.c")))))
+ ("a", Set(s"$DEFAULT_CATALOG.default.table0.a")),
+ ("b", Set(s"$DEFAULT_CATALOG.default.table0.b")),
+ ("c", Set(s"$DEFAULT_CATALOG.default.table0.c")))))
val sql5 =
"""
@@ -1022,12 +1109,12 @@ class SparkSQLLineageParserHelperSuite extends
KyuubiFunSuite
|""".stripMargin
val ret5 = extractLineage(sql5)
assert(ret5 == Lineage(
- List("default.table0"),
+ List(s"$DEFAULT_CATALOG.default.table0"),
List(),
List(
- ("a", Set("default.table0.a")),
- ("b", Set("default.table0.b")),
- ("c", Set("default.table0.c")))))
+ ("a", Set(s"$DEFAULT_CATALOG.default.table0.a")),
+ ("b", Set(s"$DEFAULT_CATALOG.default.table0.b")),
+ ("c", Set(s"$DEFAULT_CATALOG.default.table0.c")))))
val sql6 =
"""
@@ -1035,12 +1122,12 @@ class SparkSQLLineageParserHelperSuite extends
KyuubiFunSuite
|""".stripMargin
val ret6 = extractLineage(sql6)
assert(ret6 == Lineage(
- List("default.table0"),
+ List(s"$DEFAULT_CATALOG.default.table0"),
List(),
List(
- ("a", Set("default.table0.a")),
- ("b", Set("default.table0.b")),
- ("c", Set("default.table0.c")))))
+ ("a", Set(s"$DEFAULT_CATALOG.default.table0.a")),
+ ("b", Set(s"$DEFAULT_CATALOG.default.table0.b")),
+ ("c", Set(s"$DEFAULT_CATALOG.default.table0.c")))))
val sql7 =
"""
@@ -1048,12 +1135,12 @@ class SparkSQLLineageParserHelperSuite extends
KyuubiFunSuite
|""".stripMargin
val ret7 = extractLineage(sql7)
assert(ret7 == Lineage(
- List("default.table0"),
+ List(s"$DEFAULT_CATALOG.default.table0"),
List(),
List(
- ("a", Set("default.table0.a")),
- ("b", Set("default.table0.b")),
- ("c", Set("default.table0.c")))))
+ ("a", Set(s"$DEFAULT_CATALOG.default.table0.a")),
+ ("b", Set(s"$DEFAULT_CATALOG.default.table0.b")),
+ ("c", Set(s"$DEFAULT_CATALOG.default.table0.c")))))
val sql8 =
"""
@@ -1061,11 +1148,11 @@ class SparkSQLLineageParserHelperSuite extends
KyuubiFunSuite
|""".stripMargin
val ret8 = extractLineage(sql8)
assert(ret8 == Lineage(
- List("default.table1"),
+ List(s"$DEFAULT_CATALOG.default.table1"),
List(),
List(
- ("(scalarsubquery() + 1)", Set("default.table1.a")),
- ("bb", Set("default.table1.b")))))
+ ("(scalarsubquery() + 1)",
Set(s"$DEFAULT_CATALOG.default.table1.a")),
+ ("bb", Set(s"$DEFAULT_CATALOG.default.table1.b")))))
val sql9 =
"""
@@ -1073,11 +1160,11 @@ class SparkSQLLineageParserHelperSuite extends
KyuubiFunSuite
|""".stripMargin
val ret9 = extractLineage(sql9)
assert(ret9 == Lineage(
- List("default.table1"),
+ List(s"$DEFAULT_CATALOG.default.table1"),
List(),
List(
- ("aa", Set("default.table1.a")),
- ("bb", Set("default.table1.b")))))
+ ("aa", Set(s"$DEFAULT_CATALOG.default.table1.a")),
+ ("bb", Set(s"$DEFAULT_CATALOG.default.table1.b")))))
val sql10 =
"""
@@ -1086,11 +1173,11 @@ class SparkSQLLineageParserHelperSuite extends
KyuubiFunSuite
|""".stripMargin
val ret10 = extractLineage(sql10)
assert(ret10 == Lineage(
- List("default.table1", "default.table0"),
+ List(s"$DEFAULT_CATALOG.default.table1",
s"$DEFAULT_CATALOG.default.table0"),
List(),
List(
- ("aa", Set("default.table1.a", "default.table0.a")),
- ("bb", Set("default.table1.b")))))
+ ("aa", Set(s"$DEFAULT_CATALOG.default.table1.a",
s"$DEFAULT_CATALOG.default.table0.a")),
+ ("bb", Set(s"$DEFAULT_CATALOG.default.table1.b")))))
val sql11 =
"""
@@ -1099,11 +1186,11 @@ class SparkSQLLineageParserHelperSuite extends
KyuubiFunSuite
val ret11 = extractLineage(sql11)
assert(ret11 == Lineage(
- List("default.table1"),
+ List(s"$DEFAULT_CATALOG.default.table1"),
List(),
List(
- ("a", Set("default.table1.a")),
- ("b", Set("default.table1.b")))))
+ ("a", Set(s"$DEFAULT_CATALOG.default.table1.a")),
+ ("b", Set(s"$DEFAULT_CATALOG.default.table1.b")))))
}
}
@@ -1120,12 +1207,14 @@ class SparkSQLLineageParserHelperSuite extends
KyuubiFunSuite
s"count(distinct(b)) * count(distinct(c))" +
s"from t2 group by a")
assert(ret0 == Lineage(
- List("default.t2"),
- List("default.t1"),
+ List(s"$DEFAULT_CATALOG.default.t2"),
+ List(s"$DEFAULT_CATALOG.default.t1"),
List(
- ("default.t1.a", Set("default.t2.a")),
- ("default.t1.b", Set("default.t2.b")),
- ("default.t1.c", Set("default.t2.b", "default.t2.c")))))
+ (s"$DEFAULT_CATALOG.default.t1.a",
Set(s"$DEFAULT_CATALOG.default.t2.a")),
+ (s"$DEFAULT_CATALOG.default.t1.b",
Set(s"$DEFAULT_CATALOG.default.t2.b")),
+ (
+ s"$DEFAULT_CATALOG.default.t1.c",
+ Set(s"$DEFAULT_CATALOG.default.t2.b",
s"$DEFAULT_CATALOG.default.t2.c")))))
val ret1 =
extractLineage(
@@ -1166,12 +1255,12 @@ class SparkSQLLineageParserHelperSuite extends
KyuubiFunSuite
s"insert into table t1 select a,b,GROUPING__ID " +
s"from t2 group by a,b,c,d grouping sets ((a,b,c), (a,b,d))")
assert(ret0 == Lineage(
- List("default.t2"),
- List("default.t1"),
+ List(s"$DEFAULT_CATALOG.default.t2"),
+ List(s"$DEFAULT_CATALOG.default.t1"),
List(
- ("default.t1.a", Set("default.t2.a")),
- ("default.t1.b", Set("default.t2.b")),
- ("default.t1.c", Set()))))
+ (s"$DEFAULT_CATALOG.default.t1.a",
Set(s"$DEFAULT_CATALOG.default.t2.a")),
+ (s"$DEFAULT_CATALOG.default.t1.b",
Set(s"$DEFAULT_CATALOG.default.t2.b")),
+ (s"$DEFAULT_CATALOG.default.t1.c", Set()))))
}
}
@@ -1186,19 +1275,21 @@ class SparkSQLLineageParserHelperSuite extends
KyuubiFunSuite
s" where rank=1")
val ret0 = extractLineage("insert overwrite table t1 select a, b from
c1")
assert(ret0 == Lineage(
- List("default.t2"),
- List("default.t1"),
+ List(s"$DEFAULT_CATALOG.default.t2"),
+ List(s"$DEFAULT_CATALOG.default.t1"),
List(
- ("default.t1.a", Set("default.t2.a")),
- ("default.t1.b", Set("default.t2.b")))))
+ (s"$DEFAULT_CATALOG.default.t1.a",
Set(s"$DEFAULT_CATALOG.default.t2.a")),
+ (s"$DEFAULT_CATALOG.default.t1.b",
Set(s"$DEFAULT_CATALOG.default.t2.b")))))
val ret1 = extractLineage("insert overwrite table t1 select a, rank from
c1")
assert(ret1 == Lineage(
- List("default.t2"),
- List("default.t1"),
+ List(s"$DEFAULT_CATALOG.default.t2"),
+ List(s"$DEFAULT_CATALOG.default.t1"),
List(
- ("default.t1.a", Set("default.t2.a")),
- ("default.t1.b", Set("default.t2.a", "default.t2.b")))))
+ (s"$DEFAULT_CATALOG.default.t1.a",
Set(s"$DEFAULT_CATALOG.default.t2.a")),
+ (
+ s"$DEFAULT_CATALOG.default.t1.b",
+ Set(s"$DEFAULT_CATALOG.default.t2.a",
s"$DEFAULT_CATALOG.default.t2.b")))))
spark.sql(
s"cache table c2 select * from (" +
@@ -1206,11 +1297,11 @@ class SparkSQLLineageParserHelperSuite extends
KyuubiFunSuite
s" where rank=1")
val ret2 = extractLineage("insert overwrite table t1 select a, b from
c2")
assert(ret2 == Lineage(
- List("default.t2"),
- List("default.t1"),
+ List(s"$DEFAULT_CATALOG.default.t2"),
+ List(s"$DEFAULT_CATALOG.default.t1"),
List(
- ("default.t1.a", Set("default.t2.a")),
- ("default.t1.b", Set("default.t2.b")))))
+ (s"$DEFAULT_CATALOG.default.t1.a",
Set(s"$DEFAULT_CATALOG.default.t2.a")),
+ (s"$DEFAULT_CATALOG.default.t1.b",
Set(s"$DEFAULT_CATALOG.default.t2.b")))))
spark.sql(
s"cache table c3 select * from (" +
@@ -1218,11 +1309,11 @@ class SparkSQLLineageParserHelperSuite extends
KyuubiFunSuite
s" from t2) where rank=1")
val ret3 = extractLineage("insert overwrite table t1 select aa, bb from
c3")
assert(ret3 == Lineage(
- List("default.t2"),
- List("default.t1"),
+ List(s"$DEFAULT_CATALOG.default.t2"),
+ List(s"$DEFAULT_CATALOG.default.t1"),
List(
- ("default.t1.a", Set("default.t2.a")),
- ("default.t1.b", Set("default.t2.b")))))
+ (s"$DEFAULT_CATALOG.default.t1.a",
Set(s"$DEFAULT_CATALOG.default.t2.a")),
+ (s"$DEFAULT_CATALOG.default.t1.b",
Set(s"$DEFAULT_CATALOG.default.t2.b")))))
}
}
@@ -1234,12 +1325,12 @@ class SparkSQLLineageParserHelperSuite extends
KyuubiFunSuite
" ifnull(get_json_object(a, '$.b.imei'), get_json_object(a,
'$.b.android_id'))) from t2)")
assert(ret0 == Lineage(
- List("default.t2"),
- List("default.t1"),
+ List(s"$DEFAULT_CATALOG.default.t2"),
+ List(s"$DEFAULT_CATALOG.default.t1"),
List(
- ("default.t1.a", Set()),
- ("default.t1.b", Set()),
- ("default.t1.c", Set("default.t2.a")))))
+ (s"$DEFAULT_CATALOG.default.t1.a", Set()),
+ (s"$DEFAULT_CATALOG.default.t1.b", Set()),
+ (s"$DEFAULT_CATALOG.default.t1.c",
Set(s"$DEFAULT_CATALOG.default.t2.a")))))
}
}
@@ -1255,11 +1346,11 @@ class SparkSQLLineageParserHelperSuite extends
KyuubiFunSuite
s" from t2" +
s" where a in ('HELLO') and c = 'HELLO'")
assert(ret0 == Lineage(
- List("default.t1"),
- List("default.view_tst"),
+ List(s"$DEFAULT_CATALOG.default.t1"),
+ List(s"$DEFAULT_CATALOG.default.view_tst"),
List(
- ("default.view_tst.k", Set("default.t1.a")),
- ("default.view_tst.b", Set("default.t1.b")))))
+ (s"$DEFAULT_CATALOG.default.view_tst.k",
Set(s"$DEFAULT_CATALOG.default.t1.a")),
+ (s"$DEFAULT_CATALOG.default.view_tst.b",
Set(s"$DEFAULT_CATALOG.default.t1.b")))))
}
}
}
@@ -1276,11 +1367,11 @@ class SparkSQLLineageParserHelperSuite extends
KyuubiFunSuite
s" from t2" +
s" where a in ('HELLO') and c = 'HELLO'")
assert(ret0 == Lineage(
- List("default.t2"),
+ List(s"$DEFAULT_CATALOG.default.t2"),
List(),
List(
- ("k", Set("default.t2.a")),
- ("b", Set("default.t2.b")))))
+ ("k", Set(s"$DEFAULT_CATALOG.default.t2.a")),
+ ("b", Set(s"$DEFAULT_CATALOG.default.t2.b")))))
}
}
}
@@ -1294,13 +1385,13 @@ class SparkSQLLineageParserHelperSuite extends
KyuubiFunSuite
" insert overwrite table t2 select a,b where a=1" +
" insert overwrite table t3 select a,b where b=1")
assert(ret0 == Lineage(
- List("default.t1"),
- List("default.t2", "default.t3"),
+ List(s"$DEFAULT_CATALOG.default.t1"),
+ List(s"$DEFAULT_CATALOG.default.t2", s"$DEFAULT_CATALOG.default.t3"),
List(
- ("default.t2.a", Set("default.t1.a")),
- ("default.t2.b", Set("default.t1.b")),
- ("default.t3.a", Set("default.t1.a")),
- ("default.t3.b", Set("default.t1.b")))))
+ (s"$DEFAULT_CATALOG.default.t2.a",
Set(s"$DEFAULT_CATALOG.default.t1.a")),
+ (s"$DEFAULT_CATALOG.default.t2.b",
Set(s"$DEFAULT_CATALOG.default.t1.b")),
+ (s"$DEFAULT_CATALOG.default.t3.a",
Set(s"$DEFAULT_CATALOG.default.t1.a")),
+ (s"$DEFAULT_CATALOG.default.t3.b",
Set(s"$DEFAULT_CATALOG.default.t1.b")))))
}
}
@@ -1312,38 +1403,38 @@ class SparkSQLLineageParserHelperSuite extends
KyuubiFunSuite
val ret0 = extractLineage("insert into t1 select 1, t2.b, cc.action,
t2.d " +
"from t2 lateral view explode(split(c,'\\},\\{')) cc as action")
assert(ret0 == Lineage(
- List("default.t2"),
- List("default.t1"),
+ List(s"$DEFAULT_CATALOG.default.t2"),
+ List(s"$DEFAULT_CATALOG.default.t1"),
List(
- ("default.t1.a", Set()),
- ("default.t1.b", Set("default.t2.b")),
- ("default.t1.c", Set("default.t2.c")),
- ("default.t1.d", Set("default.t2.d")))))
+ (s"$DEFAULT_CATALOG.default.t1.a", Set()),
+ (s"$DEFAULT_CATALOG.default.t1.b",
Set(s"$DEFAULT_CATALOG.default.t2.b")),
+ (s"$DEFAULT_CATALOG.default.t1.c",
Set(s"$DEFAULT_CATALOG.default.t2.c")),
+ (s"$DEFAULT_CATALOG.default.t1.d",
Set(s"$DEFAULT_CATALOG.default.t2.d")))))
val ret1 = extractLineage("insert into t1 select 1, t2.b, cc.action0,
dd.action1 " +
"from t2 " +
"lateral view explode(split(c,'\\},\\{')) cc as action0 " +
"lateral view explode(split(d,'\\},\\{')) dd as action1")
assert(ret1 == Lineage(
- List("default.t2"),
- List("default.t1"),
+ List(s"$DEFAULT_CATALOG.default.t2"),
+ List(s"$DEFAULT_CATALOG.default.t1"),
List(
- ("default.t1.a", Set()),
- ("default.t1.b", Set("default.t2.b")),
- ("default.t1.c", Set("default.t2.c")),
- ("default.t1.d", Set("default.t2.d")))))
+ (s"$DEFAULT_CATALOG.default.t1.a", Set()),
+ (s"$DEFAULT_CATALOG.default.t1.b",
Set(s"$DEFAULT_CATALOG.default.t2.b")),
+ (s"$DEFAULT_CATALOG.default.t1.c",
Set(s"$DEFAULT_CATALOG.default.t2.c")),
+ (s"$DEFAULT_CATALOG.default.t1.d",
Set(s"$DEFAULT_CATALOG.default.t2.d")))))
val ret2 = extractLineage("insert into t1 select 1, t2.b, dd.pos,
dd.action1 " +
"from t2 " +
"lateral view posexplode(split(d,'\\},\\{')) dd as pos, action1")
assert(ret2 == Lineage(
- List("default.t2"),
- List("default.t1"),
+ List(s"$DEFAULT_CATALOG.default.t2"),
+ List(s"$DEFAULT_CATALOG.default.t1"),
List(
- ("default.t1.a", Set()),
- ("default.t1.b", Set("default.t2.b")),
- ("default.t1.c", Set("default.t2.d")),
- ("default.t1.d", Set("default.t2.d")))))
+ (s"$DEFAULT_CATALOG.default.t1.a", Set()),
+ (s"$DEFAULT_CATALOG.default.t1.b",
Set(s"$DEFAULT_CATALOG.default.t2.b")),
+ (s"$DEFAULT_CATALOG.default.t1.c",
Set(s"$DEFAULT_CATALOG.default.t2.d")),
+ (s"$DEFAULT_CATALOG.default.t1.d",
Set(s"$DEFAULT_CATALOG.default.t2.d")))))
}
}