This is an automated email from the ASF dual-hosted git repository. madhan pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/atlas.git
commit 80239468dc78e52e5edece828f7495a5abde3273 Author: Ashutosh Mestry <[email protected]> AuthorDate: Fri Mar 20 13:06:01 2020 -0700 ATLAS-3674: ZipFileMigrationImporter: Set Shell Entity Creation. (cherry picked from commit dff690a00e3ef690ffd584b07d243586d64b31b7) --- .../atlas/repository/impexp/ZipSourceDirect.java | 8 ++----- .../migration/DataMigrationStatusService.java | 4 ++-- .../graph/v2/bulkimport/pc/EntityConsumer.java | 7 +++++- .../v2/bulkimport/pc/EntityCreationManager.java | 28 ++++++++++------------ .../impexp/DataMigrationStatusServiceTest.java | 2 +- 5 files changed, 23 insertions(+), 26 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSourceDirect.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSourceDirect.java index 75b8e9e..04342fa 100644 --- a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSourceDirect.java +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSourceDirect.java @@ -175,12 +175,8 @@ public class ZipSourceDirect implements EntityImportStream { @Override public void setPosition(int index) { - try { - for (int i = 0; i < index; i++) { - moveNextEntry(); - } - } catch (IOException e) { - LOG.error("Error setting position: {}. Position may be beyond the stream size.", index); + for (int i = 0; i < index; i++) { + moveNext(); } } diff --git a/repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationStatusService.java b/repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationStatusService.java index 8c7a3a8..b891684 100644 --- a/repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationStatusService.java +++ b/repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationStatusService.java @@ -99,8 +99,8 @@ public class DataMigrationStatusService { this.status = null; } - public void savePosition(String position) { - this.status.setCurrentIndex(Long.valueOf(position)); + public void savePosition(Long position) { + this.status.setCurrentIndex(position); this.migrationStatusVertexManagement.updateVertexPartial(this.status); } diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumer.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumer.java index d0fac10..fc7d392 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumer.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumer.java @@ -90,6 +90,8 @@ public class EntityConsumer extends WorkItemConsumer<AtlasEntity.AtlasEntityWith private void processEntity(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo, long currentCount) { try { RequestContext.get().setImportInProgress(true); + RequestContext.get().setCreateShellEntityForNonExistingReference(true); + AtlasEntityStreamForImport oneEntityStream = new AtlasEntityStreamForImport(entityWithExtInfo, null); LOG.debug("Processing: {}", currentCount); @@ -163,7 +165,10 @@ public class EntityConsumer extends WorkItemConsumer<AtlasEntity.AtlasEntityWith } private void retryProcessEntity(int retryCount) { - LOG.info("Replaying: Starting!: Buffer: {}: Retry count: {}", entityBuffer.size(), retryCount); + if (LOG.isDebugEnabled() || retryCount > 1) { + LOG.info("Replaying: Starting!: Buffer: {}: Retry count: {}", entityBuffer.size(), retryCount); + } + for (AtlasEntity.AtlasEntityWithExtInfo e : entityBuffer) { processEntity(e, counter.get()); } diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityCreationManager.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityCreationManager.java index 177b563..734add6 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityCreationManager.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityCreationManager.java @@ -35,7 +35,7 @@ public class EntityCreationManager<AtlasEntityWithExtInfo> extends WorkItemManag private static final String WORKER_PREFIX = "migration-import"; private static final long STATUS_REPORT_TIMEOUT_DURATION = 1 * 60 * 1000; // 5 min - private final StatusReporter<String, String> statusReporter; + private final StatusReporter<String, Long> statusReporter; private final AtlasImportResult importResult; private final DataMigrationStatusService dataMigrationStatusService; private String currentTypeName; @@ -51,7 +51,7 @@ public class EntityCreationManager<AtlasEntityWithExtInfo> extends WorkItemManag } public long read(EntityImportStream entityStream) { - long currentIndex = this.dataMigrationStatusService.getStatus().getCurrentIndex(); + long currentIndex = entityStream.getPosition(); AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo; this.entityImportStream = entityStream; this.dataMigrationStatusService.setStatus("IN_PROGRESS"); @@ -68,6 +68,8 @@ public class EntityCreationManager<AtlasEntityWithExtInfo> extends WorkItemManag break; } } + + this.dataMigrationStatusService.setStatus("DONE"); return currentIndex; } @@ -83,7 +85,7 @@ public class EntityCreationManager<AtlasEntityWithExtInfo> extends WorkItemManag } setCurrentTypeName(typeName); - statusReporter.produced(entityWithExtInfo.getEntity().getGuid(), String.format("%s:%s", entityWithExtInfo.getEntity().getTypeName(), currentIndex)); + statusReporter.produced(entityWithExtInfo.getEntity().getGuid(), currentIndex); super.checkProduce(entityWithExtInfo); extractResults(); } @@ -98,25 +100,19 @@ public class EntityCreationManager<AtlasEntityWithExtInfo> extends WorkItemManag } private void logStatus() { - String ack = statusReporter.ack(); - if (StringUtils.isEmpty(ack)) { - return; - } - - String[] split = ack.split(":"); - if (split.length == 0 || split.length < 2) { + Long ack = statusReporter.ack(); + if (ack == null) { return; } - importResult.incrementMeticsCounter(split[0]); - String currentPosition = split[1]; - dataMigrationStatusService.savePosition(currentPosition); - this.currentPercent = updateImportMetrics(split[0], Integer.parseInt(currentPosition), this.entityImportStream.size(), getCurrentPercent()); + importResult.incrementMeticsCounter(getCurrentTypeName()); + dataMigrationStatusService.savePosition(ack); + this.currentPercent = updateImportMetrics(getCurrentTypeName(), ack, this.entityImportStream.size(), getCurrentPercent()); } - private static float updateImportMetrics(String typeNameGuid, int currentIndex, int streamSize, float currentPercent) { + private static float updateImportMetrics(String typeNameGuid, long currentIndex, int streamSize, float currentPercent) { String lastEntityImported = String.format("entity:last-imported:%s:(%s)", typeNameGuid, currentIndex); - return BulkImporterImpl.updateImportProgress(LOG, currentIndex, streamSize, currentPercent, lastEntityImported); + return BulkImporterImpl.updateImportProgress(LOG, (int) currentIndex, streamSize, currentPercent, lastEntityImported); } private String getCurrentTypeName() { diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/DataMigrationStatusServiceTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/DataMigrationStatusServiceTest.java index bf1d9a0..5f1cd0b 100644 --- a/repository/src/test/java/org/apache/atlas/repository/impexp/DataMigrationStatusServiceTest.java +++ b/repository/src/test/java/org/apache/atlas/repository/impexp/DataMigrationStatusServiceTest.java @@ -54,7 +54,7 @@ public class DataMigrationStatusServiceTest { assertEquals(ret.getTotalCount(), expected.getTotalCount()); assertEquals(ret.getCurrentIndex(), expected.getCurrentIndex()); - dataMigrationStatusService.savePosition("100"); + dataMigrationStatusService.savePosition(100l); assertNotNull(dataMigrationStatusService.getStatus()); assertNotNull(dataMigrationStatusService.getStatus().getCurrentIndex(), "100"); assertNotNull(dataMigrationStatusService.getCreate(expected).getCurrentIndex(), "100");
