Repository: atlas
Updated Branches:
  refs/heads/branch-0.8 64999fdc0 -> b9c4c3e78


ATLAS-2738: Export Process: Support for incremental export.


Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/b9c4c3e7
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/b9c4c3e7
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/b9c4c3e7

Branch: refs/heads/branch-0.8
Commit: b9c4c3e7862ddac02a6f6dfc6a20b2318ffbcf8a
Parents: 64999fd
Author: Ashutosh Mestry <[email protected]>
Authored: Wed Aug 8 12:27:20 2018 -0700
Committer: Ashutosh Mestry <[email protected]>
Committed: Wed Aug 8 12:27:20 2018 -0700

----------------------------------------------------------------------
 .../atlas/model/impexp/AtlasExportRequest.java  |   2 +
 .../atlas/model/impexp/AtlasExportResult.java   |  11 +-
 .../atlas/repository/impexp/ExportService.java  | 103 +++++++++--
 .../repository/impexp/ExportImportTestBase.java |  18 ++
 .../impexp/ExportIncrementalTest.java           | 174 +++++++++++++++++++
 .../impexp/ExportSkipLineageTest.java           |   6 -
 .../impexp/ZipFileResourceTestUtils.java        |   2 +-
 .../stocksDB-Entities/export-incremental.json   |  11 ++
 8 files changed, 304 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/atlas/blob/b9c4c3e7/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportRequest.java
----------------------------------------------------------------------
diff --git 
a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportRequest.java 
b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportRequest.java
index 035216b..96a6e88 100644
--- a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportRequest.java
+++ b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportRequest.java
@@ -52,6 +52,8 @@ public class AtlasExportRequest implements Serializable {
     public static final String OPTION_KEY_REPLICATED_TO         = 
"replicatedTo";
     public static final String FETCH_TYPE_FULL                  = "full";
     public static final String FETCH_TYPE_CONNECTED             = "connected";
+    public static final String FETCH_TYPE_INCREMENTAL           = 
"incremental";
+    public static final String FETCH_TYPE_INCREMENTAL_FROM_TIME = "fromTime";
     public static final String MATCH_TYPE_STARTS_WITH           = "startsWith";
     public static final String MATCH_TYPE_ENDS_WITH             = "endsWith";
     public static final String MATCH_TYPE_CONTAINS              = "contains";

http://git-wip-us.apache.org/repos/asf/atlas/blob/b9c4c3e7/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportResult.java
----------------------------------------------------------------------
diff --git 
a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportResult.java 
b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportResult.java
index 4da91a0..85a606c 100644
--- a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportResult.java
+++ b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportResult.java
@@ -61,7 +61,7 @@ public class AtlasExportResult implements Serializable {
     private AtlasExportData      data;
     private OperationStatus      operationStatus;
     private String               sourceClusterName;
-
+    private long                 lastModifiedTimestamp;
 
     public AtlasExportResult() {
         this(null, null, null, null, System.currentTimeMillis());
@@ -135,6 +135,14 @@ public class AtlasExportResult implements Serializable {
         this.data = data;
     }
 
+    public void setLastModifiedTimestamp(long lastModifiedTimestamp) {
+        this.lastModifiedTimestamp = lastModifiedTimestamp;
+    }
+
+    public long getLastModifiedTimestamp() {
+        return this.lastModifiedTimestamp;
+    }
+
     public OperationStatus getOperationStatus() {
         return operationStatus;
     }
@@ -171,6 +179,7 @@ public class AtlasExportResult implements Serializable {
         sb.append(", userName='").append(userName).append("'");
         sb.append(", clientIpAddress='").append(clientIpAddress).append("'");
         sb.append(", hostName='").append(hostName).append("'");
+        sb.append(", 
lastModifiedTimestamp='").append(lastModifiedTimestamp).append("'");
         sb.append(", sourceCluster='").append(sourceClusterName).append("'");
         sb.append(", timeStamp='").append(timeStamp).append("'");
         sb.append(", metrics={");

http://git-wip-us.apache.org/repos/asf/atlas/blob/b9c4c3e7/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
 
b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
index eeb8735..b15f828 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
@@ -38,6 +38,7 @@ import 
org.apache.atlas.repository.store.graph.v1.EntityGraphRetriever;
 import org.apache.atlas.repository.util.UniqueList;
 import org.apache.atlas.type.AtlasEntityType;
 import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
+import org.apache.atlas.type.AtlasType;
 import org.apache.atlas.type.AtlasTypeRegistry;
 import org.apache.atlas.type.AtlasTypeUtil;
 import org.apache.atlas.util.AtlasGremlinQueryProvider;
@@ -120,6 +121,7 @@ public class ExportService {
         clearContextData(context);
         context.result.setOperationStatus(getOverallOperationStatus(statuses));
         context.result.incrementMeticsCounter("duration", duration);
+        
context.result.setLastModifiedTimestamp(context.newestLastModifiedTimestamp);
         context.sink.setResult(context.result);
     }
 
@@ -278,7 +280,8 @@ public class ExportService {
     }
 
     private void logInfoStartingEntitiesFound(AtlasObjectId item, 
ExportContext context, List<String> ret) {
-        LOG.info("export(item={}; matchType={}, fetchType={}): found {} 
entities", item, context.matchType, context.fetchType, ret.size());
+        LOG.info("export(item={}; matchType={}, fetchType={}): found {} 
entities: options: {}", item,
+                context.matchType, context.fetchType, ret.size(), 
AtlasType.toJson(context.result.getRequest()));
     }
 
     private void setupBindingsForTypeName(ExportContext context, String 
typeName) {
@@ -327,9 +330,9 @@ public class ExportService {
             TraversalDirection      direction         = 
context.guidDirection.get(guid);
             AtlasEntityWithExtInfo  entityWithExtInfo = 
entityGraphRetriever.toAtlasEntityWithExtInfo(guid);
 
-            if(!context.lineageProcessed.contains(guid)) {
-                
context.result.getData().getEntityCreationOrder().add(entityWithExtInfo.getEntity().getGuid());
-            }
+        if (!context.lineageProcessed.contains(guid) && 
context.doesTimestampQualify(entityWithExtInfo.getEntity())) {
+            
context.result.getData().getEntityCreationOrder().add(entityWithExtInfo.getEntity().getGuid());
+        }
 
             addEntity(entityWithExtInfo, context);
             exportTypeProcessor.addTypes(entityWithExtInfo.getEntity(), 
context);
@@ -358,6 +361,7 @@ public class ExportService {
                 getEntityGuidsForConnectedFetch(entity, context, direction);
                 break;
 
+            case INCREMENTAL:
             case FULL:
             default:
                 getEntityGuidsForFullFetch(entity, context);
@@ -470,21 +474,31 @@ public class ExportService {
         }
     }
 
-    private void addEntity(AtlasEntityWithExtInfo entity, ExportContext 
context) throws AtlasBaseException {
-        if(context.sink.hasEntity(entity.getEntity().getGuid())) {
+    private void addEntity(AtlasEntityWithExtInfo entityWithExtInfo, 
ExportContext context) throws AtlasBaseException {
+        if(context.sink.hasEntity(entityWithExtInfo.getEntity().getGuid())) {
             return;
         }
 
-        context.sink.add(entity);
+        if(context.doesTimestampQualify(entityWithExtInfo.getEntity())) {
+            context.addToSink(entityWithExtInfo);
 
-        context.result.incrementMeticsCounter(String.format("entity:%s", 
entity.getEntity().getTypeName()));
-        if(entity.getReferredEntities() != null) {
-            for (AtlasEntity e: entity.getReferredEntities().values()) {
+            context.result.incrementMeticsCounter(String.format("entity:%s", 
entityWithExtInfo.getEntity().getTypeName()));
+            if (entityWithExtInfo.getReferredEntities() != null) {
+                for (AtlasEntity e : 
entityWithExtInfo.getReferredEntities().values()) {
+                    
context.result.incrementMeticsCounter(String.format("entity:%s", 
e.getTypeName()));
+                }
+            }
+
+            context.result.incrementMeticsCounter("entity:withExtInfo");
+        } else {
+            List<AtlasEntity> entities = 
context.getEntitiesWithModifiedTimestamp(entityWithExtInfo);
+            for (AtlasEntity e : entities) {
+                
context.result.getData().getEntityCreationOrder().add(e.getGuid());
+                context.addToSink(new AtlasEntityWithExtInfo(e));
                 
context.result.incrementMeticsCounter(String.format("entity:%s", 
e.getTypeName()));
             }
         }
 
-        context.result.incrementMeticsCounter("entity:withExtInfo");
         context.reportProgress();
     }
 
@@ -516,7 +530,8 @@ public class ExportService {
 
     public enum ExportFetchType {
         FULL(FETCH_TYPE_FULL),
-        CONNECTED(FETCH_TYPE_CONNECTED);
+        CONNECTED(FETCH_TYPE_CONNECTED),
+        INCREMENTAL(FETCH_TYPE_INCREMENTAL);
 
         final String str;
         ExportFetchType(String s) {
@@ -535,6 +550,8 @@ public class ExportService {
     }
 
     static class ExportContext {
+        private static final int REPORTING_THREASHOLD = 1000;
+
         final Set<String>                     guidsProcessed = new HashSet<>();
         final UniqueList<String> guidsToProcess = new UniqueList<>();
         final UniqueList<String>              lineageToProcess = new 
UniqueList<>();
@@ -545,13 +562,15 @@ public class ExportService {
         final Set<String>                     structTypes         = new 
HashSet<>();
         final Set<String>                     enumTypes           = new 
HashSet<>();
         final AtlasExportResult               result;
-        final ZipSink                         sink;
+        private final ZipSink                 sink;
 
         private final ScriptEngine        scriptEngine;
         private final Map<String, Object> bindings;
         private final ExportFetchType     fetchType;
         private final String              matchType;
         private final boolean             skipLineage;
+        private final long                lastModifiedTimestampRequested;
+        private       long                newestLastModifiedTimestamp;
 
         private       int                 progressReportCount = 0;
 
@@ -564,6 +583,8 @@ public class ExportService {
             fetchType    = getFetchType(result.getRequest());
             matchType    = getMatchType(result.getRequest());
             skipLineage  = getOptionSkipLineage(result.getRequest());
+            this.lastModifiedTimestampRequested = 
getLastModifiedTimestamp(fetchType, result.getRequest());
+            this.newestLastModifiedTimestamp = 0;
         }
 
         private ExportFetchType getFetchType(AtlasExportRequest request) {
@@ -595,6 +616,34 @@ public class ExportService {
                     (boolean) 
request.getOptions().get(AtlasExportRequest.OPTION_SKIP_LINEAGE);
         }
 
+        private long getLastModifiedTimestamp(ExportFetchType fetchType, 
AtlasExportRequest request) {
+            if(fetchType == ExportFetchType.INCREMENTAL && 
request.getOptions().containsKey(AtlasExportRequest.FETCH_TYPE_INCREMENTAL_FROM_TIME))
 {
+                return 
Long.parseLong(request.getOptions().get(AtlasExportRequest.FETCH_TYPE_INCREMENTAL_FROM_TIME).toString());
+            }
+
+            return 0L;
+        }
+
+        public List<AtlasEntity> 
getEntitiesWithModifiedTimestamp(AtlasEntityWithExtInfo entityWithExtInfo) {
+            if(fetchType != ExportFetchType.INCREMENTAL) {
+                return new ArrayList<>();
+            }
+
+            List<AtlasEntity> ret = new ArrayList<>();
+            if(doesTimestampQualify(entityWithExtInfo.getEntity())) {
+                ret.add(entityWithExtInfo.getEntity());
+                return ret;
+            }
+
+            for (AtlasEntity entity : 
entityWithExtInfo.getReferredEntities().values()) {
+                if((doesTimestampQualify(entity))) {
+                    ret.add(entity);
+                }
+            }
+
+            return ret;
+        }
+
         public void clear() {
             guidsToProcess.clear();
             guidsProcessed.clear();
@@ -614,16 +663,40 @@ public class ExportService {
         }
 
         public void reportProgress() {
-
-            if ((guidsProcessed.size() - progressReportCount) > 1000) {
+            if ((guidsProcessed.size() - progressReportCount) > 
REPORTING_THREASHOLD) {
                 progressReportCount = guidsProcessed.size();
 
                 LOG.info("export(): in progress.. number of entities exported: 
{}", this.guidsProcessed.size());
             }
         }
 
+        public boolean doesTimestampQualify(AtlasEntity entity) {
+            if(fetchType != ExportFetchType.INCREMENTAL) {
+                return true;
+            }
+
+            long entityModificationTimestamp = 
entity.getUpdateTime().getTime();
+            updateNewestLastModifiedTimestamp(entityModificationTimestamp);
+            return doesTimestampQualify(entityModificationTimestamp);
+        }
+
+        private void updateNewestLastModifiedTimestamp(long 
entityModificationTimestamp) {
+            if(newestLastModifiedTimestamp < entityModificationTimestamp) {
+                newestLastModifiedTimestamp = entityModificationTimestamp;
+            }
+        }
+
+        private boolean doesTimestampQualify(long modificationTimestamp) {
+            return lastModifiedTimestampRequested < modificationTimestamp;
+        }
+
         public boolean getSkipLineage() {
             return skipLineage;
         }
+
+        public void addToSink(AtlasEntityWithExtInfo entityWithExtInfo) throws 
AtlasBaseException {
+            
updateNewestLastModifiedTimestamp(entityWithExtInfo.getEntity().getUpdateTime().getTime());
+            sink.add(entityWithExtInfo);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/atlas/blob/b9c4c3e7/repository/src/test/java/org/apache/atlas/repository/impexp/ExportImportTestBase.java
----------------------------------------------------------------------
diff --git 
a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportImportTestBase.java
 
b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportImportTestBase.java
index d61b274..fcf90d3 100644
--- 
a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportImportTestBase.java
+++ 
b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportImportTestBase.java
@@ -24,14 +24,21 @@ import org.apache.atlas.AtlasException;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.discovery.AtlasSearchResult;
 import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.typedef.AtlasTypesDef;
 import org.apache.atlas.repository.store.graph.v1.AtlasEntityStoreV1;
 import org.apache.atlas.repository.store.graph.v1.DeleteHandlerV1;
 import org.apache.atlas.repository.store.graph.v1.SoftDeleteHandlerV1;
+import org.apache.atlas.store.AtlasTypeDefStore;
+import org.apache.atlas.type.AtlasTypeRegistry;
 import org.testng.SkipException;
 import scala.actors.threadpool.Arrays;
 
+import java.io.IOException;
+
 import static 
org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.createAtlasEntity;
+import static 
org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadBaseModel;
 import static 
org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadEntity;
+import static 
org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadHiveModel;
 import static org.mockito.Mockito.mock;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
@@ -39,9 +46,20 @@ import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
 public class ExportImportTestBase {
+    protected static final String ENTITIES_SUB_DIR = "stocksDB-Entities";
+    protected static final String DB_GUID = 
"1637a33e-6512-447b-ade7-249c8cb5344b";
+    protected static final String TABLE_GUID = 
"df122fc3-5555-40f8-a30f-3090b8a622f8";
+    protected static final String TABLE_TABLE_GUID = 
"6f3b305a-c459-4ae4-b651-aee0deb0685f";
+    protected static final String TABLE_VIEW_GUID = 
"56415119-7cb0-40dd-ace8-1e50efd54991";
+    protected static final String COLUMN_GUID_HIGH = 
"f87a5320-1529-4369-8d63-b637ebdf2c1c";
 
     protected DeleteHandlerV1 deleteHandler = mock(SoftDeleteHandlerV1.class);
 
+    protected void basicSetup(AtlasTypeDefStore typeDefStore, 
AtlasTypeRegistry typeRegistry) throws IOException, AtlasBaseException {
+        loadBaseModel(typeDefStore, typeRegistry);
+        loadHiveModel(typeDefStore, typeRegistry);
+    }
+
     protected int createEntities(AtlasEntityStoreV1 entityStore, String 
subDir, String entityFileNames[]) {
         for (String fileName : entityFileNames) {
             createAtlasEntity(entityStore, loadEntity(subDir, fileName));

http://git-wip-us.apache.org/repos/asf/atlas/blob/b9c4c3e7/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java
----------------------------------------------------------------------
diff --git 
a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java
 
b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java
new file mode 100644
index 0000000..86ab222
--- /dev/null
+++ 
b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.impexp;
+
+
+import com.google.common.collect.ImmutableList;
+import org.apache.atlas.RequestContextV1;
+import org.apache.atlas.TestModules;
+import org.apache.atlas.TestUtilsV2;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.impexp.AtlasExportRequest;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.repository.store.graph.v1.AtlasEntityStoreV1;
+import org.apache.atlas.store.AtlasTypeDefStore;
+import org.apache.atlas.type.AtlasClassificationType;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.atlas.utils.TestResourceFileUtils;
+import org.testng.SkipException;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+import javax.inject.Inject;
+import java.io.IOException;
+import java.util.Map;
+
+import static 
org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_INCREMENTAL_FROM_TIME;
+import static 
org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.createTypes;
+import static 
org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getEntities;
+import static 
org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runExportWithParameters;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+
+@Guice(modules = TestModules.TestOnlyModule.class)
+public class ExportIncrementalTest extends ExportImportTestBase {
+    @Inject
+    AtlasTypeRegistry typeRegistry;
+
+    @Inject
+    private AtlasTypeDefStore typeDefStore;
+
+    @Inject
+    ExportService exportService;
+
+    @Inject
+    ClusterService clusterService;
+
+    @Inject
+    private AtlasEntityStoreV1 entityStore;
+
+    private final String EXPORT_REQUEST_INCREMENTAL = "export-incremental";
+    private long nextTimestamp;
+
+    @BeforeClass
+    public void setup() throws IOException, AtlasBaseException {
+        basicSetup(typeDefStore, typeRegistry);
+        createEntities(entityStore, ENTITIES_SUB_DIR, new String[] { "db", 
"table-columns"});
+        final Object[] entityGuids = new Object[]{DB_GUID, TABLE_GUID};
+        verifyCreatedEntities(entityStore, entityGuids, 2);
+    }
+
+    @BeforeMethod
+    public void setupTest() {
+        RequestContextV1.clear();
+        RequestContextV1.get().setUser(TestUtilsV2.TEST_USER);
+    }
+
+    @Test
+    public void atT0_ReturnsAllEntities() throws AtlasBaseException {
+        final int expectedEntityCount = 2;
+
+        AtlasExportRequest request = getIncrementalRequest(0);
+        ZipSource source = runExportWithParameters(exportService, request);
+        AtlasEntity.AtlasEntityWithExtInfo entities = getEntities(source, 
expectedEntityCount);
+
+        int count = 0;
+        for (Map.Entry<String, AtlasEntity> entry : 
entities.getReferredEntities().entrySet()) {
+            assertNotNull(entry.getValue());
+            count++;
+        }
+
+        nextTimestamp = updateTimesampForNextIncrementalExport(source);
+        assertEquals(count, expectedEntityCount);
+    }
+
+    private long updateTimesampForNextIncrementalExport(ZipSource source) 
throws AtlasBaseException {
+        return source.getExportResult().getLastModifiedTimestamp();
+    }
+
+    @Test(dependsOnMethods = "atT0_ReturnsAllEntities")
+    public void atT1_NewClassificationAttachedToTable_ReturnsChangedTable() 
throws AtlasBaseException {
+        final int expectedEntityCount = 1;
+
+        AtlasClassificationType ct = createNewClassification();
+        entityStore.addClassifications(TABLE_GUID, 
ImmutableList.of(ct.createDefaultValue()));
+
+        AtlasExportRequest request = getIncrementalRequest(nextTimestamp);
+        ZipSource source = runExportWithParameters(exportService, request);
+        AtlasEntity.AtlasEntityWithExtInfo entities = getEntities(source, 
expectedEntityCount);
+
+        AtlasEntity entity = null;
+        for (Map.Entry<String, AtlasEntity> entry : 
entities.getReferredEntities().entrySet()) {
+            entity = entry.getValue();
+            assertNotNull(entity);
+            break;
+        }
+
+        nextTimestamp = updateTimesampForNextIncrementalExport(source);
+        assertEquals(entity.getGuid(),TABLE_GUID);
+    }
+
+    private AtlasClassificationType createNewClassification() {
+        createTypes(typeDefStore, 
ENTITIES_SUB_DIR,"typesDef-new-classification");
+        return typeRegistry.getClassificationTypeByName("T1");
+    }
+
+    @Test(dependsOnMethods = 
"atT1_NewClassificationAttachedToTable_ReturnsChangedTable")
+    public void atT2_NewClassificationAttachedToColumn_ReturnsChangedColumn() 
throws AtlasBaseException {
+        final int expectedEntityCount = 1;
+
+        AtlasEntity.AtlasEntityWithExtInfo tableEntity = 
entityStore.getById(TABLE_GUID);
+        long preExportTableEntityTimestamp = 
tableEntity.getEntity().getUpdateTime().getTime();
+
+        entityStore.addClassifications(COLUMN_GUID_HIGH, 
ImmutableList.of(typeRegistry.getClassificationTypeByName("T1").createDefaultValue()));
+
+        ZipSource source = runExportWithParameters(exportService, 
getIncrementalRequest(nextTimestamp));
+        AtlasEntity.AtlasEntityWithExtInfo entities = getEntities(source, 
expectedEntityCount);
+
+        for (Map.Entry<String, AtlasEntity> entry : 
entities.getReferredEntities().entrySet()) {
+            AtlasEntity entity = entry.getValue();
+            assertNotNull(entity.getGuid());
+            break;
+        }
+
+        long postUpdateTableEntityTimestamp = 
tableEntity.getEntity().getUpdateTime().getTime();
+        assertEquals(preExportTableEntityTimestamp, 
postUpdateTableEntityTimestamp);
+        nextTimestamp = updateTimesampForNextIncrementalExport(source);
+    }
+
+    @Test(dependsOnMethods = 
"atT2_NewClassificationAttachedToColumn_ReturnsChangedColumn")
+    public void exportingWithSameParameters_Succeeds() {
+        ZipSource source = runExportWithParameters(exportService, 
getIncrementalRequest(nextTimestamp));
+
+        assertNotNull(source);
+    }
+
+    private AtlasExportRequest getIncrementalRequest(long timestamp) {
+        try {
+            AtlasExportRequest request = 
TestResourceFileUtils.readObjectFromJson(ENTITIES_SUB_DIR, 
EXPORT_REQUEST_INCREMENTAL, AtlasExportRequest.class);
+            request.getOptions().put(FETCH_TYPE_INCREMENTAL_FROM_TIME, 
timestamp);
+
+            return request;
+        } catch (IOException e) {
+            throw new SkipException(String.format("getIncrementalRequest: '%s' 
could not be laoded.", EXPORT_REQUEST_INCREMENTAL));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/atlas/blob/b9c4c3e7/repository/src/test/java/org/apache/atlas/repository/impexp/ExportSkipLineageTest.java
----------------------------------------------------------------------
diff --git 
a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportSkipLineageTest.java
 
b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportSkipLineageTest.java
index 5ae86ef..c4682b8 100644
--- 
a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportSkipLineageTest.java
+++ 
b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportSkipLineageTest.java
@@ -53,12 +53,6 @@ import static org.testng.AssertJUnit.fail;
 
 @Guice(modules = TestModules.TestOnlyModule.class)
 public class ExportSkipLineageTest extends ExportImportTestBase {
-    private final String ENTITIES_SUB_DIR = "stocksDB-Entities";
-    private final String DB_GUID = "1637a33e-6512-447b-ade7-249c8cb5344b";
-    private final String TABLE_GUID = "df122fc3-5555-40f8-a30f-3090b8a622f8";
-    private final String TABLE_TABLE_GUID = 
"6f3b305a-c459-4ae4-b651-aee0deb0685f";
-    private final String TABLE_VIEW_GUID = 
"56415119-7cb0-40dd-ace8-1e50efd54991";
-
     @Inject
     AtlasTypeRegistry typeRegistry;
 

http://git-wip-us.apache.org/repos/asf/atlas/blob/b9c4c3e7/repository/src/test/java/org/apache/atlas/repository/impexp/ZipFileResourceTestUtils.java
----------------------------------------------------------------------
diff --git 
a/repository/src/test/java/org/apache/atlas/repository/impexp/ZipFileResourceTestUtils.java
 
b/repository/src/test/java/org/apache/atlas/repository/impexp/ZipFileResourceTestUtils.java
index f465d67..720aa14 100644
--- 
a/repository/src/test/java/org/apache/atlas/repository/impexp/ZipFileResourceTestUtils.java
+++ 
b/repository/src/test/java/org/apache/atlas/repository/impexp/ZipFileResourceTestUtils.java
@@ -196,7 +196,7 @@ public class ZipFileResourceTestUtils {
         AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = new 
AtlasEntity.AtlasEntityWithExtInfo();
         try {
             int count = 0;
-            for(String s : source.getCreationOrder()) {
+            for (String s : source.getCreationOrder()) {
                 AtlasEntity entity = source.getByGuid(s);
                 entityWithExtInfo.addReferredEntity(s, entity);
                 count++;

http://git-wip-us.apache.org/repos/asf/atlas/blob/b9c4c3e7/repository/src/test/resources/json/stocksDB-Entities/export-incremental.json
----------------------------------------------------------------------
diff --git 
a/repository/src/test/resources/json/stocksDB-Entities/export-incremental.json 
b/repository/src/test/resources/json/stocksDB-Entities/export-incremental.json
new file mode 100644
index 0000000..c2bc867
--- /dev/null
+++ 
b/repository/src/test/resources/json/stocksDB-Entities/export-incremental.json
@@ -0,0 +1,11 @@
+{
+  "itemsToExport": [
+    {
+      "typeName": "hive_db", "uniqueAttributes": { "qualifiedName": 
"stocks_base@cl1" }
+    }
+  ],
+  "options": {
+    "fetchType": "incremental",
+    "fromTime": 0
+  }
+}

Reply via email to