This is an automated email from the ASF dual-hosted git repository.
amestry pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push:
new dff690a ATLAS-3674: ZipFileMigrationImporter: Set Shell Entity
Creation.
dff690a is described below
commit dff690a00e3ef690ffd584b07d243586d64b31b7
Author: Ashutosh Mestry <[email protected]>
AuthorDate: Fri Mar 20 13:06:01 2020 -0700
ATLAS-3674: ZipFileMigrationImporter: Set Shell Entity Creation.
---
.../atlas/repository/impexp/ZipSourceDirect.java | 8 ++-----
.../migration/DataMigrationStatusService.java | 4 ++--
.../graph/v2/bulkimport/pc/EntityConsumer.java | 7 +++++-
.../v2/bulkimport/pc/EntityCreationManager.java | 28 ++++++++++------------
.../impexp/DataMigrationStatusServiceTest.java | 2 +-
5 files changed, 23 insertions(+), 26 deletions(-)
diff --git
a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSourceDirect.java
b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSourceDirect.java
index 75b8e9e..04342fa 100644
---
a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSourceDirect.java
+++
b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSourceDirect.java
@@ -175,12 +175,8 @@ public class ZipSourceDirect implements EntityImportStream
{
@Override
public void setPosition(int index) {
- try {
- for (int i = 0; i < index; i++) {
- moveNextEntry();
- }
- } catch (IOException e) {
- LOG.error("Error setting position: {}. Position may be beyond the
stream size.", index);
+ for (int i = 0; i < index; i++) {
+ moveNext();
}
}
diff --git
a/repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationStatusService.java
b/repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationStatusService.java
index 8c7a3a8..b891684 100644
---
a/repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationStatusService.java
+++
b/repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationStatusService.java
@@ -99,8 +99,8 @@ public class DataMigrationStatusService {
this.status = null;
}
- public void savePosition(String position) {
- this.status.setCurrentIndex(Long.valueOf(position));
+ public void savePosition(Long position) {
+ this.status.setCurrentIndex(position);
this.migrationStatusVertexManagement.updateVertexPartial(this.status);
}
diff --git
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumer.java
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumer.java
index d0fac10..fc7d392 100644
---
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumer.java
+++
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumer.java
@@ -90,6 +90,8 @@ public class EntityConsumer extends
WorkItemConsumer<AtlasEntity.AtlasEntityWith
private void processEntity(AtlasEntity.AtlasEntityWithExtInfo
entityWithExtInfo, long currentCount) {
try {
RequestContext.get().setImportInProgress(true);
+
RequestContext.get().setCreateShellEntityForNonExistingReference(true);
+
AtlasEntityStreamForImport oneEntityStream = new
AtlasEntityStreamForImport(entityWithExtInfo, null);
LOG.debug("Processing: {}", currentCount);
@@ -163,7 +165,10 @@ public class EntityConsumer extends
WorkItemConsumer<AtlasEntity.AtlasEntityWith
}
private void retryProcessEntity(int retryCount) {
- LOG.info("Replaying: Starting!: Buffer: {}: Retry count: {}",
entityBuffer.size(), retryCount);
+ if (LOG.isDebugEnabled() || retryCount > 1) {
+ LOG.info("Replaying: Starting!: Buffer: {}: Retry count: {}",
entityBuffer.size(), retryCount);
+ }
+
for (AtlasEntity.AtlasEntityWithExtInfo e : entityBuffer) {
processEntity(e, counter.get());
}
diff --git
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityCreationManager.java
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityCreationManager.java
index 177b563..734add6 100644
---
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityCreationManager.java
+++
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityCreationManager.java
@@ -35,7 +35,7 @@ public class EntityCreationManager<AtlasEntityWithExtInfo>
extends WorkItemManag
private static final String WORKER_PREFIX = "migration-import";
private static final long STATUS_REPORT_TIMEOUT_DURATION = 1 * 60 * 1000;
// 5 min
- private final StatusReporter<String, String> statusReporter;
+ private final StatusReporter<String, Long> statusReporter;
private final AtlasImportResult importResult;
private final DataMigrationStatusService dataMigrationStatusService;
private String currentTypeName;
@@ -51,7 +51,7 @@ public class EntityCreationManager<AtlasEntityWithExtInfo>
extends WorkItemManag
}
public long read(EntityImportStream entityStream) {
- long currentIndex =
this.dataMigrationStatusService.getStatus().getCurrentIndex();
+ long currentIndex = entityStream.getPosition();
AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo;
this.entityImportStream = entityStream;
this.dataMigrationStatusService.setStatus("IN_PROGRESS");
@@ -68,6 +68,8 @@ public class EntityCreationManager<AtlasEntityWithExtInfo>
extends WorkItemManag
break;
}
}
+
+ this.dataMigrationStatusService.setStatus("DONE");
return currentIndex;
}
@@ -83,7 +85,7 @@ public class EntityCreationManager<AtlasEntityWithExtInfo>
extends WorkItemManag
}
setCurrentTypeName(typeName);
- statusReporter.produced(entityWithExtInfo.getEntity().getGuid(),
String.format("%s:%s", entityWithExtInfo.getEntity().getTypeName(),
currentIndex));
+ statusReporter.produced(entityWithExtInfo.getEntity().getGuid(),
currentIndex);
super.checkProduce(entityWithExtInfo);
extractResults();
}
@@ -98,25 +100,19 @@ public class EntityCreationManager<AtlasEntityWithExtInfo>
extends WorkItemManag
}
private void logStatus() {
- String ack = statusReporter.ack();
- if (StringUtils.isEmpty(ack)) {
- return;
- }
-
- String[] split = ack.split(":");
- if (split.length == 0 || split.length < 2) {
+ Long ack = statusReporter.ack();
+ if (ack == null) {
return;
}
- importResult.incrementMeticsCounter(split[0]);
- String currentPosition = split[1];
- dataMigrationStatusService.savePosition(currentPosition);
- this.currentPercent = updateImportMetrics(split[0],
Integer.parseInt(currentPosition), this.entityImportStream.size(),
getCurrentPercent());
+ importResult.incrementMeticsCounter(getCurrentTypeName());
+ dataMigrationStatusService.savePosition(ack);
+ this.currentPercent = updateImportMetrics(getCurrentTypeName(), ack,
this.entityImportStream.size(), getCurrentPercent());
}
- private static float updateImportMetrics(String typeNameGuid, int
currentIndex, int streamSize, float currentPercent) {
+ private static float updateImportMetrics(String typeNameGuid, long
currentIndex, int streamSize, float currentPercent) {
String lastEntityImported =
String.format("entity:last-imported:%s:(%s)", typeNameGuid, currentIndex);
- return BulkImporterImpl.updateImportProgress(LOG, currentIndex,
streamSize, currentPercent, lastEntityImported);
+ return BulkImporterImpl.updateImportProgress(LOG, (int) currentIndex,
streamSize, currentPercent, lastEntityImported);
}
private String getCurrentTypeName() {
diff --git
a/repository/src/test/java/org/apache/atlas/repository/impexp/DataMigrationStatusServiceTest.java
b/repository/src/test/java/org/apache/atlas/repository/impexp/DataMigrationStatusServiceTest.java
index bf1d9a0..5f1cd0b 100644
---
a/repository/src/test/java/org/apache/atlas/repository/impexp/DataMigrationStatusServiceTest.java
+++
b/repository/src/test/java/org/apache/atlas/repository/impexp/DataMigrationStatusServiceTest.java
@@ -54,7 +54,7 @@ public class DataMigrationStatusServiceTest {
assertEquals(ret.getTotalCount(), expected.getTotalCount());
assertEquals(ret.getCurrentIndex(), expected.getCurrentIndex());
- dataMigrationStatusService.savePosition("100");
+ dataMigrationStatusService.savePosition(100l);
assertNotNull(dataMigrationStatusService.getStatus());
assertNotNull(dataMigrationStatusService.getStatus().getCurrentIndex(), "100");
assertNotNull(dataMigrationStatusService.getCreate(expected).getCurrentIndex(),
"100");