This is an automated email from the ASF dual-hosted git repository.
nixon pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new c3a932f ATLAS-3324 Incremental export with hive_table for table-level
replication.
c3a932f is described below
commit c3a932fb5bed7589e151607b430f568762eb2b35
Author: nikhilbonte <[email protected]>
AuthorDate: Tue Jul 9 18:51:42 2019 +0530
ATLAS-3324 Incremental export with hive_table for table-level replication.
Signed-off-by: nixonrodrigues <[email protected]>
(cherry picked from commit 8ada5d400ea994a0ffd3c67fa0f66719de1e4138)
---
.../atlas/repository/impexp/EntitiesExtractor.java | 3 +
.../atlas/repository/impexp/ExportService.java | 20 ++++-
.../impexp/IncrementalExportEntityProvider.java | 9 ++-
.../repository/impexp/ExportIncrementalTest.java | 88 ++++++++++++++++++++++
4 files changed, 118 insertions(+), 2 deletions(-)
diff --git
a/repository/src/main/java/org/apache/atlas/repository/impexp/EntitiesExtractor.java
b/repository/src/main/java/org/apache/atlas/repository/impexp/EntitiesExtractor.java
index 15cb111..da5cf37 100644
---
a/repository/src/main/java/org/apache/atlas/repository/impexp/EntitiesExtractor.java
+++
b/repository/src/main/java/org/apache/atlas/repository/impexp/EntitiesExtractor.java
@@ -55,6 +55,9 @@ public class EntitiesExtractor {
if (context.isHiveDBIncrementalSkipLineage()) {
extractors.get(INCREMENTAL_EXTRACT).fullFetch(entity,
context);
break;
+ } else if (context.isHiveTableIncrementalSkipLineage()) {
+ extractors.get(INCREMENTAL_EXTRACT).connectedFetch(entity,
context);
+ break;
}
case FULL:
diff --git
a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
index 5055607..6016723 100644
---
a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
+++
b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
@@ -36,6 +36,7 @@ import
org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
import org.apache.atlas.repository.util.UniqueList;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.util.AtlasGremlinQueryProvider;
+import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@@ -332,6 +333,7 @@ public class ExportService {
static class ExportContext {
private static final int REPORTING_THREASHOLD = 1000;
private static final String ATLAS_TYPE_HIVE_DB = "hive_db";
+ private static final String ATLAS_TYPE_HIVE_TABLE = "hive_table";
final UniqueList<String> entityCreationOrder = new
UniqueList<>();
@@ -353,6 +355,7 @@ public class ExportService {
final long changeMarker;
boolean isSkipConnectedFetch;
private final boolean isHiveDBIncremental;
+ private final boolean isHiveTableIncremental;
private int progressReportCount = 0;
@@ -364,11 +367,12 @@ public class ExportService {
skipLineage = result.getRequest().getSkipLineageOptionValue();
this.changeMarker =
result.getRequest().getChangeTokenFromOptions();
this.isHiveDBIncremental =
checkHiveDBIncrementalSkipLineage(result.getRequest());
+ this.isHiveTableIncremental =
checkHiveTableIncrementalSkipLineage(result.getRequest());
this.isSkipConnectedFetch = false;
}
private boolean checkHiveDBIncrementalSkipLineage(AtlasExportRequest
request) {
- if(request.getItemsToExport().size() == 0) {
+ if(CollectionUtils.isEmpty(request.getItemsToExport())) {
return false;
}
@@ -377,6 +381,16 @@ public class ExportService {
request.getSkipLineageOptionValue();
}
+ private boolean
checkHiveTableIncrementalSkipLineage(AtlasExportRequest request) {
+ if(CollectionUtils.isEmpty(request.getItemsToExport())) {
+ return false;
+ }
+
+ return
request.getItemsToExport().get(0).getTypeName().equalsIgnoreCase(ATLAS_TYPE_HIVE_TABLE)
&&
+
request.getFetchTypeOptionValue().equalsIgnoreCase(AtlasExportRequest.FETCH_TYPE_INCREMENTAL)
&&
+ request.getSkipLineageOptionValue();
+ }
+
public List<AtlasEntity>
getEntitiesWithModifiedTimestamp(AtlasEntityWithExtInfo entityWithExtInfo) {
if(fetchType != ExportFetchType.INCREMENTAL) {
return new ArrayList<>();
@@ -442,6 +456,10 @@ public class ExportService {
return isHiveDBIncremental;
}
+ public boolean isHiveTableIncrementalSkipLineage() {
+ return isHiveTableIncremental;
+ }
+
public void addToEntityCreationOrder(String guid) {
entityCreationOrder.add(guid);
}
diff --git
a/repository/src/main/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProvider.java
b/repository/src/main/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProvider.java
index 256d9de..be07b8b 100644
---
a/repository/src/main/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProvider.java
+++
b/repository/src/main/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProvider.java
@@ -48,6 +48,10 @@ public class IncrementalExportEntityProvider implements
ExtractStrategy {
private static final String TRANSFORM_CLAUSE =
".project('__guid').by('__guid').dedup().toList()";
private static final String TIMESTAMP_CLAUSE =
".has('__modificationTimestamp', gt(modificationTimestamp))";
+ private static final String QUERY_TABLE_DB = QUERY_DB +
".out('__hive_table.db')";
+ private static final String QUERY_TABLE_SD = QUERY_DB +
".out('__hive_table.sd')";
+ private static final String QUERY_TABLE_COLUMNS = QUERY_DB +
".out('__hive_table.columns')";
+
private ScriptEngine scriptEngine;
@Inject
@@ -67,7 +71,10 @@ public class IncrementalExportEntityProvider implements
ExtractStrategy {
@Override
public void connectedFetch(AtlasEntity entity, ExportService.ExportContext
context) {
-
+ //starting entity is hive_table
+ context.guidsToProcess.addAll(fetchGuids(entity.getGuid(),
QUERY_TABLE_DB, context.changeMarker));
+ context.guidsToProcess.addAll(fetchGuids(entity.getGuid(),
QUERY_TABLE_SD, context.changeMarker));
+ context.guidsToProcess.addAll(fetchGuids(entity.getGuid(),
QUERY_TABLE_COLUMNS, context.changeMarker));
}
public void populate(String dbEntityGuid, long timeStamp,
UniqueList<String> guidsToProcess) {
diff --git
a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java
b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java
index 1d44081..7aeb6a7 100644
---
a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java
+++
b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java
@@ -26,29 +26,37 @@ import org.apache.atlas.TestUtilsV2;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.AtlasExportRequest;
import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.repository.store.graph.v2.AtlasEntityStoreV2;
import org.apache.atlas.repository.util.UniqueList;
import org.apache.atlas.store.AtlasTypeDefStore;
import org.apache.atlas.type.AtlasClassificationType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.utils.TestResourceFileUtils;
+import org.testng.ITestContext;
import org.testng.SkipException;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
+import org.testng.annotations.DataProvider;
import javax.inject.Inject;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static
org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_INCREMENTAL_CHANGE_MARKER;
import static
org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.createTypes;
import static
org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getEntities;
+import static
org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getZipSource;
+import static
org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runImportWithNoParameters;
import static
org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runExportWithParameters;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
@Guice(modules = TestModules.TestOnlyModule.class)
public class ExportIncrementalTest extends ExportImportTestBase {
@@ -62,6 +70,9 @@ public class ExportIncrementalTest extends
ExportImportTestBase {
ExportService exportService;
@Inject
+ private ImportService importService;
+
+ @Inject
private AtlasEntityStoreV2 entityStore;
private final String EXPORT_REQUEST_INCREMENTAL = "export-incremental";
@@ -69,6 +80,15 @@ public class ExportIncrementalTest extends
ExportImportTestBase {
private AtlasClassificationType classificationTypeT1;
private long nextTimestamp;
+ private static final String EXPORT_INCREMENTAL = "incremental";
+ private static final String QUALIFIED_NAME_DB = "db_test_1@02052019";
+ private static final String QUALIFIED_NAME_TABLE_LINEAGE =
"db_test_1.test_tbl_ctas_2@02052019";
+
+
+ private static final String GUID_DB =
"f0b72ab4-7452-4e42-ac74-2aee7728cce4";
+ private static final String GUID_TABLE_2 =
"8d0b834c-61ce-42d8-8f66-6fa51c36bccb";
+ private static final String GUID_TABLE_CTAS_2 =
"eaec545b-3ac7-4e1b-a497-bd4a2b6434a2";
+
@BeforeClass
public void setup() throws IOException, AtlasBaseException {
basicSetup(typeDefStore, typeRegistry);
@@ -174,6 +194,36 @@ public class ExportIncrementalTest extends
ExportImportTestBase {
assertEquals(creationOrder.size(), zipCreationOrder.size());
}
+ @DataProvider(name = "hiveDb")
+ public static Object[][] getData(ITestContext context) throws IOException,
AtlasBaseException {
+ return getZipSource("hive_db_lineage.zip");
+ }
+
+ @Test(dataProvider = "hiveDb")
+ public void importHiveDb(ZipSource zipSource) throws AtlasBaseException,
IOException {
+ runImportWithNoParameters(importService, zipSource);
+ }
+
+ @Test(dependsOnMethods = "importHiveDb")
+ public void exportTableInrementalConnected() throws AtlasBaseException {
+ ZipSource source = runExportWithParameters(exportService,
getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_LINEAGE, EXPORT_INCREMENTAL,
0, true));
+ verifyExpectedEntities(getFileNames(source), GUID_DB,
GUID_TABLE_CTAS_2);
+
+ nextTimestamp = updateTimesampForNextIncrementalExport(source);
+
+ try {
+ source = runExportWithParameters(exportService,
getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_LINEAGE, EXPORT_INCREMENTAL,
nextTimestamp, true));
+ }catch (SkipException e){
+
+ }
+
+ entityStore.addClassifications(GUID_TABLE_CTAS_2,
ImmutableList.of(classificationTypeT1.createDefaultValue()));
+
+ source = runExportWithParameters(exportService,
getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_LINEAGE, EXPORT_INCREMENTAL,
nextTimestamp, true));
+ verifyExpectedEntities(getFileNames(source), GUID_TABLE_CTAS_2);
+ }
+
+
private AtlasExportRequest getIncrementalRequest(long timestamp) {
try {
AtlasExportRequest request =
TestResourceFileUtils.readObjectFromJson(ENTITIES_SUB_DIR,
EXPORT_REQUEST_INCREMENTAL, AtlasExportRequest.class);
@@ -193,4 +243,42 @@ public class ExportIncrementalTest extends
ExportImportTestBase {
}
}
+ private AtlasExportRequest getExportRequestForHiveTable(String name,
String fetchType, long changeMarker, boolean skipLineage) {
+ AtlasExportRequest request = new AtlasExportRequest();
+
+ List<AtlasObjectId> itemsToExport = new ArrayList<>();
+ itemsToExport.add(new AtlasObjectId("hive_table", "qualifiedName",
name));
+ request.setItemsToExport(itemsToExport);
+ request.setOptions(getOptionsMap(fetchType, changeMarker,
skipLineage));
+
+ return request;
+ }
+
+ private Map<String, Object> getOptionsMap(String fetchType, long
changeMarker, boolean skipLineage){
+ Map<String, Object> optionsMap = new HashMap<>();
+ optionsMap.put("fetchType", fetchType.isEmpty() ? "full" : fetchType );
+ optionsMap.put( "changeMarker", changeMarker);
+ optionsMap.put("skipLineage", skipLineage);
+
+ return optionsMap;
+ }
+
+ private void verifyExpectedEntities(List<String> fileNames, String...
guids){
+ assertEquals(fileNames.size(), guids.length);
+ for (String guid : guids) {
+ assertTrue(fileNames.contains(guid.toLowerCase()));
+ }
+ }
+
+ private List<String> getFileNames(ZipSource zipSource){
+ List<String> ret = new ArrayList<>();
+ assertTrue(zipSource.hasNext());
+
+ while (zipSource.hasNext()){
+ AtlasEntity atlasEntity = zipSource.next();
+ assertNotNull(atlasEntity);
+ ret.add(atlasEntity.getGuid());
+ }
+ return ret;
+ }
}