Repository: atlas Updated Branches: refs/heads/branch-0.8 ccd417abc -> 0c4c3b581
ATLAS-2900: Export connected addressed case where imported. Project: http://git-wip-us.apache.org/repos/asf/atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/0c4c3b58 Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/0c4c3b58 Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/0c4c3b58 Branch: refs/heads/branch-0.8 Commit: 0c4c3b581d944f2889feeffd689b1e5c2725f560 Parents: ccd417a Author: Ashutosh Mestry <[email protected]> Authored: Fri Oct 26 14:10:59 2018 -0700 Committer: Ashutosh Mestry <[email protected]> Committed: Fri Oct 26 14:10:59 2018 -0700 ---------------------------------------------------------------------- .../atlas/repository/impexp/ExportService.java | 25 +++++++++++++------- .../atlas/repository/util/UniqueList.java | 5 ++-- .../impexp/ExportIncrementalTest.java | 23 ++++++++++++++++++ .../repository/impexp/ExportServiceTest.java | 2 +- .../stocksDB-Entities/export-connected.json | 10 ++++++++ 5 files changed, 53 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/atlas/blob/0c4c3b58/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java index d1f4c8d..7382d32 100644 --- a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java @@ -128,18 +128,16 @@ public class ExportService { long startTime, long endTime) throws AtlasBaseException { int duration = getOperationDuration(startTime, endTime); context.result.setSourceClusterName(AuditsWriter.getCurrentClusterName()); - context.result.getData().getEntityCreationOrder().addAll(context.lineageProcessed); - context.sink.setExportOrder(context.result.getData().getEntityCreationOrder()); + context.addToEntityCreationOrder(context.lineageProcessed); + + context.sink.setExportOrder(context.entityCreationOrder.getList()); context.sink.setTypesDef(context.result.getData().getTypesDef()); context.result.setOperationStatus(getOverallOperationStatus(statuses)); context.result.incrementMeticsCounter("duration", duration); - auditsWriter.write(userName, context.result, startTime, endTime, context.result.getData().getEntityCreationOrder()); - clearContextData(context); - context.sink.setResult(context.result); - } + auditsWriter.write(userName, context.result, startTime, endTime, context.entityCreationOrder.getList()); - private void clearContextData(ExportContext context) { context.result.setData(null); + context.sink.setResult(context.result); } private int getOperationDuration(long startTime, long endTime) { @@ -368,7 +366,7 @@ public class ExportService { TraversalDirection direction) throws AtlasBaseException { if (!context.lineageProcessed.contains(guid) && context.doesTimestampQualify(entityWithExtInfo.getEntity())) { - context.result.getData().getEntityCreationOrder().add(entityWithExtInfo.getEntity().getGuid()); + context.addToEntityCreationOrder(entityWithExtInfo.getEntity().getGuid()); } addEntity(entityWithExtInfo, context); @@ -539,7 +537,7 @@ public class ExportService { } else { List<AtlasEntity> entities = context.getEntitiesWithModifiedTimestamp(entityWithExtInfo); for (AtlasEntity e : entities) { - context.result.getData().getEntityCreationOrder().add(e.getGuid()); + context.addToEntityCreationOrder(e.getGuid()); context.addToSink(new AtlasEntityWithExtInfo(e)); context.result.incrementMeticsCounter(String.format("entity:%s", e.getTypeName())); } @@ -600,6 +598,7 @@ public class ExportService { private static final String ATLAS_TYPE_HIVE_DB = "hive_db"; + final UniqueList<String> entityCreationOrder = new UniqueList<>(); final Set<String> guidsProcessed = new HashSet<>(); final private UniqueList<String> guidsToProcess = new UniqueList<>(); final UniqueList<String> lineageToProcess = new UniqueList<>(); @@ -708,5 +707,13 @@ public class ExportService { public boolean isHiveDBIncrementalSkipLineage() { return isHiveDBIncremental; } + + public void addToEntityCreationOrder(String guid) { + entityCreationOrder.add(guid); + } + + public void addToEntityCreationOrder(Collection<String> guids) { + entityCreationOrder.addAll(guids); + } } } http://git-wip-us.apache.org/repos/asf/atlas/blob/0c4c3b58/repository/src/main/java/org/apache/atlas/repository/util/UniqueList.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/util/UniqueList.java b/repository/src/main/java/org/apache/atlas/repository/util/UniqueList.java index eebbc4e..fb74dc4 100644 --- a/repository/src/main/java/org/apache/atlas/repository/util/UniqueList.java +++ b/repository/src/main/java/org/apache/atlas/repository/util/UniqueList.java @@ -18,6 +18,7 @@ package org.apache.atlas.repository.util; import java.util.ArrayList; +import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -44,8 +45,8 @@ public class UniqueList<T> { } } - public void addAll(List<T> list) { - for (T item : list) { + public void addAll(Collection<T> collection) { + for (T item : collection) { add(item); } } http://git-wip-us.apache.org/repos/asf/atlas/blob/0c4c3b58/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java index 75ef77c..a355297 100644 --- a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java +++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java @@ -27,6 +27,7 @@ import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.impexp.AtlasExportRequest; import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.repository.store.graph.v1.AtlasEntityStoreV1; +import org.apache.atlas.repository.util.UniqueList; import org.apache.atlas.store.AtlasTypeDefStore; import org.apache.atlas.type.AtlasClassificationType; import org.apache.atlas.type.AtlasTypeRegistry; @@ -39,6 +40,7 @@ import org.testng.annotations.Test; import javax.inject.Inject; import java.io.IOException; +import java.util.List; import java.util.Map; import static org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_INCREMENTAL_CHANGE_MARKER; @@ -63,6 +65,7 @@ public class ExportIncrementalTest extends ExportImportTestBase { private AtlasEntityStoreV1 entityStore; private final String EXPORT_REQUEST_INCREMENTAL = "export-incremental"; + private final String EXPORT_REQUEST_CONNECTED = "export-connected"; private long nextTimestamp; @BeforeClass @@ -158,6 +161,17 @@ public class ExportIncrementalTest extends ExportImportTestBase { assertNotNull(source); } + @Test + public void connectedExport() { + ZipSource source = runExportWithParameters(exportService, getConnected()); + + UniqueList<String> creationOrder = new UniqueList<>(); + List<String> zipCreationOrder = source.getCreationOrder(); + creationOrder.addAll(zipCreationOrder); + assertNotNull(source); + assertEquals(creationOrder.size(), zipCreationOrder.size()); + } + private AtlasExportRequest getIncrementalRequest(long timestamp) { try { AtlasExportRequest request = TestResourceFileUtils.readObjectFromJson(ENTITIES_SUB_DIR, EXPORT_REQUEST_INCREMENTAL, AtlasExportRequest.class); @@ -168,4 +182,13 @@ public class ExportIncrementalTest extends ExportImportTestBase { throw new SkipException(String.format("getIncrementalRequest: '%s' could not be laoded.", EXPORT_REQUEST_INCREMENTAL)); } } + + private AtlasExportRequest getConnected() { + try { + return TestResourceFileUtils.readObjectFromJson(ENTITIES_SUB_DIR, EXPORT_REQUEST_CONNECTED, AtlasExportRequest.class); + } catch (IOException e) { + throw new SkipException(String.format("getIncrementalRequest: '%s' could not be laoded.", EXPORT_REQUEST_CONNECTED)); + } + } + } http://git-wip-us.apache.org/repos/asf/atlas/blob/0c4c3b58/repository/src/test/java/org/apache/atlas/repository/impexp/ExportServiceTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportServiceTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportServiceTest.java index 0d943a4..a332175 100644 --- a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportServiceTest.java +++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportServiceTest.java @@ -366,7 +366,7 @@ public class ExportServiceTest extends ExportImportTestBase { private void verifyExportForHrDataForConnected(ZipSource zipSource) throws IOException, AtlasBaseException { assertNotNull(zipSource.getCreationOrder()); - assertTrue(zipSource.getCreationOrder().size() == 2); + assertEquals(zipSource.getCreationOrder().size(), 1); assertTrue(zipSource.hasNext()); AtlasEntity entity = zipSource.next(); http://git-wip-us.apache.org/repos/asf/atlas/blob/0c4c3b58/repository/src/test/resources/json/stocksDB-Entities/export-connected.json ---------------------------------------------------------------------- diff --git a/repository/src/test/resources/json/stocksDB-Entities/export-connected.json b/repository/src/test/resources/json/stocksDB-Entities/export-connected.json new file mode 100644 index 0000000..1b97370 --- /dev/null +++ b/repository/src/test/resources/json/stocksDB-Entities/export-connected.json @@ -0,0 +1,10 @@ +{ + "itemsToExport": [ + { + "typeName": "hive_table", "uniqueAttributes": { "qualifiedName": "stocks_base.stocks_daily@cl1" } + } + ], + "options": { + "fetchType": "connected" + } +}
