This is an automated email from the ASF dual-hosted git repository.

madhan pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git

commit 9a1b2cc28e6c325b2075dcfec307ed99df0fff26
Author: Ashutosh Mestry <[email protected]>
AuthorDate: Fri May 15 11:56:23 2020 -0700

    ATLAS-3799: EntityConumer only adds entity GUIDs that are added when they 
were produced.
    
    (cherry picked from commit 3de30f55d808df6245f60fe01fed4b6cf894a31d)
---
 .../graph/v2/bulkimport/pc/EntityConsumer.java     | 25 +++++-----------------
 1 file changed, 5 insertions(+), 20 deletions(-)

diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumer.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumer.java
index 6b5b523..34b0561 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumer.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumer.java
@@ -59,7 +59,7 @@ public class EntityConsumer extends 
WorkItemConsumer<AtlasEntity.AtlasEntityWith
     private final EntityGraphRetriever entityRetrieverBulk;
 
     private List<AtlasEntity.AtlasEntityWithExtInfo> entityBuffer = new 
ArrayList<>();
-    private List<EntityMutationResponse> localResults = new ArrayList<>();
+    private List<String> localResults = new ArrayList<>();
 
     public EntityConsumer(AtlasTypeRegistry typeRegistry,
                           AtlasGraph atlasGraph, AtlasEntityStore entityStore,
@@ -119,7 +119,7 @@ public class EntityConsumer extends 
WorkItemConsumer<AtlasEntity.AtlasEntityWith
     private void importUsingBulkEntityStore(AtlasEntity.AtlasEntityWithExtInfo 
entityWithExtInfo) throws AtlasBaseException {
         EntityStream oneEntityStream = new 
AtlasEntityStreamForImport(entityWithExtInfo, null);
         EntityMutationResponse result = 
entityStoreBulk.createOrUpdateForImportNoCommit(oneEntityStream);
-        localResults.add(result);
+        localResults.add(entityWithExtInfo.getEntity().getGuid());
         entityBuffer.add(entityWithExtInfo);
     }
 
@@ -133,9 +133,9 @@ public class EntityConsumer extends 
WorkItemConsumer<AtlasEntity.AtlasEntityWith
             try {
                 LOG.info("Regular: EntityStore: {}: Starting...", 
this.counter.get());
                 AtlasEntityStreamForImport oneEntityStream = new 
AtlasEntityStreamForImport(entityWithExtInfo, null);
-                EntityMutationResponse result = 
this.entityStore.createOrUpdateForImportNoCommit(oneEntityStream);
+                
this.entityStore.createOrUpdateForImportNoCommit(oneEntityStream);
                 atlasGraph.commit();
-                localResults.add(result);
+                localResults.add(entityWithExtInfo.getEntity().getGuid());
                 dispatchResults();
             } catch (Exception e) {
                 atlasGraph.rollback();
@@ -244,12 +244,7 @@ public class EntityConsumer extends 
WorkItemConsumer<AtlasEntity.AtlasEntityWith
     }
 
     private void dispatchResults() {
-        localResults.stream().forEach(x -> {
-            addResultsFromResponse(x.getCreatedEntities());
-            addResultsFromResponse(x.getUpdatedEntities());
-            addResultsFromResponse(x.getDeletedEntities());
-        });
-
+        localResults.stream().forEach(x -> addResult(x));
         clear();
     }
 
@@ -261,16 +256,6 @@ public class EntityConsumer extends 
WorkItemConsumer<AtlasEntity.AtlasEntityWith
         }
     }
 
-    private void addResultsFromResponse(List<AtlasEntityHeader> entities) {
-        if (CollectionUtils.isEmpty(entities)) {
-            return;
-        }
-
-        for (AtlasEntityHeader eh : entities) {
-            addResult(eh.getGuid());
-        }
-    }
-
     private void clear() {
         localResults.clear();
         entityBuffer.clear();

Reply via email to