This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 408862af7 [KYUUBI #4814] Introduce Apache Atlas hook support in 
lineage plugin
408862af7 is described below

commit 408862af72db02cd7ae2074c19f24172a24d7db7
Author: wforget <[email protected]>
AuthorDate: Tue Jun 6 17:47:19 2023 +0800

    [KYUUBI #4814] Introduce Apache Atlas hook support in lineage plugin
    
    ### _Why are the changes needed?_
    
    Implements AtlasLineageDispatcher to send lineage to Apache Atlas.
    
    close #4814
    
    Atlas Spark Model Definition: 
https://github.com/apache/atlas/blob/master/addons/models/1000-Hadoop/1100-spark_model.json
    
    spark process:
    
    
![1](https://github.com/apache/kyuubi/assets/17894939/28e2c68c-0ffd-4f1d-b805-a7e964f85aab)
    
    table lineage:
    
    
![2](https://github.com/apache/kyuubi/assets/17894939/76b3db6d-ed50-42e3-97cf-2f96d4e403df)
    
    column lineage:
    
    
![3](https://github.com/apache/kyuubi/assets/17894939/41ae6ef8-acbf-43b9-ad05-42d669c5e950)
    
    ### _How was this patch tested?_
    - [X] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [X] Add screenshots for manual tests if appropriate
    
    - [ ] [Run 
test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests)
 locally before make a pull request
    
    Closes #4815 from wForget/KYUUBI-4814.
    
    Closes #4814
    
    3df8a7ec9 [wforget] comments
    c58eae7c5 [wforget] comments
    926bcf211 [wforget] comment
    e0b4067c3 [wforget] comment
    e4cc3e3f8 [wforget] comments
    adc72b96f [Bowen Liang] Update 
extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/AtlasEntityHelper.scala
    e3bdd1c65 [Bowen Liang] Update 
extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/AtlasEntityHelper.scala
    baf1711ac [Bowen Liang] Update 
extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/AtlasLineageDispatcherSuite.scala
    61e79f3d5 [Bowen Liang] Update 
extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/AtlasLineageDispatcherSuite.scala
    541df3780 [Bowen Liang] Update 
extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/AtlasLineageDispatcherSuite.scala
    5dd310657 [wforget] fix
    cea1e137d [wforget] fix
    f028d4b09 [wforget] fix
    0c9b4516b [wforget] fix
    6f8113032 [wforget] add close atlas client shutdown hook
    3f4d2a7db [wforget] add remote user
    a0db58afc [wforget] comments
    6dd3c66df [wforget] comments
    f2b2a30dc [wforget] style
    83eb1e481 [wforget] add atlas.column.lineage.enable configuration
    0719a2b65 [wforget] doc
    05f936005 [wforget] fix
    d169b661d [wforget] fix
    6da80d742 [wforget] fix
    820ae5c5f [wforget] column lineages
    dabe8173e [wforget] license
    f22e044d2 [wforget] test
    b948bce90 [wforget] fix and add test
    0aef1be6b [wforget] fix
    368b5ab3f [wforget] [KYUUBI-4814] Implements AtlasLineageDispatcher to send 
lineage to Apache Atlas
    
    Lead-authored-by: wforget <[email protected]>
    Co-authored-by: Bowen Liang <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
---
 docs/extensions/engines/spark/lineage.md           |  22 +++
 extensions/spark/kyuubi-spark-lineage/pom.xml      |  86 ++++++++++-
 .../kyuubi/plugin/lineage/LineageDispatcher.scala  |   2 +
 .../plugin/lineage/LineageDispatcherType.scala     |   2 +-
 .../lineage/dispatcher/atlas/AtlasClient.scala     |  96 +++++++++++++
 .../lineage/dispatcher/atlas/AtlasClientConf.scala |  57 ++++++++
 .../dispatcher/atlas/AtlasEntityHelper.scala       | 158 +++++++++++++++++++++
 .../dispatcher/atlas/AtlasLineageDispatcher.scala  |  49 +++++++
 .../atlas/ConfigEntry.scala}                       |   8 +-
 .../lineage/helper/SparkListenerHelper.scala       |   9 ++
 .../apache/spark/kyuubi/lineage/LineageConf.scala  |   1 +
 .../test/resources/atlas-application.properties    |  18 +++
 .../atlas/AtlasLineageDispatcherSuite.scala        | 153 ++++++++++++++++++++
 pom.xml                                            |   1 +
 14 files changed, 648 insertions(+), 14 deletions(-)

diff --git a/docs/extensions/engines/spark/lineage.md 
b/docs/extensions/engines/spark/lineage.md
index 665929e9f..01acd884d 100644
--- a/docs/extensions/engines/spark/lineage.md
+++ b/docs/extensions/engines/spark/lineage.md
@@ -185,6 +185,7 @@ The lineage dispatchers are used to dispatch lineage 
events, configured via `spa
 <ul>
   <li>SPARK_EVENT (by default): send lineage event to spark event bus</li>
   <li>KYUUBI_EVENT: send lineage event to kyuubi event bus</li>
+  <li>ATLAS: send lineage to apache atlas</li>
 </ul>
 
 #### Get Lineage Events from SparkListener
@@ -207,3 +208,24 @@ spark.sparkContext.addSparkListener(new SparkListener {
 #### Get Lineage Events from Kyuubi EventHandler
 
 When using the `KYUUBI_EVENT` dispatcher, the lineage events will be sent to 
the Kyuubi `EventBus`. Refer to [Kyuubi Event Handler](../../server/events) to 
handle kyuubi events.
+
+#### Ingest Lineage Entities to Apache Atlas
+
+The lineage entities can be ingested into [Apache 
Atlas](https://atlas.apache.org/) using the `ATLAS` dispatcher.
+
+Extra works:
+
++ The least transitive dependencies needed, which are under 
`./extensions/spark/kyuubi-spark-lineage/target/scala-${scala.binary.version}/jars`
++ Use `spark.files` to specify the `atlas-application.properties` 
configuration file for Atlas
+
+Atlas Client configurations (Configure in `atlas-application.properties` or 
passed in `spark.atlas.` prefix):
+
+|                  Name                   |     Default Value      |           
           Description                      | Since |
+|-----------------------------------------|------------------------|-------------------------------------------------------|-------|
+| atlas.rest.address                      | http://localhost:21000 | The rest 
endpoint url for the Atlas server            | 1.8.0 |
+| atlas.client.type                       | rest                   | The 
client type (currently only supports rest)        | 1.8.0 |
+| atlas.client.username                   | none                   | The 
client username                                   | 1.8.0 |
+| atlas.client.password                   | none                   | The 
client password                                   | 1.8.0 |
+| atlas.cluster.name                      | primary                | The 
cluster name to use in qualifiedName of entities. | 1.8.0 |
+| atlas.hook.spark.column.lineage.enabled | true                   | Whether 
to ingest column lineages to Atlas.           | 1.8.0 |
+
diff --git a/extensions/spark/kyuubi-spark-lineage/pom.xml 
b/extensions/spark/kyuubi-spark-lineage/pom.xml
index 2a7ba7773..760b0cc08 100644
--- a/extensions/spark/kyuubi-spark-lineage/pom.xml
+++ b/extensions/spark/kyuubi-spark-lineage/pom.xml
@@ -59,7 +59,85 @@
         <dependency>
             <groupId>commons-collections</groupId>
             <artifactId>commons-collections</artifactId>
-            <scope>test</scope>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-annotations</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-core</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpclient</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>commons-lang</groupId>
+            <artifactId>commons-lang</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.atlas</groupId>
+            <artifactId>atlas-client-v2</artifactId>
+            <version>${atlas.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-api</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>jul-to-slf4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>commons-logging</groupId>
+                    <artifactId>commons-logging</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-common</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.springframework</groupId>
+                    <artifactId>spring-context</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.commons</groupId>
+                    <artifactId>commons-text</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
 
         <dependency>
@@ -89,12 +167,6 @@
             <artifactId>spark-hive_${scala.binary.version}</artifactId>
             <scope>test</scope>
         </dependency>
-
-        <dependency>
-            <groupId>com.google.guava</groupId>
-            <artifactId>guava</artifactId>
-            <scope>test</scope>
-        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/LineageDispatcher.scala
 
b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/LineageDispatcher.scala
index 8f5dc0d9e..b993f1428 100644
--- 
a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/LineageDispatcher.scala
+++ 
b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/LineageDispatcher.scala
@@ -20,6 +20,7 @@ package org.apache.kyuubi.plugin.lineage
 import org.apache.spark.sql.execution.QueryExecution
 
 import org.apache.kyuubi.plugin.lineage.dispatcher.{KyuubiEventDispatcher, 
SparkEventDispatcher}
+import org.apache.kyuubi.plugin.lineage.dispatcher.atlas.AtlasLineageDispatcher
 
 trait LineageDispatcher {
 
@@ -35,6 +36,7 @@ object LineageDispatcher {
     LineageDispatcherType.withName(dispatcherType) match {
       case LineageDispatcherType.SPARK_EVENT => new SparkEventDispatcher()
       case LineageDispatcherType.KYUUBI_EVENT => new KyuubiEventDispatcher()
+      case LineageDispatcherType.ATLAS => new AtlasLineageDispatcher()
       case _ => throw new UnsupportedOperationException(
           s"Unsupported lineage dispatcher: $dispatcherType.")
     }
diff --git 
a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/LineageDispatcherType.scala
 
b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/LineageDispatcherType.scala
index d6afea152..8e07f6d77 100644
--- 
a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/LineageDispatcherType.scala
+++ 
b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/LineageDispatcherType.scala
@@ -20,5 +20,5 @@ package org.apache.kyuubi.plugin.lineage
 object LineageDispatcherType extends Enumeration {
   type LineageDispatcherType = Value
 
-  val SPARK_EVENT, KYUUBI_EVENT = Value
+  val SPARK_EVENT, KYUUBI_EVENT, ATLAS = Value
 }
diff --git 
a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/AtlasClient.scala
 
b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/AtlasClient.scala
new file mode 100644
index 000000000..15b127182
--- /dev/null
+++ 
b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/AtlasClient.scala
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.plugin.lineage.dispatcher.atlas
+
+import java.util.Locale
+
+import com.google.common.annotations.VisibleForTesting
+import org.apache.atlas.AtlasClientV2
+import org.apache.atlas.model.instance.AtlasEntity
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo
+import org.apache.commons.lang3.StringUtils
+import org.apache.hadoop.util.ShutdownHookManager
+
+import org.apache.kyuubi.plugin.lineage.dispatcher.atlas.AtlasClientConf._
+
+trait AtlasClient extends AutoCloseable {
+  def send(entities: Seq[AtlasEntity]): Unit
+}
+
+class AtlasRestClient(conf: AtlasClientConf) extends AtlasClient {
+
+  private val atlasClient: AtlasClientV2 = {
+    val serverUrl = conf.get(ATLAS_REST_ENDPOINT).split(",")
+    val username = conf.get(CLIENT_USERNAME)
+    val password = conf.get(CLIENT_PASSWORD)
+    if (StringUtils.isNoneBlank(username, password)) {
+      new AtlasClientV2(serverUrl, Array(username, password))
+    } else {
+      new AtlasClientV2(serverUrl: _*)
+    }
+  }
+
+  override def send(entities: Seq[AtlasEntity]): Unit = {
+    val entitiesWithExtInfo = new AtlasEntitiesWithExtInfo()
+    entities.foreach(entitiesWithExtInfo.addEntity)
+    atlasClient.createEntities(entitiesWithExtInfo)
+  }
+
+  override def close(): Unit = {
+    if (atlasClient != null) {
+      atlasClient.close()
+    }
+  }
+}
+
+object AtlasClient {
+
+  @volatile private var client: AtlasClient = _
+
+  def getClient(): AtlasClient = {
+    if (client == null) {
+      AtlasClient.synchronized {
+        if (client == null) {
+          val clientConf = AtlasClientConf.getConf()
+          client = clientConf.get(CLIENT_TYPE).toLowerCase(Locale.ROOT) match {
+            case "rest" => new AtlasRestClient(clientConf)
+            case unknown => throw new RuntimeException(s"Unsupported client 
type: $unknown.")
+          }
+          registerCleanupShutdownHook(client)
+        }
+      }
+    }
+    client
+  }
+
+  private def registerCleanupShutdownHook(client: AtlasClient): Unit = {
+    ShutdownHookManager.get.addShutdownHook(
+      () => {
+        if (client != null) {
+          client.close()
+        }
+      },
+      Integer.MAX_VALUE)
+  }
+
+  @VisibleForTesting
+  private[dispatcher] def setClient(newClient: AtlasClient): Unit = {
+    client = newClient
+  }
+
+}
diff --git 
a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/AtlasClientConf.scala
 
b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/AtlasClientConf.scala
new file mode 100644
index 000000000..03b1a83e0
--- /dev/null
+++ 
b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/AtlasClientConf.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.plugin.lineage.dispatcher.atlas
+
+import org.apache.atlas.ApplicationProperties
+import org.apache.commons.configuration.Configuration
+import org.apache.spark.kyuubi.lineage.SparkContextHelper
+
+class AtlasClientConf(configuration: Configuration) {
+
+  def get(entry: ConfigEntry): String = {
+    configuration.getProperty(entry.key) match {
+      case s: String => s
+      case l: List[_] => l.mkString(",")
+      case o if o != null => o.toString
+      case _ => entry.defaultValue
+    }
+  }
+
+}
+
+object AtlasClientConf {
+
+  private lazy val clientConf: AtlasClientConf = {
+    val conf = ApplicationProperties.get()
+    
SparkContextHelper.globalSparkContext.getConf.getAllWithPrefix("spark.atlas.")
+      .foreach { case (k, v) => conf.setProperty(s"atlas.$k", v) }
+    new AtlasClientConf(conf)
+  }
+
+  def getConf(): AtlasClientConf = clientConf
+
+  val ATLAS_REST_ENDPOINT = ConfigEntry("atlas.rest.address", 
"http://localhost:21000";)
+
+  val CLIENT_TYPE = ConfigEntry("atlas.client.type", "rest")
+  val CLIENT_USERNAME = ConfigEntry("atlas.client.username", null)
+  val CLIENT_PASSWORD = ConfigEntry("atlas.client.password", null)
+
+  val CLUSTER_NAME = ConfigEntry("atlas.cluster.name", "primary")
+
+  val COLUMN_LINEAGE_ENABLED = 
ConfigEntry("atlas.hook.spark.column.lineage.enabled", "true")
+}
diff --git 
a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/AtlasEntityHelper.scala
 
b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/AtlasEntityHelper.scala
new file mode 100644
index 000000000..9575b5258
--- /dev/null
+++ 
b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/AtlasEntityHelper.scala
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.plugin.lineage.dispatcher.atlas
+
+import scala.collection.JavaConverters._
+
+import org.apache.atlas.model.instance.{AtlasEntity, AtlasObjectId, 
AtlasRelatedObjectId}
+import org.apache.spark.kyuubi.lineage.SparkContextHelper
+import org.apache.spark.sql.execution.QueryExecution
+
+import org.apache.kyuubi.plugin.lineage.Lineage
+import org.apache.kyuubi.plugin.lineage.helper.SparkListenerHelper
+
+/**
+ * The helpers for Atlas spark entities from Lineage.
+ * The Atlas spark models refer to :
+ *    
https://github.com/apache/atlas/blob/master/addons/models/1000-Hadoop/1100-spark_model.json
+ */
+object AtlasEntityHelper {
+
+  /**
+   * Generate `spark_process` Atlas Entity from Lineage.
+   * @param qe
+   * @param lineage
+   * @return
+   */
+  def processEntity(qe: QueryExecution, lineage: Lineage): AtlasEntity = {
+    val entity = new AtlasEntity(PROCESS_TYPE)
+
+    val appId = SparkContextHelper.globalSparkContext.applicationId
+    val appName = SparkContextHelper.globalSparkContext.appName match {
+      case "Spark shell" => s"Spark Job $appId"
+      case default => s"$default $appId"
+    }
+
+    entity.setAttribute("qualifiedName", appId)
+    entity.setAttribute("name", appName)
+    entity.setAttribute("currUser", SparkListenerHelper.currentUser)
+    SparkListenerHelper.sessionUser.foreach(entity.setAttribute("remoteUser", 
_))
+    entity.setAttribute("executionId", qe.id)
+    entity.setAttribute("details", qe.toString())
+    entity.setAttribute("sparkPlanDescription", qe.sparkPlan.toString())
+
+    // TODO add entity type instead of parsing from string
+    val inputs = lineage.inputTables.flatMap(tableObjectId).map { objId =>
+      relatedObjectId(objId, RELATIONSHIP_DATASET_PROCESS_INPUTS)
+    }
+    val outputs = lineage.outputTables.flatMap(tableObjectId).map { objId =>
+      relatedObjectId(objId, RELATIONSHIP_PROCESS_DATASET_OUTPUTS)
+    }
+
+    entity.setRelationshipAttribute("inputs", inputs.asJava)
+    entity.setRelationshipAttribute("outputs", outputs.asJava)
+
+    entity
+  }
+
+  /**
+   * Generate `spark_column_lineage` Atlas Entity from Lineage.
+   * @param processEntity
+   * @param lineage
+   * @return
+   */
+  def columnLineageEntities(processEntity: AtlasEntity, lineage: Lineage): 
Seq[AtlasEntity] = {
+    lineage.columnLineage.flatMap(columnLineage => {
+      val inputs = columnLineage.originalColumns.flatMap(columnObjectId).map { 
objId =>
+        relatedObjectId(objId, RELATIONSHIP_DATASET_PROCESS_INPUTS)
+      }
+      val outputs = Option(columnLineage.column).flatMap(columnObjectId).map { 
objId =>
+        relatedObjectId(objId, RELATIONSHIP_PROCESS_DATASET_OUTPUTS)
+      }.toSeq
+
+      if (inputs.nonEmpty && outputs.nonEmpty) {
+        val entity = new AtlasEntity(COLUMN_LINEAGE_TYPE)
+        val qualifiedName =
+          
s"${processEntity.getAttribute("qualifiedName")}:${columnLineage.column}"
+        entity.setAttribute("qualifiedName", qualifiedName)
+        entity.setAttribute("name", qualifiedName)
+        entity.setRelationshipAttribute("inputs", inputs.asJava)
+        entity.setRelationshipAttribute("outputs", outputs.asJava)
+        entity.setRelationshipAttribute(
+          "process",
+          relatedObjectId(objectId(processEntity), 
RELATIONSHIP_SPARK_PROCESS_COLUMN_LINEAGE))
+        Some(entity)
+      } else {
+        None
+      }
+    })
+  }
+
+  def tableObjectId(tableName: String): Option[AtlasObjectId] = {
+    val dbTb = tableName.split('.')
+    if (dbTb.length == 2) {
+      val qualifiedName = tableQualifiedName(cluster, dbTb(0), dbTb(1))
+      // TODO parse datasource type
+      Some(new AtlasObjectId(HIVE_TABLE_TYPE, "qualifiedName", qualifiedName))
+    } else {
+      None
+    }
+  }
+
+  def tableQualifiedName(cluster: String, db: String, table: String): String = 
{
+    s"${db.toLowerCase}.${table.toLowerCase}@$cluster"
+  }
+
+  def columnObjectId(columnName: String): Option[AtlasObjectId] = {
+    val dbTbCol = columnName.split('.')
+    if (dbTbCol.length == 3) {
+      val qualifiedName = columnQualifiedName(cluster, dbTbCol(0), dbTbCol(1), 
dbTbCol(2))
+      // TODO parse datasource type
+      Some(new AtlasObjectId(HIVE_COLUMN_TYPE, "qualifiedName", qualifiedName))
+    } else {
+      None
+    }
+  }
+
+  def columnQualifiedName(cluster: String, db: String, table: String, column: 
String): String = {
+    s"${db.toLowerCase}.${table.toLowerCase}.${column.toLowerCase}@$cluster"
+  }
+
+  def objectId(entity: AtlasEntity): AtlasObjectId = {
+    val objId = new AtlasObjectId(entity.getGuid, entity.getTypeName)
+    objId.setUniqueAttributes(Map("qualifiedName" -> 
entity.getAttribute("qualifiedName")).asJava)
+    objId
+  }
+
+  def relatedObjectId(objectId: AtlasObjectId, relationshipType: String): 
AtlasRelatedObjectId = {
+    new AtlasRelatedObjectId(objectId, relationshipType)
+  }
+
+  lazy val cluster = 
AtlasClientConf.getConf().get(AtlasClientConf.CLUSTER_NAME)
+  lazy val columnLineageEnabled =
+    
AtlasClientConf.getConf().get(AtlasClientConf.COLUMN_LINEAGE_ENABLED).toBoolean
+
+  val HIVE_TABLE_TYPE = "hive_table"
+  val HIVE_COLUMN_TYPE = "hive_column"
+  val PROCESS_TYPE = "spark_process"
+  val COLUMN_LINEAGE_TYPE = "spark_column_lineage"
+  val RELATIONSHIP_DATASET_PROCESS_INPUTS = "dataset_process_inputs"
+  val RELATIONSHIP_PROCESS_DATASET_OUTPUTS = "process_dataset_outputs"
+  val RELATIONSHIP_SPARK_PROCESS_COLUMN_LINEAGE = 
"spark_process_column_lineages"
+
+}
diff --git 
a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/AtlasLineageDispatcher.scala
 
b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/AtlasLineageDispatcher.scala
new file mode 100644
index 000000000..c66b51107
--- /dev/null
+++ 
b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/AtlasLineageDispatcher.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.plugin.lineage.dispatcher.atlas
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.execution.QueryExecution
+
+import org.apache.kyuubi.plugin.lineage.{Lineage, LineageDispatcher}
+import 
org.apache.kyuubi.plugin.lineage.dispatcher.atlas.AtlasEntityHelper.columnLineageEnabled
+
+class AtlasLineageDispatcher extends LineageDispatcher with Logging {
+
+  override def send(qe: QueryExecution, lineageOpt: Option[Lineage]): Unit = {
+    try {
+      lineageOpt.filter(l => l.inputTables.nonEmpty || 
l.outputTables.nonEmpty).foreach(lineage => {
+        val processEntity = AtlasEntityHelper.processEntity(qe, lineage)
+        val columnLineageEntities = if (lineage.columnLineage.nonEmpty && 
columnLineageEnabled) {
+          AtlasEntityHelper.columnLineageEntities(processEntity, lineage)
+        } else {
+          Seq.empty
+        }
+        AtlasClient.getClient().send(processEntity +: columnLineageEntities)
+      })
+    } catch {
+      case t: Throwable =>
+        logWarning("Send lineage to atlas failed.", t)
+    }
+  }
+
+  override def onFailure(qe: QueryExecution, exception: Exception): Unit = {
+    // ignore
+  }
+
+}
diff --git 
a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/LineageDispatcherType.scala
 
b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/ConfigEntry.scala
similarity index 82%
copy from 
extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/LineageDispatcherType.scala
copy to 
extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/ConfigEntry.scala
index d6afea152..3f9d9831d 100644
--- 
a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/LineageDispatcherType.scala
+++ 
b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/ConfigEntry.scala
@@ -15,10 +15,6 @@
  * limitations under the License.
  */
 
-package org.apache.kyuubi.plugin.lineage
+package org.apache.kyuubi.plugin.lineage.dispatcher.atlas
 
-object LineageDispatcherType extends Enumeration {
-  type LineageDispatcherType = Value
-
-  val SPARK_EVENT, KYUUBI_EVENT = Value
-}
+case class ConfigEntry(key: String, defaultValue: String)
diff --git 
a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkListenerHelper.scala
 
b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkListenerHelper.scala
index 57db55a1e..a1747493e 100644
--- 
a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkListenerHelper.scala
+++ 
b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkListenerHelper.scala
@@ -17,7 +17,9 @@
 
 package org.apache.kyuubi.plugin.lineage.helper
 
+import org.apache.hadoop.security.UserGroupInformation
 import org.apache.spark.SPARK_VERSION
+import org.apache.spark.kyuubi.lineage.SparkContextHelper
 
 import org.apache.kyuubi.util.SemanticVersion
 
@@ -40,4 +42,11 @@ object SparkListenerHelper {
   def isSparkVersionEqualTo(targetVersionString: String): Boolean = {
     SemanticVersion(SPARK_VERSION).isVersionEqualTo(targetVersionString)
   }
+
+  def currentUser: String = 
UserGroupInformation.getCurrentUser.getShortUserName
+
+  def sessionUser: Option[String] =
+    
Option(SparkContextHelper.globalSparkContext.getLocalProperty(KYUUBI_SESSION_USER))
+
+  final val KYUUBI_SESSION_USER = "kyuubi.session.user"
 }
diff --git 
a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/spark/kyuubi/lineage/LineageConf.scala
 
b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/spark/kyuubi/lineage/LineageConf.scala
index 6fb5399c0..5b7d3dfe1 100644
--- 
a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/spark/kyuubi/lineage/LineageConf.scala
+++ 
b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/spark/kyuubi/lineage/LineageConf.scala
@@ -35,6 +35,7 @@ object LineageConf {
       "`org.apache.kyuubi.plugin.lineage.LineageDispatcher` for dispatching 
lineage events.<ul>" +
       "<li>SPARK_EVENT: send lineage event to spark event bus</li>" +
       "<li>KYUUBI_EVENT: send lineage event to kyuubi event bus</li>" +
+      "<li>ATLAS: send lineage to apache atlas</li>" +
       "</ul>")
     .version("1.8.0")
     .stringConf
diff --git 
a/extensions/spark/kyuubi-spark-lineage/src/test/resources/atlas-application.properties
 
b/extensions/spark/kyuubi-spark-lineage/src/test/resources/atlas-application.properties
new file mode 100644
index 000000000..e6dc52f98
--- /dev/null
+++ 
b/extensions/spark/kyuubi-spark-lineage/src/test/resources/atlas-application.properties
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+atlas.cluster.name=test
diff --git 
a/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/AtlasLineageDispatcherSuite.scala
 
b/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/AtlasLineageDispatcherSuite.scala
new file mode 100644
index 000000000..cb98c52ef
--- /dev/null
+++ 
b/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/AtlasLineageDispatcherSuite.scala
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.plugin.lineage.dispatcher.atlas
+
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.immutable.List
+
+import org.apache.atlas.model.instance.{AtlasEntity, AtlasObjectId}
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.SparkConf
+import org.apache.spark.kyuubi.lineage.LineageConf.{DISPATCHERS, 
SKIP_PARSING_PERMANENT_VIEW_ENABLED}
+import org.apache.spark.kyuubi.lineage.SparkContextHelper
+import org.apache.spark.sql.SparkListenerExtensionTest
+import org.scalatest.concurrent.PatienceConfiguration.Timeout
+import org.scalatest.time.SpanSugar._
+
+import org.apache.kyuubi.KyuubiFunSuite
+import org.apache.kyuubi.plugin.lineage.Lineage
+import 
org.apache.kyuubi.plugin.lineage.dispatcher.atlas.AtlasEntityHelper.{COLUMN_LINEAGE_TYPE,
 PROCESS_TYPE}
+import 
org.apache.kyuubi.plugin.lineage.helper.SparkListenerHelper.isSparkVersionAtMost
+
+class AtlasLineageDispatcherSuite extends KyuubiFunSuite with 
SparkListenerExtensionTest {
+  val catalogName =
+    if (isSparkVersionAtMost("3.1")) 
"org.apache.spark.sql.connector.InMemoryTableCatalog"
+    else "org.apache.spark.sql.connector.catalog.InMemoryTableCatalog"
+
+  override protected val catalogImpl: String = "hive"
+
+  override def sparkConf(): SparkConf = {
+    super.sparkConf()
+      .set("spark.sql.catalog.v2_catalog", catalogName)
+      .set(
+        "spark.sql.queryExecutionListeners",
+        
"org.apache.kyuubi.plugin.lineage.SparkOperationLineageQueryExecutionListener")
+      .set(DISPATCHERS.key, "ATLAS")
+      .set(SKIP_PARSING_PERMANENT_VIEW_ENABLED.key, "true")
+  }
+
+  override def afterAll(): Unit = {
+    spark.stop()
+    super.afterAll()
+  }
+
+  test("altas lineage capture: insert into select sql") {
+    val mockAtlasClient = new MockAtlasClient()
+    AtlasClient.setClient(mockAtlasClient)
+
+    withTable("test_table0") { _ =>
+      spark.sql("create table test_table0(a string, b int, c int)")
+      spark.sql("create table test_table1(a string, d int)")
+      spark.sql("insert into test_table1 select a, b + c as d from 
test_table0").collect()
+      val expected = Lineage(
+        List("default.test_table0"),
+        List("default.test_table1"),
+        List(
+          ("default.test_table1.a", Set("default.test_table0.a")),
+          ("default.test_table1.d", Set("default.test_table0.b", 
"default.test_table0.c"))))
+      eventually(Timeout(5.seconds)) {
+        assert(mockAtlasClient.getEntities != null && 
mockAtlasClient.getEntities.nonEmpty)
+      }
+      checkAtlasProcessEntity(mockAtlasClient.getEntities.head, expected)
+      checkAtlasColumnLineageEntities(
+        mockAtlasClient.getEntities.head,
+        mockAtlasClient.getEntities.tail,
+        expected)
+    }
+
+  }
+
+  def checkAtlasProcessEntity(entity: AtlasEntity, expected: Lineage): Unit = {
+    assert(entity.getTypeName == PROCESS_TYPE)
+
+    val appId = SparkContextHelper.globalSparkContext.applicationId
+    assert(entity.getAttribute("qualifiedName") == appId)
+    assert(entity.getAttribute("name")
+      == s"${SparkContextHelper.globalSparkContext.appName} $appId")
+    
assert(StringUtils.isNotBlank(entity.getAttribute("currUser").asInstanceOf[String]))
+    assert(entity.getAttribute("executionId") != null)
+    
assert(StringUtils.isNotBlank(entity.getAttribute("details").asInstanceOf[String]))
+    
assert(StringUtils.isNotBlank(entity.getAttribute("sparkPlanDescription").asInstanceOf[String]))
+
+    val inputs = entity.getRelationshipAttribute("inputs")
+      
.asInstanceOf[util.Collection[AtlasObjectId]].asScala.map(getQualifiedName)
+    val outputs = entity.getRelationshipAttribute("outputs")
+      
.asInstanceOf[util.Collection[AtlasObjectId]].asScala.map(getQualifiedName)
+    assertResult(expected.inputTables.map(s => s"$s@$cluster"))(inputs)
+    assertResult(expected.outputTables.map(s => s"$s@$cluster"))(outputs)
+  }
+
+  def checkAtlasColumnLineageEntities(
+      processEntity: AtlasEntity,
+      entities: Seq[AtlasEntity],
+      expected: Lineage): Unit = {
+    assert(entities.size == expected.columnLineage.size)
+
+    entities.zip(expected.columnLineage).foreach {
+      case (entity, expectedLineage) =>
+        assert(entity.getTypeName == COLUMN_LINEAGE_TYPE)
+        val expectedQualifiedName =
+          
s"${processEntity.getAttribute("qualifiedName")}:${expectedLineage.column}"
+        assert(entity.getAttribute("qualifiedName") == expectedQualifiedName)
+        assert(entity.getAttribute("name") == expectedQualifiedName)
+
+        val inputs = entity.getRelationshipAttribute("inputs")
+          
.asInstanceOf[util.Collection[AtlasObjectId]].asScala.map(getQualifiedName)
+        assertResult(expectedLineage.originalColumns.map(s => 
s"$s@$cluster"))(inputs.toSet)
+
+        val outputs = entity.getRelationshipAttribute("outputs")
+          
.asInstanceOf[util.Collection[AtlasObjectId]].asScala.map(getQualifiedName)
+        assert(outputs.size == 1)
+        assert(s"${expectedLineage.column}@$cluster" == outputs.head)
+
+        
assert(getQualifiedName(entity.getRelationshipAttribute("process").asInstanceOf[
+          AtlasObjectId]) == processEntity.getAttribute("qualifiedName"))
+    }
+  }
+
+  // Pre-set cluster name for testing in 
`test/resources/atlas-application.properties`
+  private val cluster = "test"
+
+  def getQualifiedName(objId: AtlasObjectId): String = {
+    objId.getUniqueAttributes.get("qualifiedName").asInstanceOf[String]
+  }
+
+  class MockAtlasClient() extends AtlasClient {
+    private var _entities: Seq[AtlasEntity] = _
+
+    override def send(entities: Seq[AtlasEntity]): Unit = {
+      _entities = entities
+    }
+
+    def getEntities: Seq[AtlasEntity] = _entities
+
+    override def close(): Unit = {}
+  }
+}
diff --git a/pom.xml b/pom.xml
index 3b08ef622..515c15af9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -123,6 +123,7 @@
         <antlr4.version>4.9.3</antlr4.version>
         <antlr.st4.version>4.3.4</antlr.st4.version>
         
<apache.archive.dist>https://archive.apache.org/dist</apache.archive.dist>
+        <atlas.version>2.3.0</atlas.version>
         <bouncycastle.version>1.67</bouncycastle.version>
         <codahale.metrics.version>4.2.8</codahale.metrics.version>
         <commons-cli.version>1.5.0</commons-cli.version>


Reply via email to