This is an automated email from the ASF dual-hosted git repository.
radhikakundam pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new cd1c3d026 ATLAS-4746: hive_process and hive_process_execution
(lineage) being generated for simple DML UPDATE queries run via hive
cd1c3d026 is described below
commit cd1c3d02694fe60a749f0597580dd43808e3961f
Author: radhikakundam <[email protected]>
AuthorDate: Wed May 17 10:40:21 2023 -0700
ATLAS-4746: hive_process and hive_process_execution (lineage) being
generated for simple DML UPDATE queries run via hive
Signed-off-by: radhikakundam <[email protected]>
(cherry picked from commit 1443f25a46784dfa63892daa74744b19b5c2a71e)
---
.../atlas/hive/hook/events/CreateHiveProcess.java | 70 +++++++++++++++-------
1 file changed, 47 insertions(+), 23 deletions(-)
diff --git
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java
index bc2c91a25..fed4ece41 100644
---
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java
+++
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java
@@ -41,6 +41,7 @@ import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -72,28 +73,21 @@ public class CreateHiveProcess extends BaseHiveEvent {
if (!skipProcess()) {
List<AtlasEntity> inputs = new ArrayList<>();
List<AtlasEntity> outputs = new ArrayList<>();
- Set<String> processedInputNames = new HashSet<>();
- Set<String> processedOutputNames = new HashSet<>();
+ Set<String> processedNames = new HashSet<>();
ret = new AtlasEntitiesWithExtInfo();
+ Map<String, Entity> inputByQualifiedName = new HashMap<>();
+ Map<String, Entity> outputByQualifiedName = new HashMap<>();
+
if (getInputs() != null) {
for (ReadEntity input : getInputs()) {
String qualifiedName = getQualifiedName(input);
- if (qualifiedName == null ||
!processedInputNames.add(qualifiedName)) {
+ if (qualifiedName == null) {
continue;
}
-
- AtlasEntity entity = getInputOutputEntity(input, ret,
skipTempTables);
-
- if (!input.isDirect()) {
- continue;
- }
-
- if (entity != null) {
- inputs.add(entity);
- }
+ inputByQualifiedName.put(qualifiedName, input);
}
}
@@ -101,27 +95,53 @@ public class CreateHiveProcess extends BaseHiveEvent {
for (WriteEntity output : getOutputs()) {
String qualifiedName = getQualifiedName(output);
- if (qualifiedName == null ||
!processedOutputNames.add(qualifiedName)) {
+ if (qualifiedName == null) {
continue;
}
+ outputByQualifiedName.put(qualifiedName, output);
+ }
+ }
- AtlasEntity entity = getInputOutputEntity(output, ret,
skipTempTables);
+ for (String outputQualifiedName : outputByQualifiedName.keySet()) {
+ WriteEntity output = (WriteEntity)
outputByQualifiedName.get(outputQualifiedName);
+ AtlasEntity entity = getInputOutputEntity(output, ret,
skipTempTables);
- if (entity != null) {
- outputs.add(entity);
- }
+ if (checkIfOnlySelfLineagePossible(outputQualifiedName,
inputByQualifiedName) || !processedNames.add(outputQualifiedName)) {
+ continue;
+ }
- if (isDdlOperation(entity)) {
+ if (entity != null) {
+ outputs.add(entity);
+ }
- AtlasEntity ddlEntity = createHiveDDLEntity(entity);
+ if (isDdlOperation(entity)) {
- if (ddlEntity != null) {
- ret.addEntity(ddlEntity);
- }
+ AtlasEntity ddlEntity = createHiveDDLEntity(entity);
+
+ if (ddlEntity != null) {
+ ret.addEntity(ddlEntity);
}
}
}
+ for (String inputQualifiedName : inputByQualifiedName.keySet()) {
+ ReadEntity input = (ReadEntity)
inputByQualifiedName.get(inputQualifiedName);
+
+ if (!processedNames.add(inputQualifiedName)) {
+ continue;
+ }
+
+ AtlasEntity entity = getInputOutputEntity(input, ret,
skipTempTables);
+
+ if (!input.isDirect()) {
+ continue;
+ }
+
+ if (entity != null) {
+ inputs.add(entity);
+ }
+ }
+
boolean skipProcess = inputs.isEmpty() && outputs.isEmpty();
if (!skipProcess) {
@@ -151,6 +171,10 @@ public class CreateHiveProcess extends BaseHiveEvent {
return ret;
}
+ private boolean checkIfOnlySelfLineagePossible(String outputQualifiedName,
Map<String, Entity> inputByQualifiedName) {
+ return inputByQualifiedName.size() == 1 &&
inputByQualifiedName.containsKey(outputQualifiedName);
+ }
+
private void processColumnLineage(AtlasEntity hiveProcess,
AtlasEntitiesWithExtInfo entities) {
LineageInfo lineageInfo = getLineageInfo();