Repository: atlas
Updated Branches:
  refs/heads/branch-1.0 6fd3805ae -> e21a2c479


ATLAS-2975: updated Hive hook to avoid duplicate column-lineage entities; also 
updated Atlas server to skip duplicate column-lineage entities

(cherry picked from commit ed795dc4c10ef56999ff57fa67739a1e126a2ccb)


Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/e21a2c47
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/e21a2c47
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/e21a2c47

Branch: refs/heads/branch-1.0
Commit: e21a2c4792cd112725242f698513b93a2eb97e46
Parents: 6fd3805
Author: Madhan Neethiraj <mad...@apache.org>
Authored: Mon Nov 19 13:58:11 2018 -0800
Committer: Madhan Neethiraj <mad...@apache.org>
Committed: Mon Nov 19 16:01:53 2018 -0800

----------------------------------------------------------------------
 .../hive/hook/events/CreateHiveProcess.java     | 13 ++++++--
 .../notification/NotificationHookConsumer.java  | 35 ++++++++++++++++----
 2 files changed, 39 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/atlas/blob/e21a2c47/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java
----------------------------------------------------------------------
diff --git 
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java
 
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java
index 6d2517c..2ccfff4 100644
--- 
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java
+++ 
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java
@@ -136,8 +136,9 @@ public class CreateHiveProcess extends BaseHiveEvent {
             return;
         }
 
-        final List<AtlasEntity> columnLineages     = new ArrayList<>();
-        int                     lineageInputsCount = 0;
+        final List<AtlasEntity> columnLineages      = new ArrayList<>();
+        int                     lineageInputsCount  = 0;
+        final Set<String>       processedOutputCols = new HashSet<>();
 
         for (Map.Entry<DependencyKey, Dependency> entry : 
lineageInfo.entrySet()) {
             String      outputColName = getQualifiedName(entry.getKey());
@@ -153,6 +154,14 @@ public class CreateHiveProcess extends BaseHiveEvent {
                 continue;
             }
 
+            if (processedOutputCols.contains(outputColName)) {
+                LOG.warn("column-lineage: duplicate for output-column {}", 
outputColName);
+
+                continue;
+            } else {
+                processedOutputCols.add(outputColName);
+            }
+
             List<AtlasEntity> inputColumns = new ArrayList<>();
 
             for (BaseColumnInfo baseColumn : getBaseCols(entry.getValue())) {

http://git-wip-us.apache.org/repos/asf/atlas/blob/e21a2c47/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
----------------------------------------------------------------------
diff --git 
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
 
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index da1be9e..1cde3d0 100644
--- 
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ 
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -69,8 +69,10 @@ import javax.inject.Inject;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.ListIterator;
+import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -91,6 +93,7 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
     private static final int    SC_BAD_REQUEST = 400;
     private static final String TYPE_HIVE_COLUMN_LINEAGE = 
"hive_column_lineage";
     private static final String ATTRIBUTE_INPUTS         = "inputs";
+    private static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName";
 
     private static final String THREADNAME_PREFIX = 
NotificationHookConsumer.class.getSimpleName();
     private static final String ATLAS_HOOK_TOPIC  = 
AtlasConfiguration.NOTIFICATION_HOOK_TOPIC_NAME.getString();
@@ -688,14 +691,34 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
         }
 
         if (entities != null && entities.getEntities() != null) {
-            int lineageCount       = 0;
-            int lineageInputsCount = 0;
+            int         lineageCount       = 0;
+            int         lineageInputsCount = 0;
+            int         numRemovedEntities = 0;
+            Set<String> lineageQNames      = new HashSet<>();
 
             // find if all hive_column_lineage entities have same number of 
inputs, which is likely to be caused by HIVE-20633 that results in incorrect 
lineage in some cases
             for (ListIterator<AtlasEntity> iter = 
entities.getEntities().listIterator(); iter.hasNext(); ) {
                 AtlasEntity entity = iter.next();
 
                 if (StringUtils.equals(entity.getTypeName(), 
TYPE_HIVE_COLUMN_LINEAGE)) {
+                    final Object qName = 
entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME);
+
+                    if (qName != null) {
+                        final String qualifiedName = qName.toString();
+
+                        if (lineageQNames.contains(qualifiedName)) {
+                            iter.remove();
+
+                            LOG.warn("removed duplicate hive_column_lineage 
entity: qualifiedName={}. topic-offset={}, partition={}", qualifiedName, 
lineageInputsCount, kafkaMessage.getOffset(), kafkaMessage.getPartition());
+
+                            numRemovedEntities++;
+
+                            continue;
+                        } else {
+                            lineageQNames.add(qualifiedName);
+                        }
+                    }
+
                     lineageCount++;
 
                     Object objInputs = entity.getAttribute(ATTRIBUTE_INPUTS);
@@ -711,8 +734,6 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
             float avgInputsCount = lineageCount > 0 ? (((float) 
lineageInputsCount) / lineageCount) : 0;
 
             if (avgInputsCount > 
skipHiveColumnLineageHive20633InputsThreshold) {
-                int numRemovedEntities = 0;
-
                 for (ListIterator<AtlasEntity> iter = 
entities.getEntities().listIterator(); iter.hasNext(); ) {
                     AtlasEntity entity = iter.next();
 
@@ -722,10 +743,10 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
                         numRemovedEntities++;
                     }
                 }
+            }
 
-                if (numRemovedEntities > 0) {
-                    LOG.warn("removed {} hive_column_lineage entities. Average 
# of inputs={}, threshold={}, total # of inputs={}. topic-offset={}, 
partition={}", numRemovedEntities, avgInputsCount, 
skipHiveColumnLineageHive20633InputsThreshold, lineageInputsCount, 
kafkaMessage.getOffset(), kafkaMessage.getPartition());
-                }
+            if (numRemovedEntities > 0) {
+                LOG.warn("removed {} hive_column_lineage entities. Average # 
of inputs={}, threshold={}, total # of inputs={}. topic-offset={}, 
partition={}", numRemovedEntities, avgInputsCount, 
skipHiveColumnLineageHive20633InputsThreshold, lineageInputsCount, 
kafkaMessage.getOffset(), kafkaMessage.getPartition());
             }
         }
     }

Reply via email to