This is an automated email from the ASF dual-hosted git repository.

ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new bb671be2b [KYUUBI #4925] Add default catalog using `spark_catalog` 
with the lineage result
bb671be2b is described below

commit bb671be2bd8cd8a29cb996a81b319daf4cfefd6c
Author: odone <[email protected]>
AuthorDate: Wed Aug 9 18:28:32 2023 +0800

    [KYUUBI #4925] Add default catalog using `spark_catalog` with the lineage 
result
    
    ### _Why are the changes needed?_
    
    close #4925
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run 
test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests)
 locally before make a pull request
    
    Closes #4932 from iodone/kyuubi-4868.
    
    Closes #4925
    
    ff3195772 [odone] remove the catalog with atlas supporting
    fda2fe321 [odone] add default catalog to v1 table when parsing lineage
    
    Authored-by: odone <[email protected]>
    Signed-off-by: ulyssesyou <[email protected]>
---
 .../dispatcher/atlas/AtlasEntityHelper.scala       |  45 +-
 .../helper/SparkSQLLineageParseHelper.scala        |  49 +-
 .../apache/spark/kyuubi/lineage/LineageConf.scala  |   3 +
 .../atlas/AtlasLineageDispatcherSuite.scala        |  33 +-
 .../events/OperationLineageEventSuite.scala        |  20 +-
 .../helper/SparkSQLLineageParserHelperSuite.scala  | 591 ++++++++++++---------
 6 files changed, 433 insertions(+), 308 deletions(-)

diff --git 
a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/AtlasEntityHelper.scala
 
b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/AtlasEntityHelper.scala
index 9575b5258..cfa19b7aa 100644
--- 
a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/AtlasEntityHelper.scala
+++ 
b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/AtlasEntityHelper.scala
@@ -20,7 +20,7 @@ package org.apache.kyuubi.plugin.lineage.dispatcher.atlas
 import scala.collection.JavaConverters._
 
 import org.apache.atlas.model.instance.{AtlasEntity, AtlasObjectId, 
AtlasRelatedObjectId}
-import org.apache.spark.kyuubi.lineage.SparkContextHelper
+import org.apache.spark.kyuubi.lineage.{LineageConf, SparkContextHelper}
 import org.apache.spark.sql.execution.QueryExecution
 
 import org.apache.kyuubi.plugin.lineage.Lineage
@@ -87,8 +87,9 @@ object AtlasEntityHelper {
 
       if (inputs.nonEmpty && outputs.nonEmpty) {
         val entity = new AtlasEntity(COLUMN_LINEAGE_TYPE)
+        val outputColumnName = 
buildColumnQualifiedName(columnLineage.column).get
         val qualifiedName =
-          
s"${processEntity.getAttribute("qualifiedName")}:${columnLineage.column}"
+          s"${processEntity.getAttribute("qualifiedName")}:${outputColumnName}"
         entity.setAttribute("qualifiedName", qualifiedName)
         entity.setAttribute("name", qualifiedName)
         entity.setRelationshipAttribute("inputs", inputs.asJava)
@@ -104,33 +105,33 @@ object AtlasEntityHelper {
   }
 
   def tableObjectId(tableName: String): Option[AtlasObjectId] = {
-    val dbTb = tableName.split('.')
-    if (dbTb.length == 2) {
-      val qualifiedName = tableQualifiedName(cluster, dbTb(0), dbTb(1))
-      // TODO parse datasource type
-      Some(new AtlasObjectId(HIVE_TABLE_TYPE, "qualifiedName", qualifiedName))
-    } else {
-      None
-    }
+    buildTableQualifiedName(tableName)
+      .map(new AtlasObjectId(HIVE_TABLE_TYPE, "qualifiedName", _))
   }
 
-  def tableQualifiedName(cluster: String, db: String, table: String): String = 
{
-    s"${db.toLowerCase}.${table.toLowerCase}@$cluster"
+  def buildTableQualifiedName(tableName: String): Option[String] = {
+    val defaultCatalog = LineageConf.DEFAULT_CATALOG
+    tableName.split('.') match {
+      case Array(`defaultCatalog`, db, table) =>
+        Some(s"${db.toLowerCase}.${table.toLowerCase}@$cluster")
+      case _ =>
+        None
+    }
   }
 
   def columnObjectId(columnName: String): Option[AtlasObjectId] = {
-    val dbTbCol = columnName.split('.')
-    if (dbTbCol.length == 3) {
-      val qualifiedName = columnQualifiedName(cluster, dbTbCol(0), dbTbCol(1), 
dbTbCol(2))
-      // TODO parse datasource type
-      Some(new AtlasObjectId(HIVE_COLUMN_TYPE, "qualifiedName", qualifiedName))
-    } else {
-      None
-    }
+    buildColumnQualifiedName(columnName)
+      .map(new AtlasObjectId(HIVE_COLUMN_TYPE, "qualifiedName", _))
   }
 
-  def columnQualifiedName(cluster: String, db: String, table: String, column: 
String): String = {
-    s"${db.toLowerCase}.${table.toLowerCase}.${column.toLowerCase}@$cluster"
+  def buildColumnQualifiedName(columnName: String): Option[String] = {
+    val defaultCatalog = LineageConf.DEFAULT_CATALOG
+    columnName.split('.') match {
+      case Array(`defaultCatalog`, db, table, column) =>
+        
Some(s"${db.toLowerCase}.${table.toLowerCase}.${column.toLowerCase}@$cluster")
+      case _ =>
+        None
+    }
   }
 
   def objectId(entity: AtlasEntity): AtlasObjectId = {
diff --git 
a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala
 
b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala
index a5f805fa9..dad37eac5 100644
--- 
a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala
+++ 
b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala
@@ -199,7 +199,7 @@ trait LineageParser {
           } else {
             getQuery(plan)
           }
-        val view = getField[TableIdentifier](plan, "name").unquotedString
+        val view = getV1TableName(getField[TableIdentifier](plan, 
"name").unquotedString)
         extractColumnsLineage(query, parentColumnsLineage).map { case (k, v) =>
           k.withName(s"$view.${k.name}") -> v
         }
@@ -207,7 +207,7 @@ trait LineageParser {
       case p
           if p.nodeName == "CreateViewCommand"
             && getField[ViewType](plan, "viewType") == PersistedView =>
-        val view = getField[TableIdentifier](plan, "name").unquotedString
+        val view = getV1TableName(getField[TableIdentifier](plan, 
"name").unquotedString)
         val outputCols =
           getField[Seq[(String, Option[String])]](plan, 
"userSpecifiedColumns").map(_._1)
         val query =
@@ -223,7 +223,7 @@ trait LineageParser {
         }
 
       case p if p.nodeName == "CreateDataSourceTableAsSelectCommand" =>
-        val table = getField[CatalogTable](plan, "table").qualifiedName
+        val table = getV1TableName(getField[CatalogTable](plan, 
"table").qualifiedName)
         extractColumnsLineage(getQuery(plan), parentColumnsLineage).map { case 
(k, v) =>
           k.withName(s"$table.${k.name}") -> v
         }
@@ -231,7 +231,7 @@ trait LineageParser {
       case p
           if p.nodeName == "CreateHiveTableAsSelectCommand" ||
             p.nodeName == "OptimizedCreateHiveTableAsSelectCommand" =>
-        val table = getField[CatalogTable](plan, "tableDesc").qualifiedName
+        val table = getV1TableName(getField[CatalogTable](plan, 
"tableDesc").qualifiedName)
         extractColumnsLineage(getQuery(plan), parentColumnsLineage).map { case 
(k, v) =>
           k.withName(s"$table.${k.name}") -> v
         }
@@ -259,7 +259,8 @@ trait LineageParser {
 
       case p if p.nodeName == "InsertIntoDataSourceCommand" =>
         val logicalRelation = getField[LogicalRelation](plan, 
"logicalRelation")
-        val table = 
logicalRelation.catalogTable.map(_.qualifiedName).getOrElse("")
+        val table = logicalRelation
+          .catalogTable.map(t => getV1TableName(t.qualifiedName)).getOrElse("")
         extractColumnsLineage(getQuery(plan), parentColumnsLineage).map {
           case (k, v) if table.nonEmpty =>
             k.withName(s"$table.${k.name}") -> v
@@ -267,7 +268,8 @@ trait LineageParser {
 
       case p if p.nodeName == "InsertIntoHadoopFsRelationCommand" =>
         val table =
-          getField[Option[CatalogTable]](plan, 
"catalogTable").map(_.qualifiedName)
+          getField[Option[CatalogTable]](plan, "catalogTable")
+            .map(t => getV1TableName(t.qualifiedName))
             .getOrElse("")
         extractColumnsLineage(getQuery(plan), parentColumnsLineage).map {
           case (k, v) if table.nonEmpty =>
@@ -286,7 +288,7 @@ trait LineageParser {
         }
 
       case p if p.nodeName == "InsertIntoHiveTable" =>
-        val table = getField[CatalogTable](plan, "table").qualifiedName
+        val table = getV1TableName(getField[CatalogTable](plan, 
"table").qualifiedName)
         extractColumnsLineage(getQuery(plan), parentColumnsLineage).map { case 
(k, v) =>
           k.withName(s"$table.${k.name}") -> v
         }
@@ -298,7 +300,7 @@ trait LineageParser {
           if p.nodeName == "AppendData"
             || p.nodeName == "OverwriteByExpression"
             || p.nodeName == "OverwritePartitionsDynamic" =>
-        val table = getField[NamedRelation](plan, "table").name
+        val table = getV2TableName(getField[NamedRelation](plan, "table"))
         extractColumnsLineage(getQuery(plan), parentColumnsLineage).map { case 
(k, v) =>
           k.withName(s"$table.${k.name}") -> v
         }
@@ -409,22 +411,22 @@ trait LineageParser {
         joinColumnsLineage(parentColumnsLineage, childrenColumnsLineage)
 
       case p: LogicalRelation if p.catalogTable.nonEmpty =>
-        val tableName = p.catalogTable.get.qualifiedName
+        val tableName = getV1TableName(p.catalogTable.get.qualifiedName)
         joinRelationColumnLineage(parentColumnsLineage, p.output, 
Seq(tableName))
 
       case p: HiveTableRelation =>
-        val tableName = p.tableMeta.qualifiedName
+        val tableName = getV1TableName(p.tableMeta.qualifiedName)
         joinRelationColumnLineage(parentColumnsLineage, p.output, 
Seq(tableName))
 
       case p: DataSourceV2ScanRelation =>
-        val tableName = p.name
+        val tableName = getV2TableName(p)
         joinRelationColumnLineage(parentColumnsLineage, p.output, 
Seq(tableName))
 
       // For creating the view from v2 table, the logical plan of table will
       // be the `DataSourceV2Relation` not the `DataSourceV2ScanRelation`.
       // because the view from the table is not going to read it.
       case p: DataSourceV2Relation =>
-        val tableName = p.name
+        val tableName = getV2TableName(p)
         joinRelationColumnLineage(parentColumnsLineage, p.output, 
Seq(tableName))
 
       case p: LocalRelation =>
@@ -445,7 +447,7 @@ trait LineageParser {
       case p: View =>
         if (!p.isTempView && SparkContextHelper.getConf(
             LineageConf.SKIP_PARSING_PERMANENT_VIEW_ENABLED)) {
-          val viewName = p.desc.qualifiedName
+          val viewName = getV1TableName(p.desc.qualifiedName)
           joinRelationColumnLineage(parentColumnsLineage, p.output, 
Seq(viewName))
         } else {
           val viewColumnsLineage =
@@ -476,6 +478,27 @@ trait LineageParser {
   }
 
   private def getQuery(plan: LogicalPlan): LogicalPlan = 
getField[LogicalPlan](plan, "query")
+
+  private def getV2TableName(plan: NamedRelation): String = {
+    plan match {
+      case relation: DataSourceV2ScanRelation =>
+        val catalog = 
relation.relation.catalog.map(_.name()).getOrElse(LineageConf.DEFAULT_CATALOG)
+        val database = 
relation.relation.identifier.get.namespace().mkString(".")
+        val table = relation.relation.identifier.get.name()
+        s"$catalog.$database.$table"
+      case relation: DataSourceV2Relation =>
+        val catalog = 
relation.catalog.map(_.name()).getOrElse(LineageConf.DEFAULT_CATALOG)
+        val database = relation.identifier.get.namespace().mkString(".")
+        val table = relation.identifier.get.name()
+        s"$catalog.$database.$table"
+      case _ =>
+        plan.name
+    }
+  }
+
+  private def getV1TableName(qualifiedName: String): String = {
+    Seq(LineageConf.DEFAULT_CATALOG, 
qualifiedName).filter(_.nonEmpty).mkString(".")
+  }
 }
 
 case class SparkSQLLineageParseHelper(sparkSession: SparkSession) extends 
LineageParser
diff --git 
a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/spark/kyuubi/lineage/LineageConf.scala
 
b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/spark/kyuubi/lineage/LineageConf.scala
index 5b7d3dfe1..e264b1f35 100644
--- 
a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/spark/kyuubi/lineage/LineageConf.scala
+++ 
b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/spark/kyuubi/lineage/LineageConf.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.kyuubi.lineage
 
 import org.apache.spark.internal.config.ConfigBuilder
+import org.apache.spark.sql.internal.SQLConf
 
 import org.apache.kyuubi.plugin.lineage.LineageDispatcherType
 
@@ -45,4 +46,6 @@ object LineageConf {
       "Unsupported lineage dispatchers")
     .createWithDefault(Seq(LineageDispatcherType.SPARK_EVENT.toString))
 
+  val DEFAULT_CATALOG: String = SQLConf.get.getConf(SQLConf.DEFAULT_CATALOG)
+
 }
diff --git 
a/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/AtlasLineageDispatcherSuite.scala
 
b/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/AtlasLineageDispatcherSuite.scala
index 4d41b2a57..8e8d18f21 100644
--- 
a/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/AtlasLineageDispatcherSuite.scala
+++ 
b/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/AtlasLineageDispatcherSuite.scala
@@ -20,12 +20,11 @@ package org.apache.kyuubi.plugin.lineage.dispatcher.atlas
 import java.util
 
 import scala.collection.JavaConverters._
-import scala.collection.immutable.List
 
 import org.apache.atlas.model.instance.{AtlasEntity, AtlasObjectId}
 import org.apache.commons.lang3.StringUtils
 import org.apache.spark.SparkConf
-import org.apache.spark.kyuubi.lineage.LineageConf.{DISPATCHERS, 
SKIP_PARSING_PERMANENT_VIEW_ENABLED}
+import org.apache.spark.kyuubi.lineage.LineageConf.{DEFAULT_CATALOG, 
DISPATCHERS, SKIP_PARSING_PERMANENT_VIEW_ENABLED}
 import org.apache.spark.kyuubi.lineage.SparkContextHelper
 import org.apache.spark.sql.SparkListenerExtensionTest
 import org.scalatest.concurrent.PatienceConfiguration.Timeout
@@ -33,7 +32,7 @@ import org.scalatest.time.SpanSugar._
 
 import org.apache.kyuubi.KyuubiFunSuite
 import org.apache.kyuubi.plugin.lineage.Lineage
-import 
org.apache.kyuubi.plugin.lineage.dispatcher.atlas.AtlasEntityHelper.{COLUMN_LINEAGE_TYPE,
 PROCESS_TYPE}
+import 
org.apache.kyuubi.plugin.lineage.dispatcher.atlas.AtlasEntityHelper.{buildColumnQualifiedName,
 buildTableQualifiedName, COLUMN_LINEAGE_TYPE, PROCESS_TYPE}
 import 
org.apache.kyuubi.plugin.lineage.helper.SparkListenerHelper.SPARK_RUNTIME_VERSION
 
 class AtlasLineageDispatcherSuite extends KyuubiFunSuite with 
SparkListenerExtensionTest {
@@ -67,11 +66,17 @@ class AtlasLineageDispatcherSuite extends KyuubiFunSuite 
with SparkListenerExten
       spark.sql("create table test_table1(a string, d int)")
       spark.sql("insert into test_table1 select a, b + c as d from 
test_table0").collect()
       val expected = Lineage(
-        List("default.test_table0"),
-        List("default.test_table1"),
+        List(s"$DEFAULT_CATALOG.default.test_table0"),
+        List(s"$DEFAULT_CATALOG.default.test_table1"),
         List(
-          ("default.test_table1.a", Set("default.test_table0.a")),
-          ("default.test_table1.d", Set("default.test_table0.b", 
"default.test_table0.c"))))
+          (
+            s"$DEFAULT_CATALOG.default.test_table1.a",
+            Set(s"$DEFAULT_CATALOG.default.test_table0.a")),
+          (
+            s"$DEFAULT_CATALOG.default.test_table1.d",
+            Set(
+              s"$DEFAULT_CATALOG.default.test_table0.b",
+              s"$DEFAULT_CATALOG.default.test_table0.c"))))
       eventually(Timeout(5.seconds)) {
         assert(mockAtlasClient.getEntities != null && 
mockAtlasClient.getEntities.nonEmpty)
       }
@@ -100,8 +105,10 @@ class AtlasLineageDispatcherSuite extends KyuubiFunSuite 
with SparkListenerExten
       
.asInstanceOf[util.Collection[AtlasObjectId]].asScala.map(getQualifiedName)
     val outputs = entity.getRelationshipAttribute("outputs")
       
.asInstanceOf[util.Collection[AtlasObjectId]].asScala.map(getQualifiedName)
-    assertResult(expected.inputTables.map(s => s"$s@$cluster"))(inputs)
-    assertResult(expected.outputTables.map(s => s"$s@$cluster"))(outputs)
+    assertResult(expected.inputTables
+      .flatMap(buildTableQualifiedName(_).toSeq))(inputs)
+    assertResult(expected.outputTables
+      .flatMap(buildTableQualifiedName(_).toSeq))(outputs)
   }
 
   def checkAtlasColumnLineageEntities(
@@ -114,18 +121,20 @@ class AtlasLineageDispatcherSuite extends KyuubiFunSuite 
with SparkListenerExten
       case (entity, expectedLineage) =>
         assert(entity.getTypeName == COLUMN_LINEAGE_TYPE)
         val expectedQualifiedName =
-          
s"${processEntity.getAttribute("qualifiedName")}:${expectedLineage.column}"
+          s"${processEntity.getAttribute("qualifiedName")}:" +
+            s"${buildColumnQualifiedName(expectedLineage.column).get}"
         assert(entity.getAttribute("qualifiedName") == expectedQualifiedName)
         assert(entity.getAttribute("name") == expectedQualifiedName)
 
         val inputs = entity.getRelationshipAttribute("inputs")
           
.asInstanceOf[util.Collection[AtlasObjectId]].asScala.map(getQualifiedName)
-        assertResult(expectedLineage.originalColumns.map(s => 
s"$s@$cluster"))(inputs.toSet)
+        assertResult(expectedLineage.originalColumns
+          .flatMap(buildColumnQualifiedName(_).toSet))(inputs.toSet)
 
         val outputs = entity.getRelationshipAttribute("outputs")
           
.asInstanceOf[util.Collection[AtlasObjectId]].asScala.map(getQualifiedName)
         assert(outputs.size == 1)
-        assert(s"${expectedLineage.column}@$cluster" == outputs.head)
+        assert(buildColumnQualifiedName(expectedLineage.column).toSeq.head == 
outputs.head)
 
         
assert(getQualifiedName(entity.getRelationshipAttribute("process").asInstanceOf[
           AtlasObjectId]) == processEntity.getAttribute("qualifiedName"))
diff --git 
a/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/events/OperationLineageEventSuite.scala
 
b/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/events/OperationLineageEventSuite.scala
index ff0a55ff1..378eb3bb4 100644
--- 
a/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/events/OperationLineageEventSuite.scala
+++ 
b/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/events/OperationLineageEventSuite.scala
@@ -19,8 +19,6 @@ package org.apache.kyuubi.plugin.lineage.events
 
 import java.util.concurrent.{CountDownLatch, TimeUnit}
 
-import scala.collection.immutable.List
-
 import org.apache.spark.SparkConf
 import org.apache.spark.kyuubi.lineage.LineageConf._
 import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent}
@@ -82,11 +80,11 @@ class OperationLineageEventSuite extends KyuubiFunSuite 
with SparkListenerExtens
       spark.sql("create table test_table0(a string, b string)")
       spark.sql("select a as col0, b as col1 from test_table0").collect()
       val expected = Lineage(
-        List("default.test_table0"),
+        List(s"$DEFAULT_CATALOG.default.test_table0"),
         List(),
         List(
-          ("col0", Set("default.test_table0.a")),
-          ("col1", Set("default.test_table0.b"))))
+          ("col0", Set(s"$DEFAULT_CATALOG.default.test_table0.a")),
+          ("col1", Set(s"$DEFAULT_CATALOG.default.test_table0.b"))))
       countDownLatch.await(20, TimeUnit.SECONDS)
       assert(actualSparkEventLineage == expected)
       assert(actualKyuubiEventLineage == expected)
@@ -97,11 +95,11 @@ class OperationLineageEventSuite extends KyuubiFunSuite 
with SparkListenerExtens
     val countDownLatch = new CountDownLatch(1)
     var executionId: Long = -1
     val expected = Lineage(
-      List("default.table1", "default.table0"),
+      List(s"$DEFAULT_CATALOG.default.table1", 
s"$DEFAULT_CATALOG.default.table0"),
       List(),
       List(
-        ("aa", Set("default.table1.a")),
-        ("bb", Set("default.table0.b"))))
+        ("aa", Set(s"$DEFAULT_CATALOG.default.table1.a")),
+        ("bb", Set(s"$DEFAULT_CATALOG.default.table0.b"))))
 
     spark.sparkContext.addSparkListener(new SparkListener {
       override def onOtherEvent(event: SparkListenerEvent): Unit = {
@@ -163,11 +161,11 @@ class OperationLineageEventSuite extends KyuubiFunSuite 
with SparkListenerExtens
           s" where a in ('HELLO') and c = 'HELLO'").collect()
 
       val expected = Lineage(
-        List("default.t2"),
+        List(s"$DEFAULT_CATALOG.default.t2"),
         List(),
         List(
-          ("k", Set("default.t2.a")),
-          ("b", Set("default.t2.b"))))
+          ("k", Set(s"$DEFAULT_CATALOG.default.t2.a")),
+          ("b", Set(s"$DEFAULT_CATALOG.default.t2.b"))))
       countDownLatch.await(20, TimeUnit.SECONDS)
       assert(actual == expected)
     }
diff --git 
a/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParserHelperSuite.scala
 
b/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParserHelperSuite.scala
index d3cd41abc..3c19163db 100644
--- 
a/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParserHelperSuite.scala
+++ 
b/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParserHelperSuite.scala
@@ -38,6 +38,7 @@ class SparkSQLLineageParserHelperSuite extends KyuubiFunSuite
     if (SPARK_RUNTIME_VERSION <= "3.1") 
"org.apache.spark.sql.connector.InMemoryTableCatalog"
     else "org.apache.spark.sql.connector.catalog.InMemoryTableCatalog"
 
+  val DEFAULT_CATALOG = LineageConf.DEFAULT_CATALOG
   override protected val catalogImpl: String = "hive"
 
   override def sparkConf(): SparkConf = {
@@ -76,20 +77,26 @@ class SparkSQLLineageParserHelperSuite extends 
KyuubiFunSuite
       val ret0 =
         extractLineage("alter view alterviewascommand as select key from 
test_db0.test_table0")
       assert(ret0 == Lineage(
-        List("test_db0.test_table0"),
-        List("default.alterviewascommand"),
-        List(("default.alterviewascommand.key", 
Set("test_db0.test_table0.key")))))
+        List(s"$DEFAULT_CATALOG.test_db0.test_table0"),
+        List(s"$DEFAULT_CATALOG.default.alterviewascommand"),
+        List((
+          s"$DEFAULT_CATALOG.default.alterviewascommand.key",
+          Set(s"$DEFAULT_CATALOG.test_db0.test_table0.key")))))
 
       spark.sql("create view alterviewascommand1 as select * from 
test_db0.test_table0")
       val ret1 =
         extractLineage("alter view alterviewascommand1 as select * from 
test_db0.test_table0")
 
       assert(ret1 == Lineage(
-        List("test_db0.test_table0"),
-        List("default.alterviewascommand1"),
+        List(s"$DEFAULT_CATALOG.test_db0.test_table0"),
+        List(s"$DEFAULT_CATALOG.default.alterviewascommand1"),
         List(
-          ("default.alterviewascommand1.key", Set("test_db0.test_table0.key")),
-          ("default.alterviewascommand1.value", 
Set("test_db0.test_table0.value")))))
+          (
+            s"$DEFAULT_CATALOG.default.alterviewascommand1.key",
+            Set(s"$DEFAULT_CATALOG.test_db0.test_table0.key")),
+          (
+            s"$DEFAULT_CATALOG.default.alterviewascommand1.value",
+            Set(s"$DEFAULT_CATALOG.test_db0.test_table0.value")))))
     }
   }
 
@@ -106,11 +113,11 @@ class SparkSQLLineageParserHelperSuite extends 
KyuubiFunSuite
           " select col1 as a, col2 as b, col3 as c from v2_catalog.db.tbb")
       assert(result == Lineage(
         List("v2_catalog.db.tbb"),
-        List("default.test_view"),
+        List(s"$DEFAULT_CATALOG.default.test_view"),
         List(
-          ("default.test_view.a", Set("v2_catalog.db.tbb.col1")),
-          ("default.test_view.b", Set("v2_catalog.db.tbb.col2")),
-          ("default.test_view.c", Set("v2_catalog.db.tbb.col3")))))
+          (s"$DEFAULT_CATALOG.default.test_view.a", 
Set("v2_catalog.db.tbb.col1")),
+          (s"$DEFAULT_CATALOG.default.test_view.b", 
Set("v2_catalog.db.tbb.col2")),
+          (s"$DEFAULT_CATALOG.default.test_view.c", 
Set("v2_catalog.db.tbb.col3")))))
     }
   }
 
@@ -126,32 +133,32 @@ class SparkSQLLineageParserHelperSuite extends 
KyuubiFunSuite
           s"insert into table v2_catalog.db.tb0 " +
             s"select key as col1, value as col2 from test_db0.test_table0")
       assert(ret0 == Lineage(
-        List("test_db0.test_table0"),
+        List(s"$DEFAULT_CATALOG.test_db0.test_table0"),
         List("v2_catalog.db.tb0"),
         List(
-          ("v2_catalog.db.tb0.col1", Set("test_db0.test_table0.key")),
-          ("v2_catalog.db.tb0.col2", Set("test_db0.test_table0.value")))))
+          ("v2_catalog.db.tb0.col1", 
Set(s"$DEFAULT_CATALOG.test_db0.test_table0.key")),
+          ("v2_catalog.db.tb0.col2", 
Set(s"$DEFAULT_CATALOG.test_db0.test_table0.value")))))
 
       val ret1 =
         extractLineage(
           s"insert overwrite table v2_catalog.db.tb0 partition(col2) " +
             s"select key as col1, value as col2 from test_db0.test_table0")
       assert(ret1 == Lineage(
-        List("test_db0.test_table0"),
+        List(s"$DEFAULT_CATALOG.test_db0.test_table0"),
         List("v2_catalog.db.tb0"),
         List(
-          ("v2_catalog.db.tb0.col1", Set("test_db0.test_table0.key")),
-          ("v2_catalog.db.tb0.col2", Set("test_db0.test_table0.value")))))
+          ("v2_catalog.db.tb0.col1", 
Set(s"$DEFAULT_CATALOG.test_db0.test_table0.key")),
+          ("v2_catalog.db.tb0.col2", 
Set(s"$DEFAULT_CATALOG.test_db0.test_table0.value")))))
 
       val ret2 =
         extractLineage(
           s"insert overwrite table v2_catalog.db.tb0 partition(col2 = 'bb') " +
             s"select key as col1 from test_db0.test_table0")
       assert(ret2 == Lineage(
-        List("test_db0.test_table0"),
+        List(s"$DEFAULT_CATALOG.test_db0.test_table0"),
         List("v2_catalog.db.tb0"),
         List(
-          ("v2_catalog.db.tb0.col1", Set("test_db0.test_table0.key")),
+          ("v2_catalog.db.tb0.col1", 
Set(s"$DEFAULT_CATALOG.test_db0.test_table0.key")),
           ("v2_catalog.db.tb0.col2", Set()))))
     }
   }
@@ -220,29 +227,41 @@ class SparkSQLLineageParserHelperSuite extends 
KyuubiFunSuite
       val ret0 = extractLineage(
         "create view createviewcommand(a, b) as select key, value from 
test_db0.test_table0")
       assert(ret0 == Lineage(
-        List("test_db0.test_table0"),
-        List("default.createviewcommand"),
+        List(s"$DEFAULT_CATALOG.test_db0.test_table0"),
+        List(s"$DEFAULT_CATALOG.default.createviewcommand"),
         List(
-          ("default.createviewcommand.a", Set("test_db0.test_table0.key")),
-          ("default.createviewcommand.b", Set("test_db0.test_table0.value")))))
+          (
+            s"$DEFAULT_CATALOG.default.createviewcommand.a",
+            Set(s"$DEFAULT_CATALOG.test_db0.test_table0.key")),
+          (
+            s"$DEFAULT_CATALOG.default.createviewcommand.b",
+            Set(s"$DEFAULT_CATALOG.test_db0.test_table0.value")))))
 
       val ret1 = extractLineage(
         "create view createviewcommand1 as select key, value from 
test_db0.test_table0")
       assert(ret1 == Lineage(
-        List("test_db0.test_table0"),
-        List("default.createviewcommand1"),
+        List(s"$DEFAULT_CATALOG.test_db0.test_table0"),
+        List(s"$DEFAULT_CATALOG.default.createviewcommand1"),
         List(
-          ("default.createviewcommand1.key", Set("test_db0.test_table0.key")),
-          ("default.createviewcommand1.value", 
Set("test_db0.test_table0.value")))))
+          (
+            s"$DEFAULT_CATALOG.default.createviewcommand1.key",
+            Set(s"$DEFAULT_CATALOG.test_db0.test_table0.key")),
+          (
+            s"$DEFAULT_CATALOG.default.createviewcommand1.value",
+            Set(s"$DEFAULT_CATALOG.test_db0.test_table0.value")))))
 
       val ret2 = extractLineage(
         "create view createviewcommand2 as select * from test_db0.test_table0")
       assert(ret2 == Lineage(
-        List("test_db0.test_table0"),
-        List("default.createviewcommand2"),
+        List(s"$DEFAULT_CATALOG.test_db0.test_table0"),
+        List(s"$DEFAULT_CATALOG.default.createviewcommand2"),
         List(
-          ("default.createviewcommand2.key", Set("test_db0.test_table0.key")),
-          ("default.createviewcommand2.value", 
Set("test_db0.test_table0.value")))))
+          (
+            s"$DEFAULT_CATALOG.default.createviewcommand2.key",
+            Set(s"$DEFAULT_CATALOG.test_db0.test_table0.key")),
+          (
+            s"$DEFAULT_CATALOG.default.createviewcommand2.value",
+            Set(s"$DEFAULT_CATALOG.test_db0.test_table0.value")))))
     }
   }
 
@@ -253,25 +272,29 @@ class SparkSQLLineageParserHelperSuite extends 
KyuubiFunSuite
           extractLineage("create table createdatasourcetableasselectcommand 
using parquet" +
             " AS SELECT key, value FROM test_db0.test_table0")
         assert(ret0 == Lineage(
-          List("test_db0.test_table0"),
-          List("default.createdatasourcetableasselectcommand"),
+          List(s"$DEFAULT_CATALOG.test_db0.test_table0"),
+          
List(s"$DEFAULT_CATALOG.default.createdatasourcetableasselectcommand"),
           List(
-            ("default.createdatasourcetableasselectcommand.key", 
Set("test_db0.test_table0.key")),
             (
-              "default.createdatasourcetableasselectcommand.value",
-              Set("test_db0.test_table0.value")))))
+              
s"$DEFAULT_CATALOG.default.createdatasourcetableasselectcommand.key",
+              Set(s"$DEFAULT_CATALOG.test_db0.test_table0.key")),
+            (
+              
s"$DEFAULT_CATALOG.default.createdatasourcetableasselectcommand.value",
+              Set(s"$DEFAULT_CATALOG.test_db0.test_table0.value")))))
 
         val ret1 =
           extractLineage("create table createdatasourcetableasselectcommand1 
using parquet" +
             " AS SELECT * FROM test_db0.test_table0")
         assert(ret1 == Lineage(
-          List("test_db0.test_table0"),
-          List("default.createdatasourcetableasselectcommand1"),
+          List(s"$DEFAULT_CATALOG.test_db0.test_table0"),
+          
List(s"$DEFAULT_CATALOG.default.createdatasourcetableasselectcommand1"),
           List(
-            ("default.createdatasourcetableasselectcommand1.key", 
Set("test_db0.test_table0.key")),
             (
-              "default.createdatasourcetableasselectcommand1.value",
-              Set("test_db0.test_table0.value")))))
+              
s"$DEFAULT_CATALOG.default.createdatasourcetableasselectcommand1.key",
+              Set(s"$DEFAULT_CATALOG.test_db0.test_table0.key")),
+            (
+              
s"$DEFAULT_CATALOG.default.createdatasourcetableasselectcommand1.value",
+              Set(s"$DEFAULT_CATALOG.test_db0.test_table0.value")))))
     }
   }
 
@@ -280,20 +303,28 @@ class SparkSQLLineageParserHelperSuite extends 
KyuubiFunSuite
       val ret0 = extractLineage("create table createhivetableasselectcommand 
using hive" +
         " as select key, value from test_db0.test_table0")
       assert(ret0 == Lineage(
-        List("test_db0.test_table0"),
-        List("default.createhivetableasselectcommand"),
+        List(s"$DEFAULT_CATALOG.test_db0.test_table0"),
+        List(s"$DEFAULT_CATALOG.default.createhivetableasselectcommand"),
         List(
-          ("default.createhivetableasselectcommand.key", 
Set("test_db0.test_table0.key")),
-          ("default.createhivetableasselectcommand.value", 
Set("test_db0.test_table0.value")))))
+          (
+            s"$DEFAULT_CATALOG.default.createhivetableasselectcommand.key",
+            Set(s"$DEFAULT_CATALOG.test_db0.test_table0.key")),
+          (
+            s"$DEFAULT_CATALOG.default.createhivetableasselectcommand.value",
+            Set(s"$DEFAULT_CATALOG.test_db0.test_table0.value")))))
 
       val ret1 = extractLineage("create table createhivetableasselectcommand1 
using hive" +
         " as select * from test_db0.test_table0")
       assert(ret1 == Lineage(
-        List("test_db0.test_table0"),
-        List("default.createhivetableasselectcommand1"),
+        List(s"$DEFAULT_CATALOG.test_db0.test_table0"),
+        List(s"$DEFAULT_CATALOG.default.createhivetableasselectcommand1"),
         List(
-          ("default.createhivetableasselectcommand1.key", 
Set("test_db0.test_table0.key")),
-          ("default.createhivetableasselectcommand1.value", 
Set("test_db0.test_table0.value")))))
+          (
+            s"$DEFAULT_CATALOG.default.createhivetableasselectcommand1.key",
+            Set(s"$DEFAULT_CATALOG.test_db0.test_table0.key")),
+          (
+            s"$DEFAULT_CATALOG.default.createhivetableasselectcommand1.value",
+            Set(s"$DEFAULT_CATALOG.test_db0.test_table0.value")))))
     }
   }
 
@@ -304,13 +335,15 @@ class SparkSQLLineageParserHelperSuite extends 
KyuubiFunSuite
           "create table optimizedcreatehivetableasselectcommand stored as 
parquet " +
             "as select * from test_db0.test_table0")
       assert(ret == Lineage(
-        List("test_db0.test_table0"),
-        List("default.optimizedcreatehivetableasselectcommand"),
+        List(s"$DEFAULT_CATALOG.test_db0.test_table0"),
+        
List(s"$DEFAULT_CATALOG.default.optimizedcreatehivetableasselectcommand"),
         List(
-          ("default.optimizedcreatehivetableasselectcommand.key", 
Set("test_db0.test_table0.key")),
           (
-            "default.optimizedcreatehivetableasselectcommand.value",
-            Set("test_db0.test_table0.value")))))
+            
s"$DEFAULT_CATALOG.default.optimizedcreatehivetableasselectcommand.key",
+            Set(s"$DEFAULT_CATALOG.test_db0.test_table0.key")),
+          (
+            
s"$DEFAULT_CATALOG.default.optimizedcreatehivetableasselectcommand.value",
+            Set(s"$DEFAULT_CATALOG.test_db0.test_table0.value")))))
     }
   }
 
@@ -321,24 +354,28 @@ class SparkSQLLineageParserHelperSuite extends 
KyuubiFunSuite
       val ret0 = extractLineage("create table 
v2_catalog.db.createhivetableasselectcommand" +
         " as select key, value from test_db0.test_table0")
       assert(ret0 == Lineage(
-        List("test_db0.test_table0"),
+        List(s"$DEFAULT_CATALOG.test_db0.test_table0"),
         List("v2_catalog.db.createhivetableasselectcommand"),
         List(
-          ("v2_catalog.db.createhivetableasselectcommand.key", 
Set("test_db0.test_table0.key")),
+          (
+            "v2_catalog.db.createhivetableasselectcommand.key",
+            Set(s"$DEFAULT_CATALOG.test_db0.test_table0.key")),
           (
             "v2_catalog.db.createhivetableasselectcommand.value",
-            Set("test_db0.test_table0.value")))))
+            Set(s"$DEFAULT_CATALOG.test_db0.test_table0.value")))))
 
       val ret1 = extractLineage("create table 
v2_catalog.db.createhivetableasselectcommand1" +
         " as select * from test_db0.test_table0")
       assert(ret1 == Lineage(
-        List("test_db0.test_table0"),
+        List(s"$DEFAULT_CATALOG.test_db0.test_table0"),
         List("v2_catalog.db.createhivetableasselectcommand1"),
         List(
-          ("v2_catalog.db.createhivetableasselectcommand1.key", 
Set("test_db0.test_table0.key")),
+          (
+            "v2_catalog.db.createhivetableasselectcommand1.key",
+            Set(s"$DEFAULT_CATALOG.test_db0.test_table0.key")),
           (
             "v2_catalog.db.createhivetableasselectcommand1.value",
-            Set("test_db0.test_table0.value")))))
+            Set(s"$DEFAULT_CATALOG.test_db0.test_table0.value")))))
     }
   }
 
@@ -366,21 +403,29 @@ class SparkSQLLineageParserHelperSuite extends 
KyuubiFunSuite
         extractLineage(
           s"insert into table  $tableName select key, value from 
test_db0.test_table0")
       assert(ret0 == Lineage(
-        List("test_db0.test_table0"),
-        List("default.insertintodatasourcecommand"),
+        List(s"$DEFAULT_CATALOG.test_db0.test_table0"),
+        List(s"$DEFAULT_CATALOG.default.insertintodatasourcecommand"),
         List(
-          ("default.insertintodatasourcecommand.a", 
Set("test_db0.test_table0.key")),
-          ("default.insertintodatasourcecommand.b", 
Set("test_db0.test_table0.value")))))
+          (
+            s"$DEFAULT_CATALOG.default.insertintodatasourcecommand.a",
+            Set(s"$DEFAULT_CATALOG.test_db0.test_table0.key")),
+          (
+            s"$DEFAULT_CATALOG.default.insertintodatasourcecommand.b",
+            Set(s"$DEFAULT_CATALOG.test_db0.test_table0.value")))))
 
       val ret1 =
         extractLineage(
           s"insert into table  $tableName select * from test_db0.test_table0")
       assert(ret1 == Lineage(
-        List("test_db0.test_table0"),
-        List("default.insertintodatasourcecommand"),
+        List(s"$DEFAULT_CATALOG.test_db0.test_table0"),
+        List(s"$DEFAULT_CATALOG.default.insertintodatasourcecommand"),
         List(
-          ("default.insertintodatasourcecommand.a", 
Set("test_db0.test_table0.key")),
-          ("default.insertintodatasourcecommand.b", 
Set("test_db0.test_table0.value")))))
+          (
+            s"$DEFAULT_CATALOG.default.insertintodatasourcecommand.a",
+            Set(s"$DEFAULT_CATALOG.test_db0.test_table0.key")),
+          (
+            s"$DEFAULT_CATALOG.default.insertintodatasourcecommand.b",
+            Set(s"$DEFAULT_CATALOG.test_db0.test_table0.value")))))
 
       val ret2 =
         extractLineage(
@@ -388,11 +433,15 @@ class SparkSQLLineageParserHelperSuite extends 
KyuubiFunSuite
             s"select (select key from test_db0.test_table1 limit 1) + 1 as aa, 
" +
             s"value as bb from test_db0.test_table0")
       assert(ret2 == Lineage(
-        List("test_db0.test_table1", "test_db0.test_table0"),
-        List("default.insertintodatasourcecommand"),
+        List(s"$DEFAULT_CATALOG.test_db0.test_table1", 
s"$DEFAULT_CATALOG.test_db0.test_table0"),
+        List(s"$DEFAULT_CATALOG.default.insertintodatasourcecommand"),
         List(
-          ("default.insertintodatasourcecommand.a", 
Set("test_db0.test_table1.key")),
-          ("default.insertintodatasourcecommand.b", 
Set("test_db0.test_table0.value")))))
+          (
+            s"$DEFAULT_CATALOG.default.insertintodatasourcecommand.a",
+            Set(s"$DEFAULT_CATALOG.test_db0.test_table1.key")),
+          (
+            s"$DEFAULT_CATALOG.default.insertintodatasourcecommand.b",
+            Set(s"$DEFAULT_CATALOG.test_db0.test_table0.value")))))
 
     }
   }
@@ -406,11 +455,15 @@ class SparkSQLLineageParserHelperSuite extends 
KyuubiFunSuite
           s"insert into table $tableName select key, value from 
test_db0.test_table0")
 
       assert(ret0 == Lineage(
-        List("test_db0.test_table0"),
-        List("default.insertintohadoopfsrelationcommand"),
+        List(s"$DEFAULT_CATALOG.test_db0.test_table0"),
+        List(s"$DEFAULT_CATALOG.default.insertintohadoopfsrelationcommand"),
         List(
-          ("default.insertintohadoopfsrelationcommand.a", 
Set("test_db0.test_table0.key")),
-          ("default.insertintohadoopfsrelationcommand.b", 
Set("test_db0.test_table0.value")))))
+          (
+            s"$DEFAULT_CATALOG.default.insertintohadoopfsrelationcommand.a",
+            Set(s"$DEFAULT_CATALOG.test_db0.test_table0.key")),
+          (
+            s"$DEFAULT_CATALOG.default.insertintohadoopfsrelationcommand.b",
+            Set(s"$DEFAULT_CATALOG.test_db0.test_table0.value")))))
     }
 
   }
@@ -423,12 +476,12 @@ class SparkSQLLineageParserHelperSuite extends 
KyuubiFunSuite
                                  |USING parquet
                                  |SELECT * FROM 
test_db0.test_table_part0""".stripMargin)
     assert(ret0 == Lineage(
-      List("test_db0.test_table_part0"),
+      List(s"$DEFAULT_CATALOG.test_db0.test_table_part0"),
       List(s"""`$directory.path`"""),
       List(
-        (s"""`$directory.path`.key""", Set("test_db0.test_table_part0.key")),
-        (s"""`$directory.path`.value""", 
Set("test_db0.test_table_part0.value")),
-        (s"""`$directory.path`.pid""", Set("test_db0.test_table_part0.pid")))))
+        (s"""`$directory.path`.key""", 
Set(s"$DEFAULT_CATALOG.test_db0.test_table_part0.key")),
+        (s"""`$directory.path`.value""", 
Set(s"$DEFAULT_CATALOG.test_db0.test_table_part0.value")),
+        (s"""`$directory.path`.pid""", 
Set(s"$DEFAULT_CATALOG.test_db0.test_table_part0.pid")))))
   }
 
   test("columns lineage extract - InsertIntoHiveDirCommand") {
@@ -439,12 +492,12 @@ class SparkSQLLineageParserHelperSuite extends 
KyuubiFunSuite
                                  |USING parquet
                                  |SELECT * FROM 
test_db0.test_table_part0""".stripMargin)
     assert(ret0 == Lineage(
-      List("test_db0.test_table_part0"),
+      List(s"$DEFAULT_CATALOG.test_db0.test_table_part0"),
       List(s"""`$directory.path`"""),
       List(
-        (s"""`$directory.path`.key""", Set("test_db0.test_table_part0.key")),
-        (s"""`$directory.path`.value""", 
Set("test_db0.test_table_part0.value")),
-        (s"""`$directory.path`.pid""", Set("test_db0.test_table_part0.pid")))))
+        (s"""`$directory.path`.key""", 
Set(s"$DEFAULT_CATALOG.test_db0.test_table_part0.key")),
+        (s"""`$directory.path`.value""", 
Set(s"$DEFAULT_CATALOG.test_db0.test_table_part0.value")),
+        (s"""`$directory.path`.pid""", 
Set(s"$DEFAULT_CATALOG.test_db0.test_table_part0.pid")))))
   }
 
   test("columns lineage extract - InsertIntoHiveTable") {
@@ -456,11 +509,15 @@ class SparkSQLLineageParserHelperSuite extends 
KyuubiFunSuite
           s"insert into table $tableName select * from test_db0.test_table0")
 
       assert(ret0 == Lineage(
-        List("test_db0.test_table0"),
-        List(s"default.$tableName"),
+        List(s"$DEFAULT_CATALOG.test_db0.test_table0"),
+        List(s"$DEFAULT_CATALOG.default.$tableName"),
         List(
-          (s"default.$tableName.a", Set("test_db0.test_table0.key")),
-          (s"default.$tableName.b", Set("test_db0.test_table0.value")))))
+          (
+            s"$DEFAULT_CATALOG.default.$tableName.a",
+            Set(s"$DEFAULT_CATALOG.test_db0.test_table0.key")),
+          (
+            s"$DEFAULT_CATALOG.default.$tableName.b",
+            Set(s"$DEFAULT_CATALOG.test_db0.test_table0.value")))))
     }
 
   }
@@ -468,20 +525,20 @@ class SparkSQLLineageParserHelperSuite extends 
KyuubiFunSuite
   test("columns lineage extract - logical relation sql") {
     val ret0 = extractLineage("select key, value from test_db0.test_table0")
     assert(ret0 == Lineage(
-      List("test_db0.test_table0"),
+      List(s"$DEFAULT_CATALOG.test_db0.test_table0"),
       List(),
       List(
-        ("key", Set("test_db0.test_table0.key")),
-        ("value", Set("test_db0.test_table0.value")))))
+        ("key", Set(s"$DEFAULT_CATALOG.test_db0.test_table0.key")),
+        ("value", Set(s"$DEFAULT_CATALOG.test_db0.test_table0.value")))))
 
     val ret1 = extractLineage("select * from test_db0.test_table_part0")
     assert(ret1 == Lineage(
-      List("test_db0.test_table_part0"),
+      List(s"$DEFAULT_CATALOG.test_db0.test_table_part0"),
       List(),
       List(
-        ("key", Set("test_db0.test_table_part0.key")),
-        ("value", Set("test_db0.test_table_part0.value")),
-        ("pid", Set("test_db0.test_table_part0.pid")))))
+        ("key", Set(s"$DEFAULT_CATALOG.test_db0.test_table_part0.key")),
+        ("value", Set(s"$DEFAULT_CATALOG.test_db0.test_table_part0.value")),
+        ("pid", Set(s"$DEFAULT_CATALOG.test_db0.test_table_part0.pid")))))
 
   }
 
@@ -578,22 +635,22 @@ class SparkSQLLineageParserHelperSuite extends 
KyuubiFunSuite
           |select tmp0_0 as a0, tmp1_0 as a1 from tmp0 join tmp1 where tmp1_0 
= tmp0_0
           |""".stripMargin
       val sql0ExpectResult = Lineage(
-        List("default.tmp0", "default.tmp1"),
+        List(s"$DEFAULT_CATALOG.default.tmp0", 
s"$DEFAULT_CATALOG.default.tmp1"),
         List(),
         List(
-          "a0" -> Set("default.tmp0.tmp0_0"),
-          "a1" -> Set("default.tmp1.tmp1_0")))
+          "a0" -> Set(s"$DEFAULT_CATALOG.default.tmp0.tmp0_0"),
+          "a1" -> Set(s"$DEFAULT_CATALOG.default.tmp1.tmp1_0")))
 
       val sql1 =
         """
           |select count(tmp1_0) as cnt, tmp1_1 from tmp1 group by tmp1_1
           |""".stripMargin
       val sql1ExpectResult = Lineage(
-        List("default.tmp1"),
+        List(s"$DEFAULT_CATALOG.default.tmp1"),
         List(),
         List(
-          "cnt" -> Set("default.tmp1.tmp1_0"),
-          "tmp1_1" -> Set("default.tmp1.tmp1_1")))
+          "cnt" -> Set(s"$DEFAULT_CATALOG.default.tmp1.tmp1_0"),
+          "tmp1_1" -> Set(s"$DEFAULT_CATALOG.default.tmp1.tmp1_1")))
 
       val ret0 = extractLineage(sql0)
       assert(ret0 == sql0ExpectResult)
@@ -660,14 +717,14 @@ class SparkSQLLineageParserHelperSuite extends 
KyuubiFunSuite
       val ret0 = extractLineage(sql0)
       assert(ret0 == Lineage(
         List(
-          "test_db.goods_detail0",
+          s"$DEFAULT_CATALOG.test_db.goods_detail0",
           "v2_catalog.test_db_v2.goods_detail1",
           "v2_catalog.test_db_v2.mall_icon_schedule",
           "v2_catalog.test_db_v2.mall_icon"),
         List(),
         List(
-          ("goods_id", Set("test_db.goods_detail0.goods_id")),
-          ("cate_grory", Set("test_db.goods_detail0.cat_id")),
+          ("goods_id", 
Set(s"$DEFAULT_CATALOG.test_db.goods_detail0.goods_id")),
+          ("cate_grory", 
Set(s"$DEFAULT_CATALOG.test_db.goods_detail0.cat_id")),
           ("cat_id", Set("v2_catalog.test_db_v2.goods_detail1.cat_id")),
           ("product_id", 
Set("v2_catalog.test_db_v2.goods_detail1.product_id")),
           ("start_time", 
Set("v2_catalog.test_db_v2.mall_icon_schedule.start_time")),
@@ -728,12 +785,24 @@ class SparkSQLLineageParserHelperSuite extends 
KyuubiFunSuite
           |""".stripMargin
       val ret0 = extractLineage(sql0)
       assert(ret0 == Lineage(
-        List("test_db.test_table0", "test_db.test_table1"),
+        List(s"$DEFAULT_CATALOG.test_db.test_table0", 
s"$DEFAULT_CATALOG.test_db.test_table1"),
         List(),
         List(
-          ("a", Set("test_db.test_table0.a", "test_db.test_table1.a")),
-          ("b", Set("test_db.test_table0.b", "test_db.test_table1.b")),
-          ("c", Set("test_db.test_table0.b", "test_db.test_table1.c")))))
+          (
+            "a",
+            Set(
+              s"$DEFAULT_CATALOG.test_db.test_table0.a",
+              s"$DEFAULT_CATALOG.test_db.test_table1.a")),
+          (
+            "b",
+            Set(
+              s"$DEFAULT_CATALOG.test_db.test_table0.b",
+              s"$DEFAULT_CATALOG.test_db.test_table1.b")),
+          (
+            "c",
+            Set(
+              s"$DEFAULT_CATALOG.test_db.test_table0.b",
+              s"$DEFAULT_CATALOG.test_db.test_table1.c")))))
     }
   }
 
@@ -769,18 +838,20 @@ class SparkSQLLineageParserHelperSuite extends 
KyuubiFunSuite
           |""".stripMargin
       val ret0 = extractLineage(sql0)
       assert(ret0 == Lineage(
-        List("test_db.test_order_item"),
+        List(s"$DEFAULT_CATALOG.test_db.test_order_item"),
         List(),
         List(
-          ("stat_date", Set("test_db.test_order_item.stat_date")),
-          ("channel_id", Set("test_db.test_order_item.channel_id")),
-          ("sub_channel_id", Set("test_db.test_order_item.sub_channel_id")),
-          ("user_type", Set("test_db.test_order_item.user_type")),
-          ("country_name", Set("test_db.test_order_item.country_name")),
-          ("get_count0", Set("test_db.test_order_item.order_id")),
+          ("stat_date", 
Set(s"$DEFAULT_CATALOG.test_db.test_order_item.stat_date")),
+          ("channel_id", 
Set(s"$DEFAULT_CATALOG.test_db.test_order_item.channel_id")),
+          ("sub_channel_id", 
Set(s"$DEFAULT_CATALOG.test_db.test_order_item.sub_channel_id")),
+          ("user_type", 
Set(s"$DEFAULT_CATALOG.test_db.test_order_item.user_type")),
+          ("country_name", 
Set(s"$DEFAULT_CATALOG.test_db.test_order_item.country_name")),
+          ("get_count0", 
Set(s"$DEFAULT_CATALOG.test_db.test_order_item.order_id")),
           (
             "get_amount0",
-            Set("test_db.test_order_item.goods_count", 
"test_db.test_order_item.shop_price")),
+            Set(
+              s"$DEFAULT_CATALOG.test_db.test_order_item.goods_count",
+              s"$DEFAULT_CATALOG.test_db.test_order_item.shop_price")),
           ("add_time", Set[String]()))))
       val sql1 =
         """
@@ -807,24 +878,32 @@ class SparkSQLLineageParserHelperSuite extends 
KyuubiFunSuite
           |""".stripMargin
       val ret1 = extractLineage(sql1)
       assert(ret1 == Lineage(
-        List("test_db.test_order_item", "test_db.test_p0_order_item"),
+        List(
+          s"$DEFAULT_CATALOG.test_db.test_order_item",
+          s"$DEFAULT_CATALOG.test_db.test_p0_order_item"),
         List(),
         List(
           (
             "channel_id",
-            Set("test_db.test_order_item.channel_id", 
"test_db.test_p0_order_item.channel_id")),
+            Set(
+              s"$DEFAULT_CATALOG.test_db.test_order_item.channel_id",
+              s"$DEFAULT_CATALOG.test_db.test_p0_order_item.channel_id")),
           (
             "sub_channel_id",
             Set(
-              "test_db.test_order_item.sub_channel_id",
-              "test_db.test_p0_order_item.sub_channel_id")),
+              s"$DEFAULT_CATALOG.test_db.test_order_item.sub_channel_id",
+              s"$DEFAULT_CATALOG.test_db.test_p0_order_item.sub_channel_id")),
           (
             "country_name",
-            Set("test_db.test_order_item.country_name", 
"test_db.test_p0_order_item.country_name")),
-          ("get_count0", Set("test_db.test_order_item.order_id")),
+            Set(
+              s"$DEFAULT_CATALOG.test_db.test_order_item.country_name",
+              s"$DEFAULT_CATALOG.test_db.test_p0_order_item.country_name")),
+          ("get_count0", 
Set(s"$DEFAULT_CATALOG.test_db.test_order_item.order_id")),
           (
             "get_amount0",
-            Set("test_db.test_order_item.goods_count", 
"test_db.test_order_item.shop_price")),
+            Set(
+              s"$DEFAULT_CATALOG.test_db.test_order_item.goods_count",
+              s"$DEFAULT_CATALOG.test_db.test_order_item.shop_price")),
           ("add_time", Set[String]()))))
     }
   }
@@ -833,56 +912,56 @@ class SparkSQLLineageParserHelperSuite extends 
KyuubiFunSuite
     val sql0 = """select key as a, count(*) as b, 1 as c from 
test_db0.test_table0 group by key"""
     val ret0 = extractLineage(sql0)
     assert(ret0 == Lineage(
-      List("test_db0.test_table0"),
+      List(s"$DEFAULT_CATALOG.test_db0.test_table0"),
       List(),
       List(
-        ("a", Set("test_db0.test_table0.key")),
-        ("b", Set("test_db0.test_table0.__count__")),
+        ("a", Set(s"$DEFAULT_CATALOG.test_db0.test_table0.key")),
+        ("b", Set(s"$DEFAULT_CATALOG.test_db0.test_table0.__count__")),
         ("c", Set()))))
 
     val sql1 = """select count(*) as a, 1 as b from test_db0.test_table0"""
     val ret1 = extractLineage(sql1)
     assert(ret1 == Lineage(
-      List("test_db0.test_table0"),
+      List(s"$DEFAULT_CATALOG.test_db0.test_table0"),
       List(),
       List(
-        ("a", Set("test_db0.test_table0.__count__")),
+        ("a", Set(s"$DEFAULT_CATALOG.test_db0.test_table0.__count__")),
         ("b", Set()))))
 
     val sql2 = """select every(key == 1) as a, 1 as b from 
test_db0.test_table0"""
     val ret2 = extractLineage(sql2)
     assert(ret2 == Lineage(
-      List("test_db0.test_table0"),
+      List(s"$DEFAULT_CATALOG.test_db0.test_table0"),
       List(),
       List(
-        ("a", Set("test_db0.test_table0.key")),
+        ("a", Set(s"$DEFAULT_CATALOG.test_db0.test_table0.key")),
         ("b", Set()))))
 
     val sql3 = """select count(*) as a, 1 as b from test_db0.test_table0"""
     val ret3 = extractLineage(sql3)
     assert(ret3 == Lineage(
-      List("test_db0.test_table0"),
+      List(s"$DEFAULT_CATALOG.test_db0.test_table0"),
       List(),
       List(
-        ("a", Set("test_db0.test_table0.__count__")),
+        ("a", Set(s"$DEFAULT_CATALOG.test_db0.test_table0.__count__")),
         ("b", Set()))))
 
     val sql4 = """select first(key) as a, 1 as b from test_db0.test_table0"""
     val ret4 = extractLineage(sql4)
     assert(ret4 == Lineage(
-      List("test_db0.test_table0"),
+      List(s"$DEFAULT_CATALOG.test_db0.test_table0"),
       List(),
       List(
-        ("a", Set("test_db0.test_table0.key")),
+        ("a", Set(s"$DEFAULT_CATALOG.test_db0.test_table0.key")),
         ("b", Set()))))
 
     val sql5 = """select avg(key) as a, 1 as b from test_db0.test_table0"""
     val ret5 = extractLineage(sql5)
     assert(ret5 == Lineage(
-      List("test_db0.test_table0"),
+      List(s"$DEFAULT_CATALOG.test_db0.test_table0"),
       List(),
       List(
-        ("a", Set("test_db0.test_table0.key")),
+        ("a", Set(s"$DEFAULT_CATALOG.test_db0.test_table0.key")),
         ("b", Set()))))
 
     val sql6 =
@@ -890,19 +969,27 @@ class SparkSQLLineageParserHelperSuite extends 
KyuubiFunSuite
         | 1 as b from test_db0.test_table0""".stripMargin
     val ret6 = extractLineage(sql6)
     assert(ret6 == Lineage(
-      List("test_db0.test_table0"),
+      List(s"$DEFAULT_CATALOG.test_db0.test_table0"),
       List(),
       List(
-        ("a", Set("test_db0.test_table0.value", "test_db0.test_table0.key")),
+        (
+          "a",
+          Set(
+            s"$DEFAULT_CATALOG.test_db0.test_table0.value",
+            s"$DEFAULT_CATALOG.test_db0.test_table0.key")),
         ("b", Set()))))
 
     val sql7 = """select count(*) + sum(key) as a, 1 as b from 
test_db0.test_table0"""
     val ret7 = extractLineage(sql7)
     assert(ret7 == Lineage(
-      List("test_db0.test_table0"),
+      List(s"$DEFAULT_CATALOG.test_db0.test_table0"),
       List(),
       List(
-        ("a", Set("test_db0.test_table0.__count__", 
"test_db0.test_table0.key")),
+        (
+          "a",
+          Set(
+            s"$DEFAULT_CATALOG.test_db0.test_table0.__count__",
+            s"$DEFAULT_CATALOG.test_db0.test_table0.key")),
         ("b", Set()))))
 
   }
@@ -922,11 +1009,11 @@ class SparkSQLLineageParserHelperSuite extends 
KyuubiFunSuite
           |""".stripMargin
       val ret0 = extractLineage(sql0)
       assert(ret0 == Lineage(
-        List("default.table1", "default.table0"),
+        List(s"$DEFAULT_CATALOG.default.table1", 
s"$DEFAULT_CATALOG.default.table0"),
         List(),
         List(
-          ("aa", Set("default.table1.a")),
-          ("bb", Set("default.table0.b")))))
+          ("aa", Set(s"$DEFAULT_CATALOG.default.table1.a")),
+          ("bb", Set(s"$DEFAULT_CATALOG.default.table0.b")))))
 
       val df0 = spark.sql("select a as a0, b as b0 from table0 where a = 2")
       df0.cache()
@@ -935,11 +1022,11 @@ class SparkSQLLineageParserHelperSuite extends 
KyuubiFunSuite
       val analyzed = df.queryExecution.analyzed
       val ret1 = SparkSQLLineageParseHelper(spark).transformToLineage(0, 
analyzed).get
       assert(ret1 == Lineage(
-        List("default.table0", "default.table1"),
+        List(s"$DEFAULT_CATALOG.default.table0", 
s"$DEFAULT_CATALOG.default.table1"),
         List(),
         List(
-          ("aa", Set("default.table0.a")),
-          ("bb", Set("default.table1.b")))))
+          ("aa", Set(s"$DEFAULT_CATALOG.default.table0.a")),
+          ("bb", Set(s"$DEFAULT_CATALOG.default.table1.b")))))
     }
   }
 
@@ -957,12 +1044,12 @@ class SparkSQLLineageParserHelperSuite extends 
KyuubiFunSuite
           |""".stripMargin
       val ret0 = extractLineage(sql0)
       assert(ret0 == Lineage(
-        List("default.table0", "default.table1"),
+        List(s"$DEFAULT_CATALOG.default.table0", 
s"$DEFAULT_CATALOG.default.table1"),
         List(),
         List(
-          ("aa", Set("default.table0.a")),
-          ("bb", Set("default.table1.b")),
-          ("cc", Set("default.table1.c")))))
+          ("aa", Set(s"$DEFAULT_CATALOG.default.table0.a")),
+          ("bb", Set(s"$DEFAULT_CATALOG.default.table1.b")),
+          ("cc", Set(s"$DEFAULT_CATALOG.default.table1.c")))))
 
       val sql1 =
         """
@@ -970,11 +1057,11 @@ class SparkSQLLineageParserHelperSuite extends 
KyuubiFunSuite
           |""".stripMargin
       val ret1 = extractLineage(sql1)
       assert(ret1 == Lineage(
-        List("default.table1"),
+        List(s"$DEFAULT_CATALOG.default.table1"),
         List(),
         List(
-          ("aa", Set("default.table1.a")),
-          ("bb", Set("default.table1.b")))))
+          ("aa", Set(s"$DEFAULT_CATALOG.default.table1.a")),
+          ("bb", Set(s"$DEFAULT_CATALOG.default.table1.b")))))
 
       val sql2 =
         """
@@ -982,11 +1069,11 @@ class SparkSQLLineageParserHelperSuite extends 
KyuubiFunSuite
           |""".stripMargin
       val ret2 = extractLineage(sql2)
       assert(ret2 == Lineage(
-        List("default.table0", "default.table1"),
+        List(s"$DEFAULT_CATALOG.default.table0", 
s"$DEFAULT_CATALOG.default.table1"),
         List(),
         List(
-          ("aa", Set("default.table0.__count__")),
-          ("bb", Set("default.table1.b")))))
+          ("aa", Set(s"$DEFAULT_CATALOG.default.table0.__count__")),
+          ("bb", Set(s"$DEFAULT_CATALOG.default.table1.b")))))
 
       // ListQuery
       val sql3 =
@@ -995,12 +1082,12 @@ class SparkSQLLineageParserHelperSuite extends 
KyuubiFunSuite
           |""".stripMargin
       val ret3 = extractLineage(sql3)
       assert(ret3 == Lineage(
-        List("default.table0"),
+        List(s"$DEFAULT_CATALOG.default.table0"),
         List(),
         List(
-          ("a", Set("default.table0.a")),
-          ("b", Set("default.table0.b")),
-          ("c", Set("default.table0.c")))))
+          ("a", Set(s"$DEFAULT_CATALOG.default.table0.a")),
+          ("b", Set(s"$DEFAULT_CATALOG.default.table0.b")),
+          ("c", Set(s"$DEFAULT_CATALOG.default.table0.c")))))
 
       // Exists
       val sql4 =
@@ -1009,12 +1096,12 @@ class SparkSQLLineageParserHelperSuite extends 
KyuubiFunSuite
           |""".stripMargin
       val ret4 = extractLineage(sql4)
       assert(ret4 == Lineage(
-        List("default.table0"),
+        List(s"$DEFAULT_CATALOG.default.table0"),
         List(),
         List(
-          ("a", Set("default.table0.a")),
-          ("b", Set("default.table0.b")),
-          ("c", Set("default.table0.c")))))
+          ("a", Set(s"$DEFAULT_CATALOG.default.table0.a")),
+          ("b", Set(s"$DEFAULT_CATALOG.default.table0.b")),
+          ("c", Set(s"$DEFAULT_CATALOG.default.table0.c")))))
 
       val sql5 =
         """
@@ -1022,12 +1109,12 @@ class SparkSQLLineageParserHelperSuite extends 
KyuubiFunSuite
           |""".stripMargin
       val ret5 = extractLineage(sql5)
       assert(ret5 == Lineage(
-        List("default.table0"),
+        List(s"$DEFAULT_CATALOG.default.table0"),
         List(),
         List(
-          ("a", Set("default.table0.a")),
-          ("b", Set("default.table0.b")),
-          ("c", Set("default.table0.c")))))
+          ("a", Set(s"$DEFAULT_CATALOG.default.table0.a")),
+          ("b", Set(s"$DEFAULT_CATALOG.default.table0.b")),
+          ("c", Set(s"$DEFAULT_CATALOG.default.table0.c")))))
 
       val sql6 =
         """
@@ -1035,12 +1122,12 @@ class SparkSQLLineageParserHelperSuite extends 
KyuubiFunSuite
           |""".stripMargin
       val ret6 = extractLineage(sql6)
       assert(ret6 == Lineage(
-        List("default.table0"),
+        List(s"$DEFAULT_CATALOG.default.table0"),
         List(),
         List(
-          ("a", Set("default.table0.a")),
-          ("b", Set("default.table0.b")),
-          ("c", Set("default.table0.c")))))
+          ("a", Set(s"$DEFAULT_CATALOG.default.table0.a")),
+          ("b", Set(s"$DEFAULT_CATALOG.default.table0.b")),
+          ("c", Set(s"$DEFAULT_CATALOG.default.table0.c")))))
 
       val sql7 =
         """
@@ -1048,12 +1135,12 @@ class SparkSQLLineageParserHelperSuite extends 
KyuubiFunSuite
           |""".stripMargin
       val ret7 = extractLineage(sql7)
       assert(ret7 == Lineage(
-        List("default.table0"),
+        List(s"$DEFAULT_CATALOG.default.table0"),
         List(),
         List(
-          ("a", Set("default.table0.a")),
-          ("b", Set("default.table0.b")),
-          ("c", Set("default.table0.c")))))
+          ("a", Set(s"$DEFAULT_CATALOG.default.table0.a")),
+          ("b", Set(s"$DEFAULT_CATALOG.default.table0.b")),
+          ("c", Set(s"$DEFAULT_CATALOG.default.table0.c")))))
 
       val sql8 =
         """
@@ -1061,11 +1148,11 @@ class SparkSQLLineageParserHelperSuite extends 
KyuubiFunSuite
           |""".stripMargin
       val ret8 = extractLineage(sql8)
       assert(ret8 == Lineage(
-        List("default.table1"),
+        List(s"$DEFAULT_CATALOG.default.table1"),
         List(),
         List(
-          ("(scalarsubquery() + 1)", Set("default.table1.a")),
-          ("bb", Set("default.table1.b")))))
+          ("(scalarsubquery() + 1)", 
Set(s"$DEFAULT_CATALOG.default.table1.a")),
+          ("bb", Set(s"$DEFAULT_CATALOG.default.table1.b")))))
 
       val sql9 =
         """
@@ -1073,11 +1160,11 @@ class SparkSQLLineageParserHelperSuite extends 
KyuubiFunSuite
           |""".stripMargin
       val ret9 = extractLineage(sql9)
       assert(ret9 == Lineage(
-        List("default.table1"),
+        List(s"$DEFAULT_CATALOG.default.table1"),
         List(),
         List(
-          ("aa", Set("default.table1.a")),
-          ("bb", Set("default.table1.b")))))
+          ("aa", Set(s"$DEFAULT_CATALOG.default.table1.a")),
+          ("bb", Set(s"$DEFAULT_CATALOG.default.table1.b")))))
 
       val sql10 =
         """
@@ -1086,11 +1173,11 @@ class SparkSQLLineageParserHelperSuite extends 
KyuubiFunSuite
           |""".stripMargin
       val ret10 = extractLineage(sql10)
       assert(ret10 == Lineage(
-        List("default.table1", "default.table0"),
+        List(s"$DEFAULT_CATALOG.default.table1", 
s"$DEFAULT_CATALOG.default.table0"),
         List(),
         List(
-          ("aa", Set("default.table1.a", "default.table0.a")),
-          ("bb", Set("default.table1.b")))))
+          ("aa", Set(s"$DEFAULT_CATALOG.default.table1.a", 
s"$DEFAULT_CATALOG.default.table0.a")),
+          ("bb", Set(s"$DEFAULT_CATALOG.default.table1.b")))))
 
       val sql11 =
         """
@@ -1099,11 +1186,11 @@ class SparkSQLLineageParserHelperSuite extends 
KyuubiFunSuite
 
       val ret11 = extractLineage(sql11)
       assert(ret11 == Lineage(
-        List("default.table1"),
+        List(s"$DEFAULT_CATALOG.default.table1"),
         List(),
         List(
-          ("a", Set("default.table1.a")),
-          ("b", Set("default.table1.b")))))
+          ("a", Set(s"$DEFAULT_CATALOG.default.table1.a")),
+          ("b", Set(s"$DEFAULT_CATALOG.default.table1.b")))))
     }
   }
 
@@ -1120,12 +1207,14 @@ class SparkSQLLineageParserHelperSuite extends 
KyuubiFunSuite
             s"count(distinct(b)) * count(distinct(c))" +
             s"from t2 group by a")
       assert(ret0 == Lineage(
-        List("default.t2"),
-        List("default.t1"),
+        List(s"$DEFAULT_CATALOG.default.t2"),
+        List(s"$DEFAULT_CATALOG.default.t1"),
         List(
-          ("default.t1.a", Set("default.t2.a")),
-          ("default.t1.b", Set("default.t2.b")),
-          ("default.t1.c", Set("default.t2.b", "default.t2.c")))))
+          (s"$DEFAULT_CATALOG.default.t1.a", 
Set(s"$DEFAULT_CATALOG.default.t2.a")),
+          (s"$DEFAULT_CATALOG.default.t1.b", 
Set(s"$DEFAULT_CATALOG.default.t2.b")),
+          (
+            s"$DEFAULT_CATALOG.default.t1.c",
+            Set(s"$DEFAULT_CATALOG.default.t2.b", 
s"$DEFAULT_CATALOG.default.t2.c")))))
 
       val ret1 =
         extractLineage(
@@ -1166,12 +1255,12 @@ class SparkSQLLineageParserHelperSuite extends 
KyuubiFunSuite
           s"insert into table t1 select a,b,GROUPING__ID " +
             s"from t2 group by a,b,c,d grouping sets ((a,b,c), (a,b,d))")
       assert(ret0 == Lineage(
-        List("default.t2"),
-        List("default.t1"),
+        List(s"$DEFAULT_CATALOG.default.t2"),
+        List(s"$DEFAULT_CATALOG.default.t1"),
         List(
-          ("default.t1.a", Set("default.t2.a")),
-          ("default.t1.b", Set("default.t2.b")),
-          ("default.t1.c", Set()))))
+          (s"$DEFAULT_CATALOG.default.t1.a", 
Set(s"$DEFAULT_CATALOG.default.t2.a")),
+          (s"$DEFAULT_CATALOG.default.t1.b", 
Set(s"$DEFAULT_CATALOG.default.t2.b")),
+          (s"$DEFAULT_CATALOG.default.t1.c", Set()))))
     }
   }
 
@@ -1186,19 +1275,21 @@ class SparkSQLLineageParserHelperSuite extends 
KyuubiFunSuite
           s" where rank=1")
       val ret0 = extractLineage("insert overwrite table t1 select a, b from 
c1")
       assert(ret0 == Lineage(
-        List("default.t2"),
-        List("default.t1"),
+        List(s"$DEFAULT_CATALOG.default.t2"),
+        List(s"$DEFAULT_CATALOG.default.t1"),
         List(
-          ("default.t1.a", Set("default.t2.a")),
-          ("default.t1.b", Set("default.t2.b")))))
+          (s"$DEFAULT_CATALOG.default.t1.a", 
Set(s"$DEFAULT_CATALOG.default.t2.a")),
+          (s"$DEFAULT_CATALOG.default.t1.b", 
Set(s"$DEFAULT_CATALOG.default.t2.b")))))
 
       val ret1 = extractLineage("insert overwrite table t1 select a, rank from 
c1")
       assert(ret1 == Lineage(
-        List("default.t2"),
-        List("default.t1"),
+        List(s"$DEFAULT_CATALOG.default.t2"),
+        List(s"$DEFAULT_CATALOG.default.t1"),
         List(
-          ("default.t1.a", Set("default.t2.a")),
-          ("default.t1.b", Set("default.t2.a", "default.t2.b")))))
+          (s"$DEFAULT_CATALOG.default.t1.a", 
Set(s"$DEFAULT_CATALOG.default.t2.a")),
+          (
+            s"$DEFAULT_CATALOG.default.t1.b",
+            Set(s"$DEFAULT_CATALOG.default.t2.a", 
s"$DEFAULT_CATALOG.default.t2.b")))))
 
       spark.sql(
         s"cache table c2 select * from (" +
@@ -1206,11 +1297,11 @@ class SparkSQLLineageParserHelperSuite extends 
KyuubiFunSuite
           s" where rank=1")
       val ret2 = extractLineage("insert overwrite table t1 select a, b from 
c2")
       assert(ret2 == Lineage(
-        List("default.t2"),
-        List("default.t1"),
+        List(s"$DEFAULT_CATALOG.default.t2"),
+        List(s"$DEFAULT_CATALOG.default.t1"),
         List(
-          ("default.t1.a", Set("default.t2.a")),
-          ("default.t1.b", Set("default.t2.b")))))
+          (s"$DEFAULT_CATALOG.default.t1.a", 
Set(s"$DEFAULT_CATALOG.default.t2.a")),
+          (s"$DEFAULT_CATALOG.default.t1.b", 
Set(s"$DEFAULT_CATALOG.default.t2.b")))))
 
       spark.sql(
         s"cache table c3 select * from (" +
@@ -1218,11 +1309,11 @@ class SparkSQLLineageParserHelperSuite extends 
KyuubiFunSuite
           s" from t2) where rank=1")
       val ret3 = extractLineage("insert overwrite table t1 select aa, bb from 
c3")
       assert(ret3 == Lineage(
-        List("default.t2"),
-        List("default.t1"),
+        List(s"$DEFAULT_CATALOG.default.t2"),
+        List(s"$DEFAULT_CATALOG.default.t1"),
         List(
-          ("default.t1.a", Set("default.t2.a")),
-          ("default.t1.b", Set("default.t2.b")))))
+          (s"$DEFAULT_CATALOG.default.t1.a", 
Set(s"$DEFAULT_CATALOG.default.t2.a")),
+          (s"$DEFAULT_CATALOG.default.t1.b", 
Set(s"$DEFAULT_CATALOG.default.t2.b")))))
     }
   }
 
@@ -1234,12 +1325,12 @@ class SparkSQLLineageParserHelperSuite extends 
KyuubiFunSuite
         " ifnull(get_json_object(a, '$.b.imei'), get_json_object(a, 
'$.b.android_id'))) from t2)")
 
       assert(ret0 == Lineage(
-        List("default.t2"),
-        List("default.t1"),
+        List(s"$DEFAULT_CATALOG.default.t2"),
+        List(s"$DEFAULT_CATALOG.default.t1"),
         List(
-          ("default.t1.a", Set()),
-          ("default.t1.b", Set()),
-          ("default.t1.c", Set("default.t2.a")))))
+          (s"$DEFAULT_CATALOG.default.t1.a", Set()),
+          (s"$DEFAULT_CATALOG.default.t1.b", Set()),
+          (s"$DEFAULT_CATALOG.default.t1.c", 
Set(s"$DEFAULT_CATALOG.default.t2.a")))))
     }
   }
 
@@ -1255,11 +1346,11 @@ class SparkSQLLineageParserHelperSuite extends 
KyuubiFunSuite
               s" from t2" +
               s" where a in ('HELLO') and c = 'HELLO'")
         assert(ret0 == Lineage(
-          List("default.t1"),
-          List("default.view_tst"),
+          List(s"$DEFAULT_CATALOG.default.t1"),
+          List(s"$DEFAULT_CATALOG.default.view_tst"),
           List(
-            ("default.view_tst.k", Set("default.t1.a")),
-            ("default.view_tst.b", Set("default.t1.b")))))
+            (s"$DEFAULT_CATALOG.default.view_tst.k", 
Set(s"$DEFAULT_CATALOG.default.t1.a")),
+            (s"$DEFAULT_CATALOG.default.view_tst.b", 
Set(s"$DEFAULT_CATALOG.default.t1.b")))))
       }
     }
   }
@@ -1276,11 +1367,11 @@ class SparkSQLLineageParserHelperSuite extends 
KyuubiFunSuite
               s" from t2" +
               s" where a in ('HELLO') and c = 'HELLO'")
         assert(ret0 == Lineage(
-          List("default.t2"),
+          List(s"$DEFAULT_CATALOG.default.t2"),
           List(),
           List(
-            ("k", Set("default.t2.a")),
-            ("b", Set("default.t2.b")))))
+            ("k", Set(s"$DEFAULT_CATALOG.default.t2.a")),
+            ("b", Set(s"$DEFAULT_CATALOG.default.t2.b")))))
       }
     }
   }
@@ -1294,13 +1385,13 @@ class SparkSQLLineageParserHelperSuite extends 
KyuubiFunSuite
         " insert overwrite table t2 select a,b where a=1" +
         " insert overwrite table t3 select a,b where b=1")
       assert(ret0 == Lineage(
-        List("default.t1"),
-        List("default.t2", "default.t3"),
+        List(s"$DEFAULT_CATALOG.default.t1"),
+        List(s"$DEFAULT_CATALOG.default.t2", s"$DEFAULT_CATALOG.default.t3"),
         List(
-          ("default.t2.a", Set("default.t1.a")),
-          ("default.t2.b", Set("default.t1.b")),
-          ("default.t3.a", Set("default.t1.a")),
-          ("default.t3.b", Set("default.t1.b")))))
+          (s"$DEFAULT_CATALOG.default.t2.a", 
Set(s"$DEFAULT_CATALOG.default.t1.a")),
+          (s"$DEFAULT_CATALOG.default.t2.b", 
Set(s"$DEFAULT_CATALOG.default.t1.b")),
+          (s"$DEFAULT_CATALOG.default.t3.a", 
Set(s"$DEFAULT_CATALOG.default.t1.a")),
+          (s"$DEFAULT_CATALOG.default.t3.b", 
Set(s"$DEFAULT_CATALOG.default.t1.b")))))
     }
   }
 
@@ -1312,38 +1403,38 @@ class SparkSQLLineageParserHelperSuite extends 
KyuubiFunSuite
       val ret0 = extractLineage("insert into t1 select 1, t2.b, cc.action, 
t2.d " +
         "from t2 lateral view explode(split(c,'\\},\\{')) cc as action")
       assert(ret0 == Lineage(
-        List("default.t2"),
-        List("default.t1"),
+        List(s"$DEFAULT_CATALOG.default.t2"),
+        List(s"$DEFAULT_CATALOG.default.t1"),
         List(
-          ("default.t1.a", Set()),
-          ("default.t1.b", Set("default.t2.b")),
-          ("default.t1.c", Set("default.t2.c")),
-          ("default.t1.d", Set("default.t2.d")))))
+          (s"$DEFAULT_CATALOG.default.t1.a", Set()),
+          (s"$DEFAULT_CATALOG.default.t1.b", 
Set(s"$DEFAULT_CATALOG.default.t2.b")),
+          (s"$DEFAULT_CATALOG.default.t1.c", 
Set(s"$DEFAULT_CATALOG.default.t2.c")),
+          (s"$DEFAULT_CATALOG.default.t1.d", 
Set(s"$DEFAULT_CATALOG.default.t2.d")))))
 
       val ret1 = extractLineage("insert into t1 select 1, t2.b, cc.action0, 
dd.action1 " +
         "from t2 " +
         "lateral view explode(split(c,'\\},\\{')) cc as action0 " +
         "lateral view explode(split(d,'\\},\\{')) dd as action1")
       assert(ret1 == Lineage(
-        List("default.t2"),
-        List("default.t1"),
+        List(s"$DEFAULT_CATALOG.default.t2"),
+        List(s"$DEFAULT_CATALOG.default.t1"),
         List(
-          ("default.t1.a", Set()),
-          ("default.t1.b", Set("default.t2.b")),
-          ("default.t1.c", Set("default.t2.c")),
-          ("default.t1.d", Set("default.t2.d")))))
+          (s"$DEFAULT_CATALOG.default.t1.a", Set()),
+          (s"$DEFAULT_CATALOG.default.t1.b", 
Set(s"$DEFAULT_CATALOG.default.t2.b")),
+          (s"$DEFAULT_CATALOG.default.t1.c", 
Set(s"$DEFAULT_CATALOG.default.t2.c")),
+          (s"$DEFAULT_CATALOG.default.t1.d", 
Set(s"$DEFAULT_CATALOG.default.t2.d")))))
 
       val ret2 = extractLineage("insert into t1 select 1, t2.b, dd.pos, 
dd.action1 " +
         "from t2 " +
         "lateral view posexplode(split(d,'\\},\\{')) dd as pos, action1")
       assert(ret2 == Lineage(
-        List("default.t2"),
-        List("default.t1"),
+        List(s"$DEFAULT_CATALOG.default.t2"),
+        List(s"$DEFAULT_CATALOG.default.t1"),
         List(
-          ("default.t1.a", Set()),
-          ("default.t1.b", Set("default.t2.b")),
-          ("default.t1.c", Set("default.t2.d")),
-          ("default.t1.d", Set("default.t2.d")))))
+          (s"$DEFAULT_CATALOG.default.t1.a", Set()),
+          (s"$DEFAULT_CATALOG.default.t1.b", 
Set(s"$DEFAULT_CATALOG.default.t2.b")),
+          (s"$DEFAULT_CATALOG.default.t1.c", 
Set(s"$DEFAULT_CATALOG.default.t2.d")),
+          (s"$DEFAULT_CATALOG.default.t1.d", 
Set(s"$DEFAULT_CATALOG.default.t2.d")))))
     }
   }
 

Reply via email to