This is an automated email from the ASF dual-hosted git repository.

amestry pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/master by this push:
     new dff690a  ATLAS-3674: ZipFileMigrationImporter: Set Shell Entity 
Creation.
dff690a is described below

commit dff690a00e3ef690ffd584b07d243586d64b31b7
Author: Ashutosh Mestry <[email protected]>
AuthorDate: Fri Mar 20 13:06:01 2020 -0700

    ATLAS-3674: ZipFileMigrationImporter: Set Shell Entity Creation.
---
 .../atlas/repository/impexp/ZipSourceDirect.java   |  8 ++-----
 .../migration/DataMigrationStatusService.java      |  4 ++--
 .../graph/v2/bulkimport/pc/EntityConsumer.java     |  7 +++++-
 .../v2/bulkimport/pc/EntityCreationManager.java    | 28 ++++++++++------------
 .../impexp/DataMigrationStatusServiceTest.java     |  2 +-
 5 files changed, 23 insertions(+), 26 deletions(-)

diff --git 
a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSourceDirect.java
 
b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSourceDirect.java
index 75b8e9e..04342fa 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSourceDirect.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSourceDirect.java
@@ -175,12 +175,8 @@ public class ZipSourceDirect implements EntityImportStream 
{
 
     @Override
     public void setPosition(int index) {
-        try {
-            for (int i = 0; i < index; i++) {
-                moveNextEntry();
-            }
-        } catch (IOException e) {
-            LOG.error("Error setting position: {}. Position may be beyond the 
stream size.", index);
+        for (int i = 0; i < index; i++) {
+            moveNext();
         }
     }
 
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationStatusService.java
 
b/repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationStatusService.java
index 8c7a3a8..b891684 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationStatusService.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationStatusService.java
@@ -99,8 +99,8 @@ public class DataMigrationStatusService {
         this.status = null;
     }
 
-    public void savePosition(String position) {
-        this.status.setCurrentIndex(Long.valueOf(position));
+    public void savePosition(Long position) {
+        this.status.setCurrentIndex(position);
         this.migrationStatusVertexManagement.updateVertexPartial(this.status);
     }
 
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumer.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumer.java
index d0fac10..fc7d392 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumer.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumer.java
@@ -90,6 +90,8 @@ public class EntityConsumer extends 
WorkItemConsumer<AtlasEntity.AtlasEntityWith
     private void processEntity(AtlasEntity.AtlasEntityWithExtInfo 
entityWithExtInfo, long currentCount) {
         try {
             RequestContext.get().setImportInProgress(true);
+            
RequestContext.get().setCreateShellEntityForNonExistingReference(true);
+
             AtlasEntityStreamForImport oneEntityStream = new 
AtlasEntityStreamForImport(entityWithExtInfo, null);
 
             LOG.debug("Processing: {}", currentCount);
@@ -163,7 +165,10 @@ public class EntityConsumer extends 
WorkItemConsumer<AtlasEntity.AtlasEntityWith
     }
 
     private void retryProcessEntity(int retryCount) {
-        LOG.info("Replaying: Starting!: Buffer: {}: Retry count: {}", 
entityBuffer.size(), retryCount);
+        if (LOG.isDebugEnabled() || retryCount > 1) {
+            LOG.info("Replaying: Starting!: Buffer: {}: Retry count: {}", 
entityBuffer.size(), retryCount);
+        }
+
         for (AtlasEntity.AtlasEntityWithExtInfo e : entityBuffer) {
             processEntity(e, counter.get());
         }
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityCreationManager.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityCreationManager.java
index 177b563..734add6 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityCreationManager.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityCreationManager.java
@@ -35,7 +35,7 @@ public class EntityCreationManager<AtlasEntityWithExtInfo> 
extends WorkItemManag
     private static final String WORKER_PREFIX = "migration-import";
     private static final long STATUS_REPORT_TIMEOUT_DURATION = 1 * 60 * 1000; 
// 5 min
 
-    private final StatusReporter<String, String> statusReporter;
+    private final StatusReporter<String, Long> statusReporter;
     private final AtlasImportResult importResult;
     private final DataMigrationStatusService dataMigrationStatusService;
     private String currentTypeName;
@@ -51,7 +51,7 @@ public class EntityCreationManager<AtlasEntityWithExtInfo> 
extends WorkItemManag
     }
 
     public long read(EntityImportStream entityStream) {
-        long currentIndex = 
this.dataMigrationStatusService.getStatus().getCurrentIndex();
+        long currentIndex = entityStream.getPosition();
         AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo;
         this.entityImportStream = entityStream;
         this.dataMigrationStatusService.setStatus("IN_PROGRESS");
@@ -68,6 +68,8 @@ public class EntityCreationManager<AtlasEntityWithExtInfo> 
extends WorkItemManag
                 break;
             }
         }
+
+        this.dataMigrationStatusService.setStatus("DONE");
         return currentIndex;
     }
 
@@ -83,7 +85,7 @@ public class EntityCreationManager<AtlasEntityWithExtInfo> 
extends WorkItemManag
         }
 
         setCurrentTypeName(typeName);
-        statusReporter.produced(entityWithExtInfo.getEntity().getGuid(), 
String.format("%s:%s", entityWithExtInfo.getEntity().getTypeName(), 
currentIndex));
+        statusReporter.produced(entityWithExtInfo.getEntity().getGuid(), 
currentIndex);
         super.checkProduce(entityWithExtInfo);
         extractResults();
     }
@@ -98,25 +100,19 @@ public class EntityCreationManager<AtlasEntityWithExtInfo> 
extends WorkItemManag
     }
 
     private void logStatus() {
-        String ack = statusReporter.ack();
-        if (StringUtils.isEmpty(ack)) {
-            return;
-        }
-
-        String[] split = ack.split(":");
-        if (split.length == 0 || split.length < 2) {
+        Long ack = statusReporter.ack();
+        if (ack == null) {
             return;
         }
 
-        importResult.incrementMeticsCounter(split[0]);
-        String currentPosition = split[1];
-        dataMigrationStatusService.savePosition(currentPosition);
-        this.currentPercent = updateImportMetrics(split[0], 
Integer.parseInt(currentPosition), this.entityImportStream.size(), 
getCurrentPercent());
+        importResult.incrementMeticsCounter(getCurrentTypeName());
+        dataMigrationStatusService.savePosition(ack);
+        this.currentPercent = updateImportMetrics(getCurrentTypeName(), ack, 
this.entityImportStream.size(), getCurrentPercent());
     }
 
-    private static float updateImportMetrics(String typeNameGuid, int 
currentIndex, int streamSize, float currentPercent) {
+    private static float updateImportMetrics(String typeNameGuid, long 
currentIndex, int streamSize, float currentPercent) {
         String lastEntityImported = 
String.format("entity:last-imported:%s:(%s)", typeNameGuid, currentIndex);
-        return BulkImporterImpl.updateImportProgress(LOG, currentIndex, 
streamSize, currentPercent, lastEntityImported);
+        return BulkImporterImpl.updateImportProgress(LOG, (int) currentIndex, 
streamSize, currentPercent, lastEntityImported);
     }
 
     private String getCurrentTypeName() {
diff --git 
a/repository/src/test/java/org/apache/atlas/repository/impexp/DataMigrationStatusServiceTest.java
 
b/repository/src/test/java/org/apache/atlas/repository/impexp/DataMigrationStatusServiceTest.java
index bf1d9a0..5f1cd0b 100644
--- 
a/repository/src/test/java/org/apache/atlas/repository/impexp/DataMigrationStatusServiceTest.java
+++ 
b/repository/src/test/java/org/apache/atlas/repository/impexp/DataMigrationStatusServiceTest.java
@@ -54,7 +54,7 @@ public class DataMigrationStatusServiceTest {
         assertEquals(ret.getTotalCount(), expected.getTotalCount());
         assertEquals(ret.getCurrentIndex(), expected.getCurrentIndex());
 
-        dataMigrationStatusService.savePosition("100");
+        dataMigrationStatusService.savePosition(100l);
         assertNotNull(dataMigrationStatusService.getStatus());
         
assertNotNull(dataMigrationStatusService.getStatus().getCurrentIndex(), "100");
         
assertNotNull(dataMigrationStatusService.getCreate(expected).getCurrentIndex(), 
"100");

Reply via email to