Repository: incubator-atlas
Updated Branches:
  refs/heads/master 1d85e95fa -> ce54e8a4d


ATLAS-1499: Notification processing using V2 Store (#2) - fixes in handling of 
partial-update notifications

Signed-off-by: Madhan Neethiraj <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/ce54e8a4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/ce54e8a4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/ce54e8a4

Branch: refs/heads/master
Commit: ce54e8a4de04f410c3129ebe1dafc6864e1aa98c
Parents: 1d85e95
Author: apoorvnaik <[email protected]>
Authored: Mon Feb 13 10:24:49 2017 -0800
Committer: Madhan Neethiraj <[email protected]>
Committed: Fri Feb 17 16:13:48 2017 -0800

----------------------------------------------------------------------
 .../converters/AtlasInstanceConverter.java      |  8 +++---
 .../store/graph/v1/AtlasEntityStoreV1.java      |  6 ++---
 .../store/graph/v1/AtlasGraphUtilsV1.java       |  5 ++++
 .../notification/NotificationHookConsumer.java  | 27 +++++++++++++-------
 .../NotificationHookConsumerKafkaTest.java      |  2 +-
 .../NotificationHookConsumerTest.java           |  2 +-
 6 files changed, 32 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ce54e8a4/repository/src/main/java/org/apache/atlas/repository/converters/AtlasInstanceConverter.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/converters/AtlasInstanceConverter.java
 
b/repository/src/main/java/org/apache/atlas/repository/converters/AtlasInstanceConverter.java
index 95dcc7a..9d475bf 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/converters/AtlasInstanceConverter.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/converters/AtlasInstanceConverter.java
@@ -125,7 +125,7 @@ public class AtlasInstanceConverter {
         return ret;
     }
 
-    public AtlasEntity.AtlasEntitiesWithExtInfo 
getAtlasEntity(IReferenceableInstance referenceable) throws AtlasBaseException {
+    public AtlasEntity.AtlasEntitiesWithExtInfo 
toAtlasEntity(IReferenceableInstance referenceable) throws AtlasBaseException {
 
         AtlasEntityFormatConverter converter  = (AtlasEntityFormatConverter) 
instanceFormatters.getConverter(TypeCategory.ENTITY);
         AtlasEntityType      entityType = 
typeRegistry.getEntityTypeByName(referenceable.getTypeName());
@@ -187,9 +187,9 @@ public class AtlasInstanceConverter {
         return new AtlasBaseException(e);
     }
 
-    public AtlasEntity.AtlasEntitiesWithExtInfo 
getEntities(List<Referenceable> referenceables) throws AtlasBaseException {
+    public AtlasEntity.AtlasEntitiesWithExtInfo 
toAtlasEntities(List<Referenceable> referenceables) throws AtlasBaseException {
         if (LOG.isDebugEnabled()) {
-            LOG.debug("==> getEntities");
+            LOG.debug("==> toAtlasEntities");
         }
 
         AtlasFormatConverter.ConverterContext context = new 
AtlasFormatConverter.ConverterContext();
@@ -199,7 +199,7 @@ public class AtlasInstanceConverter {
             context.addEntity(entity);
         }
         if (LOG.isDebugEnabled()) {
-            LOG.debug("<== getEntities");
+            LOG.debug("<== toAtlasEntities");
         }
 
         return context.getEntities();

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ce54e8a4/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
index 4684bfe..31a5e8c 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
@@ -224,9 +224,9 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore 
{
             throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, 
"no entity to update.");
         }
 
-        AtlasVertex entityVertex = 
AtlasGraphUtilsV1.getVertexByUniqueAttributes(entityType, uniqAttributes);
+        String guid = AtlasGraphUtilsV1.getGuidByUniqueAttributes(entityType, 
uniqAttributes);
 
-        updatedEntity.setGuid(AtlasGraphUtilsV1.getIdFromVertex(entityVertex));
+        updatedEntity.setGuid(guid);
 
         return createOrUpdate(new AtlasEntityStream(updatedEntity), true);
     }
@@ -249,7 +249,7 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore 
{
             }
         }
 
-        Collection<AtlasVertex> deletionCandidates = new 
ArrayList<AtlasVertex>();
+        Collection<AtlasVertex> deletionCandidates = new ArrayList<>();
         deletionCandidates.add(vertex);
 
         return deleteVertices(deletionCandidates);

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ce54e8a4/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java
index efc50d3..49d5a08 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java
@@ -190,6 +190,11 @@ public class AtlasGraphUtilsV1 {
         return vertex;
     }
 
+    public static String getGuidByUniqueAttributes(AtlasEntityType entityType, 
Map<String, Object> attrValues) throws AtlasBaseException {
+        AtlasVertex vertexByUniqueAttributes = 
getVertexByUniqueAttributes(entityType, attrValues);
+        return getIdFromVertex(vertexByUniqueAttributes);
+    }
+
     public static AtlasVertex findByUniqueAttributes(AtlasEntityType 
entityType, Map<String, Object> attrValues) {
         AtlasVertex vertex = null;
 

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ce54e8a4/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
----------------------------------------------------------------------
diff --git 
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
 
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 891d7ac..c16fd66 100644
--- 
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ 
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -29,9 +29,11 @@ import org.apache.atlas.ha.HAConfiguration;
 import org.apache.atlas.listener.ActiveStateChangeHandler;
 import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.notification.hook.HookNotification;
+import 
org.apache.atlas.notification.hook.HookNotification.EntityPartialUpdateRequest;
 import org.apache.atlas.repository.converters.AtlasInstanceConverter;
 import org.apache.atlas.repository.store.graph.AtlasEntityStore;
 import org.apache.atlas.repository.store.graph.v1.AtlasEntityStream;
+import org.apache.atlas.repository.store.graph.v1.AtlasGraphUtilsV1;
 import org.apache.atlas.service.Service;
 import org.apache.atlas.type.AtlasEntityType;
 import org.apache.atlas.type.AtlasTypeRegistry;
@@ -45,7 +47,6 @@ import org.slf4j.LoggerFactory;
 
 import javax.inject.Inject;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
@@ -54,7 +55,10 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import static org.apache.atlas.notification.hook.HookNotification.*;
+import static 
org.apache.atlas.notification.hook.HookNotification.EntityCreateRequest;
+import static 
org.apache.atlas.notification.hook.HookNotification.EntityDeleteRequest;
+import static 
org.apache.atlas.notification.hook.HookNotification.EntityUpdateRequest;
+import static 
org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage;
 
 /**
  * Consumer of notifications from hooks e.g., hive hook etc.
@@ -249,7 +253,7 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
                             EntityCreateRequest createRequest = 
(EntityCreateRequest) message;
                             audit(messageUser, AtlasClient.API.CREATE_ENTITY);
 
-                            entities = 
instanceConverter.getEntities(createRequest.getEntities());
+                            entities = 
instanceConverter.toAtlasEntities(createRequest.getEntities());
 
                             atlasEntityStore.createOrUpdate(new 
AtlasEntityStream(entities), false);
                             break;
@@ -262,11 +266,16 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
                             audit(messageUser, 
AtlasClient.API.UPDATE_ENTITY_PARTIAL);
 
                             Referenceable referenceable = 
partialUpdateRequest.getEntity();
-                            entities = 
instanceConverter.getEntities(Collections.singletonList(referenceable));
-                            // There should only be one root entity after the 
conversion
-                            AtlasEntity entity = entities.getEntities().get(0);
-                            // Need to set the attributes explicitly here as 
the qualified name might have changed during update
-                            
entity.setAttribute(partialUpdateRequest.getAttribute(), 
partialUpdateRequest.getAttributeValue());
+                            entities = 
instanceConverter.toAtlasEntity(referenceable);
+
+                            AtlasEntityType entityType = 
typeRegistry.getEntityTypeByName(partialUpdateRequest.getTypeName());
+                            String guid = 
AtlasGraphUtilsV1.getGuidByUniqueAttributes(entityType, new HashMap<String, 
Object>(){
+                                { put(partialUpdateRequest.getAttribute(), 
partialUpdateRequest.getAttributeValue()); }
+                            });
+
+                            // There should only be one root entity
+                            entities.getEntities().get(0).setGuid(guid);
+
                             atlasEntityStore.createOrUpdate(new 
AtlasEntityStream(entities), true);
                             break;
 
@@ -293,7 +302,7 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
                             EntityUpdateRequest updateRequest = 
(EntityUpdateRequest) message;
                             audit(messageUser, AtlasClient.API.UPDATE_ENTITY);
 
-                            entities = 
instanceConverter.getEntities(updateRequest.getEntities());
+                            entities = 
instanceConverter.toAtlasEntities(updateRequest.getEntities());
                             atlasEntityStore.createOrUpdate(new 
AtlasEntityStream(entities), false);
                             break;
 

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ce54e8a4/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
----------------------------------------------------------------------
diff --git 
a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
 
b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
index 13747b2..e744e2e 100644
--- 
a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
+++ 
b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
@@ -77,7 +77,7 @@ public class NotificationHookConsumerKafkaTest {
         AtlasType mockType = mock(AtlasType.class);
         when(typeRegistry.getType(anyString())).thenReturn(mockType);
         AtlasEntity.AtlasEntitiesWithExtInfo mockEntity = 
mock(AtlasEntity.AtlasEntitiesWithExtInfo.class);
-        when(instanceConverter.getEntities(anyList())).thenReturn(mockEntity);
+        
when(instanceConverter.toAtlasEntities(anyList())).thenReturn(mockEntity);
         kafkaNotification = startKafkaServer();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ce54e8a4/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
 
b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
index b86c693..bdb60a2 100644
--- 
a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
+++ 
b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
@@ -75,7 +75,7 @@ public class NotificationHookConsumerTest {
         AtlasType mockType = mock(AtlasType.class);
         when(typeRegistry.getType(anyString())).thenReturn(mockType);
         AtlasEntity.AtlasEntitiesWithExtInfo mockEntity = 
mock(AtlasEntity.AtlasEntitiesWithExtInfo.class);
-        when(instanceConverter.getEntities(anyList())).thenReturn(mockEntity);
+        
when(instanceConverter.toAtlasEntities(anyList())).thenReturn(mockEntity);
         EntityMutationResponse mutationResponse = 
mock(EntityMutationResponse.class);
         when(atlasEntityStore.createOrUpdate(any(EntityStream.class), 
anyBoolean())).thenReturn(mutationResponse);
     }

Reply via email to