This is an automated email from the ASF dual-hosted git repository.

nixon pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new c3a932f  ATLAS-3324 Incremental export with hive_table for table-level 
replication.
c3a932f is described below

commit c3a932fb5bed7589e151607b430f568762eb2b35
Author: nikhilbonte <[email protected]>
AuthorDate: Tue Jul 9 18:51:42 2019 +0530

    ATLAS-3324 Incremental export with hive_table for table-level replication.
    
    Signed-off-by: nixonrodrigues <[email protected]>
    (cherry picked from commit 8ada5d400ea994a0ffd3c67fa0f66719de1e4138)
---
 .../atlas/repository/impexp/EntitiesExtractor.java |  3 +
 .../atlas/repository/impexp/ExportService.java     | 20 ++++-
 .../impexp/IncrementalExportEntityProvider.java    |  9 ++-
 .../repository/impexp/ExportIncrementalTest.java   | 88 ++++++++++++++++++++++
 4 files changed, 118 insertions(+), 2 deletions(-)

diff --git 
a/repository/src/main/java/org/apache/atlas/repository/impexp/EntitiesExtractor.java
 
b/repository/src/main/java/org/apache/atlas/repository/impexp/EntitiesExtractor.java
index 15cb111..da5cf37 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/impexp/EntitiesExtractor.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/impexp/EntitiesExtractor.java
@@ -55,6 +55,9 @@ public class EntitiesExtractor {
                 if (context.isHiveDBIncrementalSkipLineage()) {
                     extractors.get(INCREMENTAL_EXTRACT).fullFetch(entity, 
context);
                     break;
+                } else if (context.isHiveTableIncrementalSkipLineage()) {
+                    extractors.get(INCREMENTAL_EXTRACT).connectedFetch(entity, 
context);
+                    break;
                 }
 
             case FULL:
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
 
b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
index 5055607..6016723 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
@@ -36,6 +36,7 @@ import 
org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
 import org.apache.atlas.repository.util.UniqueList;
 import org.apache.atlas.type.AtlasTypeRegistry;
 import org.apache.atlas.util.AtlasGremlinQueryProvider;
+import org.apache.commons.collections.CollectionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
@@ -332,6 +333,7 @@ public class ExportService {
     static class ExportContext {
         private static final int REPORTING_THREASHOLD = 1000;
         private static final String ATLAS_TYPE_HIVE_DB = "hive_db";
+        private static final String ATLAS_TYPE_HIVE_TABLE = "hive_table";
 
 
         final UniqueList<String>              entityCreationOrder = new 
UniqueList<>();
@@ -353,6 +355,7 @@ public class ExportService {
         final long                        changeMarker;
         boolean isSkipConnectedFetch;
         private final boolean isHiveDBIncremental;
+        private final boolean isHiveTableIncremental;
 
         private       int                 progressReportCount = 0;
 
@@ -364,11 +367,12 @@ public class ExportService {
             skipLineage  = result.getRequest().getSkipLineageOptionValue();
             this.changeMarker = 
result.getRequest().getChangeTokenFromOptions();
             this.isHiveDBIncremental = 
checkHiveDBIncrementalSkipLineage(result.getRequest());
+            this.isHiveTableIncremental = 
checkHiveTableIncrementalSkipLineage(result.getRequest());
             this.isSkipConnectedFetch = false;
         }
 
         private boolean checkHiveDBIncrementalSkipLineage(AtlasExportRequest 
request) {
-            if(request.getItemsToExport().size() == 0) {
+            if(CollectionUtils.isEmpty(request.getItemsToExport())) {
                 return false;
             }
 
@@ -377,6 +381,16 @@ public class ExportService {
                     request.getSkipLineageOptionValue();
         }
 
+        private boolean 
checkHiveTableIncrementalSkipLineage(AtlasExportRequest request) {
+            if(CollectionUtils.isEmpty(request.getItemsToExport())) {
+                return false;
+            }
+
+            return 
request.getItemsToExport().get(0).getTypeName().equalsIgnoreCase(ATLAS_TYPE_HIVE_TABLE)
 &&
+                    
request.getFetchTypeOptionValue().equalsIgnoreCase(AtlasExportRequest.FETCH_TYPE_INCREMENTAL)
 &&
+                    request.getSkipLineageOptionValue();
+        }
+
         public List<AtlasEntity> 
getEntitiesWithModifiedTimestamp(AtlasEntityWithExtInfo entityWithExtInfo) {
             if(fetchType != ExportFetchType.INCREMENTAL) {
                 return new ArrayList<>();
@@ -442,6 +456,10 @@ public class ExportService {
             return isHiveDBIncremental;
         }
 
+        public boolean isHiveTableIncrementalSkipLineage() {
+            return isHiveTableIncremental;
+        }
+
         public void addToEntityCreationOrder(String guid) {
             entityCreationOrder.add(guid);
         }
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProvider.java
 
b/repository/src/main/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProvider.java
index 256d9de..be07b8b 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProvider.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProvider.java
@@ -48,6 +48,10 @@ public class IncrementalExportEntityProvider implements 
ExtractStrategy {
     private static final String TRANSFORM_CLAUSE = 
".project('__guid').by('__guid').dedup().toList()";
     private static final String TIMESTAMP_CLAUSE = 
".has('__modificationTimestamp', gt(modificationTimestamp))";
 
+    private static final String QUERY_TABLE_DB = QUERY_DB + 
".out('__hive_table.db')";
+    private static final String QUERY_TABLE_SD = QUERY_DB + 
".out('__hive_table.sd')";
+    private static final String QUERY_TABLE_COLUMNS = QUERY_DB + 
".out('__hive_table.columns')";
+
     private ScriptEngine scriptEngine;
 
     @Inject
@@ -67,7 +71,10 @@ public class IncrementalExportEntityProvider implements 
ExtractStrategy {
 
     @Override
     public void connectedFetch(AtlasEntity entity, ExportService.ExportContext 
context) {
-
+        //starting entity is hive_table
+        context.guidsToProcess.addAll(fetchGuids(entity.getGuid(), 
QUERY_TABLE_DB, context.changeMarker));
+        context.guidsToProcess.addAll(fetchGuids(entity.getGuid(), 
QUERY_TABLE_SD, context.changeMarker));
+        context.guidsToProcess.addAll(fetchGuids(entity.getGuid(), 
QUERY_TABLE_COLUMNS, context.changeMarker));
     }
 
     public void populate(String dbEntityGuid, long timeStamp, 
UniqueList<String> guidsToProcess) {
diff --git 
a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java
 
b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java
index 1d44081..7aeb6a7 100644
--- 
a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java
+++ 
b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java
@@ -26,29 +26,37 @@ import org.apache.atlas.TestUtilsV2;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.impexp.AtlasExportRequest;
 import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasObjectId;
 import org.apache.atlas.repository.store.graph.v2.AtlasEntityStoreV2;
 import org.apache.atlas.repository.util.UniqueList;
 import org.apache.atlas.store.AtlasTypeDefStore;
 import org.apache.atlas.type.AtlasClassificationType;
 import org.apache.atlas.type.AtlasTypeRegistry;
 import org.apache.atlas.utils.TestResourceFileUtils;
+import org.testng.ITestContext;
 import org.testng.SkipException;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Guice;
 import org.testng.annotations.Test;
+import org.testng.annotations.DataProvider;
 
 import javax.inject.Inject;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 import static 
org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_INCREMENTAL_CHANGE_MARKER;
 import static 
org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.createTypes;
 import static 
org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getEntities;
+import static 
org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getZipSource;
+import static 
org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runImportWithNoParameters;
 import static 
org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runExportWithParameters;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
 
 @Guice(modules = TestModules.TestOnlyModule.class)
 public class ExportIncrementalTest extends ExportImportTestBase {
@@ -62,6 +70,9 @@ public class ExportIncrementalTest extends 
ExportImportTestBase {
     ExportService exportService;
 
     @Inject
+    private ImportService importService;
+
+    @Inject
     private AtlasEntityStoreV2 entityStore;
 
     private final String EXPORT_REQUEST_INCREMENTAL = "export-incremental";
@@ -69,6 +80,15 @@ public class ExportIncrementalTest extends 
ExportImportTestBase {
     private AtlasClassificationType classificationTypeT1;
     private long nextTimestamp;
 
+    private static final String EXPORT_INCREMENTAL = "incremental";
+    private static final String QUALIFIED_NAME_DB = "db_test_1@02052019";
+    private static final String QUALIFIED_NAME_TABLE_LINEAGE = 
"db_test_1.test_tbl_ctas_2@02052019";
+
+
+    private static final String GUID_DB = 
"f0b72ab4-7452-4e42-ac74-2aee7728cce4";
+    private static final String GUID_TABLE_2 = 
"8d0b834c-61ce-42d8-8f66-6fa51c36bccb";
+    private static final String GUID_TABLE_CTAS_2 = 
"eaec545b-3ac7-4e1b-a497-bd4a2b6434a2";
+
     @BeforeClass
     public void setup() throws IOException, AtlasBaseException {
         basicSetup(typeDefStore, typeRegistry);
@@ -174,6 +194,36 @@ public class ExportIncrementalTest extends 
ExportImportTestBase {
         assertEquals(creationOrder.size(), zipCreationOrder.size());
     }
 
+    @DataProvider(name = "hiveDb")
+    public static Object[][] getData(ITestContext context) throws IOException, 
AtlasBaseException {
+        return getZipSource("hive_db_lineage.zip");
+    }
+
+    @Test(dataProvider = "hiveDb")
+    public void importHiveDb(ZipSource zipSource) throws AtlasBaseException, 
IOException {
+        runImportWithNoParameters(importService, zipSource);
+    }
+
+    @Test(dependsOnMethods = "importHiveDb")
+    public void exportTableInrementalConnected() throws AtlasBaseException {
+        ZipSource source = runExportWithParameters(exportService, 
getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_LINEAGE, EXPORT_INCREMENTAL, 
0, true));
+        verifyExpectedEntities(getFileNames(source), GUID_DB, 
GUID_TABLE_CTAS_2);
+
+        nextTimestamp = updateTimesampForNextIncrementalExport(source);
+
+        try {
+            source = runExportWithParameters(exportService, 
getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_LINEAGE, EXPORT_INCREMENTAL, 
nextTimestamp, true));
+        }catch (SkipException e){
+
+        }
+
+        entityStore.addClassifications(GUID_TABLE_CTAS_2, 
ImmutableList.of(classificationTypeT1.createDefaultValue()));
+
+        source = runExportWithParameters(exportService, 
getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_LINEAGE, EXPORT_INCREMENTAL, 
nextTimestamp, true));
+        verifyExpectedEntities(getFileNames(source), GUID_TABLE_CTAS_2);
+    }
+
+
     private AtlasExportRequest getIncrementalRequest(long timestamp) {
         try {
             AtlasExportRequest request = 
TestResourceFileUtils.readObjectFromJson(ENTITIES_SUB_DIR, 
EXPORT_REQUEST_INCREMENTAL, AtlasExportRequest.class);
@@ -193,4 +243,42 @@ public class ExportIncrementalTest extends 
ExportImportTestBase {
         }
     }
 
+    private AtlasExportRequest getExportRequestForHiveTable(String name, 
String fetchType, long changeMarker, boolean skipLineage) {
+        AtlasExportRequest request = new AtlasExportRequest();
+
+        List<AtlasObjectId> itemsToExport = new ArrayList<>();
+        itemsToExport.add(new AtlasObjectId("hive_table", "qualifiedName", 
name));
+        request.setItemsToExport(itemsToExport);
+        request.setOptions(getOptionsMap(fetchType, changeMarker, 
skipLineage));
+
+        return request;
+    }
+
+    private Map<String, Object> getOptionsMap(String fetchType, long 
changeMarker, boolean skipLineage){
+        Map<String, Object> optionsMap = new HashMap<>();
+        optionsMap.put("fetchType", fetchType.isEmpty() ? "full" : fetchType );
+        optionsMap.put( "changeMarker", changeMarker);
+        optionsMap.put("skipLineage", skipLineage);
+
+        return optionsMap;
+    }
+
+    private void verifyExpectedEntities(List<String> fileNames, String... 
guids){
+        assertEquals(fileNames.size(), guids.length);
+        for (String guid : guids) {
+            assertTrue(fileNames.contains(guid.toLowerCase()));
+        }
+    }
+
+    private List<String> getFileNames(ZipSource zipSource){
+        List<String> ret = new ArrayList<>();
+        assertTrue(zipSource.hasNext());
+
+        while (zipSource.hasNext()){
+            AtlasEntity atlasEntity = zipSource.next();
+            assertNotNull(atlasEntity);
+            ret.add(atlasEntity.getGuid());
+        }
+        return ret;
+    }
 }

Reply via email to