This is an automated email from the ASF dual-hosted git repository.

madhan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/master by this push:
     new 10bcaa8  ATLAS-3621: updated HiveHook to not save query-string in 
multiple attributes - queryText and name
10bcaa8 is described below

commit 10bcaa8091b548efd82e993d6844623f93f1e1f8
Author: Madhan Neethiraj <[email protected]>
AuthorDate: Fri Feb 14 18:02:26 2020 -0800

    ATLAS-3621: updated HiveHook to not save query-string in multiple 
attributes - queryText and name
---
 .../atlas/hive/hook/events/BaseHiveEvent.java      |  4 +--
 .../notification/NotificationHookConsumer.java     | 38 ++++++++++++++++++++--
 .../preprocessor/HivePreprocessor.java             |  8 +++++
 .../preprocessor/PreprocessorContext.java          | 28 +++++++++-------
 .../NotificationHookConsumerKafkaTest.java         | 13 +++++++-
 .../notification/NotificationHookConsumerTest.java |  3 +-
 6 files changed, 75 insertions(+), 19 deletions(-)

diff --git 
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java
 
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java
index 0409d8a..35d85f3 100644
--- 
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java
+++ 
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java
@@ -646,7 +646,6 @@ public abstract class BaseHiveEvent {
         if (queryStr != null) {
             queryStr = queryStr.toLowerCase().trim();
         }
-        ret.setAttribute(ATTRIBUTE_NAME, queryStr);
 
         ret.setAttribute(ATTRIBUTE_OPERATION_TYPE, getOperationName());
         String qualifiedName = getQualifiedName(inputs, outputs);
@@ -661,6 +660,7 @@ public abstract class BaseHiveEvent {
             }
         }
         ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, qualifiedName);
+        ret.setAttribute(ATTRIBUTE_NAME, qualifiedName);
         ret.setRelationshipAttribute(ATTRIBUTE_INPUTS, 
AtlasTypeUtil.getAtlasRelatedObjectIds(inputs, 
RELATIONSHIP_DATASET_PROCESS_INPUTS));
         ret.setRelationshipAttribute(ATTRIBUTE_OUTPUTS, 
AtlasTypeUtil.getAtlasRelatedObjectIds(outputs, 
RELATIONSHIP_PROCESS_DATASET_OUTPUTS));
 
@@ -697,7 +697,7 @@ public abstract class BaseHiveEvent {
         ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, 
hiveProcess.getAttribute(ATTRIBUTE_QUALIFIED_NAME).toString() +
                 QNAME_SEP_PROCESS + getQueryStartTime().toString() +
                 QNAME_SEP_PROCESS + endTime.toString());
-        ret.setAttribute(ATTRIBUTE_NAME, queryStr + QNAME_SEP_PROCESS + 
getQueryStartTime().toString());
+        ret.setAttribute(ATTRIBUTE_NAME, 
ret.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
         ret.setAttribute(ATTRIBUTE_START_TIME, getQueryStartTime());
         ret.setAttribute(ATTRIBUTE_END_TIME, endTime);
         ret.setAttribute(ATTRIBUTE_USER_NAME, getUserName());
diff --git 
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
 
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 14cae42..3f1ea05 100644
--- 
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ 
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -146,6 +146,7 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
     public static final String 
CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_DUMMY_NAMES             = 
"atlas.notification.consumer.preprocess.hive_table.ignore.dummy.names";
     public static final String 
CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_NAME_PREFIXES_ENABLED   = 
"atlas.notification.consumer.preprocess.hive_table.ignore.name.prefixes.enabled";
     public static final String 
CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_NAME_PREFIXES           = 
"atlas.notification.consumer.preprocess.hive_table.ignore.name.prefixes";
+    public static final String 
CONSUMER_PREPROCESS_HIVE_PROCESS_UPD_NAME_WITH_QUALIFIED_NAME = 
"atlas.notification.consumer.preprocess.hive_process.update.name.with.qualified_name";
     public static final String 
CONSUMER_PREPROCESS_HIVE_TYPES_REMOVE_OWNEDREF_ATTRS          = 
"atlas.notification.consumer.preprocess.hive_types.remove.ownedref.attrs";
     public static final String 
CONSUMER_PREPROCESS_RDBMS_TYPES_REMOVE_OWNEDREF_ATTRS         = 
"atlas.notification.consumer.preprocess.rdbms_types.remove.ownedref.attrs";
     public static final String CONSUMER_AUTHORIZE_USING_MESSAGE_USER           
              = "atlas.notification.authorize.using.message.user";
@@ -165,6 +166,7 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
     private final int                           commitBatchSize;
     private final boolean                       skipHiveColumnLineageHive20633;
     private final int                           
skipHiveColumnLineageHive20633InputsThreshold;
+    private final boolean                       
updateHiveProcessNameWithQualifiedName;
     private final int                           
largeMessageProcessingTimeThresholdMs;
     private final boolean                       consumerDisabled;
     private final List<Pattern>                 hiveTablesToIgnore = new 
ArrayList<>();
@@ -212,6 +214,7 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
 
         skipHiveColumnLineageHive20633                = 
applicationProperties.getBoolean(CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, 
false);
         skipHiveColumnLineageHive20633InputsThreshold = 
applicationProperties.getInt(CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD,
 15); // skip if avg # of inputs is > 15
+        updateHiveProcessNameWithQualifiedName        = 
applicationProperties.getBoolean(CONSUMER_PREPROCESS_HIVE_PROCESS_UPD_NAME_WITH_QUALIFIED_NAME,
 true);
         consumerDisabled                              = 
applicationProperties.getBoolean(CONSUMER_DISABLED, false);
         largeMessageProcessingTimeThresholdMs         = 
applicationProperties.getInt("atlas.notification.consumer.large.message.processing.time.threshold.ms",
 60 * 1000);  //  60 sec by default
         createShellEntityForNonExistingReference      = 
AtlasConfiguration.NOTIFICATION_CREATE_SHELL_ENTITY_FOR_NON_EXISTING_REF.getBoolean();
@@ -294,9 +297,11 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
             hiveTablePrefixesToIgnore = Collections.emptyList();
         }
 
+        LOG.info("{}={}", 
CONSUMER_PREPROCESS_HIVE_PROCESS_UPD_NAME_WITH_QUALIFIED_NAME, 
updateHiveProcessNameWithQualifiedName);
+
         hiveTypesRemoveOwnedRefAttrs  = 
applicationProperties.getBoolean(CONSUMER_PREPROCESS_HIVE_TYPES_REMOVE_OWNEDREF_ATTRS,
 true);
         rdbmsTypesRemoveOwnedRefAttrs = 
applicationProperties.getBoolean(CONSUMER_PREPROCESS_RDBMS_TYPES_REMOVE_OWNEDREF_ATTRS,
 true);
-        preprocessEnabled             = skipHiveColumnLineageHive20633 || 
hiveTypesRemoveOwnedRefAttrs || rdbmsTypesRemoveOwnedRefAttrs || 
!hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() || 
!hiveDummyDatabasesToIgnore.isEmpty() || !hiveDummyTablesToIgnore.isEmpty() || 
!hiveTablePrefixesToIgnore.isEmpty();
+        preprocessEnabled             = skipHiveColumnLineageHive20633 || 
updateHiveProcessNameWithQualifiedName || hiveTypesRemoveOwnedRefAttrs || 
rdbmsTypesRemoveOwnedRefAttrs || !hiveTablesToIgnore.isEmpty() || 
!hiveTablesToPrune.isEmpty() || !hiveDummyDatabasesToIgnore.isEmpty() || 
!hiveDummyTablesToIgnore.isEmpty() || !hiveTablePrefixesToIgnore.isEmpty();
 
         LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, 
skipHiveColumnLineageHive20633);
         LOG.info("{}={}", 
CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, 
skipHiveColumnLineageHive20633InputsThreshold);
@@ -584,6 +589,33 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
                     return;
                 }
 
+                // covert V1 messages to V2 to enable preProcess
+                try {
+                    switch (message.getType()) {
+                        case ENTITY_CREATE: {
+                            final EntityCreateRequest      createRequest = 
(EntityCreateRequest) message;
+                            final AtlasEntitiesWithExtInfo entities      = 
instanceConverter.toAtlasEntities(createRequest.getEntities());
+                            final EntityCreateRequestV2    v2Request     = new 
EntityCreateRequestV2(message.getUser(), entities);
+
+                            kafkaMsg = new AtlasKafkaMessage<>(v2Request, 
kafkaMsg.getOffset(), kafkaMsg.getTopic(), kafkaMsg.getPartition());
+                            message  = kafkaMsg.getMessage();
+                        }
+                        break;
+
+                        case ENTITY_FULL_UPDATE: {
+                            final EntityUpdateRequest      updateRequest = 
(EntityUpdateRequest) message;
+                            final AtlasEntitiesWithExtInfo entities      = 
instanceConverter.toAtlasEntities(updateRequest.getEntities());
+                            final EntityUpdateRequestV2    v2Request     = new 
EntityUpdateRequestV2(messageUser, entities);
+
+                            kafkaMsg = new AtlasKafkaMessage<>(v2Request, 
kafkaMsg.getOffset(), kafkaMsg.getTopic(), kafkaMsg.getPartition());
+                            message  = kafkaMsg.getMessage();
+                        }
+                        break;
+                    }
+                } catch (AtlasBaseException excp) {
+                    LOG.error("handleMessage(): failed to convert V1 message 
to V2", message.getType().name());
+                }
+
                 PreprocessorContext context = 
preProcessNotificationMessage(kafkaMsg);
 
                 if (isEmptyMessage(kafkaMsg)) {
@@ -934,7 +966,7 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
         PreprocessorContext context = null;
 
         if (preprocessEnabled) {
-            context = new PreprocessorContext(kafkaMsg, typeRegistry, 
hiveTablesToIgnore, hiveTablesToPrune, hiveTablesCache, 
hiveDummyDatabasesToIgnore, hiveDummyTablesToIgnore, hiveTablePrefixesToIgnore, 
hiveTypesRemoveOwnedRefAttrs, rdbmsTypesRemoveOwnedRefAttrs);
+            context = new PreprocessorContext(kafkaMsg, typeRegistry, 
hiveTablesToIgnore, hiveTablesToPrune, hiveTablesCache, 
hiveDummyDatabasesToIgnore, hiveDummyTablesToIgnore, hiveTablePrefixesToIgnore, 
hiveTypesRemoveOwnedRefAttrs, rdbmsTypesRemoveOwnedRefAttrs, 
updateHiveProcessNameWithQualifiedName);
 
             if (context.isHivePreprocessEnabled()) {
                 preprocessHiveTypes(context);
@@ -950,7 +982,7 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
 
             context.moveRegisteredReferredEntities();
 
-            if (context.isHivePreprocessEnabled() && 
CollectionUtils.isNotEmpty(context.getEntities())) {
+            if (context.isHivePreprocessEnabled() && 
CollectionUtils.isNotEmpty(context.getEntities()) && 
context.getEntities().size() > 1) {
                 // move hive_process and hive_column_lineage entities to end 
of the list
                 List<AtlasEntity> entities = context.getEntities();
                 int               count    = entities.size();
diff --git 
a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java
 
b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java
index cc5fe52..4bb279d 100644
--- 
a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java
+++ 
b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java
@@ -162,6 +162,14 @@ public class HivePreprocessor {
 
         @Override
         public void preprocess(AtlasEntity entity, PreprocessorContext 
context) {
+            if (context.updateHiveProcessNameWithQualifiedName()) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("setting {}.name={}. topic-offset={}, 
partition={}", entity.getTypeName(), 
entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME), context.getKafkaMessageOffset(), 
context.getKafkaPartition());
+                }
+
+                entity.setAttribute(ATTRIBUTE_NAME, 
entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
+            }
+
             if (context.isIgnoredEntity(entity.getGuid())) {
                 context.addToIgnoredEntities(entity); // so that this will be 
logged with typeName and qualifiedName
             } else {
diff --git 
a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
 
b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
index 3c58c9f..608b4a3 100644
--- 
a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
+++ 
b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
@@ -59,6 +59,7 @@ public class PreprocessorContext {
     private final List<String>                        
hiveDummyDatabasesToIgnore;
     private final List<String>                        hiveDummyTablesToIgnore;
     private final List<String>                        
hiveTablePrefixesToIgnore;
+    private final boolean                             
updateHiveProcessNameWithQualifiedName;
     private final boolean                             
hiveTypesRemoveOwnedRefAttrs;
     private final boolean                             
rdbmsTypesRemoveOwnedRefAttrs;
     private final boolean                             isHivePreProcessEnabled;
@@ -70,17 +71,18 @@ public class PreprocessorContext {
     private final Map<String, String>                 guidAssignments        = 
new HashMap<>();
     private       List<AtlasEntity>                   postUpdateEntities     = 
null;
 
-    public PreprocessorContext(AtlasKafkaMessage<HookNotification> 
kafkaMessage, AtlasTypeRegistry typeRegistry, List<Pattern> hiveTablesToIgnore, 
List<Pattern> hiveTablesToPrune, Map<String, PreprocessAction> hiveTablesCache, 
List<String> hiveDummyDatabasesToIgnore, List<String> hiveDummyTablesToIgnore, 
List<String> hiveTablePrefixesToIgnore, boolean hiveTypesRemoveOwnedRefAttrs, 
boolean rdbmsTypesRemoveOwnedRefAttrs) {
-        this.kafkaMessage                  = kafkaMessage;
-        this.typeRegistry                  = typeRegistry;
-        this.hiveTablesToIgnore            = hiveTablesToIgnore;
-        this.hiveTablesToPrune             = hiveTablesToPrune;
-        this.hiveTablesCache               = hiveTablesCache;
-        this.hiveDummyDatabasesToIgnore    = hiveDummyDatabasesToIgnore;
-        this.hiveDummyTablesToIgnore       = hiveDummyTablesToIgnore;
-        this.hiveTablePrefixesToIgnore     = hiveTablePrefixesToIgnore;
-        this.hiveTypesRemoveOwnedRefAttrs  = hiveTypesRemoveOwnedRefAttrs;
-        this.rdbmsTypesRemoveOwnedRefAttrs = rdbmsTypesRemoveOwnedRefAttrs;
+    public PreprocessorContext(AtlasKafkaMessage<HookNotification> 
kafkaMessage, AtlasTypeRegistry typeRegistry, List<Pattern> hiveTablesToIgnore, 
List<Pattern> hiveTablesToPrune, Map<String, PreprocessAction> hiveTablesCache, 
List<String> hiveDummyDatabasesToIgnore, List<String> hiveDummyTablesToIgnore, 
List<String> hiveTablePrefixesToIgnore, boolean hiveTypesRemoveOwnedRefAttrs, 
boolean rdbmsTypesRemoveOwnedRefAttrs, boolean 
updateHiveProcessNameWithQualifiedName) {
+        this.kafkaMessage                           = kafkaMessage;
+        this.typeRegistry                           = typeRegistry;
+        this.hiveTablesToIgnore                     = hiveTablesToIgnore;
+        this.hiveTablesToPrune                      = hiveTablesToPrune;
+        this.hiveTablesCache                        = hiveTablesCache;
+        this.hiveDummyDatabasesToIgnore             = 
hiveDummyDatabasesToIgnore;
+        this.hiveDummyTablesToIgnore                = hiveDummyTablesToIgnore;
+        this.hiveTablePrefixesToIgnore              = 
hiveTablePrefixesToIgnore;
+        this.hiveTypesRemoveOwnedRefAttrs           = 
hiveTypesRemoveOwnedRefAttrs;
+        this.rdbmsTypesRemoveOwnedRefAttrs          = 
rdbmsTypesRemoveOwnedRefAttrs;
+        this.updateHiveProcessNameWithQualifiedName = 
updateHiveProcessNameWithQualifiedName;
 
         final HookNotification  message = kafkaMessage.getMessage();
 
@@ -98,7 +100,7 @@ public class PreprocessorContext {
             break;
         }
 
-        this.isHivePreProcessEnabled = hiveTypesRemoveOwnedRefAttrs || 
!hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() || 
!hiveDummyDatabasesToIgnore.isEmpty() || !hiveDummyTablesToIgnore.isEmpty() || 
!hiveTablePrefixesToIgnore.isEmpty();
+        this.isHivePreProcessEnabled = hiveTypesRemoveOwnedRefAttrs || 
!hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() || 
!hiveDummyDatabasesToIgnore.isEmpty() || !hiveDummyTablesToIgnore.isEmpty() || 
!hiveTablePrefixesToIgnore.isEmpty() || updateHiveProcessNameWithQualifiedName;
     }
 
     public AtlasKafkaMessage<HookNotification> getKafkaMessage() {
@@ -113,6 +115,8 @@ public class PreprocessorContext {
         return kafkaMessage.getPartition();
     }
 
+    public boolean updateHiveProcessNameWithQualifiedName() { return 
updateHiveProcessNameWithQualifiedName; }
+
     public boolean getHiveTypesRemoveOwnedRefAttrs() { return 
hiveTypesRemoveOwnedRefAttrs; }
 
     public boolean getRdbmsTypesRemoveOwnedRefAttrs() { return 
rdbmsTypesRemoveOwnedRefAttrs; }
diff --git 
a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
 
b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
index 93675e8..33191a7 100644
--- 
a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
+++ 
b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
@@ -23,6 +23,7 @@ import org.apache.atlas.AtlasException;
 import org.apache.atlas.AtlasServiceException;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.kafka.*;
+import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
 import org.apache.atlas.model.notification.HookNotification;
 import org.apache.atlas.util.AtlasMetricsUtil;
@@ -89,7 +90,7 @@ public class NotificationHookConsumerKafkaTest {
         MockitoAnnotations.initMocks(this);
 
         AtlasType                mockType   = mock(AtlasType.class);
-        AtlasEntitiesWithExtInfo mockEntity = 
mock(AtlasEntitiesWithExtInfo.class);
+        AtlasEntitiesWithExtInfo mockEntity = new 
AtlasEntitiesWithExtInfo(createV2Entity());
 
         when(typeRegistry.getType(anyString())).thenReturn(mockType);
 
@@ -220,6 +221,16 @@ public class NotificationHookConsumerKafkaTest {
         return entity;
     }
 
+    AtlasEntity createV2Entity() {
+        final AtlasEntity entity = new 
AtlasEntity(AtlasClient.DATA_SET_SUPER_TYPE);
+
+        entity.setAttribute(NAME, "db" + randomString());
+        entity.setAttribute(DESCRIPTION, randomString());
+        entity.setAttribute(QUALIFIED_NAME, randomString());
+
+        return entity;
+    }
+
     protected String randomString() {
         return RandomStringUtils.randomAlphanumeric(10);
     }
diff --git 
a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
 
b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
index ece46a4..3774064 100644
--- 
a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
+++ 
b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
@@ -23,6 +23,7 @@ import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.ha.HAConfiguration;
 import org.apache.atlas.kafka.AtlasKafkaMessage;
 import org.apache.atlas.kafka.KafkaNotification;
+import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
 import org.apache.atlas.model.instance.EntityMutationResponse;
 import 
org.apache.atlas.model.notification.HookNotification.HookNotificationType;
@@ -85,7 +86,7 @@ public class NotificationHookConsumerTest {
         MockitoAnnotations.initMocks(this);
 
         AtlasType                mockType   = mock(AtlasType.class);
-        AtlasEntitiesWithExtInfo mockEntity = 
mock(AtlasEntitiesWithExtInfo.class);
+        AtlasEntitiesWithExtInfo mockEntity = new 
AtlasEntitiesWithExtInfo(mock(AtlasEntity.class));
 
         when(typeRegistry.getType(anyString())).thenReturn(mockType);
         
when(instanceConverter.toAtlasEntities(anyList())).thenReturn(mockEntity);

Reply via email to