Repository: incubator-atlas Updated Branches: refs/heads/master 1d85e95fa -> ce54e8a4d
ATLAS-1499: Notification processing using V2 Store (#2) - fixes in handling of partial-update notifications Signed-off-by: Madhan Neethiraj <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/ce54e8a4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/ce54e8a4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/ce54e8a4 Branch: refs/heads/master Commit: ce54e8a4de04f410c3129ebe1dafc6864e1aa98c Parents: 1d85e95 Author: apoorvnaik <[email protected]> Authored: Mon Feb 13 10:24:49 2017 -0800 Committer: Madhan Neethiraj <[email protected]> Committed: Fri Feb 17 16:13:48 2017 -0800 ---------------------------------------------------------------------- .../converters/AtlasInstanceConverter.java | 8 +++--- .../store/graph/v1/AtlasEntityStoreV1.java | 6 ++--- .../store/graph/v1/AtlasGraphUtilsV1.java | 5 ++++ .../notification/NotificationHookConsumer.java | 27 +++++++++++++------- .../NotificationHookConsumerKafkaTest.java | 2 +- .../NotificationHookConsumerTest.java | 2 +- 6 files changed, 32 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ce54e8a4/repository/src/main/java/org/apache/atlas/repository/converters/AtlasInstanceConverter.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/converters/AtlasInstanceConverter.java b/repository/src/main/java/org/apache/atlas/repository/converters/AtlasInstanceConverter.java index 95dcc7a..9d475bf 100644 --- a/repository/src/main/java/org/apache/atlas/repository/converters/AtlasInstanceConverter.java +++ b/repository/src/main/java/org/apache/atlas/repository/converters/AtlasInstanceConverter.java @@ -125,7 +125,7 @@ public class AtlasInstanceConverter { return ret; } - public AtlasEntity.AtlasEntitiesWithExtInfo getAtlasEntity(IReferenceableInstance referenceable) throws AtlasBaseException { + public AtlasEntity.AtlasEntitiesWithExtInfo toAtlasEntity(IReferenceableInstance referenceable) throws AtlasBaseException { AtlasEntityFormatConverter converter = (AtlasEntityFormatConverter) instanceFormatters.getConverter(TypeCategory.ENTITY); AtlasEntityType entityType = typeRegistry.getEntityTypeByName(referenceable.getTypeName()); @@ -187,9 +187,9 @@ public class AtlasInstanceConverter { return new AtlasBaseException(e); } - public AtlasEntity.AtlasEntitiesWithExtInfo getEntities(List<Referenceable> referenceables) throws AtlasBaseException { + public AtlasEntity.AtlasEntitiesWithExtInfo toAtlasEntities(List<Referenceable> referenceables) throws AtlasBaseException { if (LOG.isDebugEnabled()) { - LOG.debug("==> getEntities"); + LOG.debug("==> toAtlasEntities"); } AtlasFormatConverter.ConverterContext context = new AtlasFormatConverter.ConverterContext(); @@ -199,7 +199,7 @@ public class AtlasInstanceConverter { context.addEntity(entity); } if (LOG.isDebugEnabled()) { - LOG.debug("<== getEntities"); + LOG.debug("<== toAtlasEntities"); } return context.getEntities(); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ce54e8a4/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java index 4684bfe..31a5e8c 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java @@ -224,9 +224,9 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore { throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "no entity to update."); } - AtlasVertex entityVertex = AtlasGraphUtilsV1.getVertexByUniqueAttributes(entityType, uniqAttributes); + String guid = AtlasGraphUtilsV1.getGuidByUniqueAttributes(entityType, uniqAttributes); - updatedEntity.setGuid(AtlasGraphUtilsV1.getIdFromVertex(entityVertex)); + updatedEntity.setGuid(guid); return createOrUpdate(new AtlasEntityStream(updatedEntity), true); } @@ -249,7 +249,7 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore { } } - Collection<AtlasVertex> deletionCandidates = new ArrayList<AtlasVertex>(); + Collection<AtlasVertex> deletionCandidates = new ArrayList<>(); deletionCandidates.add(vertex); return deleteVertices(deletionCandidates); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ce54e8a4/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java index efc50d3..49d5a08 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java @@ -190,6 +190,11 @@ public class AtlasGraphUtilsV1 { return vertex; } + public static String getGuidByUniqueAttributes(AtlasEntityType entityType, Map<String, Object> attrValues) throws AtlasBaseException { + AtlasVertex vertexByUniqueAttributes = getVertexByUniqueAttributes(entityType, attrValues); + return getIdFromVertex(vertexByUniqueAttributes); + } + public static AtlasVertex findByUniqueAttributes(AtlasEntityType entityType, Map<String, Object> attrValues) { AtlasVertex vertex = null; http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ce54e8a4/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java index 891d7ac..c16fd66 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java +++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java @@ -29,9 +29,11 @@ import org.apache.atlas.ha.HAConfiguration; import org.apache.atlas.listener.ActiveStateChangeHandler; import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.notification.hook.HookNotification; +import org.apache.atlas.notification.hook.HookNotification.EntityPartialUpdateRequest; import org.apache.atlas.repository.converters.AtlasInstanceConverter; import org.apache.atlas.repository.store.graph.AtlasEntityStore; import org.apache.atlas.repository.store.graph.v1.AtlasEntityStream; +import org.apache.atlas.repository.store.graph.v1.AtlasGraphUtilsV1; import org.apache.atlas.service.Service; import org.apache.atlas.type.AtlasEntityType; import org.apache.atlas.type.AtlasTypeRegistry; @@ -45,7 +47,6 @@ import org.slf4j.LoggerFactory; import javax.inject.Inject; import java.util.ArrayList; -import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.List; @@ -54,7 +55,10 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import static org.apache.atlas.notification.hook.HookNotification.*; +import static org.apache.atlas.notification.hook.HookNotification.EntityCreateRequest; +import static org.apache.atlas.notification.hook.HookNotification.EntityDeleteRequest; +import static org.apache.atlas.notification.hook.HookNotification.EntityUpdateRequest; +import static org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage; /** * Consumer of notifications from hooks e.g., hive hook etc. @@ -249,7 +253,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl EntityCreateRequest createRequest = (EntityCreateRequest) message; audit(messageUser, AtlasClient.API.CREATE_ENTITY); - entities = instanceConverter.getEntities(createRequest.getEntities()); + entities = instanceConverter.toAtlasEntities(createRequest.getEntities()); atlasEntityStore.createOrUpdate(new AtlasEntityStream(entities), false); break; @@ -262,11 +266,16 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl audit(messageUser, AtlasClient.API.UPDATE_ENTITY_PARTIAL); Referenceable referenceable = partialUpdateRequest.getEntity(); - entities = instanceConverter.getEntities(Collections.singletonList(referenceable)); - // There should only be one root entity after the conversion - AtlasEntity entity = entities.getEntities().get(0); - // Need to set the attributes explicitly here as the qualified name might have changed during update - entity.setAttribute(partialUpdateRequest.getAttribute(), partialUpdateRequest.getAttributeValue()); + entities = instanceConverter.toAtlasEntity(referenceable); + + AtlasEntityType entityType = typeRegistry.getEntityTypeByName(partialUpdateRequest.getTypeName()); + String guid = AtlasGraphUtilsV1.getGuidByUniqueAttributes(entityType, new HashMap<String, Object>(){ + { put(partialUpdateRequest.getAttribute(), partialUpdateRequest.getAttributeValue()); } + }); + + // There should only be one root entity + entities.getEntities().get(0).setGuid(guid); + atlasEntityStore.createOrUpdate(new AtlasEntityStream(entities), true); break; @@ -293,7 +302,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl EntityUpdateRequest updateRequest = (EntityUpdateRequest) message; audit(messageUser, AtlasClient.API.UPDATE_ENTITY); - entities = instanceConverter.getEntities(updateRequest.getEntities()); + entities = instanceConverter.toAtlasEntities(updateRequest.getEntities()); atlasEntityStore.createOrUpdate(new AtlasEntityStream(entities), false); break; http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ce54e8a4/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java index 13747b2..e744e2e 100644 --- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java +++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java @@ -77,7 +77,7 @@ public class NotificationHookConsumerKafkaTest { AtlasType mockType = mock(AtlasType.class); when(typeRegistry.getType(anyString())).thenReturn(mockType); AtlasEntity.AtlasEntitiesWithExtInfo mockEntity = mock(AtlasEntity.AtlasEntitiesWithExtInfo.class); - when(instanceConverter.getEntities(anyList())).thenReturn(mockEntity); + when(instanceConverter.toAtlasEntities(anyList())).thenReturn(mockEntity); kafkaNotification = startKafkaServer(); } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ce54e8a4/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java index b86c693..bdb60a2 100644 --- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java +++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java @@ -75,7 +75,7 @@ public class NotificationHookConsumerTest { AtlasType mockType = mock(AtlasType.class); when(typeRegistry.getType(anyString())).thenReturn(mockType); AtlasEntity.AtlasEntitiesWithExtInfo mockEntity = mock(AtlasEntity.AtlasEntitiesWithExtInfo.class); - when(instanceConverter.getEntities(anyList())).thenReturn(mockEntity); + when(instanceConverter.toAtlasEntities(anyList())).thenReturn(mockEntity); EntityMutationResponse mutationResponse = mock(EntityMutationResponse.class); when(atlasEntityStore.createOrUpdate(any(EntityStream.class), anyBoolean())).thenReturn(mutationResponse); }
