This is an automated email from the ASF dual-hosted git repository. madhan pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/atlas.git
commit 9a1b2cc28e6c325b2075dcfec307ed99df0fff26 Author: Ashutosh Mestry <[email protected]> AuthorDate: Fri May 15 11:56:23 2020 -0700 ATLAS-3799: EntityConumer only adds entity GUIDs that are added when they were produced. (cherry picked from commit 3de30f55d808df6245f60fe01fed4b6cf894a31d) --- .../graph/v2/bulkimport/pc/EntityConsumer.java | 25 +++++----------------- 1 file changed, 5 insertions(+), 20 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumer.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumer.java index 6b5b523..34b0561 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumer.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumer.java @@ -59,7 +59,7 @@ public class EntityConsumer extends WorkItemConsumer<AtlasEntity.AtlasEntityWith private final EntityGraphRetriever entityRetrieverBulk; private List<AtlasEntity.AtlasEntityWithExtInfo> entityBuffer = new ArrayList<>(); - private List<EntityMutationResponse> localResults = new ArrayList<>(); + private List<String> localResults = new ArrayList<>(); public EntityConsumer(AtlasTypeRegistry typeRegistry, AtlasGraph atlasGraph, AtlasEntityStore entityStore, @@ -119,7 +119,7 @@ public class EntityConsumer extends WorkItemConsumer<AtlasEntity.AtlasEntityWith private void importUsingBulkEntityStore(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) throws AtlasBaseException { EntityStream oneEntityStream = new AtlasEntityStreamForImport(entityWithExtInfo, null); EntityMutationResponse result = entityStoreBulk.createOrUpdateForImportNoCommit(oneEntityStream); - localResults.add(result); + localResults.add(entityWithExtInfo.getEntity().getGuid()); entityBuffer.add(entityWithExtInfo); } @@ -133,9 +133,9 @@ public class EntityConsumer extends WorkItemConsumer<AtlasEntity.AtlasEntityWith try { LOG.info("Regular: EntityStore: {}: Starting...", this.counter.get()); AtlasEntityStreamForImport oneEntityStream = new AtlasEntityStreamForImport(entityWithExtInfo, null); - EntityMutationResponse result = this.entityStore.createOrUpdateForImportNoCommit(oneEntityStream); + this.entityStore.createOrUpdateForImportNoCommit(oneEntityStream); atlasGraph.commit(); - localResults.add(result); + localResults.add(entityWithExtInfo.getEntity().getGuid()); dispatchResults(); } catch (Exception e) { atlasGraph.rollback(); @@ -244,12 +244,7 @@ public class EntityConsumer extends WorkItemConsumer<AtlasEntity.AtlasEntityWith } private void dispatchResults() { - localResults.stream().forEach(x -> { - addResultsFromResponse(x.getCreatedEntities()); - addResultsFromResponse(x.getUpdatedEntities()); - addResultsFromResponse(x.getDeletedEntities()); - }); - + localResults.stream().forEach(x -> addResult(x)); clear(); } @@ -261,16 +256,6 @@ public class EntityConsumer extends WorkItemConsumer<AtlasEntity.AtlasEntityWith } } - private void addResultsFromResponse(List<AtlasEntityHeader> entities) { - if (CollectionUtils.isEmpty(entities)) { - return; - } - - for (AtlasEntityHeader eh : entities) { - addResult(eh.getGuid()); - } - } - private void clear() { localResults.clear(); entityBuffer.clear();
