This is an automated email from the ASF dual-hosted git repository.
madhan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push:
new 10bcaa8 ATLAS-3621: updated HiveHook to not save query-string in
multiple attributes - queryText and name
10bcaa8 is described below
commit 10bcaa8091b548efd82e993d6844623f93f1e1f8
Author: Madhan Neethiraj <[email protected]>
AuthorDate: Fri Feb 14 18:02:26 2020 -0800
ATLAS-3621: updated HiveHook to not save query-string in multiple
attributes - queryText and name
---
.../atlas/hive/hook/events/BaseHiveEvent.java | 4 +--
.../notification/NotificationHookConsumer.java | 38 ++++++++++++++++++++--
.../preprocessor/HivePreprocessor.java | 8 +++++
.../preprocessor/PreprocessorContext.java | 28 +++++++++-------
.../NotificationHookConsumerKafkaTest.java | 13 +++++++-
.../notification/NotificationHookConsumerTest.java | 3 +-
6 files changed, 75 insertions(+), 19 deletions(-)
diff --git
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java
index 0409d8a..35d85f3 100644
---
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java
+++
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java
@@ -646,7 +646,6 @@ public abstract class BaseHiveEvent {
if (queryStr != null) {
queryStr = queryStr.toLowerCase().trim();
}
- ret.setAttribute(ATTRIBUTE_NAME, queryStr);
ret.setAttribute(ATTRIBUTE_OPERATION_TYPE, getOperationName());
String qualifiedName = getQualifiedName(inputs, outputs);
@@ -661,6 +660,7 @@ public abstract class BaseHiveEvent {
}
}
ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, qualifiedName);
+ ret.setAttribute(ATTRIBUTE_NAME, qualifiedName);
ret.setRelationshipAttribute(ATTRIBUTE_INPUTS,
AtlasTypeUtil.getAtlasRelatedObjectIds(inputs,
RELATIONSHIP_DATASET_PROCESS_INPUTS));
ret.setRelationshipAttribute(ATTRIBUTE_OUTPUTS,
AtlasTypeUtil.getAtlasRelatedObjectIds(outputs,
RELATIONSHIP_PROCESS_DATASET_OUTPUTS));
@@ -697,7 +697,7 @@ public abstract class BaseHiveEvent {
ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME,
hiveProcess.getAttribute(ATTRIBUTE_QUALIFIED_NAME).toString() +
QNAME_SEP_PROCESS + getQueryStartTime().toString() +
QNAME_SEP_PROCESS + endTime.toString());
- ret.setAttribute(ATTRIBUTE_NAME, queryStr + QNAME_SEP_PROCESS +
getQueryStartTime().toString());
+ ret.setAttribute(ATTRIBUTE_NAME,
ret.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
ret.setAttribute(ATTRIBUTE_START_TIME, getQueryStartTime());
ret.setAttribute(ATTRIBUTE_END_TIME, endTime);
ret.setAttribute(ATTRIBUTE_USER_NAME, getUserName());
diff --git
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 14cae42..3f1ea05 100644
---
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -146,6 +146,7 @@ public class NotificationHookConsumer implements Service,
ActiveStateChangeHandl
public static final String
CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_DUMMY_NAMES =
"atlas.notification.consumer.preprocess.hive_table.ignore.dummy.names";
public static final String
CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_NAME_PREFIXES_ENABLED =
"atlas.notification.consumer.preprocess.hive_table.ignore.name.prefixes.enabled";
public static final String
CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_NAME_PREFIXES =
"atlas.notification.consumer.preprocess.hive_table.ignore.name.prefixes";
+ public static final String
CONSUMER_PREPROCESS_HIVE_PROCESS_UPD_NAME_WITH_QUALIFIED_NAME =
"atlas.notification.consumer.preprocess.hive_process.update.name.with.qualified_name";
public static final String
CONSUMER_PREPROCESS_HIVE_TYPES_REMOVE_OWNEDREF_ATTRS =
"atlas.notification.consumer.preprocess.hive_types.remove.ownedref.attrs";
public static final String
CONSUMER_PREPROCESS_RDBMS_TYPES_REMOVE_OWNEDREF_ATTRS =
"atlas.notification.consumer.preprocess.rdbms_types.remove.ownedref.attrs";
public static final String CONSUMER_AUTHORIZE_USING_MESSAGE_USER
= "atlas.notification.authorize.using.message.user";
@@ -165,6 +166,7 @@ public class NotificationHookConsumer implements Service,
ActiveStateChangeHandl
private final int commitBatchSize;
private final boolean skipHiveColumnLineageHive20633;
private final int
skipHiveColumnLineageHive20633InputsThreshold;
+ private final boolean
updateHiveProcessNameWithQualifiedName;
private final int
largeMessageProcessingTimeThresholdMs;
private final boolean consumerDisabled;
private final List<Pattern> hiveTablesToIgnore = new
ArrayList<>();
@@ -212,6 +214,7 @@ public class NotificationHookConsumer implements Service,
ActiveStateChangeHandl
skipHiveColumnLineageHive20633 =
applicationProperties.getBoolean(CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633,
false);
skipHiveColumnLineageHive20633InputsThreshold =
applicationProperties.getInt(CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD,
15); // skip if avg # of inputs is > 15
+ updateHiveProcessNameWithQualifiedName =
applicationProperties.getBoolean(CONSUMER_PREPROCESS_HIVE_PROCESS_UPD_NAME_WITH_QUALIFIED_NAME,
true);
consumerDisabled =
applicationProperties.getBoolean(CONSUMER_DISABLED, false);
largeMessageProcessingTimeThresholdMs =
applicationProperties.getInt("atlas.notification.consumer.large.message.processing.time.threshold.ms",
60 * 1000); // 60 sec by default
createShellEntityForNonExistingReference =
AtlasConfiguration.NOTIFICATION_CREATE_SHELL_ENTITY_FOR_NON_EXISTING_REF.getBoolean();
@@ -294,9 +297,11 @@ public class NotificationHookConsumer implements Service,
ActiveStateChangeHandl
hiveTablePrefixesToIgnore = Collections.emptyList();
}
+ LOG.info("{}={}",
CONSUMER_PREPROCESS_HIVE_PROCESS_UPD_NAME_WITH_QUALIFIED_NAME,
updateHiveProcessNameWithQualifiedName);
+
hiveTypesRemoveOwnedRefAttrs =
applicationProperties.getBoolean(CONSUMER_PREPROCESS_HIVE_TYPES_REMOVE_OWNEDREF_ATTRS,
true);
rdbmsTypesRemoveOwnedRefAttrs =
applicationProperties.getBoolean(CONSUMER_PREPROCESS_RDBMS_TYPES_REMOVE_OWNEDREF_ATTRS,
true);
- preprocessEnabled = skipHiveColumnLineageHive20633 ||
hiveTypesRemoveOwnedRefAttrs || rdbmsTypesRemoveOwnedRefAttrs ||
!hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() ||
!hiveDummyDatabasesToIgnore.isEmpty() || !hiveDummyTablesToIgnore.isEmpty() ||
!hiveTablePrefixesToIgnore.isEmpty();
+ preprocessEnabled = skipHiveColumnLineageHive20633 ||
updateHiveProcessNameWithQualifiedName || hiveTypesRemoveOwnedRefAttrs ||
rdbmsTypesRemoveOwnedRefAttrs || !hiveTablesToIgnore.isEmpty() ||
!hiveTablesToPrune.isEmpty() || !hiveDummyDatabasesToIgnore.isEmpty() ||
!hiveDummyTablesToIgnore.isEmpty() || !hiveTablePrefixesToIgnore.isEmpty();
LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633,
skipHiveColumnLineageHive20633);
LOG.info("{}={}",
CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD,
skipHiveColumnLineageHive20633InputsThreshold);
@@ -584,6 +589,33 @@ public class NotificationHookConsumer implements Service,
ActiveStateChangeHandl
return;
}
+ // covert V1 messages to V2 to enable preProcess
+ try {
+ switch (message.getType()) {
+ case ENTITY_CREATE: {
+ final EntityCreateRequest createRequest =
(EntityCreateRequest) message;
+ final AtlasEntitiesWithExtInfo entities =
instanceConverter.toAtlasEntities(createRequest.getEntities());
+ final EntityCreateRequestV2 v2Request = new
EntityCreateRequestV2(message.getUser(), entities);
+
+ kafkaMsg = new AtlasKafkaMessage<>(v2Request,
kafkaMsg.getOffset(), kafkaMsg.getTopic(), kafkaMsg.getPartition());
+ message = kafkaMsg.getMessage();
+ }
+ break;
+
+ case ENTITY_FULL_UPDATE: {
+ final EntityUpdateRequest updateRequest =
(EntityUpdateRequest) message;
+ final AtlasEntitiesWithExtInfo entities =
instanceConverter.toAtlasEntities(updateRequest.getEntities());
+ final EntityUpdateRequestV2 v2Request = new
EntityUpdateRequestV2(messageUser, entities);
+
+ kafkaMsg = new AtlasKafkaMessage<>(v2Request,
kafkaMsg.getOffset(), kafkaMsg.getTopic(), kafkaMsg.getPartition());
+ message = kafkaMsg.getMessage();
+ }
+ break;
+ }
+ } catch (AtlasBaseException excp) {
+ LOG.error("handleMessage(): failed to convert V1 message
to V2", message.getType().name());
+ }
+
PreprocessorContext context =
preProcessNotificationMessage(kafkaMsg);
if (isEmptyMessage(kafkaMsg)) {
@@ -934,7 +966,7 @@ public class NotificationHookConsumer implements Service,
ActiveStateChangeHandl
PreprocessorContext context = null;
if (preprocessEnabled) {
- context = new PreprocessorContext(kafkaMsg, typeRegistry,
hiveTablesToIgnore, hiveTablesToPrune, hiveTablesCache,
hiveDummyDatabasesToIgnore, hiveDummyTablesToIgnore, hiveTablePrefixesToIgnore,
hiveTypesRemoveOwnedRefAttrs, rdbmsTypesRemoveOwnedRefAttrs);
+ context = new PreprocessorContext(kafkaMsg, typeRegistry,
hiveTablesToIgnore, hiveTablesToPrune, hiveTablesCache,
hiveDummyDatabasesToIgnore, hiveDummyTablesToIgnore, hiveTablePrefixesToIgnore,
hiveTypesRemoveOwnedRefAttrs, rdbmsTypesRemoveOwnedRefAttrs,
updateHiveProcessNameWithQualifiedName);
if (context.isHivePreprocessEnabled()) {
preprocessHiveTypes(context);
@@ -950,7 +982,7 @@ public class NotificationHookConsumer implements Service,
ActiveStateChangeHandl
context.moveRegisteredReferredEntities();
- if (context.isHivePreprocessEnabled() &&
CollectionUtils.isNotEmpty(context.getEntities())) {
+ if (context.isHivePreprocessEnabled() &&
CollectionUtils.isNotEmpty(context.getEntities()) &&
context.getEntities().size() > 1) {
// move hive_process and hive_column_lineage entities to end
of the list
List<AtlasEntity> entities = context.getEntities();
int count = entities.size();
diff --git
a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java
b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java
index cc5fe52..4bb279d 100644
---
a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java
+++
b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java
@@ -162,6 +162,14 @@ public class HivePreprocessor {
@Override
public void preprocess(AtlasEntity entity, PreprocessorContext
context) {
+ if (context.updateHiveProcessNameWithQualifiedName()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("setting {}.name={}. topic-offset={},
partition={}", entity.getTypeName(),
entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME), context.getKafkaMessageOffset(),
context.getKafkaPartition());
+ }
+
+ entity.setAttribute(ATTRIBUTE_NAME,
entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
+ }
+
if (context.isIgnoredEntity(entity.getGuid())) {
context.addToIgnoredEntities(entity); // so that this will be
logged with typeName and qualifiedName
} else {
diff --git
a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
index 3c58c9f..608b4a3 100644
---
a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
+++
b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
@@ -59,6 +59,7 @@ public class PreprocessorContext {
private final List<String>
hiveDummyDatabasesToIgnore;
private final List<String> hiveDummyTablesToIgnore;
private final List<String>
hiveTablePrefixesToIgnore;
+ private final boolean
updateHiveProcessNameWithQualifiedName;
private final boolean
hiveTypesRemoveOwnedRefAttrs;
private final boolean
rdbmsTypesRemoveOwnedRefAttrs;
private final boolean isHivePreProcessEnabled;
@@ -70,17 +71,18 @@ public class PreprocessorContext {
private final Map<String, String> guidAssignments =
new HashMap<>();
private List<AtlasEntity> postUpdateEntities =
null;
- public PreprocessorContext(AtlasKafkaMessage<HookNotification>
kafkaMessage, AtlasTypeRegistry typeRegistry, List<Pattern> hiveTablesToIgnore,
List<Pattern> hiveTablesToPrune, Map<String, PreprocessAction> hiveTablesCache,
List<String> hiveDummyDatabasesToIgnore, List<String> hiveDummyTablesToIgnore,
List<String> hiveTablePrefixesToIgnore, boolean hiveTypesRemoveOwnedRefAttrs,
boolean rdbmsTypesRemoveOwnedRefAttrs) {
- this.kafkaMessage = kafkaMessage;
- this.typeRegistry = typeRegistry;
- this.hiveTablesToIgnore = hiveTablesToIgnore;
- this.hiveTablesToPrune = hiveTablesToPrune;
- this.hiveTablesCache = hiveTablesCache;
- this.hiveDummyDatabasesToIgnore = hiveDummyDatabasesToIgnore;
- this.hiveDummyTablesToIgnore = hiveDummyTablesToIgnore;
- this.hiveTablePrefixesToIgnore = hiveTablePrefixesToIgnore;
- this.hiveTypesRemoveOwnedRefAttrs = hiveTypesRemoveOwnedRefAttrs;
- this.rdbmsTypesRemoveOwnedRefAttrs = rdbmsTypesRemoveOwnedRefAttrs;
+ public PreprocessorContext(AtlasKafkaMessage<HookNotification>
kafkaMessage, AtlasTypeRegistry typeRegistry, List<Pattern> hiveTablesToIgnore,
List<Pattern> hiveTablesToPrune, Map<String, PreprocessAction> hiveTablesCache,
List<String> hiveDummyDatabasesToIgnore, List<String> hiveDummyTablesToIgnore,
List<String> hiveTablePrefixesToIgnore, boolean hiveTypesRemoveOwnedRefAttrs,
boolean rdbmsTypesRemoveOwnedRefAttrs, boolean
updateHiveProcessNameWithQualifiedName) {
+ this.kafkaMessage = kafkaMessage;
+ this.typeRegistry = typeRegistry;
+ this.hiveTablesToIgnore = hiveTablesToIgnore;
+ this.hiveTablesToPrune = hiveTablesToPrune;
+ this.hiveTablesCache = hiveTablesCache;
+ this.hiveDummyDatabasesToIgnore =
hiveDummyDatabasesToIgnore;
+ this.hiveDummyTablesToIgnore = hiveDummyTablesToIgnore;
+ this.hiveTablePrefixesToIgnore =
hiveTablePrefixesToIgnore;
+ this.hiveTypesRemoveOwnedRefAttrs =
hiveTypesRemoveOwnedRefAttrs;
+ this.rdbmsTypesRemoveOwnedRefAttrs =
rdbmsTypesRemoveOwnedRefAttrs;
+ this.updateHiveProcessNameWithQualifiedName =
updateHiveProcessNameWithQualifiedName;
final HookNotification message = kafkaMessage.getMessage();
@@ -98,7 +100,7 @@ public class PreprocessorContext {
break;
}
- this.isHivePreProcessEnabled = hiveTypesRemoveOwnedRefAttrs ||
!hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() ||
!hiveDummyDatabasesToIgnore.isEmpty() || !hiveDummyTablesToIgnore.isEmpty() ||
!hiveTablePrefixesToIgnore.isEmpty();
+ this.isHivePreProcessEnabled = hiveTypesRemoveOwnedRefAttrs ||
!hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() ||
!hiveDummyDatabasesToIgnore.isEmpty() || !hiveDummyTablesToIgnore.isEmpty() ||
!hiveTablePrefixesToIgnore.isEmpty() || updateHiveProcessNameWithQualifiedName;
}
public AtlasKafkaMessage<HookNotification> getKafkaMessage() {
@@ -113,6 +115,8 @@ public class PreprocessorContext {
return kafkaMessage.getPartition();
}
+ public boolean updateHiveProcessNameWithQualifiedName() { return
updateHiveProcessNameWithQualifiedName; }
+
public boolean getHiveTypesRemoveOwnedRefAttrs() { return
hiveTypesRemoveOwnedRefAttrs; }
public boolean getRdbmsTypesRemoveOwnedRefAttrs() { return
rdbmsTypesRemoveOwnedRefAttrs; }
diff --git
a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
index 93675e8..33191a7 100644
---
a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
+++
b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
@@ -23,6 +23,7 @@ import org.apache.atlas.AtlasException;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.kafka.*;
+import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
import org.apache.atlas.model.notification.HookNotification;
import org.apache.atlas.util.AtlasMetricsUtil;
@@ -89,7 +90,7 @@ public class NotificationHookConsumerKafkaTest {
MockitoAnnotations.initMocks(this);
AtlasType mockType = mock(AtlasType.class);
- AtlasEntitiesWithExtInfo mockEntity =
mock(AtlasEntitiesWithExtInfo.class);
+ AtlasEntitiesWithExtInfo mockEntity = new
AtlasEntitiesWithExtInfo(createV2Entity());
when(typeRegistry.getType(anyString())).thenReturn(mockType);
@@ -220,6 +221,16 @@ public class NotificationHookConsumerKafkaTest {
return entity;
}
+ AtlasEntity createV2Entity() {
+ final AtlasEntity entity = new
AtlasEntity(AtlasClient.DATA_SET_SUPER_TYPE);
+
+ entity.setAttribute(NAME, "db" + randomString());
+ entity.setAttribute(DESCRIPTION, randomString());
+ entity.setAttribute(QUALIFIED_NAME, randomString());
+
+ return entity;
+ }
+
protected String randomString() {
return RandomStringUtils.randomAlphanumeric(10);
}
diff --git
a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
index ece46a4..3774064 100644
---
a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
+++
b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
@@ -23,6 +23,7 @@ import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.ha.HAConfiguration;
import org.apache.atlas.kafka.AtlasKafkaMessage;
import org.apache.atlas.kafka.KafkaNotification;
+import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
import org.apache.atlas.model.instance.EntityMutationResponse;
import
org.apache.atlas.model.notification.HookNotification.HookNotificationType;
@@ -85,7 +86,7 @@ public class NotificationHookConsumerTest {
MockitoAnnotations.initMocks(this);
AtlasType mockType = mock(AtlasType.class);
- AtlasEntitiesWithExtInfo mockEntity =
mock(AtlasEntitiesWithExtInfo.class);
+ AtlasEntitiesWithExtInfo mockEntity = new
AtlasEntitiesWithExtInfo(mock(AtlasEntity.class));
when(typeRegistry.getType(anyString())).thenReturn(mockType);
when(instanceConverter.toAtlasEntities(anyList())).thenReturn(mockEntity);