Repository: atlas
Updated Branches:
  refs/heads/branch-0.8 ea33708cf -> 20215f3dd


ATLAS-2891: updated hook notification processing with option to ignore 
potentially incorrect hive_column_lineage


Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/20215f3d
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/20215f3d
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/20215f3d

Branch: refs/heads/branch-0.8
Commit: 20215f3dd74b16fe4a7a9c8eb21b17925256f4f9
Parents: ea33708
Author: Madhan Neethiraj <[email protected]>
Authored: Wed Sep 19 11:51:52 2018 -0700
Committer: Madhan Neethiraj <[email protected]>
Committed: Wed Sep 26 17:58:06 2018 -0700

----------------------------------------------------------------------
 .../atlas/hive/hook/AtlasHiveHookContext.java   |   8 ++
 .../org/apache/atlas/hive/hook/HiveHook.java    |  15 +++
 .../hive/hook/events/CreateHiveProcess.java     |  26 +++-
 .../apache/atlas/kafka/AtlasKafkaConsumer.java  |   9 +-
 .../notification/NotificationHookConsumer.java  | 121 ++++++++++++++++---
 5 files changed, 162 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/atlas/blob/20215f3d/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java
----------------------------------------------------------------------
diff --git 
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java
 
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java
index b9e4256..b467f4c 100644
--- 
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java
+++ 
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java
@@ -85,6 +85,14 @@ public class AtlasHiveHookContext {
         return hook.getClusterName();
     }
 
+    public boolean getSkipHiveColumnLineageHive20633() {
+        return hook.getSkipHiveColumnLineageHive20633();
+    }
+
+    public int getSkipHiveColumnLineageHive20633InputsThreshold() {
+        return hook.getSkipHiveColumnLineageHive20633InputsThreshold();
+    }
+
     public String getQualifiedName(Database db) {
         return (db.getName() + QNAME_SEP_CLUSTER_NAME).toLowerCase() + 
getClusterName();
     }

http://git-wip-us.apache.org/repos/asf/atlas/blob/20215f3d/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
----------------------------------------------------------------------
diff --git 
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java 
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
index 19075f6..673fa3b 100644
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
@@ -51,6 +51,8 @@ public class HiveHook extends AtlasHook implements 
ExecuteWithHookContext {
     public static final String HOOK_NAME_CACHE_DATABASE_COUNT      = 
CONF_PREFIX + "name.cache.database.count";
     public static final String HOOK_NAME_CACHE_TABLE_COUNT         = 
CONF_PREFIX + "name.cache.table.count";
     public static final String HOOK_NAME_CACHE_REBUID_INTERVAL_SEC = 
CONF_PREFIX + "name.cache.rebuild.interval.seconds";
+    public static final String HOOK_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633        
          = CONF_PREFIX + "skip.hive_column_lineage.hive-20633";
+    public static final String 
HOOK_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD = CONF_PREFIX + 
"skip.hive_column_lineage.hive-20633.inputs.threshold";
 
     public static final String DEFAULT_CLUSTER_NAME = "primary";
 
@@ -62,6 +64,9 @@ public class HiveHook extends AtlasHook implements 
ExecuteWithHookContext {
     private static final int     nameCacheTableMaxCount;
     private static final int     nameCacheRebuildIntervalSeconds;
 
+    private static final boolean skipHiveColumnLineageHive20633;
+    private static final int     skipHiveColumnLineageHive20633InputsThreshold;
+
     private static HiveHookObjectNamesCache knownObjects = null;
 
     static {
@@ -74,6 +79,8 @@ public class HiveHook extends AtlasHook implements 
ExecuteWithHookContext {
         nameCacheDatabaseMaxCount       = 
atlasProperties.getInt(HOOK_NAME_CACHE_DATABASE_COUNT, 10000);
         nameCacheTableMaxCount          = 
atlasProperties.getInt(HOOK_NAME_CACHE_TABLE_COUNT, 10000);
         nameCacheRebuildIntervalSeconds = 
atlasProperties.getInt(HOOK_NAME_CACHE_REBUID_INTERVAL_SEC, 60 * 60); // 60 
minutes default
+        skipHiveColumnLineageHive20633                = 
atlasProperties.getBoolean(HOOK_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, true);
+        skipHiveColumnLineageHive20633InputsThreshold = 
atlasProperties.getInt(HOOK_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD,
 5); // skip greater-than 5 inputs by default
 
         knownObjects = nameCacheEnabled ? new 
HiveHookObjectNamesCache(nameCacheDatabaseMaxCount, nameCacheTableMaxCount, 
nameCacheRebuildIntervalSeconds) : null;
     }
@@ -182,6 +189,14 @@ public class HiveHook extends AtlasHook implements 
ExecuteWithHookContext {
         return clusterName;
     }
 
+    public boolean getSkipHiveColumnLineageHive20633() {
+        return skipHiveColumnLineageHive20633;
+    }
+
+    public int getSkipHiveColumnLineageHive20633InputsThreshold() {
+        return skipHiveColumnLineageHive20633InputsThreshold;
+    }
+
 
     public static class HiveHookObjectNamesCache {
         private final int         dbMaxCacheCount;

http://git-wip-us.apache.org/repos/asf/atlas/blob/20215f3d/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java
----------------------------------------------------------------------
diff --git 
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java
 
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java
index c99a699..9b9852e 100644
--- 
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java
+++ 
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java
@@ -132,10 +132,18 @@ public class CreateHiveProcess extends BaseHiveEvent {
             return;
         }
 
+        final List<AtlasEntity> columnLineages    = new ArrayList<>();
+        boolean                 isSameInputsSize  = true;
+        int                     lineageInputsSize = -1;
+
         for (Map.Entry<DependencyKey, Dependency> entry : 
lineageInfo.entrySet()) {
             String      outputColName = getQualifiedName(entry.getKey());
             AtlasEntity outputColumn  = context.getEntity(outputColName);
 
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("processColumnLineage(): DependencyKey={}; 
Dependency={}", entry.getKey(), entry.getValue());
+            }
+
             if (outputColumn == null) {
                 LOG.warn("column-lineage: non-existing output-column {}", 
outputColName);
 
@@ -161,6 +169,12 @@ public class CreateHiveProcess extends BaseHiveEvent {
                 continue;
             }
 
+            if (lineageInputsSize == -1) {
+                lineageInputsSize = inputColumns.size();
+            } else if (lineageInputsSize != inputColumns.size()) {
+                isSameInputsSize = false;
+            }
+
             AtlasEntity columnLineageProcess = new 
AtlasEntity(HIVE_TYPE_COLUMN_LINEAGE);
 
             columnLineageProcess.setAttribute(ATTRIBUTE_NAME, 
hiveProcess.getAttribute(ATTRIBUTE_NAME) + ":" + 
outputColumn.getAttribute(ATTRIBUTE_NAME));
@@ -171,7 +185,17 @@ public class CreateHiveProcess extends BaseHiveEvent {
             columnLineageProcess.setAttribute(ATTRIBUTE_DEPENDENCY_TYPE, 
entry.getValue().getType());
             columnLineageProcess.setAttribute(ATTRIBUTE_EXPRESSION, 
entry.getValue().getExpr());
 
-            entities.addEntity(columnLineageProcess);
+            columnLineages.add(columnLineageProcess);
+        }
+
+        boolean skipColumnLineage = 
context.getSkipHiveColumnLineageHive20633() && isSameInputsSize && 
lineageInputsSize > context.getSkipHiveColumnLineageHive20633InputsThreshold();
+
+        if (!skipColumnLineage) {
+            for (AtlasEntity columnLineage : columnLineages) {
+                entities.addEntity(columnLineage);
+            }
+        } else {
+            LOG.warn("skipping {} hive_column_lineage entities, each having {} 
inputs", columnLineages.size(), lineageInputsSize);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/atlas/blob/20215f3d/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java
----------------------------------------------------------------------
diff --git 
a/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java 
b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java
index e3bb71c..c15da15 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java
@@ -69,7 +69,14 @@ public class AtlasKafkaConsumer<T> extends 
AbstractNotificationConsumer<T> {
                             record.topic(), record.partition(), 
record.offset(), record.key(), record.value());
                 }
 
-                T message = 
deserializer.deserialize(record.value().toString());
+                T message = null;
+
+                try {
+                    message = 
deserializer.deserialize(record.value().toString());
+                } catch (OutOfMemoryError excp) {
+                    LOG.error("Ignoring message that failed to deserialize: 
topic={}, partition={}, offset={}, key={}, value={}",
+                              record.topic(), record.partition(), 
record.offset(), record.key(), record.value(), excp);
+                }
 
                 if (message == null) {
                     continue;

http://git-wip-us.apache.org/repos/asf/atlas/blob/20215f3d/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
----------------------------------------------------------------------
diff --git 
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
 
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 1a567af..6b34aa6 100644
--- 
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ 
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -24,6 +24,7 @@ import org.apache.atlas.*;
 import org.apache.atlas.ha.HAConfiguration;
 import org.apache.atlas.kafka.AtlasKafkaMessage;
 import org.apache.atlas.listener.ActiveStateChangeHandler;
+import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
 import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
 import org.apache.atlas.model.instance.AtlasObjectId;
@@ -49,6 +50,7 @@ import org.apache.atlas.web.filters.AuditFilter;
 import org.apache.atlas.web.service.ServiceState;
 import org.apache.atlas.web.util.DateTimeHelper;
 import org.apache.commons.configuration.Configuration;
+import org.apache.commons.lang.StringUtils;
 import org.apache.kafka.common.TopicPartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -57,9 +59,11 @@ import org.springframework.stereotype.Component;
 
 import javax.inject.Inject;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
+import java.util.ListIterator;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -80,23 +84,31 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
     private static final String LOCALHOST = "localhost";
     private static Logger FAILED_LOG = LoggerFactory.getLogger("FAILED");
 
+    private static final String TYPE_HIVE_COLUMN_LINEAGE = 
"hive_column_lineage";
+    private static final String ATTRIBUTE_INPUTS         = "inputs";
+
     private static final String THREADNAME_PREFIX = 
NotificationHookConsumer.class.getSimpleName();
 
-    public static final String CONSUMER_THREADS_PROPERTY = 
"atlas.notification.hook.numthreads";
-    public static final String CONSUMER_RETRIES_PROPERTY = 
"atlas.notification.hook.maxretries";
+    public static final String CONSUMER_THREADS_PROPERTY         = 
"atlas.notification.hook.numthreads";
+    public static final String CONSUMER_RETRIES_PROPERTY         = 
"atlas.notification.hook.maxretries";
     public static final String CONSUMER_FAILEDCACHESIZE_PROPERTY = 
"atlas.notification.hook.failedcachesize";
-    public static final String CONSUMER_RETRY_INTERVAL = 
"atlas.notification.consumer.retry.interval";
-    public static final String CONSUMER_MIN_RETRY_INTERVAL = 
"atlas.notification.consumer.min.retry.interval";
-    public static final String CONSUMER_MAX_RETRY_INTERVAL = 
"atlas.notification.consumer.max.retry.interval";
+    public static final String CONSUMER_RETRY_INTERVAL           = 
"atlas.notification.consumer.retry.interval";
+    public static final String CONSUMER_MIN_RETRY_INTERVAL       = 
"atlas.notification.consumer.min.retry.interval";
+    public static final String CONSUMER_MAX_RETRY_INTERVAL       = 
"atlas.notification.consumer.max.retry.interval";
+    public static final int    SERVER_READY_WAIT_TIME_MS         = 1000;
+
+    public static final String CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633    
              = 
"atlas.notification.consumer.skip.hive_column_lineage.hive-20633";
+    public static final String 
CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD = 
"atlas.notification.consumer.skip.hive_column_lineage.hive-20633.inputs.threshold";
 
 
-    public static final int SERVER_READY_WAIT_TIME_MS = 1000;
-    private final AtlasEntityStore atlasEntityStore;
-    private final ServiceState serviceState;
+    private final AtlasEntityStore       atlasEntityStore;
+    private final ServiceState           serviceState;
     private final AtlasInstanceConverter instanceConverter;
-    private final AtlasTypeRegistry typeRegistry;
-    private final int maxRetries;
-    private final int failedMsgCacheSize;
+    private final AtlasTypeRegistry      typeRegistry;
+    private final int                    maxRetries;
+    private final int                    failedMsgCacheSize;
+    private final boolean                skipHiveColumnLineageHive20633;
+    private final int                    
skipHiveColumnLineageHive20633InputsThreshold;
 
     @VisibleForTesting
     final int consumerRetryInterval;
@@ -122,11 +134,17 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
 
         this.applicationProperties = ApplicationProperties.get();
 
-        maxRetries = applicationProperties.getInt(CONSUMER_RETRIES_PROPERTY, 
3);
-        failedMsgCacheSize = 
applicationProperties.getInt(CONSUMER_FAILEDCACHESIZE_PROPERTY, 20);
+        maxRetries            = 
applicationProperties.getInt(CONSUMER_RETRIES_PROPERTY, 3);
+        failedMsgCacheSize    = 
applicationProperties.getInt(CONSUMER_FAILEDCACHESIZE_PROPERTY, 1);
         consumerRetryInterval = 
applicationProperties.getInt(CONSUMER_RETRY_INTERVAL, 500);
-        minWaitDuration = 
applicationProperties.getInt(CONSUMER_MIN_RETRY_INTERVAL, 
consumerRetryInterval); // 500 ms  by default
-        maxWaitDuration = 
applicationProperties.getInt(CONSUMER_MAX_RETRY_INTERVAL, minWaitDuration * 
60);  //  30 sec by default
+        minWaitDuration       = 
applicationProperties.getInt(CONSUMER_MIN_RETRY_INTERVAL, 
consumerRetryInterval); // 500 ms  by default
+        maxWaitDuration       = 
applicationProperties.getInt(CONSUMER_MAX_RETRY_INTERVAL, minWaitDuration * 
60);  //  30 sec by default
+
+        skipHiveColumnLineageHive20633                = 
applicationProperties.getBoolean(CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, 
true);
+        skipHiveColumnLineageHive20633InputsThreshold = 
applicationProperties.getInt(CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD,
 5); // skip greater-than 5 inputs by default
+
+        LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, 
skipHiveColumnLineageHive20633);
+        LOG.info("{}={}", 
CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, 
skipHiveColumnLineageHive20633InputsThreshold);
     }
 
     @Override
@@ -350,6 +368,8 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
                     return;
                 }
 
+                preProcessNotificationMessage(kafkaMsg);
+
                 // Used for intermediate conversions during create and update
                 for (int numRetries = 0; numRetries < maxRetries; 
numRetries++) {
                     if (LOG.isDebugEnabled()) {
@@ -603,6 +623,77 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
         }
     }
 
+    private void 
preProcessNotificationMessage(AtlasKafkaMessage<HookNotificationMessage> 
kafkaMsg) {
+        skipHiveColumnLineage(kafkaMsg);
+    }
+
+    private void 
skipHiveColumnLineage(AtlasKafkaMessage<HookNotificationMessage> kafkaMessage) {
+        if (!skipHiveColumnLineageHive20633) {
+            return;
+        }
+
+        final HookNotificationMessage  message = kafkaMessage.getMessage();
+        final AtlasEntitiesWithExtInfo entities;
+
+        switch (message.getType()) {
+            case ENTITY_CREATE_V2:
+                entities = ((EntityCreateRequestV2) message).getEntities();
+            break;
+
+            case ENTITY_FULL_UPDATE_V2:
+                entities = ((EntityUpdateRequestV2) message).getEntities();
+                break;
+
+            default:
+                entities = null;
+            break;
+        }
+
+        if (entities != null && entities.getEntities() != null) {
+            boolean isSameInputsSize  = true;
+            int     lineageInputsSize = -1;
+
+            // find if all hive_column_lineage entities have same number of 
inputs, which is likely to be caused by HIVE-20633 that results in incorrect 
lineage in some cases
+            for (ListIterator<AtlasEntity> iter = 
entities.getEntities().listIterator(); iter.hasNext(); ) {
+                AtlasEntity entity = iter.next();
+
+                if (StringUtils.equals(entity.getTypeName(), 
TYPE_HIVE_COLUMN_LINEAGE)) {
+                    Object objInputs = entity.getAttribute(ATTRIBUTE_INPUTS);
+
+                    if (objInputs instanceof Collection) {
+                        Collection inputs = (Collection) objInputs;
+
+                        if (lineageInputsSize == -1) { // first entry
+                            lineageInputsSize = inputs.size();
+                        } else if (inputs.size() != lineageInputsSize) {
+                            isSameInputsSize = false;
+
+                            break;
+                        }
+                    }
+                }
+            }
+
+            if (isSameInputsSize && lineageInputsSize > 
skipHiveColumnLineageHive20633InputsThreshold) {
+                int numRemovedEntities = 0;
+
+                for (ListIterator<AtlasEntity> iter = 
entities.getEntities().listIterator(); iter.hasNext(); ) {
+                    AtlasEntity entity = iter.next();
+
+                    if (StringUtils.equals(entity.getTypeName(), 
TYPE_HIVE_COLUMN_LINEAGE)) {
+                        iter.remove();
+
+                        numRemovedEntities++;
+                    }
+                }
+
+                if (numRemovedEntities > 0) {
+                    LOG.warn("removed {} hive_column_lineage entities, each 
having {} inputs. offset={}, partition={}", numRemovedEntities, 
lineageInputsSize, kafkaMessage.getOffset(), kafkaMessage.getPartition());
+                }
+            }
+        }
+    }
+
     private void audit(String messageUser, String method, String path) {
         if (LOG.isDebugEnabled()) {
             LOG.debug("==> audit({},{}, {})", messageUser, method, path);

Reply via email to