This is an automated email from the ASF dual-hosted git repository.
nixon pushed a commit to branch branch-0.8
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/branch-0.8 by this push:
new 9ee23bf ATLAS-3416 Import API: delete non-exported hive_table for
table level replication
9ee23bf is described below
commit 9ee23bf5aec4cb9c9dc9c3e899d96be99b0c7d3b
Author: nikhilbonte <[email protected]>
AuthorDate: Thu Aug 29 12:47:20 2019 +0530
ATLAS-3416 Import API: delete non-exported hive_table for table level
replication
Signed-off-by: nixonrodrigues <[email protected]>
---
.../atlas/model/impexp/ExportImportAuditEntry.java | 1 +
.../atlas/repository/impexp/AuditsWriter.java | 22 +++
.../atlas/repository/impexp/ImportService.java | 40 ++++-
.../impexp/TableReplicationRequestProcessor.java | 180 +++++++++++++++++++++
.../atlas/repository/impexp/ImportServiceTest.java | 81 +++++++++-
.../TableReplicationRequestProcessorTest.java | 133 +++++++++++++++
6 files changed, 449 insertions(+), 8 deletions(-)
diff --git
a/intg/src/main/java/org/apache/atlas/model/impexp/ExportImportAuditEntry.java
b/intg/src/main/java/org/apache/atlas/model/impexp/ExportImportAuditEntry.java
index 6fff6be..a07b56f 100644
---
a/intg/src/main/java/org/apache/atlas/model/impexp/ExportImportAuditEntry.java
+++
b/intg/src/main/java/org/apache/atlas/model/impexp/ExportImportAuditEntry.java
@@ -37,6 +37,7 @@ public class ExportImportAuditEntry extends
AtlasBaseModelObject implements Seri
private static final long serialVersionUID = 1L;
public static final String OPERATION_EXPORT = "EXPORT";
public static final String OPERATION_IMPORT = "IMPORT";
+ public static final String OPERATION_IMPORT_DELETE_REPL =
"IMPORT_DELETE_REPL";
private String userName;
private String operation;
diff --git
a/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java
b/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java
index 612b403..d57cb88 100644
---
a/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java
+++
b/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java
@@ -44,6 +44,7 @@ import org.springframework.util.CollectionUtils;
import javax.inject.Inject;
import java.util.Collections;
import java.util.List;
+import java.util.Set;
@Component
public class AuditsWriter {
@@ -68,6 +69,10 @@ public class AuditsWriter {
this.auditService = auditService;
}
+ public AtlasServerService getAtlasServerService() {
+ return atlasServerService;
+ }
+
public void write(String userName, AtlasExportResult result,
long startTime, long endTime,
List<String> entityCreationOrder) throws
AtlasBaseException {
@@ -80,6 +85,12 @@ public class AuditsWriter {
auditForImport.add(userName, result, startTime, endTime,
entityCreationOrder);
}
+ public void write(String userName, String sourceCluster,
+ long startTime, long endTime,
+ Set<String> entityCreationOrder) throws
AtlasBaseException {
+ auditForImport.add(userName, sourceCluster, startTime, endTime,
entityCreationOrder);
+ }
+
private void updateReplicationAttribute(boolean isReplicationSet,
String serverName, String
serverFullName,
List<String> exportedGuids,
@@ -240,5 +251,16 @@ public class AuditsWriter {
updateReplicationAttribute(replicationOptionState,
sourceServerName, sourceServerFullName, entityGuids,
Constants.ATTR_NAME_REPLICATED_FROM,
result.getExportResult().getChangeMarker());
}
+
+ public void add(String userName, String sourceCluster, long startTime,
+ long endTime, Set<String> entityGuids) throws
AtlasBaseException {
+
+ sourceServerName = getServerNameFromFullName(sourceCluster);
+ auditService.add(userName,
+ sourceServerName, getCurrentClusterName(),
+ ExportImportAuditEntry.OPERATION_IMPORT_DELETE_REPL,
+ AtlasType.toJson(entityGuids), startTime, endTime,
!entityGuids.isEmpty());
+
+ }
}
}
diff --git
a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
index 90e9f5d..b9ace04 100644
---
a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
+++
b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
@@ -23,8 +23,10 @@ import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.entitytransform.BaseEntityHandler;
import org.apache.atlas.entitytransform.TransformerContext;
import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.impexp.AtlasExportRequest;
import org.apache.atlas.model.impexp.AtlasImportRequest;
import org.apache.atlas.model.impexp.AtlasImportResult;
+import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.repository.store.graph.BulkImporter;
import org.apache.atlas.repository.store.graph.v1.EntityImportStream;
@@ -53,24 +55,28 @@ import static
org.apache.atlas.model.impexp.AtlasImportRequest.TRANSFORMS_KEY;
public class ImportService {
private static final Logger LOG =
LoggerFactory.getLogger(ImportService.class);
+ private static final String ATLAS_TYPE_HIVE_TABLE = "hive_table";
private final AtlasTypeDefStore typeDefStore;
private final AtlasTypeRegistry typeRegistry;
private final BulkImporter bulkImporter;
private final AuditsWriter auditsWriter;
private final ImportTransformsShaper importTransformsShaper;
+ private TableReplicationRequestProcessor tableReplicationRequestProcessor;
+
private long startTimestamp;
private long endTimestamp;
@Inject
public ImportService(AtlasTypeDefStore typeDefStore, AtlasTypeRegistry
typeRegistry, BulkImporter bulkImporter,
- AuditsWriter auditsWriter,
- ImportTransformsShaper importTransformsShaper) {
+ AuditsWriter auditsWriter, ImportTransformsShaper
importTransformsShaper,
+ TableReplicationRequestProcessor
tableReplicationRequestProcessor) {
this.typeDefStore = typeDefStore;
this.typeRegistry = typeRegistry;
this.bulkImporter = bulkImporter;
this.auditsWriter = auditsWriter;
this.importTransformsShaper = importTransformsShaper;
+ this.tableReplicationRequestProcessor =
tableReplicationRequestProcessor;
}
public AtlasImportResult run(InputStream inputStream, String userName,
@@ -106,7 +112,11 @@ public class ImportService {
startTimestamp = System.currentTimeMillis();
processTypes(source.getTypesDef(), result);
setStartPosition(request, source);
+
processEntities(userName, source, result);
+
+ processReplicationDeletion(source.getExportResult().getRequest(),
request, userName);
+
} catch (AtlasBaseException excp) {
LOG.error("import(user={}, from={}): failed", userName,
requestingIP, excp);
@@ -223,6 +233,12 @@ public class ImportService {
auditsWriter.write(userName, result, startTimestamp, endTimestamp,
importSource.getCreationOrder());
}
+ private void processReplicationDeletion(AtlasExportRequest exportRequest,
AtlasImportRequest importRequest, String userName) throws AtlasBaseException {
+ if (checkHiveTableIncrementalSkipLineage(importRequest,
exportRequest)) {
+ tableReplicationRequestProcessor.process(exportRequest,
importRequest, userName);
+ }
+ }
+
private int getDuration(long endTime, long startTime) {
return (int) (endTime - startTime);
}
@@ -234,9 +250,25 @@ public class ImportService {
}
return new ZipSourceWithBackingDirectory(inputStream,
configuredTemporaryDirectory);
- }
- catch (IOException ex) {
+ } catch (IOException ex) {
throw new AtlasBaseException(ex);
}
}
+
+ @VisibleForTesting
+ boolean checkHiveTableIncrementalSkipLineage(AtlasImportRequest
importRequest, AtlasExportRequest exportRequest) {
+ if (CollectionUtils.isEmpty(exportRequest.getItemsToExport())) {
+ return false;
+ }
+
+ for (AtlasObjectId itemToExport : exportRequest.getItemsToExport()) {
+ if
(!itemToExport.getTypeName().equalsIgnoreCase(ATLAS_TYPE_HIVE_TABLE)){
+ return false;
+ }
+ }
+
+ return importRequest.isReplicationOptionSet() &&
exportRequest.isReplicationOptionSet() &&
+
exportRequest.getFetchTypeOptionValue().equalsIgnoreCase(AtlasExportRequest.FETCH_TYPE_INCREMENTAL)
&&
+ exportRequest.getSkipLineageOptionValue();
+ }
}
diff --git
a/repository/src/main/java/org/apache/atlas/repository/impexp/TableReplicationRequestProcessor.java
b/repository/src/main/java/org/apache/atlas/repository/impexp/TableReplicationRequestProcessor.java
new file mode 100644
index 0000000..397be16
--- /dev/null
+++
b/repository/src/main/java/org/apache/atlas/repository/impexp/TableReplicationRequestProcessor.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.impexp;
+
+import org.apache.atlas.discovery.AtlasDiscoveryService;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.discovery.AtlasSearchResult;
+import org.apache.atlas.model.discovery.SearchParameters;
+import org.apache.atlas.model.impexp.AtlasExportRequest;
+import org.apache.atlas.model.impexp.AtlasImportRequest;
+import org.apache.atlas.model.instance.AtlasEntityHeader;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.repository.store.graph.AtlasEntityStore;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.commons.collections.CollectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import javax.inject.Inject;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.Map;
+import java.util.ArrayList;
+import java.util.HashSet;
+
+@Component
+public class TableReplicationRequestProcessor {
+ private static final Logger LOG =
LoggerFactory.getLogger(TableReplicationRequestProcessor.class);
+
+ private static final String QUERY_DB_NAME_EQUALS= "where db.name='%s'";
+ private static final String ATTR_NAME_KEY = "name";
+ private static final String TYPE_HIVE_TABLE = "hive_table";
+ private static final String ATTR_QUALIFIED_NAME_KEY = "qualifiedName";
+ private static final String REPLICATED_TAG_NAME = "%s_replicated";
+
+ private long startTstamp;
+ private long endTstamp;
+ private AuditsWriter auditsWriter;
+ private AtlasEntityStore entityStore;
+ private AtlasTypeRegistry typeRegistry;
+ private AtlasDiscoveryService discoveryService;
+
+ @Inject
+ public TableReplicationRequestProcessor(AuditsWriter auditsWriter,
AtlasEntityStore entityStore,
+ AtlasDiscoveryService
atlasDiscoveryService, AtlasTypeRegistry typeRegistry) {
+ this.auditsWriter = auditsWriter;
+ this.entityStore = entityStore;
+ this.typeRegistry = typeRegistry;
+ this.discoveryService = atlasDiscoveryService;
+ }
+
+ public void process(AtlasExportRequest exportRequest, AtlasImportRequest
importRequest, String userName) throws AtlasBaseException {
+ startTstamp = System.currentTimeMillis();
+ LOG.info("process: deleting entities with type hive_table which are
not imported.");
+ String sourceCluster = importRequest.getOptionKeyReplicatedFrom();
+
+ List<String> qualifiedNames =
getQualifiedNamesFromRequest(exportRequest);
+
+ List<String> safeGUIDs = getEntitiesFromQualifiedNames(qualifiedNames);
+
+ String dbName = getDbName(safeGUIDs.get(0));
+
+ Set<String> guidsToDelete = getGuidsToDelete(dbName, safeGUIDs,
sourceCluster);
+
+ deleteTables(sourceCluster, guidsToDelete, userName);
+ }
+
+ private List<String> getQualifiedNamesFromRequest(AtlasExportRequest
exportRequest){
+ List<String> qualifiedNames = new ArrayList<>();
+
+ for (AtlasObjectId objectId : exportRequest.getItemsToExport()) {
+
qualifiedNames.add(objectId.getUniqueAttributes().get(ATTR_QUALIFIED_NAME_KEY).toString());
+ }
+ return qualifiedNames;
+ }
+
+ private List<String> getEntitiesFromQualifiedNames(List<String>
qualifiedNames) throws AtlasBaseException {
+
+ List<String> safeGUIDs = new ArrayList<>();
+ for(Object qualifiedName : qualifiedNames) {
+ String guid =
getGuidByUniqueAttributes(Collections.singletonMap(ATTR_QUALIFIED_NAME_KEY,
qualifiedName));
+ safeGUIDs.add(guid);
+ }
+ return safeGUIDs;
+ }
+
+ private String getGuidByUniqueAttributes(Map<String, Object>
uniqueAttributes) throws AtlasBaseException {
+ return
entityStore.getGuidByUniqueAttributes(typeRegistry.getEntityTypeByName(TYPE_HIVE_TABLE),
uniqueAttributes);
+ }
+
+ private String getDbName(String tableGuid) throws AtlasBaseException {
+ String dbGuid = AuditsWriter.ReplKeyGuidFinder.get(typeRegistry,
entityStore, tableGuid);
+ return (String)
entityStore.getById(dbGuid).getEntity().getAttribute(ATTR_NAME_KEY);
+ }
+
+ private Set<String> getGuidsToDelete(String dbName, List<String>
excludeGUIDs, String sourceCluster) throws AtlasBaseException {
+
+ SearchParameters parameters = getSearchParameters(dbName,
sourceCluster);
+ Set<String> unsafeGUIDs = new HashSet<>();
+
+ final int max = 10000;
+ int fetchedSize = 0;
+ int i = 0;
+ parameters.setLimit(max);
+
+ while (fetchedSize == (max * i)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("i={}, fetchedSize={}, unsafeGUIDs.size()={}", i,
fetchedSize, unsafeGUIDs.size());
+ }
+
+ int offset = max * i;
+ parameters.setOffset(offset);
+
+ AtlasSearchResult searchResult =
discoveryService.searchWithParameters(parameters);
+
+ if (CollectionUtils.isEmpty(searchResult.getEntities())) {
+ break;
+ }
+
+ for (AtlasEntityHeader entityHeader : searchResult.getEntities()) {
+ String guid = entityHeader.getGuid();
+ if (!excludeGUIDs.contains(guid)) {
+ unsafeGUIDs.add(guid);
+ }
+ }
+ fetchedSize = searchResult.getEntities().size();
+ i++;
+ }
+ return unsafeGUIDs;
+ }
+
+ private SearchParameters getSearchParameters(String dbName, String
sourceCluster) {
+ String query = String.format(QUERY_DB_NAME_EQUALS, dbName);
+
+ SearchParameters parameters = new SearchParameters();
+ parameters.setExcludeDeletedEntities(false);
+ parameters.setTypeName(TYPE_HIVE_TABLE);
+ parameters.setExcludeDeletedEntities(true);
+
+ parameters.setClassification(String.format(REPLICATED_TAG_NAME,
sourceCluster));
+ parameters.setAttributes(new HashSet<String>(){{
add(AtlasImportRequest.OPTION_KEY_REPLICATED_FROM); }});
+ parameters.setQuery(query);
+
+ return parameters;
+ }
+
+ private void deleteTables(String sourceCluster, Set<String> guidsToDelete,
String userName) throws AtlasBaseException {
+ if (!CollectionUtils.isEmpty(guidsToDelete)) {
+ entityStore.deleteByIds(new ArrayList<>(guidsToDelete));
+
+ endTstamp = System.currentTimeMillis();
+ createAuditEntry(sourceCluster, guidsToDelete, userName);
+ }
+ }
+
+ private void createAuditEntry(String sourceCluster, Set<String>
guidsToDelete, String userName) throws AtlasBaseException {
+ auditsWriter.write(userName, sourceCluster, startTstamp, endTstamp,
guidsToDelete);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Deleted entities => {}", guidsToDelete);
+ }
+ }
+}
diff --git
a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java
b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java
index 2aee233..52b509f 100644
---
a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java
+++
b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java
@@ -23,11 +23,14 @@ import org.apache.atlas.RequestContextV1;
import org.apache.atlas.TestModules;
import org.apache.atlas.TestUtilsV2;
import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.impexp.AtlasExportRequest;
import org.apache.atlas.model.impexp.AtlasImportRequest;
import org.apache.atlas.store.AtlasTypeDefStore;
import org.apache.atlas.type.AtlasClassificationType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.mockito.invocation.InvocationOnMock;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.commons.lang.StringUtils;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,25 +40,33 @@ import org.testng.annotations.BeforeTest;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
-
import java.io.IOException;
import java.io.InputStream;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import static
org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getDefaultImportRequest;
import static
org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getZipSource;
+import static
org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getInputStreamFrom;
import static
org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadModelFromJson;
import static
org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadModelFromResourcesJson;
import static
org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runAndVerifyQuickStart_v1_Import;
import static
org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runImportWithNoParameters;
import static
org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runImportWithNoParametersUsingBackingDirectory;
import static
org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runImportWithParameters;
+import static
org.apache.atlas.model.impexp.AtlasExportRequest.OPTION_FETCH_TYPE;
+import static
org.apache.atlas.model.impexp.AtlasExportRequest.OPTION_KEY_REPLICATED_TO;
+import static
org.apache.atlas.model.impexp.AtlasExportRequest.OPTION_SKIP_LINEAGE;
+import static org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_FULL;
+import static
org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_INCREMENTAL;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertTrue;
@Guice(modules = TestModules.TestOnlyModule.class)
public class ImportServiceTest extends ExportImportTestBase {
@@ -215,7 +226,7 @@ public class ImportServiceTest extends ExportImportTestBase
{
@Test
public void importServiceProcessesIOException() {
- ImportService importService = new ImportService(typeDefStore,
typeRegistry, null, null,null);
+ ImportService importService = new ImportService(typeDefStore,
typeRegistry, null,null, null,null);
AtlasImportRequest req = mock(AtlasImportRequest.class);
Answer<Map> answer = new Answer<Map>() {
@@ -281,6 +292,68 @@ public class ImportServiceTest extends
ExportImportTestBase {
@Test(expectedExceptions = AtlasBaseException.class)
public void importEmptyZip() throws IOException, AtlasBaseException {
- new ZipSource((InputStream) getZipSource("empty.zip")[0][0]);
+ new ZipSource(getInputStreamFrom("empty.zip"));
+ }
+
+ @Test
+ public void testCheckHiveTableIncrementalSkipLineage() {
+ AtlasImportRequest importRequest;
+ AtlasExportRequest exportRequest;
+
+ importRequest = getImportRequest("cl1");
+ exportRequest = getExportRequest(FETCH_TYPE_INCREMENTAL, "cl2", true,
getItemsToExport("hive_table", "hive_table"));
+
assertTrue(importService.checkHiveTableIncrementalSkipLineage(importRequest,
exportRequest));
+
+ exportRequest = getExportRequest(FETCH_TYPE_INCREMENTAL, "cl2", true,
getItemsToExport("hive_table", "hive_db", "hive_table"));
+
assertFalse(importService.checkHiveTableIncrementalSkipLineage(importRequest,
exportRequest));
+
+ exportRequest = getExportRequest(FETCH_TYPE_FULL, "cl2", true,
getItemsToExport("hive_table", "hive_table"));
+
assertFalse(importService.checkHiveTableIncrementalSkipLineage(importRequest,
exportRequest));
+
+ exportRequest = getExportRequest(FETCH_TYPE_FULL, "", true,
getItemsToExport("hive_table", "hive_table"));
+
assertFalse(importService.checkHiveTableIncrementalSkipLineage(importRequest,
exportRequest));
+
+ importRequest = getImportRequest("");
+ exportRequest = getExportRequest(FETCH_TYPE_INCREMENTAL, "cl2", true,
getItemsToExport("hive_table", "hive_table"));
+
assertFalse(importService.checkHiveTableIncrementalSkipLineage(importRequest,
exportRequest));
+ }
+
+ private AtlasImportRequest getImportRequest(String replicatedFrom){
+ AtlasImportRequest importRequest = getDefaultImportRequest();
+
+ if (!StringUtils.isEmpty(replicatedFrom)) {
+
importRequest.setOption(AtlasImportRequest.OPTION_KEY_REPLICATED_FROM,
replicatedFrom);
+ }
+ return importRequest;
+ }
+
+ private AtlasExportRequest getExportRequest(String fetchType, String
replicatedTo, boolean skipLineage, List<AtlasObjectId> itemsToExport){
+ AtlasExportRequest request = new AtlasExportRequest();
+
+ request.setOptions(getOptionsMap(fetchType, replicatedTo,
skipLineage));
+ request.setItemsToExport(itemsToExport);
+ return request;
+ }
+
+ private List<AtlasObjectId> getItemsToExport(String... typeNames){
+ List<AtlasObjectId> itemsToExport = new ArrayList<>();
+ for (String typeName : typeNames) {
+ itemsToExport.add(new AtlasObjectId(typeName, "qualifiedName",
"db.table@cluster"));
+ }
+ return itemsToExport;
+ }
+
+ private Map<String, Object> getOptionsMap(String fetchType, String
replicatedTo, boolean skipLineage){
+ Map<String, Object> options = new HashMap<>();
+
+ if (!StringUtils.isEmpty(fetchType)) {
+ options.put(OPTION_FETCH_TYPE, fetchType);
+ }
+ if (!StringUtils.isEmpty(replicatedTo)) {
+ options.put(OPTION_KEY_REPLICATED_TO, replicatedTo);
+ }
+ options.put(OPTION_SKIP_LINEAGE, skipLineage);
+
+ return options;
}
}
diff --git
a/repository/src/test/java/org/apache/atlas/repository/impexp/TableReplicationRequestProcessorTest.java
b/repository/src/test/java/org/apache/atlas/repository/impexp/TableReplicationRequestProcessorTest.java
new file mode 100644
index 0000000..9d6e616
--- /dev/null
+++
b/repository/src/test/java/org/apache/atlas/repository/impexp/TableReplicationRequestProcessorTest.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.impexp;
+
+import com.google.inject.Inject;
+import org.apache.atlas.RequestContextV1;
+import org.apache.atlas.TestModules;
+import org.apache.atlas.TestUtilsV2;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.impexp.AtlasImportRequest;
+import org.apache.atlas.model.impexp.ExportImportAuditEntry;
+import org.apache.atlas.repository.store.graph.AtlasEntityStore;
+import org.apache.atlas.store.AtlasTypeDefStore;
+import org.apache.atlas.type.AtlasType;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.ITestContext;
+import org.testng.SkipException;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.DataProvider;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+
+import static
org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getDefaultImportRequest;
+import static
org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getZipSource;
+import static
org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getInputStreamFrom;
+import static
org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runImportWithParameters;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+
+
+@Guice(modules = TestModules.TestOnlyModule.class)
+public class TableReplicationRequestProcessorTest extends ExportImportTestBase
{
+ private static final Logger LOG =
LoggerFactory.getLogger(TableReplicationRequestProcessorTest.class);
+
+ private static final String ENTITY_GUID_REPLICATED =
"718a6d12-35a8-4731-aff8-3a64637a43a3";
+ private static final String ENTITY_GUID_NOT_REPLICATED_1 =
"e19e5683-d9ae-436a-af1e-0873582d0f1e";
+ private static final String ENTITY_GUID_NOT_REPLICATED_2 =
"2e28ae34-576e-4a8b-be48-cf5f925d7b15";
+ private static final String REPL_FROM = "cl1";
+ private static final String REPL_TRANSFORMER =
"[{\"conditions\":{\"__entity\":\"topLevel: \"}," +
+ "\"action\":{\"__entity\":\"ADD_CLASSIFICATION:
cl1_replicated\"}}," +
+
"{\"action\":{\"__entity.replicatedTo\":\"CLEAR:\",\"__entity.replicatedFrom\":\"CLEAR:\"}},"
+
+ "{\"conditions\":{\"hive_db.clusterName\":\"EQUALS:
cl1\"},\"action\":{\"hive_db.clusterName\":\"SET: cl2\"}}," +
+ "{\"conditions\":{\"hive_db.location\":\"STARTS_WITH_IGNORE_CASE:
file:///\"}," +
+ "\"action\":{\"hive_db.location\":\"REPLACE_PREFIX: =
:file:///=file:///\"}}," +
+
"{\"conditions\":{\"hive_storagedesc.location\":\"STARTS_WITH_IGNORE_CASE:
file:///\"}," +
+ "\"action\":{\"hive_storagedesc.location\":\"REPLACE_PREFIX: =
:file:///=file:///\"}}]";
+
+ @Inject
+ private ImportService importService;
+
+ @Inject
+ private AtlasTypeRegistry typeRegistry;
+
+ @Inject
+ private AtlasEntityStore entityStore;
+
+ @Inject
+ private ExportImportAuditService auditService;
+
+ @Inject
+ private AtlasTypeDefStore typeDefStore;
+
+ @BeforeTest
+ public void setupTest() throws IOException, AtlasBaseException {
+ RequestContextV1.clear();
+ RequestContextV1.get().setUser(TestUtilsV2.TEST_USER);
+ basicSetup(typeDefStore, typeRegistry);
+ }
+
+ @DataProvider(name = "source1")
+ public static Object[][] getData1(ITestContext context) throws
IOException, AtlasBaseException {
+ return getZipSource("repl_exp_1.zip");
+ }
+
+ public static InputStream getData2() {
+ return getInputStreamFrom("repl_exp_2.zip");
+ }
+
+ @Test(dataProvider = "source1")
+ public void importWithIsReplTrue(InputStream zipSource) throws
AtlasBaseException, IOException {
+ AtlasImportRequest atlasImportRequest = getDefaultImportRequest();
+
+ atlasImportRequest.setOption("replicatedFrom", REPL_FROM);
+ atlasImportRequest.setOption("transformers", REPL_TRANSFORMER);
+
+ runImportWithParameters(importService, atlasImportRequest, zipSource);
+
+ runImportWithParameters(importService, atlasImportRequest, getData2());
+
+ assertAuditEntry();
+ }
+
+ private void assertAuditEntry() {
+ pauseForIndexCreation();
+ List<ExportImportAuditEntry> result;
+ try {
+ result = auditService.get("", "IMPORT_DELETE_REPL", "", "", "",
10, 0);
+ } catch (Exception e) {
+ throw new SkipException("audit entries not retrieved.");
+ }
+
+ assertNotNull(result);
+ assertTrue(result.size() > 0);
+
+ List<String> deletedGuids =
AtlasType.fromJson(result.get(0).getResultSummary(), List.class);
+ assertNotNull(deletedGuids);
+ assertFalse(deletedGuids.contains(ENTITY_GUID_REPLICATED));
+ assertTrue(deletedGuids.contains(ENTITY_GUID_NOT_REPLICATED_1));
+ assertTrue(deletedGuids.contains(ENTITY_GUID_NOT_REPLICATED_2));
+ }
+}