Repository: atlas
Updated Branches:
  refs/heads/branch-0.8 be6a4ffa0 -> 11555f7e1


ATLAS-2891: updated hook notification processing with option to ignore 
potentially incorrect hive_column_lineage - #3


Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/11555f7e
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/11555f7e
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/11555f7e

Branch: refs/heads/branch-0.8
Commit: 11555f7e1f3a385b613adb7f95ea9fffd0936876
Parents: be6a4ff
Author: Madhan Neethiraj <[email protected]>
Authored: Fri Oct 12 07:31:39 2018 -0700
Committer: Madhan Neethiraj <[email protected]>
Committed: Fri Oct 12 09:26:50 2018 -0700

----------------------------------------------------------------------
 .../org/apache/atlas/hive/hook/HiveHook.java    |  2 +-
 .../hive/hook/events/CreateHiveProcess.java     | 16 ++++++---------
 .../notification/NotificationHookConsumer.java  | 21 ++++++++------------
 3 files changed, 15 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/atlas/blob/11555f7e/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
----------------------------------------------------------------------
diff --git 
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java 
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
index 673fa3b..501eb63 100644
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
@@ -80,7 +80,7 @@ public class HiveHook extends AtlasHook implements 
ExecuteWithHookContext {
         nameCacheTableMaxCount          = 
atlasProperties.getInt(HOOK_NAME_CACHE_TABLE_COUNT, 10000);
         nameCacheRebuildIntervalSeconds = 
atlasProperties.getInt(HOOK_NAME_CACHE_REBUID_INTERVAL_SEC, 60 * 60); // 60 
minutes default
         skipHiveColumnLineageHive20633                = 
atlasProperties.getBoolean(HOOK_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, true);
-        skipHiveColumnLineageHive20633InputsThreshold = 
atlasProperties.getInt(HOOK_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD,
 5); // skip greater-than 5 inputs by default
+        skipHiveColumnLineageHive20633InputsThreshold = 
atlasProperties.getInt(HOOK_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD,
 15); // skip if avg # of inputs is > 15
 
         knownObjects = nameCacheEnabled ? new 
HiveHookObjectNamesCache(nameCacheDatabaseMaxCount, nameCacheTableMaxCount, 
nameCacheRebuildIntervalSeconds) : null;
     }

http://git-wip-us.apache.org/repos/asf/atlas/blob/11555f7e/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java
----------------------------------------------------------------------
diff --git 
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java
 
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java
index 12a65bd..9b55c5f 100644
--- 
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java
+++ 
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java
@@ -132,9 +132,8 @@ public class CreateHiveProcess extends BaseHiveEvent {
             return;
         }
 
-        final List<AtlasEntity> columnLineages    = new ArrayList<>();
-        boolean                 isSameInputsSize  = true;
-        int                     lineageInputsSize = -1;
+        final List<AtlasEntity> columnLineages     = new ArrayList<>();
+        int                     lineageInputsCount = 0;
 
         for (Map.Entry<DependencyKey, Dependency> entry : 
lineageInfo.entrySet()) {
             String      outputColName = getQualifiedName(entry.getKey());
@@ -169,11 +168,7 @@ public class CreateHiveProcess extends BaseHiveEvent {
                 continue;
             }
 
-            if (lineageInputsSize == -1) {
-                lineageInputsSize = inputColumns.size();
-            } else if (lineageInputsSize != inputColumns.size()) {
-                isSameInputsSize = false;
-            }
+            lineageInputsCount += inputColumns.size();
 
             AtlasEntity columnLineageProcess = new 
AtlasEntity(HIVE_TYPE_COLUMN_LINEAGE);
 
@@ -188,14 +183,15 @@ public class CreateHiveProcess extends BaseHiveEvent {
             columnLineages.add(columnLineageProcess);
         }
 
-        boolean skipColumnLineage = 
context.getSkipHiveColumnLineageHive20633() && columnLineages.size() > 1 && 
isSameInputsSize && lineageInputsSize > 
context.getSkipHiveColumnLineageHive20633InputsThreshold();
+        float   avgInputsCount    = columnLineages.size() > 0 ? (((float) 
lineageInputsCount) / columnLineages.size()) : 0;
+        boolean skipColumnLineage = 
context.getSkipHiveColumnLineageHive20633() && avgInputsCount > 
context.getSkipHiveColumnLineageHive20633InputsThreshold();
 
         if (!skipColumnLineage) {
             for (AtlasEntity columnLineage : columnLineages) {
                 entities.addEntity(columnLineage);
             }
         } else {
-            LOG.warn("skipping {} hive_column_lineage entities, each having {} 
inputs", columnLineages.size(), lineageInputsSize);
+            LOG.warn("skipped {} hive_column_lineage entities. Average # of 
inputs={}, threshold={}, total # of inputs={}", columnLineages.size(), 
avgInputsCount, context.getSkipHiveColumnLineageHive20633InputsThreshold(), 
lineageInputsCount);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/atlas/blob/11555f7e/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
----------------------------------------------------------------------
diff --git 
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
 
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 753ea9e..97fd095 100644
--- 
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ 
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -141,7 +141,7 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
         maxWaitDuration       = 
applicationProperties.getInt(CONSUMER_MAX_RETRY_INTERVAL, minWaitDuration * 
60);  //  30 sec by default
 
         skipHiveColumnLineageHive20633                = 
applicationProperties.getBoolean(CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, 
true);
-        skipHiveColumnLineageHive20633InputsThreshold = 
applicationProperties.getInt(CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD,
 5); // skip greater-than 5 inputs by default
+        skipHiveColumnLineageHive20633InputsThreshold = 
applicationProperties.getInt(CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD,
 15); // skip if avg # of inputs is > 15
 
         LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, 
skipHiveColumnLineageHive20633);
         LOG.info("{}={}", 
CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, 
skipHiveColumnLineageHive20633InputsThreshold);
@@ -650,9 +650,8 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
         }
 
         if (entities != null && entities.getEntities() != null) {
-            boolean isSameInputsSize  = true;
-            int     lineageInputsSize = -1;
-            int     lineageCount      = 0;
+            int lineageCount       = 0;
+            int lineageInputsCount = 0;
 
             // find if all hive_column_lineage entities have same number of 
inputs, which is likely to be caused by HIVE-20633 that results in incorrect 
lineage in some cases
             for (ListIterator<AtlasEntity> iter = 
entities.getEntities().listIterator(); iter.hasNext(); ) {
@@ -666,18 +665,14 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
                     if (objInputs instanceof Collection) {
                         Collection inputs = (Collection) objInputs;
 
-                        if (lineageInputsSize == -1) { // first entry
-                            lineageInputsSize = inputs.size();
-                        } else if (inputs.size() != lineageInputsSize) {
-                            isSameInputsSize = false;
-
-                            break;
-                        }
+                        lineageInputsCount += inputs.size();
                     }
                 }
             }
 
-            if (isSameInputsSize && lineageCount > 1 && lineageInputsSize > 
skipHiveColumnLineageHive20633InputsThreshold) {
+            float avgInputsCount = lineageCount > 0 ? (((float) 
lineageInputsCount) / lineageCount) : 0;
+
+            if (avgInputsCount > 
skipHiveColumnLineageHive20633InputsThreshold) {
                 int numRemovedEntities = 0;
 
                 for (ListIterator<AtlasEntity> iter = 
entities.getEntities().listIterator(); iter.hasNext(); ) {
@@ -691,7 +686,7 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
                 }
 
                 if (numRemovedEntities > 0) {
-                    LOG.warn("removed {} hive_column_lineage entities, each 
having {} inputs. offset={}, partition={}", numRemovedEntities, 
lineageInputsSize, kafkaMessage.getOffset(), kafkaMessage.getPartition());
+                    LOG.warn("removed {} hive_column_lineage entities. Average 
# of inputs={}, threshold={}, total # of inputs={}. topic-offset={}, 
partition={}", numRemovedEntities, avgInputsCount, 
skipHiveColumnLineageHive20633InputsThreshold, lineageInputsCount, 
kafkaMessage.getOffset(), kafkaMessage.getPartition());
                 }
             }
         }

Reply via email to