Repository: atlas Updated Branches: refs/heads/branch-0.8 be6a4ffa0 -> 11555f7e1
ATLAS-2891: updated hook notification processing with option to ignore potentially incorrect hive_column_lineage - #3 Project: http://git-wip-us.apache.org/repos/asf/atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/11555f7e Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/11555f7e Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/11555f7e Branch: refs/heads/branch-0.8 Commit: 11555f7e1f3a385b613adb7f95ea9fffd0936876 Parents: be6a4ff Author: Madhan Neethiraj <[email protected]> Authored: Fri Oct 12 07:31:39 2018 -0700 Committer: Madhan Neethiraj <[email protected]> Committed: Fri Oct 12 09:26:50 2018 -0700 ---------------------------------------------------------------------- .../org/apache/atlas/hive/hook/HiveHook.java | 2 +- .../hive/hook/events/CreateHiveProcess.java | 16 ++++++--------- .../notification/NotificationHookConsumer.java | 21 ++++++++------------ 3 files changed, 15 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/atlas/blob/11555f7e/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java index 673fa3b..501eb63 100644 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java @@ -80,7 +80,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { nameCacheTableMaxCount = atlasProperties.getInt(HOOK_NAME_CACHE_TABLE_COUNT, 10000); nameCacheRebuildIntervalSeconds = atlasProperties.getInt(HOOK_NAME_CACHE_REBUID_INTERVAL_SEC, 60 * 60); // 60 minutes default skipHiveColumnLineageHive20633 = atlasProperties.getBoolean(HOOK_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, true); - skipHiveColumnLineageHive20633InputsThreshold = atlasProperties.getInt(HOOK_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, 5); // skip greater-than 5 inputs by default + skipHiveColumnLineageHive20633InputsThreshold = atlasProperties.getInt(HOOK_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, 15); // skip if avg # of inputs is > 15 knownObjects = nameCacheEnabled ? new HiveHookObjectNamesCache(nameCacheDatabaseMaxCount, nameCacheTableMaxCount, nameCacheRebuildIntervalSeconds) : null; } http://git-wip-us.apache.org/repos/asf/atlas/blob/11555f7e/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java index 12a65bd..9b55c5f 100644 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java @@ -132,9 +132,8 @@ public class CreateHiveProcess extends BaseHiveEvent { return; } - final List<AtlasEntity> columnLineages = new ArrayList<>(); - boolean isSameInputsSize = true; - int lineageInputsSize = -1; + final List<AtlasEntity> columnLineages = new ArrayList<>(); + int lineageInputsCount = 0; for (Map.Entry<DependencyKey, Dependency> entry : lineageInfo.entrySet()) { String outputColName = getQualifiedName(entry.getKey()); @@ -169,11 +168,7 @@ public class CreateHiveProcess extends BaseHiveEvent { continue; } - if (lineageInputsSize == -1) { - lineageInputsSize = inputColumns.size(); - } else if (lineageInputsSize != inputColumns.size()) { - isSameInputsSize = false; - } + lineageInputsCount += inputColumns.size(); AtlasEntity columnLineageProcess = new AtlasEntity(HIVE_TYPE_COLUMN_LINEAGE); @@ -188,14 +183,15 @@ public class CreateHiveProcess extends BaseHiveEvent { columnLineages.add(columnLineageProcess); } - boolean skipColumnLineage = context.getSkipHiveColumnLineageHive20633() && columnLineages.size() > 1 && isSameInputsSize && lineageInputsSize > context.getSkipHiveColumnLineageHive20633InputsThreshold(); + float avgInputsCount = columnLineages.size() > 0 ? (((float) lineageInputsCount) / columnLineages.size()) : 0; + boolean skipColumnLineage = context.getSkipHiveColumnLineageHive20633() && avgInputsCount > context.getSkipHiveColumnLineageHive20633InputsThreshold(); if (!skipColumnLineage) { for (AtlasEntity columnLineage : columnLineages) { entities.addEntity(columnLineage); } } else { - LOG.warn("skipping {} hive_column_lineage entities, each having {} inputs", columnLineages.size(), lineageInputsSize); + LOG.warn("skipped {} hive_column_lineage entities. Average # of inputs={}, threshold={}, total # of inputs={}", columnLineages.size(), avgInputsCount, context.getSkipHiveColumnLineageHive20633InputsThreshold(), lineageInputsCount); } } http://git-wip-us.apache.org/repos/asf/atlas/blob/11555f7e/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java index 753ea9e..97fd095 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java +++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java @@ -141,7 +141,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl maxWaitDuration = applicationProperties.getInt(CONSUMER_MAX_RETRY_INTERVAL, minWaitDuration * 60); // 30 sec by default skipHiveColumnLineageHive20633 = applicationProperties.getBoolean(CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, true); - skipHiveColumnLineageHive20633InputsThreshold = applicationProperties.getInt(CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, 5); // skip greater-than 5 inputs by default + skipHiveColumnLineageHive20633InputsThreshold = applicationProperties.getInt(CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, 15); // skip if avg # of inputs is > 15 LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, skipHiveColumnLineageHive20633); LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, skipHiveColumnLineageHive20633InputsThreshold); @@ -650,9 +650,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl } if (entities != null && entities.getEntities() != null) { - boolean isSameInputsSize = true; - int lineageInputsSize = -1; - int lineageCount = 0; + int lineageCount = 0; + int lineageInputsCount = 0; // find if all hive_column_lineage entities have same number of inputs, which is likely to be caused by HIVE-20633 that results in incorrect lineage in some cases for (ListIterator<AtlasEntity> iter = entities.getEntities().listIterator(); iter.hasNext(); ) { @@ -666,18 +665,14 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl if (objInputs instanceof Collection) { Collection inputs = (Collection) objInputs; - if (lineageInputsSize == -1) { // first entry - lineageInputsSize = inputs.size(); - } else if (inputs.size() != lineageInputsSize) { - isSameInputsSize = false; - - break; - } + lineageInputsCount += inputs.size(); } } } - if (isSameInputsSize && lineageCount > 1 && lineageInputsSize > skipHiveColumnLineageHive20633InputsThreshold) { + float avgInputsCount = lineageCount > 0 ? (((float) lineageInputsCount) / lineageCount) : 0; + + if (avgInputsCount > skipHiveColumnLineageHive20633InputsThreshold) { int numRemovedEntities = 0; for (ListIterator<AtlasEntity> iter = entities.getEntities().listIterator(); iter.hasNext(); ) { @@ -691,7 +686,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl } if (numRemovedEntities > 0) { - LOG.warn("removed {} hive_column_lineage entities, each having {} inputs. offset={}, partition={}", numRemovedEntities, lineageInputsSize, kafkaMessage.getOffset(), kafkaMessage.getPartition()); + LOG.warn("removed {} hive_column_lineage entities. Average # of inputs={}, threshold={}, total # of inputs={}. topic-offset={}, partition={}", numRemovedEntities, avgInputsCount, skipHiveColumnLineageHive20633InputsThreshold, lineageInputsCount, kafkaMessage.getOffset(), kafkaMessage.getPartition()); } } }
