Repository: atlas Updated Branches: refs/heads/branch-0.8 fd629982f -> 833fb20a1
ATLAS-2862: Incremental Export now uses request context to determine change marker. Project: http://git-wip-us.apache.org/repos/asf/atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/833fb20a Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/833fb20a Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/833fb20a Branch: refs/heads/branch-0.8 Commit: 833fb20a182a1a9e43357e66161aaf094302bc72 Parents: fd62998 Author: Ashutosh Mestry <[email protected]> Authored: Mon Sep 10 11:51:49 2018 -0700 Committer: Ashutosh Mestry <[email protected]> Committed: Mon Sep 10 11:51:49 2018 -0700 ---------------------------------------------------------------------- .../atlas/model/impexp/AtlasExportRequest.java | 69 +++++++++++++---- .../atlas/model/impexp/AtlasExportResult.java | 43 +++-------- .../atlas/model/impexp/AtlasImportResult.java | 2 +- .../atlas/repository/impexp/AuditsWriter.java | 4 +- .../atlas/repository/impexp/ExportService.java | 79 ++++++-------------- .../apache/atlas/repository/impexp/ZipSink.java | 5 +- .../impexp/ExportIncrementalTest.java | 6 +- .../impexp/ImportTransformsShaperTest.java | 1 + .../impexp/ReplicationEntityAttributeTest.java | 2 +- .../atlas/repository/impexp/ZipSinkTest.java | 2 +- .../stocksDB-Entities/export-incremental.json | 2 +- 11 files changed, 100 insertions(+), 115 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/atlas/blob/833fb20a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportRequest.java ---------------------------------------------------------------------- diff --git a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportRequest.java b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportRequest.java index 96a6e88..106a4a0 100644 --- a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportRequest.java +++ b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportRequest.java @@ -20,6 +20,7 @@ package org.apache.atlas.model.impexp; import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.atlas.model.typedef.AtlasBaseTypeDef; +import org.apache.commons.collections.MapUtils; import org.codehaus.jackson.annotate.JsonAutoDetect; import org.codehaus.jackson.annotate.JsonIgnoreProperties; import org.codehaus.jackson.map.annotate.JsonSerialize; @@ -46,19 +47,19 @@ public class AtlasExportRequest implements Serializable { private static final long serialVersionUID = 1L; - public static final String OPTION_FETCH_TYPE = "fetchType"; - public static final String OPTION_ATTR_MATCH_TYPE = "matchType"; - public static final String OPTION_SKIP_LINEAGE = "skipLineage"; - public static final String OPTION_KEY_REPLICATED_TO = "replicatedTo"; - public static final String FETCH_TYPE_FULL = "full"; - public static final String FETCH_TYPE_CONNECTED = "connected"; - public static final String FETCH_TYPE_INCREMENTAL = "incremental"; - public static final String FETCH_TYPE_INCREMENTAL_FROM_TIME = "fromTime"; - public static final String MATCH_TYPE_STARTS_WITH = "startsWith"; - public static final String MATCH_TYPE_ENDS_WITH = "endsWith"; - public static final String MATCH_TYPE_CONTAINS = "contains"; - public static final String MATCH_TYPE_MATCHES = "matches"; - public static final String MATCH_TYPE_FOR_TYPE = "forType"; + public static final String OPTION_FETCH_TYPE = "fetchType"; + public static final String OPTION_ATTR_MATCH_TYPE = "matchType"; + public static final String OPTION_SKIP_LINEAGE = "skipLineage"; + public static final String OPTION_KEY_REPLICATED_TO = "replicatedTo"; + public static final String FETCH_TYPE_FULL = "full"; + public static final String FETCH_TYPE_CONNECTED = "connected"; + public static final String FETCH_TYPE_INCREMENTAL = "incremental"; + public static final String FETCH_TYPE_INCREMENTAL_CHANGE_MARKER = "changeMarker"; + public static final String MATCH_TYPE_STARTS_WITH = "startsWith"; + public static final String MATCH_TYPE_ENDS_WITH = "endsWith"; + public static final String MATCH_TYPE_CONTAINS = "contains"; + public static final String MATCH_TYPE_MATCHES = "matches"; + public static final String MATCH_TYPE_FOR_TYPE = "forType"; private List<AtlasObjectId> itemsToExport = new ArrayList<>(); private Map<String, Object> options = new HashMap<>(); @@ -79,6 +80,48 @@ public class AtlasExportRequest implements Serializable { this.options = options; } + public String getMatchTypeOptionValue() { + String matchType = null; + + if (MapUtils.isNotEmpty(getOptions())) { + if (getOptions().get(OPTION_ATTR_MATCH_TYPE) != null) { + matchType = getOptions().get(OPTION_ATTR_MATCH_TYPE).toString(); + } + } + + return matchType; + } + + public String getFetchTypeOptionValue() { + if(getOptions() == null || !getOptions().containsKey(OPTION_FETCH_TYPE)) { + return FETCH_TYPE_FULL; + } + + Object o = getOptions().get(OPTION_FETCH_TYPE); + if (o instanceof String) { + return (String) o; + } + + return FETCH_TYPE_FULL; + } + + public boolean getSkipLineageOptionValue() { + if(!getOptions().containsKey(AtlasExportRequest.OPTION_SKIP_LINEAGE)) { + return false; + } + + Object o = getOptions().get(AtlasExportRequest.OPTION_SKIP_LINEAGE); + if(o instanceof String) { + return Boolean.parseBoolean((String) o); + } + + if(o instanceof Boolean) { + return (Boolean) o; + } + + return false; + } + public StringBuilder toString(StringBuilder sb) { if (sb == null) { sb = new StringBuilder(); http://git-wip-us.apache.org/repos/asf/atlas/blob/833fb20a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportResult.java ---------------------------------------------------------------------- diff --git a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportResult.java b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportResult.java index fd68712..ed9b644 100644 --- a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportResult.java +++ b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportResult.java @@ -18,7 +18,6 @@ package org.apache.atlas.model.impexp; -import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.typedef.AtlasBaseTypeDef; import org.apache.atlas.model.typedef.AtlasTypesDef; import org.codehaus.jackson.annotate.JsonAutoDetect; @@ -61,14 +60,15 @@ public class AtlasExportResult implements Serializable { private AtlasExportData data; private OperationStatus operationStatus; private String sourceClusterName; - private long lastModifiedTimestamp; + private long changeMarker; public AtlasExportResult() { - this(null, null, null, null, System.currentTimeMillis()); + this(null, null, null, null, System.currentTimeMillis(), 0L); } public AtlasExportResult(AtlasExportRequest request, - String userName, String clientIpAddress, String hostName, long timeStamp) { + String userName, String clientIpAddress, String hostName, long timeStamp, + long changeMarker) { this.request = request; this.userName = userName; this.clientIpAddress = clientIpAddress; @@ -77,6 +77,7 @@ public class AtlasExportResult implements Serializable { this.metrics = new HashMap<>(); this.operationStatus = OperationStatus.FAIL; this.data = new AtlasExportData(); + this.changeMarker = changeMarker; } public AtlasExportRequest getRequest() { @@ -135,12 +136,12 @@ public class AtlasExportResult implements Serializable { this.data = data; } - public void setLastModifiedTimestamp(long lastModifiedTimestamp) { - this.lastModifiedTimestamp = lastModifiedTimestamp; + public void setChangeMarker(long changeMarker) { + this.changeMarker = changeMarker; } - public long getLastModifiedTimestamp() { - return this.lastModifiedTimestamp; + public long getChangeMarker() { + return this.changeMarker; } public OperationStatus getOperationStatus() { @@ -169,22 +170,6 @@ public class AtlasExportResult implements Serializable { metrics.put(key, currentValue + incrementBy); } - public AtlasExportResult shallowCopy() { - AtlasExportResult result = new AtlasExportResult(); - - result.setRequest(getRequest()); - result.setUserName(getUserName()); - result.setClientIpAddress(getClientIpAddress()); - result.setHostName(getHostName()); - result.setTimeStamp(getTimeStamp()); - result.setMetrics(getMetrics()); - result.setOperationStatus(getOperationStatus()); - result.setSourceClusterName(getSourceClusterName()); - result.setLastModifiedTimestamp(getLastModifiedTimestamp()); - - return result; - } - public StringBuilder toString(StringBuilder sb) { if (sb == null) { sb = new StringBuilder(); @@ -195,14 +180,13 @@ public class AtlasExportResult implements Serializable { sb.append(", userName='").append(userName).append("'"); sb.append(", clientIpAddress='").append(clientIpAddress).append("'"); sb.append(", hostName='").append(hostName).append("'"); - sb.append(", lastModifiedTimestamp='").append(lastModifiedTimestamp).append("'"); + sb.append(", changeMarker='").append(changeMarker).append("'"); sb.append(", sourceCluster='").append(sourceClusterName).append("'"); sb.append(", timeStamp='").append(timeStamp).append("'"); sb.append(", metrics={"); AtlasBaseTypeDef.dumpObjects(metrics, sb); sb.append("}"); - sb.append(", data='").append(data).append("'"); sb.append(", operationStatus='").append(operationStatus).append("'"); sb.append("}"); @@ -233,13 +217,11 @@ public class AtlasExportResult implements Serializable { private static final long serialVersionUID = 1L; private AtlasTypesDef typesDef; - private Map<String, AtlasEntity> entities; private List<String> entityCreationOrder; public AtlasExportData() { typesDef = new AtlasTypesDef(); - entities = new HashMap<>(); entityCreationOrder = new ArrayList<>(); } @@ -247,10 +229,6 @@ public class AtlasExportResult implements Serializable { public void setTypesDef(AtlasTypesDef typesDef) { this.typesDef = typesDef; } - public Map<String, AtlasEntity> getEntities() { return entities; } - - public void setEntities(Map<String, AtlasEntity> entities) { this.entities = entities; } - public List<String> getEntityCreationOrder() { return entityCreationOrder; } public void setEntityCreationOrder(List<String> entityCreationOrder) { this.entityCreationOrder = entityCreationOrder; } @@ -264,7 +242,6 @@ public class AtlasExportResult implements Serializable { sb.append("AtlasExportData {"); sb.append(", typesDef={").append(typesDef).append("}"); sb.append(", entities={"); - AtlasBaseTypeDef.dumpObjects(entities, sb); sb.append("}"); sb.append(", entityCreationOrder={"); AtlasBaseTypeDef.dumpObjects(entityCreationOrder, sb); http://git-wip-us.apache.org/repos/asf/atlas/blob/833fb20a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportResult.java ---------------------------------------------------------------------- diff --git a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportResult.java b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportResult.java index f066688..59ffded 100644 --- a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportResult.java +++ b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportResult.java @@ -147,7 +147,7 @@ public class AtlasImportResult { } public void setExportResult(AtlasExportResult exportResult) { - this.exportResultWithoutData = exportResult.shallowCopy(); + this.exportResultWithoutData = exportResult; } public StringBuilder toString(StringBuilder sb) { http://git-wip-us.apache.org/repos/asf/atlas/blob/833fb20a/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java b/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java index 7fb2a3b..08576fe 100644 --- a/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java @@ -145,7 +145,7 @@ public class AuditsWriter { } updateReplicationAttribute(replicationOptionState, targetServerName, - entityGuids, Constants.ATTR_NAME_REPLICATED_TO, result.getLastModifiedTimestamp()); + entityGuids, Constants.ATTR_NAME_REPLICATED_TO, result.getChangeMarker()); } private void saveServers() throws AtlasBaseException { @@ -183,7 +183,7 @@ public class AuditsWriter { } updateReplicationAttribute(replicationOptionState, this.sourceServerName, entityGuids, - Constants.ATTR_NAME_REPLICATED_FROM, result.getExportResult().getLastModifiedTimestamp()); + Constants.ATTR_NAME_REPLICATED_FROM, result.getExportResult().getChangeMarker()); } private void saveServers() throws AtlasBaseException { http://git-wip-us.apache.org/repos/asf/atlas/blob/833fb20a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java index b507002..aded67c 100644 --- a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java @@ -21,6 +21,8 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasServiceException; +import org.apache.atlas.RequestContext; +import org.apache.atlas.RequestContextV1; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.impexp.AtlasExportRequest; import org.apache.atlas.model.impexp.AtlasExportResult; @@ -85,7 +87,9 @@ public class ExportService { public AtlasExportResult run(ZipSink exportSink, AtlasExportRequest request, String userName, String hostName, String requestingIP) throws AtlasBaseException { long startTime = System.currentTimeMillis(); - AtlasExportResult result = new AtlasExportResult(request, userName, requestingIP, hostName, startTime); + AtlasExportResult result = new AtlasExportResult(request, userName, requestingIP, + hostName, startTime, getCurrentChangeMarker()); + ExportContext context = new ExportContext(atlasGraph, result, exportSink); exportTypeProcessor = new ExportTypeProcessor(typeRegistry, context); @@ -109,6 +113,10 @@ public class ExportService { return context.result; } + private long getCurrentChangeMarker() { + return Math.min(RequestContextV1.earliestActiveRequestTime(), RequestContext.earliestActiveRequestTime()); + } + private void updateSinkWithOperationMetrics(String userName, ExportContext context, AtlasExportResult.OperationStatus[] statuses, long startTime, long endTime) throws AtlasBaseException { @@ -117,7 +125,6 @@ public class ExportService { context.result.getData().getEntityCreationOrder().addAll(context.lineageProcessed); context.sink.setExportOrder(context.result.getData().getEntityCreationOrder()); context.sink.setTypesDef(context.result.getData().getTypesDef()); - context.result.setLastModifiedTimestamp(context.newestLastModifiedTimestamp); context.result.setOperationStatus(getOverallOperationStatus(statuses)); context.result.incrementMeticsCounter("duration", duration); auditsWriter.write(userName, context.result, startTime, endTime, context.result.getData().getEntityCreationOrder()); @@ -354,7 +361,7 @@ public class ExportService { debugLog("<== processEntity({})", guid); } - private void getConntedEntitiesBasedOnOption(AtlasEntity entity, ExportContext context, TraversalDirection direction) throws AtlasBaseException { + private void getConntedEntitiesBasedOnOption(AtlasEntity entity, ExportContext context, TraversalDirection direction) { switch (context.fetchType) { case CONNECTED: getEntityGuidsForConnectedFetch(entity, context, direction); @@ -367,7 +374,7 @@ public class ExportService { } } - private void getEntityGuidsForConnectedFetch(AtlasEntity entity, ExportContext context, TraversalDirection direction) throws AtlasBaseException { + private void getEntityGuidsForConnectedFetch(AtlasEntity entity, ExportContext context, TraversalDirection direction) { if (direction == null || direction == TraversalDirection.UNKNOWN) { getConnectedEntityGuids(entity, context, TraversalDirection.OUTWARD, TraversalDirection.INWARD); } else { @@ -527,7 +534,6 @@ public class ExportService { BOTH; } - public enum ExportFetchType { FULL(FETCH_TYPE_FULL), CONNECTED(FETCH_TYPE_CONNECTED), @@ -569,8 +575,7 @@ public class ExportService { private final ExportFetchType fetchType; private final String matchType; private final boolean skipLineage; - private final long lastModifiedTimestampRequested; - private long newestLastModifiedTimestamp; + private final long changeMarker; private int progressReportCount = 0; @@ -580,45 +585,16 @@ public class ExportService { scriptEngine = atlasGraph.getGremlinScriptEngine(); bindings = new HashMap<>(); - fetchType = getFetchType(result.getRequest()); - matchType = getMatchType(result.getRequest()); - skipLineage = getOptionSkipLineage(result.getRequest()); - this.lastModifiedTimestampRequested = getLastModifiedTimestamp(fetchType, result.getRequest()); - this.newestLastModifiedTimestamp = 0; - } - - private ExportFetchType getFetchType(AtlasExportRequest request) { - Object fetchOption = request.getOptions() != null ? request.getOptions().get(OPTION_FETCH_TYPE) : null; - - if (fetchOption instanceof String) { - return ExportFetchType.from((String) fetchOption); - } else if (fetchOption instanceof ExportFetchType) { - return (ExportFetchType) fetchOption; - } - - return ExportFetchType.FULL; - } - - private String getMatchType(AtlasExportRequest request) { - String matchType = null; - - if (MapUtils.isNotEmpty(request.getOptions())) { - if (request.getOptions().get(OPTION_ATTR_MATCH_TYPE) != null) { - matchType = request.getOptions().get(OPTION_ATTR_MATCH_TYPE).toString(); - } - } - - return matchType; - } - - private boolean getOptionSkipLineage(AtlasExportRequest request) { - return request.getOptions().containsKey(AtlasExportRequest.OPTION_SKIP_LINEAGE) && - (boolean) request.getOptions().get(AtlasExportRequest.OPTION_SKIP_LINEAGE); + fetchType = ExportFetchType.from(result.getRequest().getFetchTypeOptionValue()); + matchType = result.getRequest().getMatchTypeOptionValue(); + skipLineage = result.getRequest().getSkipLineageOptionValue(); + this.changeMarker = getChangeTokenFromOptions(fetchType, result.getRequest()); } - private long getLastModifiedTimestamp(ExportFetchType fetchType, AtlasExportRequest request) { - if(fetchType == ExportFetchType.INCREMENTAL && request.getOptions().containsKey(AtlasExportRequest.FETCH_TYPE_INCREMENTAL_FROM_TIME)) { - return Long.parseLong(request.getOptions().get(AtlasExportRequest.FETCH_TYPE_INCREMENTAL_FROM_TIME).toString()); + private long getChangeTokenFromOptions(ExportFetchType fetchType, AtlasExportRequest request) { + if(fetchType == ExportFetchType.INCREMENTAL && + request.getOptions().containsKey(AtlasExportRequest.FETCH_TYPE_INCREMENTAL_CHANGE_MARKER)) { + return Long.parseLong(request.getOptions().get(AtlasExportRequest.FETCH_TYPE_INCREMENTAL_CHANGE_MARKER).toString()); } return 0L; @@ -675,19 +651,7 @@ public class ExportService { return true; } - long entityModificationTimestamp = entity.getUpdateTime().getTime(); - updateNewestLastModifiedTimestamp(entityModificationTimestamp); - return doesTimestampQualify(entityModificationTimestamp); - } - - private void updateNewestLastModifiedTimestamp(long entityModificationTimestamp) { - if(newestLastModifiedTimestamp < entityModificationTimestamp) { - newestLastModifiedTimestamp = entityModificationTimestamp; - } - } - - private boolean doesTimestampQualify(long modificationTimestamp) { - return lastModifiedTimestampRequested < modificationTimestamp; + return changeMarker <= entity.getUpdateTime().getTime(); } public boolean getSkipLineage() { @@ -695,7 +659,6 @@ public class ExportService { } public void addToSink(AtlasEntityWithExtInfo entityWithExtInfo) throws AtlasBaseException { - updateNewestLastModifiedTimestamp(entityWithExtInfo.getEntity().getUpdateTime().getTime()); sink.add(entityWithExtInfo); } } http://git-wip-us.apache.org/repos/asf/atlas/blob/833fb20a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSink.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSink.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSink.java index 17ebbf1..9928c54 100644 --- a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSink.java +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSink.java @@ -36,10 +36,11 @@ import java.util.zip.ZipOutputStream; public class ZipSink { private static final Logger LOG = LoggerFactory.getLogger(ZipSink.class); + private static String FILE_EXTENSION_JSON = ".json"; + private ZipOutputStream zipOutputStream; final Set<String> guids = new HashSet<>(); - public ZipSink(OutputStream outputStream) { zipOutputStream = new ZipOutputStream(outputStream); } @@ -92,7 +93,7 @@ public class ZipSink { private void saveToZip(String fileName, String jsonData) throws AtlasBaseException { try { - addToZipStream(fileName.toString() + ".json", jsonData); + addToZipStream(fileName.toString() + FILE_EXTENSION_JSON, jsonData); } catch (IOException e) { throw new AtlasBaseException(String.format("Error writing file %s.", fileName), e); } http://git-wip-us.apache.org/repos/asf/atlas/blob/833fb20a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java index ed4fc37..75ef77c 100644 --- a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java +++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java @@ -41,7 +41,7 @@ import javax.inject.Inject; import java.io.IOException; import java.util.Map; -import static org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_INCREMENTAL_FROM_TIME; +import static org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_INCREMENTAL_CHANGE_MARKER; import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.createTypes; import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getEntities; import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runExportWithParameters; @@ -98,7 +98,7 @@ public class ExportIncrementalTest extends ExportImportTestBase { } private long updateTimesampForNextIncrementalExport(ZipSource source) throws AtlasBaseException { - return source.getExportResult().getLastModifiedTimestamp(); + return source.getExportResult().getChangeMarker(); } @Test(dependsOnMethods = "atT0_ReturnsAllEntities") @@ -161,7 +161,7 @@ public class ExportIncrementalTest extends ExportImportTestBase { private AtlasExportRequest getIncrementalRequest(long timestamp) { try { AtlasExportRequest request = TestResourceFileUtils.readObjectFromJson(ENTITIES_SUB_DIR, EXPORT_REQUEST_INCREMENTAL, AtlasExportRequest.class); - request.getOptions().put(FETCH_TYPE_INCREMENTAL_FROM_TIME, timestamp); + request.getOptions().put(FETCH_TYPE_INCREMENTAL_CHANGE_MARKER, timestamp); return request; } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/atlas/blob/833fb20a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsShaperTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsShaperTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsShaperTest.java index f894553..06bdaa6 100644 --- a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsShaperTest.java +++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsShaperTest.java @@ -72,6 +72,7 @@ public class ImportTransformsShaperTest extends ExportImportTestBase { assertNotNull(classification); assertEntities(result.getProcessedEntities(), TAG_NAME); } + private void assertEntities(List<String> entityGuids, String tagName) throws AtlasBaseException { for (String guid : entityGuids) { AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = this.entityStore.getById(guid); http://git-wip-us.apache.org/repos/asf/atlas/blob/833fb20a/repository/src/test/java/org/apache/atlas/repository/impexp/ReplicationEntityAttributeTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ReplicationEntityAttributeTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ReplicationEntityAttributeTest.java index 719d6ca..8de7368 100644 --- a/repository/src/test/java/org/apache/atlas/repository/impexp/ReplicationEntityAttributeTest.java +++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ReplicationEntityAttributeTest.java @@ -159,7 +159,7 @@ public class ReplicationEntityAttributeTest extends ExportImportTestBase { long actualLastModifiedTimestamp = (long) cluster.getAdditionalInfoRepl(entity.getEntity().getGuid()); assertTrue(cluster.getAdditionalInfo().size() > 0); - assertEquals(actualLastModifiedTimestamp, importResult.getExportResult().getLastModifiedTimestamp()); + assertEquals(actualLastModifiedTimestamp, importResult.getExportResult().getChangeMarker()); } private AtlasExportRequest getUpdateMetaInfoUpdateRequest() { http://git-wip-us.apache.org/repos/asf/atlas/blob/833fb20a/repository/src/test/java/org/apache/atlas/repository/impexp/ZipSinkTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ZipSinkTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ZipSinkTest.java index e8bbeb5..0988a30 100644 --- a/repository/src/test/java/org/apache/atlas/repository/impexp/ZipSinkTest.java +++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ZipSinkTest.java @@ -59,7 +59,7 @@ public class ZipSinkTest { itemsToExport.add(new AtlasObjectId("hive_db", "qualifiedName", "default")); request.setItemsToExport(itemsToExport); - defaultExportResult = new AtlasExportResult(request, "admin", "1.0.0.0", "root", 100); + defaultExportResult = new AtlasExportResult(request, "admin", "1.0.0.0", "root", 100, 0L); return defaultExportResult; } http://git-wip-us.apache.org/repos/asf/atlas/blob/833fb20a/repository/src/test/resources/json/stocksDB-Entities/export-incremental.json ---------------------------------------------------------------------- diff --git a/repository/src/test/resources/json/stocksDB-Entities/export-incremental.json b/repository/src/test/resources/json/stocksDB-Entities/export-incremental.json index c2bc867..fdd3b01 100644 --- a/repository/src/test/resources/json/stocksDB-Entities/export-incremental.json +++ b/repository/src/test/resources/json/stocksDB-Entities/export-incremental.json @@ -6,6 +6,6 @@ ], "options": { "fetchType": "incremental", - "fromTime": 0 + "changeMarker": 0 } }
