This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 408862af7 [KYUUBI #4814] Introduce Apache Atlas hook support in
lineage plugin
408862af7 is described below
commit 408862af72db02cd7ae2074c19f24172a24d7db7
Author: wforget <[email protected]>
AuthorDate: Tue Jun 6 17:47:19 2023 +0800
[KYUUBI #4814] Introduce Apache Atlas hook support in lineage plugin
### _Why are the changes needed?_
Implements AtlasLineageDispatcher to send lineage to Apache Atlas.
close #4814
Atlas Spark Model Definition:
https://github.com/apache/atlas/blob/master/addons/models/1000-Hadoop/1100-spark_model.json
spark process:

table lineage:

column lineage:

### _How was this patch tested?_
- [X] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [X] Add screenshots for manual tests if appropriate
- [ ] [Run
test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #4815 from wForget/KYUUBI-4814.
Closes #4814
3df8a7ec9 [wforget] comments
c58eae7c5 [wforget] comments
926bcf211 [wforget] comment
e0b4067c3 [wforget] comment
e4cc3e3f8 [wforget] comments
adc72b96f [Bowen Liang] Update
extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/AtlasEntityHelper.scala
e3bdd1c65 [Bowen Liang] Update
extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/AtlasEntityHelper.scala
baf1711ac [Bowen Liang] Update
extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/AtlasLineageDispatcherSuite.scala
61e79f3d5 [Bowen Liang] Update
extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/AtlasLineageDispatcherSuite.scala
541df3780 [Bowen Liang] Update
extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/AtlasLineageDispatcherSuite.scala
5dd310657 [wforget] fix
cea1e137d [wforget] fix
f028d4b09 [wforget] fix
0c9b4516b [wforget] fix
6f8113032 [wforget] add close atlas client shutdown hook
3f4d2a7db [wforget] add remote user
a0db58afc [wforget] comments
6dd3c66df [wforget] comments
f2b2a30dc [wforget] style
83eb1e481 [wforget] add atlas.column.lineage.enable configuration
0719a2b65 [wforget] doc
05f936005 [wforget] fix
d169b661d [wforget] fix
6da80d742 [wforget] fix
820ae5c5f [wforget] column lineages
dabe8173e [wforget] license
f22e044d2 [wforget] test
b948bce90 [wforget] fix and add test
0aef1be6b [wforget] fix
368b5ab3f [wforget] [KYUUBI-4814] Implements AtlasLineageDispatcher to send
lineage to Apache Atlas
Lead-authored-by: wforget <[email protected]>
Co-authored-by: Bowen Liang <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
docs/extensions/engines/spark/lineage.md | 22 +++
extensions/spark/kyuubi-spark-lineage/pom.xml | 86 ++++++++++-
.../kyuubi/plugin/lineage/LineageDispatcher.scala | 2 +
.../plugin/lineage/LineageDispatcherType.scala | 2 +-
.../lineage/dispatcher/atlas/AtlasClient.scala | 96 +++++++++++++
.../lineage/dispatcher/atlas/AtlasClientConf.scala | 57 ++++++++
.../dispatcher/atlas/AtlasEntityHelper.scala | 158 +++++++++++++++++++++
.../dispatcher/atlas/AtlasLineageDispatcher.scala | 49 +++++++
.../atlas/ConfigEntry.scala} | 8 +-
.../lineage/helper/SparkListenerHelper.scala | 9 ++
.../apache/spark/kyuubi/lineage/LineageConf.scala | 1 +
.../test/resources/atlas-application.properties | 18 +++
.../atlas/AtlasLineageDispatcherSuite.scala | 153 ++++++++++++++++++++
pom.xml | 1 +
14 files changed, 648 insertions(+), 14 deletions(-)
diff --git a/docs/extensions/engines/spark/lineage.md
b/docs/extensions/engines/spark/lineage.md
index 665929e9f..01acd884d 100644
--- a/docs/extensions/engines/spark/lineage.md
+++ b/docs/extensions/engines/spark/lineage.md
@@ -185,6 +185,7 @@ The lineage dispatchers are used to dispatch lineage
events, configured via `spa
<ul>
<li>SPARK_EVENT (by default): send lineage event to spark event bus</li>
<li>KYUUBI_EVENT: send lineage event to kyuubi event bus</li>
+ <li>ATLAS: send lineage to apache atlas</li>
</ul>
#### Get Lineage Events from SparkListener
@@ -207,3 +208,24 @@ spark.sparkContext.addSparkListener(new SparkListener {
#### Get Lineage Events from Kyuubi EventHandler
When using the `KYUUBI_EVENT` dispatcher, the lineage events will be sent to
the Kyuubi `EventBus`. Refer to [Kyuubi Event Handler](../../server/events) to
handle kyuubi events.
+
+#### Ingest Lineage Entities to Apache Atlas
+
+The lineage entities can be ingested into [Apache
Atlas](https://atlas.apache.org/) using the `ATLAS` dispatcher.
+
+Extra works:
+
++ The least transitive dependencies needed, which are under
`./extensions/spark/kyuubi-spark-lineage/target/scala-${scala.binary.version}/jars`
++ Use `spark.files` to specify the `atlas-application.properties`
configuration file for Atlas
+
+Atlas Client configurations (Configure in `atlas-application.properties` or
passed in `spark.atlas.` prefix):
+
+| Name | Default Value |
Description | Since |
+|-----------------------------------------|------------------------|-------------------------------------------------------|-------|
+| atlas.rest.address | http://localhost:21000 | The rest
endpoint url for the Atlas server | 1.8.0 |
+| atlas.client.type | rest | The
client type (currently only supports rest) | 1.8.0 |
+| atlas.client.username | none | The
client username | 1.8.0 |
+| atlas.client.password | none | The
client password | 1.8.0 |
+| atlas.cluster.name | primary | The
cluster name to use in qualifiedName of entities. | 1.8.0 |
+| atlas.hook.spark.column.lineage.enabled | true | Whether
to ingest column lineages to Atlas. | 1.8.0 |
+
diff --git a/extensions/spark/kyuubi-spark-lineage/pom.xml
b/extensions/spark/kyuubi-spark-lineage/pom.xml
index 2a7ba7773..760b0cc08 100644
--- a/extensions/spark/kyuubi-spark-lineage/pom.xml
+++ b/extensions/spark/kyuubi-spark-lineage/pom.xml
@@ -59,7 +59,85 @@
<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
- <scope>test</scope>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>commons-lang</groupId>
+ <artifactId>commons-lang</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.atlas</groupId>
+ <artifactId>atlas-client-v2</artifactId>
+ <version>${atlas.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>jul-to-slf4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-context</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-text</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
@@ -89,12 +167,6 @@
<artifactId>spark-hive_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>
-
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- <scope>test</scope>
- </dependency>
</dependencies>
<build>
diff --git
a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/LineageDispatcher.scala
b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/LineageDispatcher.scala
index 8f5dc0d9e..b993f1428 100644
---
a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/LineageDispatcher.scala
+++
b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/LineageDispatcher.scala
@@ -20,6 +20,7 @@ package org.apache.kyuubi.plugin.lineage
import org.apache.spark.sql.execution.QueryExecution
import org.apache.kyuubi.plugin.lineage.dispatcher.{KyuubiEventDispatcher,
SparkEventDispatcher}
+import org.apache.kyuubi.plugin.lineage.dispatcher.atlas.AtlasLineageDispatcher
trait LineageDispatcher {
@@ -35,6 +36,7 @@ object LineageDispatcher {
LineageDispatcherType.withName(dispatcherType) match {
case LineageDispatcherType.SPARK_EVENT => new SparkEventDispatcher()
case LineageDispatcherType.KYUUBI_EVENT => new KyuubiEventDispatcher()
+ case LineageDispatcherType.ATLAS => new AtlasLineageDispatcher()
case _ => throw new UnsupportedOperationException(
s"Unsupported lineage dispatcher: $dispatcherType.")
}
diff --git
a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/LineageDispatcherType.scala
b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/LineageDispatcherType.scala
index d6afea152..8e07f6d77 100644
---
a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/LineageDispatcherType.scala
+++
b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/LineageDispatcherType.scala
@@ -20,5 +20,5 @@ package org.apache.kyuubi.plugin.lineage
object LineageDispatcherType extends Enumeration {
type LineageDispatcherType = Value
- val SPARK_EVENT, KYUUBI_EVENT = Value
+ val SPARK_EVENT, KYUUBI_EVENT, ATLAS = Value
}
diff --git
a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/AtlasClient.scala
b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/AtlasClient.scala
new file mode 100644
index 000000000..15b127182
--- /dev/null
+++
b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/AtlasClient.scala
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.plugin.lineage.dispatcher.atlas
+
+import java.util.Locale
+
+import com.google.common.annotations.VisibleForTesting
+import org.apache.atlas.AtlasClientV2
+import org.apache.atlas.model.instance.AtlasEntity
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo
+import org.apache.commons.lang3.StringUtils
+import org.apache.hadoop.util.ShutdownHookManager
+
+import org.apache.kyuubi.plugin.lineage.dispatcher.atlas.AtlasClientConf._
+
+trait AtlasClient extends AutoCloseable {
+ def send(entities: Seq[AtlasEntity]): Unit
+}
+
+class AtlasRestClient(conf: AtlasClientConf) extends AtlasClient {
+
+ private val atlasClient: AtlasClientV2 = {
+ val serverUrl = conf.get(ATLAS_REST_ENDPOINT).split(",")
+ val username = conf.get(CLIENT_USERNAME)
+ val password = conf.get(CLIENT_PASSWORD)
+ if (StringUtils.isNoneBlank(username, password)) {
+ new AtlasClientV2(serverUrl, Array(username, password))
+ } else {
+ new AtlasClientV2(serverUrl: _*)
+ }
+ }
+
+ override def send(entities: Seq[AtlasEntity]): Unit = {
+ val entitiesWithExtInfo = new AtlasEntitiesWithExtInfo()
+ entities.foreach(entitiesWithExtInfo.addEntity)
+ atlasClient.createEntities(entitiesWithExtInfo)
+ }
+
+ override def close(): Unit = {
+ if (atlasClient != null) {
+ atlasClient.close()
+ }
+ }
+}
+
+object AtlasClient {
+
+ @volatile private var client: AtlasClient = _
+
+ def getClient(): AtlasClient = {
+ if (client == null) {
+ AtlasClient.synchronized {
+ if (client == null) {
+ val clientConf = AtlasClientConf.getConf()
+ client = clientConf.get(CLIENT_TYPE).toLowerCase(Locale.ROOT) match {
+ case "rest" => new AtlasRestClient(clientConf)
+ case unknown => throw new RuntimeException(s"Unsupported client
type: $unknown.")
+ }
+ registerCleanupShutdownHook(client)
+ }
+ }
+ }
+ client
+ }
+
+ private def registerCleanupShutdownHook(client: AtlasClient): Unit = {
+ ShutdownHookManager.get.addShutdownHook(
+ () => {
+ if (client != null) {
+ client.close()
+ }
+ },
+ Integer.MAX_VALUE)
+ }
+
+ @VisibleForTesting
+ private[dispatcher] def setClient(newClient: AtlasClient): Unit = {
+ client = newClient
+ }
+
+}
diff --git
a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/AtlasClientConf.scala
b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/AtlasClientConf.scala
new file mode 100644
index 000000000..03b1a83e0
--- /dev/null
+++
b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/AtlasClientConf.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.plugin.lineage.dispatcher.atlas
+
+import org.apache.atlas.ApplicationProperties
+import org.apache.commons.configuration.Configuration
+import org.apache.spark.kyuubi.lineage.SparkContextHelper
+
+class AtlasClientConf(configuration: Configuration) {
+
+ def get(entry: ConfigEntry): String = {
+ configuration.getProperty(entry.key) match {
+ case s: String => s
+ case l: List[_] => l.mkString(",")
+ case o if o != null => o.toString
+ case _ => entry.defaultValue
+ }
+ }
+
+}
+
+object AtlasClientConf {
+
+ private lazy val clientConf: AtlasClientConf = {
+ val conf = ApplicationProperties.get()
+
SparkContextHelper.globalSparkContext.getConf.getAllWithPrefix("spark.atlas.")
+ .foreach { case (k, v) => conf.setProperty(s"atlas.$k", v) }
+ new AtlasClientConf(conf)
+ }
+
+ def getConf(): AtlasClientConf = clientConf
+
+ val ATLAS_REST_ENDPOINT = ConfigEntry("atlas.rest.address",
"http://localhost:21000")
+
+ val CLIENT_TYPE = ConfigEntry("atlas.client.type", "rest")
+ val CLIENT_USERNAME = ConfigEntry("atlas.client.username", null)
+ val CLIENT_PASSWORD = ConfigEntry("atlas.client.password", null)
+
+ val CLUSTER_NAME = ConfigEntry("atlas.cluster.name", "primary")
+
+ val COLUMN_LINEAGE_ENABLED =
ConfigEntry("atlas.hook.spark.column.lineage.enabled", "true")
+}
diff --git
a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/AtlasEntityHelper.scala
b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/AtlasEntityHelper.scala
new file mode 100644
index 000000000..9575b5258
--- /dev/null
+++
b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/AtlasEntityHelper.scala
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.plugin.lineage.dispatcher.atlas
+
+import scala.collection.JavaConverters._
+
+import org.apache.atlas.model.instance.{AtlasEntity, AtlasObjectId,
AtlasRelatedObjectId}
+import org.apache.spark.kyuubi.lineage.SparkContextHelper
+import org.apache.spark.sql.execution.QueryExecution
+
+import org.apache.kyuubi.plugin.lineage.Lineage
+import org.apache.kyuubi.plugin.lineage.helper.SparkListenerHelper
+
+/**
+ * The helpers for Atlas spark entities from Lineage.
+ * The Atlas spark models refer to :
+ *
https://github.com/apache/atlas/blob/master/addons/models/1000-Hadoop/1100-spark_model.json
+ */
+object AtlasEntityHelper {
+
+ /**
+ * Generate `spark_process` Atlas Entity from Lineage.
+ * @param qe
+ * @param lineage
+ * @return
+ */
+ def processEntity(qe: QueryExecution, lineage: Lineage): AtlasEntity = {
+ val entity = new AtlasEntity(PROCESS_TYPE)
+
+ val appId = SparkContextHelper.globalSparkContext.applicationId
+ val appName = SparkContextHelper.globalSparkContext.appName match {
+ case "Spark shell" => s"Spark Job $appId"
+ case default => s"$default $appId"
+ }
+
+ entity.setAttribute("qualifiedName", appId)
+ entity.setAttribute("name", appName)
+ entity.setAttribute("currUser", SparkListenerHelper.currentUser)
+ SparkListenerHelper.sessionUser.foreach(entity.setAttribute("remoteUser",
_))
+ entity.setAttribute("executionId", qe.id)
+ entity.setAttribute("details", qe.toString())
+ entity.setAttribute("sparkPlanDescription", qe.sparkPlan.toString())
+
+ // TODO add entity type instead of parsing from string
+ val inputs = lineage.inputTables.flatMap(tableObjectId).map { objId =>
+ relatedObjectId(objId, RELATIONSHIP_DATASET_PROCESS_INPUTS)
+ }
+ val outputs = lineage.outputTables.flatMap(tableObjectId).map { objId =>
+ relatedObjectId(objId, RELATIONSHIP_PROCESS_DATASET_OUTPUTS)
+ }
+
+ entity.setRelationshipAttribute("inputs", inputs.asJava)
+ entity.setRelationshipAttribute("outputs", outputs.asJava)
+
+ entity
+ }
+
+ /**
+ * Generate `spark_column_lineage` Atlas Entity from Lineage.
+ * @param processEntity
+ * @param lineage
+ * @return
+ */
+ def columnLineageEntities(processEntity: AtlasEntity, lineage: Lineage):
Seq[AtlasEntity] = {
+ lineage.columnLineage.flatMap(columnLineage => {
+ val inputs = columnLineage.originalColumns.flatMap(columnObjectId).map {
objId =>
+ relatedObjectId(objId, RELATIONSHIP_DATASET_PROCESS_INPUTS)
+ }
+ val outputs = Option(columnLineage.column).flatMap(columnObjectId).map {
objId =>
+ relatedObjectId(objId, RELATIONSHIP_PROCESS_DATASET_OUTPUTS)
+ }.toSeq
+
+ if (inputs.nonEmpty && outputs.nonEmpty) {
+ val entity = new AtlasEntity(COLUMN_LINEAGE_TYPE)
+ val qualifiedName =
+
s"${processEntity.getAttribute("qualifiedName")}:${columnLineage.column}"
+ entity.setAttribute("qualifiedName", qualifiedName)
+ entity.setAttribute("name", qualifiedName)
+ entity.setRelationshipAttribute("inputs", inputs.asJava)
+ entity.setRelationshipAttribute("outputs", outputs.asJava)
+ entity.setRelationshipAttribute(
+ "process",
+ relatedObjectId(objectId(processEntity),
RELATIONSHIP_SPARK_PROCESS_COLUMN_LINEAGE))
+ Some(entity)
+ } else {
+ None
+ }
+ })
+ }
+
+ def tableObjectId(tableName: String): Option[AtlasObjectId] = {
+ val dbTb = tableName.split('.')
+ if (dbTb.length == 2) {
+ val qualifiedName = tableQualifiedName(cluster, dbTb(0), dbTb(1))
+ // TODO parse datasource type
+ Some(new AtlasObjectId(HIVE_TABLE_TYPE, "qualifiedName", qualifiedName))
+ } else {
+ None
+ }
+ }
+
+ def tableQualifiedName(cluster: String, db: String, table: String): String =
{
+ s"${db.toLowerCase}.${table.toLowerCase}@$cluster"
+ }
+
+ def columnObjectId(columnName: String): Option[AtlasObjectId] = {
+ val dbTbCol = columnName.split('.')
+ if (dbTbCol.length == 3) {
+ val qualifiedName = columnQualifiedName(cluster, dbTbCol(0), dbTbCol(1),
dbTbCol(2))
+ // TODO parse datasource type
+ Some(new AtlasObjectId(HIVE_COLUMN_TYPE, "qualifiedName", qualifiedName))
+ } else {
+ None
+ }
+ }
+
+ def columnQualifiedName(cluster: String, db: String, table: String, column:
String): String = {
+ s"${db.toLowerCase}.${table.toLowerCase}.${column.toLowerCase}@$cluster"
+ }
+
+ def objectId(entity: AtlasEntity): AtlasObjectId = {
+ val objId = new AtlasObjectId(entity.getGuid, entity.getTypeName)
+ objId.setUniqueAttributes(Map("qualifiedName" ->
entity.getAttribute("qualifiedName")).asJava)
+ objId
+ }
+
+ def relatedObjectId(objectId: AtlasObjectId, relationshipType: String):
AtlasRelatedObjectId = {
+ new AtlasRelatedObjectId(objectId, relationshipType)
+ }
+
+ lazy val cluster =
AtlasClientConf.getConf().get(AtlasClientConf.CLUSTER_NAME)
+ lazy val columnLineageEnabled =
+
AtlasClientConf.getConf().get(AtlasClientConf.COLUMN_LINEAGE_ENABLED).toBoolean
+
+ val HIVE_TABLE_TYPE = "hive_table"
+ val HIVE_COLUMN_TYPE = "hive_column"
+ val PROCESS_TYPE = "spark_process"
+ val COLUMN_LINEAGE_TYPE = "spark_column_lineage"
+ val RELATIONSHIP_DATASET_PROCESS_INPUTS = "dataset_process_inputs"
+ val RELATIONSHIP_PROCESS_DATASET_OUTPUTS = "process_dataset_outputs"
+ val RELATIONSHIP_SPARK_PROCESS_COLUMN_LINEAGE =
"spark_process_column_lineages"
+
+}
diff --git
a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/AtlasLineageDispatcher.scala
b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/AtlasLineageDispatcher.scala
new file mode 100644
index 000000000..c66b51107
--- /dev/null
+++
b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/AtlasLineageDispatcher.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.plugin.lineage.dispatcher.atlas
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.execution.QueryExecution
+
+import org.apache.kyuubi.plugin.lineage.{Lineage, LineageDispatcher}
+import
org.apache.kyuubi.plugin.lineage.dispatcher.atlas.AtlasEntityHelper.columnLineageEnabled
+
+class AtlasLineageDispatcher extends LineageDispatcher with Logging {
+
+ override def send(qe: QueryExecution, lineageOpt: Option[Lineage]): Unit = {
+ try {
+ lineageOpt.filter(l => l.inputTables.nonEmpty ||
l.outputTables.nonEmpty).foreach(lineage => {
+ val processEntity = AtlasEntityHelper.processEntity(qe, lineage)
+ val columnLineageEntities = if (lineage.columnLineage.nonEmpty &&
columnLineageEnabled) {
+ AtlasEntityHelper.columnLineageEntities(processEntity, lineage)
+ } else {
+ Seq.empty
+ }
+ AtlasClient.getClient().send(processEntity +: columnLineageEntities)
+ })
+ } catch {
+ case t: Throwable =>
+ logWarning("Send lineage to atlas failed.", t)
+ }
+ }
+
+ override def onFailure(qe: QueryExecution, exception: Exception): Unit = {
+ // ignore
+ }
+
+}
diff --git
a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/LineageDispatcherType.scala
b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/ConfigEntry.scala
similarity index 82%
copy from
extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/LineageDispatcherType.scala
copy to
extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/ConfigEntry.scala
index d6afea152..3f9d9831d 100644
---
a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/LineageDispatcherType.scala
+++
b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/ConfigEntry.scala
@@ -15,10 +15,6 @@
* limitations under the License.
*/
-package org.apache.kyuubi.plugin.lineage
+package org.apache.kyuubi.plugin.lineage.dispatcher.atlas
-object LineageDispatcherType extends Enumeration {
- type LineageDispatcherType = Value
-
- val SPARK_EVENT, KYUUBI_EVENT = Value
-}
+case class ConfigEntry(key: String, defaultValue: String)
diff --git
a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkListenerHelper.scala
b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkListenerHelper.scala
index 57db55a1e..a1747493e 100644
---
a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkListenerHelper.scala
+++
b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkListenerHelper.scala
@@ -17,7 +17,9 @@
package org.apache.kyuubi.plugin.lineage.helper
+import org.apache.hadoop.security.UserGroupInformation
import org.apache.spark.SPARK_VERSION
+import org.apache.spark.kyuubi.lineage.SparkContextHelper
import org.apache.kyuubi.util.SemanticVersion
@@ -40,4 +42,11 @@ object SparkListenerHelper {
def isSparkVersionEqualTo(targetVersionString: String): Boolean = {
SemanticVersion(SPARK_VERSION).isVersionEqualTo(targetVersionString)
}
+
+ def currentUser: String =
UserGroupInformation.getCurrentUser.getShortUserName
+
+ def sessionUser: Option[String] =
+
Option(SparkContextHelper.globalSparkContext.getLocalProperty(KYUUBI_SESSION_USER))
+
+ final val KYUUBI_SESSION_USER = "kyuubi.session.user"
}
diff --git
a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/spark/kyuubi/lineage/LineageConf.scala
b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/spark/kyuubi/lineage/LineageConf.scala
index 6fb5399c0..5b7d3dfe1 100644
---
a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/spark/kyuubi/lineage/LineageConf.scala
+++
b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/spark/kyuubi/lineage/LineageConf.scala
@@ -35,6 +35,7 @@ object LineageConf {
"`org.apache.kyuubi.plugin.lineage.LineageDispatcher` for dispatching
lineage events.<ul>" +
"<li>SPARK_EVENT: send lineage event to spark event bus</li>" +
"<li>KYUUBI_EVENT: send lineage event to kyuubi event bus</li>" +
+ "<li>ATLAS: send lineage to apache atlas</li>" +
"</ul>")
.version("1.8.0")
.stringConf
diff --git
a/extensions/spark/kyuubi-spark-lineage/src/test/resources/atlas-application.properties
b/extensions/spark/kyuubi-spark-lineage/src/test/resources/atlas-application.properties
new file mode 100644
index 000000000..e6dc52f98
--- /dev/null
+++
b/extensions/spark/kyuubi-spark-lineage/src/test/resources/atlas-application.properties
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+atlas.cluster.name=test
diff --git
a/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/AtlasLineageDispatcherSuite.scala
b/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/AtlasLineageDispatcherSuite.scala
new file mode 100644
index 000000000..cb98c52ef
--- /dev/null
+++
b/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/dispatcher/atlas/AtlasLineageDispatcherSuite.scala
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.plugin.lineage.dispatcher.atlas
+
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.immutable.List
+
+import org.apache.atlas.model.instance.{AtlasEntity, AtlasObjectId}
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.SparkConf
+import org.apache.spark.kyuubi.lineage.LineageConf.{DISPATCHERS,
SKIP_PARSING_PERMANENT_VIEW_ENABLED}
+import org.apache.spark.kyuubi.lineage.SparkContextHelper
+import org.apache.spark.sql.SparkListenerExtensionTest
+import org.scalatest.concurrent.PatienceConfiguration.Timeout
+import org.scalatest.time.SpanSugar._
+
+import org.apache.kyuubi.KyuubiFunSuite
+import org.apache.kyuubi.plugin.lineage.Lineage
+import
org.apache.kyuubi.plugin.lineage.dispatcher.atlas.AtlasEntityHelper.{COLUMN_LINEAGE_TYPE,
PROCESS_TYPE}
+import
org.apache.kyuubi.plugin.lineage.helper.SparkListenerHelper.isSparkVersionAtMost
+
+class AtlasLineageDispatcherSuite extends KyuubiFunSuite with
SparkListenerExtensionTest {
+ val catalogName =
+ if (isSparkVersionAtMost("3.1"))
"org.apache.spark.sql.connector.InMemoryTableCatalog"
+ else "org.apache.spark.sql.connector.catalog.InMemoryTableCatalog"
+
+ override protected val catalogImpl: String = "hive"
+
+ override def sparkConf(): SparkConf = {
+ super.sparkConf()
+ .set("spark.sql.catalog.v2_catalog", catalogName)
+ .set(
+ "spark.sql.queryExecutionListeners",
+
"org.apache.kyuubi.plugin.lineage.SparkOperationLineageQueryExecutionListener")
+ .set(DISPATCHERS.key, "ATLAS")
+ .set(SKIP_PARSING_PERMANENT_VIEW_ENABLED.key, "true")
+ }
+
+ override def afterAll(): Unit = {
+ spark.stop()
+ super.afterAll()
+ }
+
+ test("altas lineage capture: insert into select sql") {
+ val mockAtlasClient = new MockAtlasClient()
+ AtlasClient.setClient(mockAtlasClient)
+
+ withTable("test_table0") { _ =>
+ spark.sql("create table test_table0(a string, b int, c int)")
+ spark.sql("create table test_table1(a string, d int)")
+ spark.sql("insert into test_table1 select a, b + c as d from
test_table0").collect()
+ val expected = Lineage(
+ List("default.test_table0"),
+ List("default.test_table1"),
+ List(
+ ("default.test_table1.a", Set("default.test_table0.a")),
+ ("default.test_table1.d", Set("default.test_table0.b",
"default.test_table0.c"))))
+ eventually(Timeout(5.seconds)) {
+ assert(mockAtlasClient.getEntities != null &&
mockAtlasClient.getEntities.nonEmpty)
+ }
+ checkAtlasProcessEntity(mockAtlasClient.getEntities.head, expected)
+ checkAtlasColumnLineageEntities(
+ mockAtlasClient.getEntities.head,
+ mockAtlasClient.getEntities.tail,
+ expected)
+ }
+
+ }
+
+ def checkAtlasProcessEntity(entity: AtlasEntity, expected: Lineage): Unit = {
+ assert(entity.getTypeName == PROCESS_TYPE)
+
+ val appId = SparkContextHelper.globalSparkContext.applicationId
+ assert(entity.getAttribute("qualifiedName") == appId)
+ assert(entity.getAttribute("name")
+ == s"${SparkContextHelper.globalSparkContext.appName} $appId")
+
assert(StringUtils.isNotBlank(entity.getAttribute("currUser").asInstanceOf[String]))
+ assert(entity.getAttribute("executionId") != null)
+
assert(StringUtils.isNotBlank(entity.getAttribute("details").asInstanceOf[String]))
+
assert(StringUtils.isNotBlank(entity.getAttribute("sparkPlanDescription").asInstanceOf[String]))
+
+ val inputs = entity.getRelationshipAttribute("inputs")
+
.asInstanceOf[util.Collection[AtlasObjectId]].asScala.map(getQualifiedName)
+ val outputs = entity.getRelationshipAttribute("outputs")
+
.asInstanceOf[util.Collection[AtlasObjectId]].asScala.map(getQualifiedName)
+ assertResult(expected.inputTables.map(s => s"$s@$cluster"))(inputs)
+ assertResult(expected.outputTables.map(s => s"$s@$cluster"))(outputs)
+ }
+
+ def checkAtlasColumnLineageEntities(
+ processEntity: AtlasEntity,
+ entities: Seq[AtlasEntity],
+ expected: Lineage): Unit = {
+ assert(entities.size == expected.columnLineage.size)
+
+ entities.zip(expected.columnLineage).foreach {
+ case (entity, expectedLineage) =>
+ assert(entity.getTypeName == COLUMN_LINEAGE_TYPE)
+ val expectedQualifiedName =
+
s"${processEntity.getAttribute("qualifiedName")}:${expectedLineage.column}"
+ assert(entity.getAttribute("qualifiedName") == expectedQualifiedName)
+ assert(entity.getAttribute("name") == expectedQualifiedName)
+
+ val inputs = entity.getRelationshipAttribute("inputs")
+
.asInstanceOf[util.Collection[AtlasObjectId]].asScala.map(getQualifiedName)
+ assertResult(expectedLineage.originalColumns.map(s =>
s"$s@$cluster"))(inputs.toSet)
+
+ val outputs = entity.getRelationshipAttribute("outputs")
+
.asInstanceOf[util.Collection[AtlasObjectId]].asScala.map(getQualifiedName)
+ assert(outputs.size == 1)
+ assert(s"${expectedLineage.column}@$cluster" == outputs.head)
+
+
assert(getQualifiedName(entity.getRelationshipAttribute("process").asInstanceOf[
+ AtlasObjectId]) == processEntity.getAttribute("qualifiedName"))
+ }
+ }
+
+ // Pre-set cluster name for testing in
`test/resources/atlas-application.properties`
+ private val cluster = "test"
+
+ def getQualifiedName(objId: AtlasObjectId): String = {
+ objId.getUniqueAttributes.get("qualifiedName").asInstanceOf[String]
+ }
+
+ class MockAtlasClient() extends AtlasClient {
+ private var _entities: Seq[AtlasEntity] = _
+
+ override def send(entities: Seq[AtlasEntity]): Unit = {
+ _entities = entities
+ }
+
+ def getEntities: Seq[AtlasEntity] = _entities
+
+ override def close(): Unit = {}
+ }
+}
diff --git a/pom.xml b/pom.xml
index 3b08ef622..515c15af9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -123,6 +123,7 @@
<antlr4.version>4.9.3</antlr4.version>
<antlr.st4.version>4.3.4</antlr.st4.version>
<apache.archive.dist>https://archive.apache.org/dist</apache.archive.dist>
+ <atlas.version>2.3.0</atlas.version>
<bouncycastle.version>1.67</bouncycastle.version>
<codahale.metrics.version>4.2.8</codahale.metrics.version>
<commons-cli.version>1.5.0</commons-cli.version>