This is an automated email from the ASF dual-hosted git repository.

nixon pushed a commit to branch branch-0.8
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/branch-0.8 by this push:
     new 9ee23bf  ATLAS-3416 Import API: delete non-exported hive_table for 
table level replication
9ee23bf is described below

commit 9ee23bf5aec4cb9c9dc9c3e899d96be99b0c7d3b
Author: nikhilbonte <[email protected]>
AuthorDate: Thu Aug 29 12:47:20 2019 +0530

    ATLAS-3416 Import API: delete non-exported hive_table for table level 
replication
    
    Signed-off-by: nixonrodrigues <[email protected]>
---
 .../atlas/model/impexp/ExportImportAuditEntry.java |   1 +
 .../atlas/repository/impexp/AuditsWriter.java      |  22 +++
 .../atlas/repository/impexp/ImportService.java     |  40 ++++-
 .../impexp/TableReplicationRequestProcessor.java   | 180 +++++++++++++++++++++
 .../atlas/repository/impexp/ImportServiceTest.java |  81 +++++++++-
 .../TableReplicationRequestProcessorTest.java      | 133 +++++++++++++++
 6 files changed, 449 insertions(+), 8 deletions(-)

diff --git 
a/intg/src/main/java/org/apache/atlas/model/impexp/ExportImportAuditEntry.java 
b/intg/src/main/java/org/apache/atlas/model/impexp/ExportImportAuditEntry.java
index 6fff6be..a07b56f 100644
--- 
a/intg/src/main/java/org/apache/atlas/model/impexp/ExportImportAuditEntry.java
+++ 
b/intg/src/main/java/org/apache/atlas/model/impexp/ExportImportAuditEntry.java
@@ -37,6 +37,7 @@ public class ExportImportAuditEntry extends 
AtlasBaseModelObject implements Seri
     private static final long serialVersionUID = 1L;
     public static final String OPERATION_EXPORT = "EXPORT";
     public static final String OPERATION_IMPORT = "IMPORT";
+    public static final String OPERATION_IMPORT_DELETE_REPL = 
"IMPORT_DELETE_REPL";
 
     private String userName;
     private String operation;
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java 
b/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java
index 612b403..d57cb88 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java
@@ -44,6 +44,7 @@ import org.springframework.util.CollectionUtils;
 import javax.inject.Inject;
 import java.util.Collections;
 import java.util.List;
+import java.util.Set;
 
 @Component
 public class AuditsWriter {
@@ -68,6 +69,10 @@ public class AuditsWriter {
         this.auditService = auditService;
     }
 
+    public AtlasServerService getAtlasServerService() {
+        return atlasServerService;
+    }
+
     public void write(String userName, AtlasExportResult result,
                       long startTime, long endTime,
                       List<String> entityCreationOrder) throws 
AtlasBaseException {
@@ -80,6 +85,12 @@ public class AuditsWriter {
         auditForImport.add(userName, result, startTime, endTime, 
entityCreationOrder);
     }
 
+    public void write(String userName, String sourceCluster,
+                      long startTime, long endTime,
+                      Set<String> entityCreationOrder) throws 
AtlasBaseException {
+        auditForImport.add(userName, sourceCluster, startTime, endTime, 
entityCreationOrder);
+    }
+
     private void updateReplicationAttribute(boolean isReplicationSet,
                                             String serverName, String 
serverFullName,
                                             List<String> exportedGuids,
@@ -240,5 +251,16 @@ public class AuditsWriter {
             updateReplicationAttribute(replicationOptionState, 
sourceServerName, sourceServerFullName, entityGuids,
                     Constants.ATTR_NAME_REPLICATED_FROM, 
result.getExportResult().getChangeMarker());
         }
+
+        public void add(String userName, String sourceCluster, long startTime,
+                        long endTime, Set<String> entityGuids) throws 
AtlasBaseException {
+
+            sourceServerName = getServerNameFromFullName(sourceCluster);
+            auditService.add(userName,
+                    sourceServerName, getCurrentClusterName(),
+                    ExportImportAuditEntry.OPERATION_IMPORT_DELETE_REPL,
+                    AtlasType.toJson(entityGuids), startTime, endTime, 
!entityGuids.isEmpty());
+
+        }
     }
 }
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
 
b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
index 90e9f5d..b9ace04 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
@@ -23,8 +23,10 @@ import org.apache.atlas.AtlasErrorCode;
 import org.apache.atlas.entitytransform.BaseEntityHandler;
 import org.apache.atlas.entitytransform.TransformerContext;
 import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.impexp.AtlasExportRequest;
 import org.apache.atlas.model.impexp.AtlasImportRequest;
 import org.apache.atlas.model.impexp.AtlasImportResult;
+import org.apache.atlas.model.instance.AtlasObjectId;
 import org.apache.atlas.model.typedef.AtlasTypesDef;
 import org.apache.atlas.repository.store.graph.BulkImporter;
 import org.apache.atlas.repository.store.graph.v1.EntityImportStream;
@@ -53,24 +55,28 @@ import static 
org.apache.atlas.model.impexp.AtlasImportRequest.TRANSFORMS_KEY;
 public class ImportService {
     private static final Logger LOG = 
LoggerFactory.getLogger(ImportService.class);
 
+    private static final String ATLAS_TYPE_HIVE_TABLE = "hive_table";
     private final AtlasTypeDefStore typeDefStore;
     private final AtlasTypeRegistry typeRegistry;
     private final BulkImporter bulkImporter;
     private final AuditsWriter auditsWriter;
     private final ImportTransformsShaper importTransformsShaper;
 
+    private TableReplicationRequestProcessor tableReplicationRequestProcessor;
+
     private long startTimestamp;
     private long endTimestamp;
 
     @Inject
     public ImportService(AtlasTypeDefStore typeDefStore, AtlasTypeRegistry 
typeRegistry, BulkImporter bulkImporter,
-                         AuditsWriter auditsWriter,
-                         ImportTransformsShaper importTransformsShaper) {
+                         AuditsWriter auditsWriter, ImportTransformsShaper 
importTransformsShaper,
+                         TableReplicationRequestProcessor 
tableReplicationRequestProcessor) {
         this.typeDefStore = typeDefStore;
         this.typeRegistry = typeRegistry;
         this.bulkImporter = bulkImporter;
         this.auditsWriter = auditsWriter;
         this.importTransformsShaper = importTransformsShaper;
+        this.tableReplicationRequestProcessor = 
tableReplicationRequestProcessor;
     }
 
     public AtlasImportResult run(InputStream inputStream, String userName,
@@ -106,7 +112,11 @@ public class ImportService {
             startTimestamp = System.currentTimeMillis();
             processTypes(source.getTypesDef(), result);
             setStartPosition(request, source);
+
             processEntities(userName, source, result);
+
+            processReplicationDeletion(source.getExportResult().getRequest(), 
request, userName);
+
         } catch (AtlasBaseException excp) {
             LOG.error("import(user={}, from={}): failed", userName, 
requestingIP, excp);
 
@@ -223,6 +233,12 @@ public class ImportService {
         auditsWriter.write(userName, result, startTimestamp, endTimestamp, 
importSource.getCreationOrder());
     }
 
+    private void processReplicationDeletion(AtlasExportRequest exportRequest, 
AtlasImportRequest importRequest, String userName) throws AtlasBaseException {
+        if (checkHiveTableIncrementalSkipLineage(importRequest, 
exportRequest)) {
+            tableReplicationRequestProcessor.process(exportRequest, 
importRequest, userName);
+        }
+    }
+
     private int getDuration(long endTime, long startTime) {
         return (int) (endTime - startTime);
     }
@@ -234,9 +250,25 @@ public class ImportService {
             }
 
             return new ZipSourceWithBackingDirectory(inputStream, 
configuredTemporaryDirectory);
-        }
-        catch (IOException ex) {
+        } catch (IOException ex) {
             throw new AtlasBaseException(ex);
         }
     }
+
+    @VisibleForTesting
+    boolean checkHiveTableIncrementalSkipLineage(AtlasImportRequest 
importRequest, AtlasExportRequest exportRequest) {
+        if (CollectionUtils.isEmpty(exportRequest.getItemsToExport())) {
+            return false;
+        }
+
+        for (AtlasObjectId itemToExport : exportRequest.getItemsToExport()) {
+            if 
(!itemToExport.getTypeName().equalsIgnoreCase(ATLAS_TYPE_HIVE_TABLE)){
+                return false;
+            }
+        }
+
+        return importRequest.isReplicationOptionSet() && 
exportRequest.isReplicationOptionSet() &&
+                
exportRequest.getFetchTypeOptionValue().equalsIgnoreCase(AtlasExportRequest.FETCH_TYPE_INCREMENTAL)
 &&
+                exportRequest.getSkipLineageOptionValue();
+    }
 }
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/impexp/TableReplicationRequestProcessor.java
 
b/repository/src/main/java/org/apache/atlas/repository/impexp/TableReplicationRequestProcessor.java
new file mode 100644
index 0000000..397be16
--- /dev/null
+++ 
b/repository/src/main/java/org/apache/atlas/repository/impexp/TableReplicationRequestProcessor.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.impexp;
+
+import org.apache.atlas.discovery.AtlasDiscoveryService;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.discovery.AtlasSearchResult;
+import org.apache.atlas.model.discovery.SearchParameters;
+import org.apache.atlas.model.impexp.AtlasExportRequest;
+import org.apache.atlas.model.impexp.AtlasImportRequest;
+import org.apache.atlas.model.instance.AtlasEntityHeader;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.repository.store.graph.AtlasEntityStore;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.commons.collections.CollectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import javax.inject.Inject;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.Map;
+import java.util.ArrayList;
+import java.util.HashSet;
+
+@Component
+public class TableReplicationRequestProcessor {
+    private static final Logger LOG = 
LoggerFactory.getLogger(TableReplicationRequestProcessor.class);
+
+    private static final String QUERY_DB_NAME_EQUALS= "where db.name='%s'";
+    private static final String ATTR_NAME_KEY = "name";
+    private static final String TYPE_HIVE_TABLE = "hive_table";
+    private static final String ATTR_QUALIFIED_NAME_KEY = "qualifiedName";
+    private static final String REPLICATED_TAG_NAME = "%s_replicated";
+
+    private long startTstamp;
+    private long endTstamp;
+    private AuditsWriter auditsWriter;
+    private AtlasEntityStore entityStore;
+    private AtlasTypeRegistry typeRegistry;
+    private AtlasDiscoveryService discoveryService;
+
+    @Inject
+    public TableReplicationRequestProcessor(AuditsWriter auditsWriter, 
AtlasEntityStore entityStore,
+                                            AtlasDiscoveryService 
atlasDiscoveryService, AtlasTypeRegistry typeRegistry) {
+        this.auditsWriter = auditsWriter;
+        this.entityStore = entityStore;
+        this.typeRegistry = typeRegistry;
+        this.discoveryService = atlasDiscoveryService;
+    }
+
+    public void process(AtlasExportRequest exportRequest, AtlasImportRequest 
importRequest, String userName) throws AtlasBaseException {
+        startTstamp = System.currentTimeMillis();
+        LOG.info("process: deleting entities with type hive_table which are 
not imported.");
+        String sourceCluster = importRequest.getOptionKeyReplicatedFrom();
+
+        List<String> qualifiedNames = 
getQualifiedNamesFromRequest(exportRequest);
+
+        List<String> safeGUIDs = getEntitiesFromQualifiedNames(qualifiedNames);
+
+        String dbName = getDbName(safeGUIDs.get(0));
+
+        Set<String> guidsToDelete = getGuidsToDelete(dbName, safeGUIDs, 
sourceCluster);
+
+        deleteTables(sourceCluster, guidsToDelete, userName);
+    }
+
+    private List<String> getQualifiedNamesFromRequest(AtlasExportRequest 
exportRequest){
+        List<String> qualifiedNames = new ArrayList<>();
+
+        for (AtlasObjectId objectId : exportRequest.getItemsToExport()) {
+            
qualifiedNames.add(objectId.getUniqueAttributes().get(ATTR_QUALIFIED_NAME_KEY).toString());
+        }
+        return qualifiedNames;
+    }
+
+    private List<String> getEntitiesFromQualifiedNames(List<String> 
qualifiedNames) throws AtlasBaseException {
+
+        List<String> safeGUIDs = new ArrayList<>();
+        for(Object qualifiedName : qualifiedNames) {
+            String guid = 
getGuidByUniqueAttributes(Collections.singletonMap(ATTR_QUALIFIED_NAME_KEY, 
qualifiedName));
+            safeGUIDs.add(guid);
+        }
+        return safeGUIDs;
+    }
+
+    private String getGuidByUniqueAttributes(Map<String, Object> 
uniqueAttributes) throws AtlasBaseException {
+        return 
entityStore.getGuidByUniqueAttributes(typeRegistry.getEntityTypeByName(TYPE_HIVE_TABLE),
 uniqueAttributes);
+    }
+
+    private String getDbName(String tableGuid) throws AtlasBaseException {
+        String dbGuid = AuditsWriter.ReplKeyGuidFinder.get(typeRegistry, 
entityStore, tableGuid);
+        return (String) 
entityStore.getById(dbGuid).getEntity().getAttribute(ATTR_NAME_KEY);
+    }
+
+    private Set<String> getGuidsToDelete(String dbName, List<String> 
excludeGUIDs, String sourceCluster) throws AtlasBaseException {
+
+        SearchParameters parameters = getSearchParameters(dbName, 
sourceCluster);
+        Set<String> unsafeGUIDs = new HashSet<>();
+
+        final int max = 10000;
+        int fetchedSize = 0;
+        int i = 0;
+        parameters.setLimit(max);
+
+        while (fetchedSize == (max * i)) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("i={}, fetchedSize={}, unsafeGUIDs.size()={}", i, 
fetchedSize, unsafeGUIDs.size());
+            }
+
+            int offset = max * i;
+            parameters.setOffset(offset);
+
+            AtlasSearchResult searchResult = 
discoveryService.searchWithParameters(parameters);
+
+            if (CollectionUtils.isEmpty(searchResult.getEntities())) {
+                break;
+            }
+
+            for (AtlasEntityHeader entityHeader : searchResult.getEntities()) {
+                String guid = entityHeader.getGuid();
+                if (!excludeGUIDs.contains(guid)) {
+                    unsafeGUIDs.add(guid);
+                }
+            }
+            fetchedSize = searchResult.getEntities().size();
+            i++;
+        }
+        return unsafeGUIDs;
+    }
+
+    private SearchParameters getSearchParameters(String dbName, String 
sourceCluster) {
+        String query = String.format(QUERY_DB_NAME_EQUALS, dbName);
+
+        SearchParameters parameters = new SearchParameters();
+        parameters.setExcludeDeletedEntities(false);
+        parameters.setTypeName(TYPE_HIVE_TABLE);
+        parameters.setExcludeDeletedEntities(true);
+
+        parameters.setClassification(String.format(REPLICATED_TAG_NAME, 
sourceCluster));
+        parameters.setAttributes(new HashSet<String>(){{ 
add(AtlasImportRequest.OPTION_KEY_REPLICATED_FROM); }});
+        parameters.setQuery(query);
+
+        return parameters;
+    }
+
+    private void deleteTables(String sourceCluster, Set<String> guidsToDelete, 
String userName) throws AtlasBaseException {
+        if (!CollectionUtils.isEmpty(guidsToDelete)) {
+            entityStore.deleteByIds(new ArrayList<>(guidsToDelete));
+
+            endTstamp = System.currentTimeMillis();
+            createAuditEntry(sourceCluster, guidsToDelete, userName);
+        }
+    }
+
+    private void createAuditEntry(String sourceCluster, Set<String> 
guidsToDelete, String userName) throws AtlasBaseException {
+        auditsWriter.write(userName, sourceCluster, startTstamp, endTstamp, 
guidsToDelete);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Deleted entities => {}", guidsToDelete);
+        }
+    }
+}
diff --git 
a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java
 
b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java
index 2aee233..52b509f 100644
--- 
a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java
+++ 
b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java
@@ -23,11 +23,14 @@ import org.apache.atlas.RequestContextV1;
 import org.apache.atlas.TestModules;
 import org.apache.atlas.TestUtilsV2;
 import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.impexp.AtlasExportRequest;
 import org.apache.atlas.model.impexp.AtlasImportRequest;
 import org.apache.atlas.store.AtlasTypeDefStore;
 import org.apache.atlas.type.AtlasClassificationType;
 import org.apache.atlas.type.AtlasTypeRegistry;
 import org.mockito.invocation.InvocationOnMock;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.commons.lang.StringUtils;
 import org.mockito.stubbing.Answer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,25 +40,33 @@ import org.testng.annotations.BeforeTest;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Guice;
 import org.testng.annotations.Test;
-
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import static 
org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getDefaultImportRequest;
 import static 
org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getZipSource;
+import static 
org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getInputStreamFrom;
 import static 
org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadModelFromJson;
 import static 
org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadModelFromResourcesJson;
 import static 
org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runAndVerifyQuickStart_v1_Import;
 import static 
org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runImportWithNoParameters;
 import static 
org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runImportWithNoParametersUsingBackingDirectory;
 import static 
org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runImportWithParameters;
+import static 
org.apache.atlas.model.impexp.AtlasExportRequest.OPTION_FETCH_TYPE;
+import static 
org.apache.atlas.model.impexp.AtlasExportRequest.OPTION_KEY_REPLICATED_TO;
+import static 
org.apache.atlas.model.impexp.AtlasExportRequest.OPTION_SKIP_LINEAGE;
+import static org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_FULL;
+import static 
org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_INCREMENTAL;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertTrue;
 
 @Guice(modules = TestModules.TestOnlyModule.class)
 public class ImportServiceTest extends ExportImportTestBase {
@@ -215,7 +226,7 @@ public class ImportServiceTest extends ExportImportTestBase 
{
 
     @Test
     public void importServiceProcessesIOException() {
-        ImportService importService = new ImportService(typeDefStore, 
typeRegistry, null, null,null);
+        ImportService importService = new ImportService(typeDefStore, 
typeRegistry, null,null, null,null);
         AtlasImportRequest req = mock(AtlasImportRequest.class);
 
         Answer<Map> answer = new Answer<Map>() {
@@ -281,6 +292,68 @@ public class ImportServiceTest extends 
ExportImportTestBase {
 
     @Test(expectedExceptions = AtlasBaseException.class)
     public void importEmptyZip() throws IOException, AtlasBaseException {
-        new ZipSource((InputStream) getZipSource("empty.zip")[0][0]);
+        new ZipSource(getInputStreamFrom("empty.zip"));
+    }
+
+    @Test
+    public void testCheckHiveTableIncrementalSkipLineage() {
+        AtlasImportRequest importRequest;
+        AtlasExportRequest exportRequest;
+
+        importRequest = getImportRequest("cl1");
+        exportRequest = getExportRequest(FETCH_TYPE_INCREMENTAL, "cl2", true, 
getItemsToExport("hive_table", "hive_table"));
+        
assertTrue(importService.checkHiveTableIncrementalSkipLineage(importRequest, 
exportRequest));
+
+        exportRequest = getExportRequest(FETCH_TYPE_INCREMENTAL, "cl2", true, 
getItemsToExport("hive_table", "hive_db", "hive_table"));
+        
assertFalse(importService.checkHiveTableIncrementalSkipLineage(importRequest, 
exportRequest));
+
+        exportRequest = getExportRequest(FETCH_TYPE_FULL, "cl2", true, 
getItemsToExport("hive_table", "hive_table"));
+        
assertFalse(importService.checkHiveTableIncrementalSkipLineage(importRequest, 
exportRequest));
+
+        exportRequest = getExportRequest(FETCH_TYPE_FULL, "", true, 
getItemsToExport("hive_table", "hive_table"));
+        
assertFalse(importService.checkHiveTableIncrementalSkipLineage(importRequest, 
exportRequest));
+
+        importRequest = getImportRequest("");
+        exportRequest = getExportRequest(FETCH_TYPE_INCREMENTAL, "cl2", true, 
getItemsToExport("hive_table", "hive_table"));
+        
assertFalse(importService.checkHiveTableIncrementalSkipLineage(importRequest, 
exportRequest));
+    }
+
+    private AtlasImportRequest getImportRequest(String replicatedFrom){
+        AtlasImportRequest importRequest = getDefaultImportRequest();
+
+        if (!StringUtils.isEmpty(replicatedFrom)) {
+            
importRequest.setOption(AtlasImportRequest.OPTION_KEY_REPLICATED_FROM, 
replicatedFrom);
+        }
+        return importRequest;
+    }
+
+    private AtlasExportRequest getExportRequest(String fetchType, String 
replicatedTo, boolean skipLineage, List<AtlasObjectId> itemsToExport){
+        AtlasExportRequest request = new AtlasExportRequest();
+
+        request.setOptions(getOptionsMap(fetchType, replicatedTo, 
skipLineage));
+        request.setItemsToExport(itemsToExport);
+        return request;
+    }
+
+    private List<AtlasObjectId> getItemsToExport(String... typeNames){
+        List<AtlasObjectId> itemsToExport = new ArrayList<>();
+        for (String typeName : typeNames) {
+            itemsToExport.add(new AtlasObjectId(typeName, "qualifiedName", 
"db.table@cluster"));
+        }
+        return itemsToExport;
+    }
+
+    private Map<String, Object> getOptionsMap(String fetchType, String 
replicatedTo, boolean skipLineage){
+        Map<String, Object> options = new HashMap<>();
+
+        if (!StringUtils.isEmpty(fetchType)) {
+            options.put(OPTION_FETCH_TYPE, fetchType);
+        }
+        if (!StringUtils.isEmpty(replicatedTo)) {
+            options.put(OPTION_KEY_REPLICATED_TO, replicatedTo);
+        }
+        options.put(OPTION_SKIP_LINEAGE, skipLineage);
+
+        return options;
     }
 }
diff --git 
a/repository/src/test/java/org/apache/atlas/repository/impexp/TableReplicationRequestProcessorTest.java
 
b/repository/src/test/java/org/apache/atlas/repository/impexp/TableReplicationRequestProcessorTest.java
new file mode 100644
index 0000000..9d6e616
--- /dev/null
+++ 
b/repository/src/test/java/org/apache/atlas/repository/impexp/TableReplicationRequestProcessorTest.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.impexp;
+
+import com.google.inject.Inject;
+import org.apache.atlas.RequestContextV1;
+import org.apache.atlas.TestModules;
+import org.apache.atlas.TestUtilsV2;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.impexp.AtlasImportRequest;
+import org.apache.atlas.model.impexp.ExportImportAuditEntry;
+import org.apache.atlas.repository.store.graph.AtlasEntityStore;
+import org.apache.atlas.store.AtlasTypeDefStore;
+import org.apache.atlas.type.AtlasType;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.ITestContext;
+import org.testng.SkipException;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.DataProvider;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+
+import static 
org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getDefaultImportRequest;
+import static 
org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getZipSource;
+import static 
org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getInputStreamFrom;
+import static 
org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runImportWithParameters;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+
+
+@Guice(modules = TestModules.TestOnlyModule.class)
+public class TableReplicationRequestProcessorTest extends ExportImportTestBase 
{
+    private static final Logger LOG = 
LoggerFactory.getLogger(TableReplicationRequestProcessorTest.class);
+
+    private static final String ENTITY_GUID_REPLICATED = 
"718a6d12-35a8-4731-aff8-3a64637a43a3";
+    private static final String ENTITY_GUID_NOT_REPLICATED_1 = 
"e19e5683-d9ae-436a-af1e-0873582d0f1e";
+    private static final String ENTITY_GUID_NOT_REPLICATED_2 = 
"2e28ae34-576e-4a8b-be48-cf5f925d7b15";
+    private static final String REPL_FROM = "cl1";
+    private static final String REPL_TRANSFORMER = 
"[{\"conditions\":{\"__entity\":\"topLevel: \"}," +
+            "\"action\":{\"__entity\":\"ADD_CLASSIFICATION: 
cl1_replicated\"}}," +
+            
"{\"action\":{\"__entity.replicatedTo\":\"CLEAR:\",\"__entity.replicatedFrom\":\"CLEAR:\"}},"
 +
+            "{\"conditions\":{\"hive_db.clusterName\":\"EQUALS: 
cl1\"},\"action\":{\"hive_db.clusterName\":\"SET: cl2\"}}," +
+            "{\"conditions\":{\"hive_db.location\":\"STARTS_WITH_IGNORE_CASE: 
file:///\"}," +
+            "\"action\":{\"hive_db.location\":\"REPLACE_PREFIX: = 
:file:///=file:///\"}}," +
+            
"{\"conditions\":{\"hive_storagedesc.location\":\"STARTS_WITH_IGNORE_CASE: 
file:///\"}," +
+            "\"action\":{\"hive_storagedesc.location\":\"REPLACE_PREFIX: = 
:file:///=file:///\"}}]";
+
+    @Inject
+    private ImportService importService;
+
+    @Inject
+    private AtlasTypeRegistry typeRegistry;
+
+    @Inject
+    private AtlasEntityStore entityStore;
+
+    @Inject
+    private ExportImportAuditService auditService;
+
+    @Inject
+    private AtlasTypeDefStore typeDefStore;
+
+    @BeforeTest
+    public void setupTest() throws IOException, AtlasBaseException {
+        RequestContextV1.clear();
+        RequestContextV1.get().setUser(TestUtilsV2.TEST_USER);
+        basicSetup(typeDefStore, typeRegistry);
+    }
+
+    @DataProvider(name = "source1")
+    public static Object[][] getData1(ITestContext context) throws 
IOException, AtlasBaseException {
+        return getZipSource("repl_exp_1.zip");
+    }
+
+    public static InputStream getData2() {
+        return getInputStreamFrom("repl_exp_2.zip");
+    }
+
+    @Test(dataProvider = "source1")
+    public void importWithIsReplTrue(InputStream zipSource) throws 
AtlasBaseException, IOException {
+        AtlasImportRequest atlasImportRequest = getDefaultImportRequest();
+
+        atlasImportRequest.setOption("replicatedFrom", REPL_FROM);
+        atlasImportRequest.setOption("transformers", REPL_TRANSFORMER);
+
+        runImportWithParameters(importService, atlasImportRequest, zipSource);
+
+        runImportWithParameters(importService, atlasImportRequest, getData2());
+
+        assertAuditEntry();
+    }
+
+    private void assertAuditEntry() {
+        pauseForIndexCreation();
+        List<ExportImportAuditEntry> result;
+        try {
+            result = auditService.get("", "IMPORT_DELETE_REPL", "", "",  "", 
10, 0);
+        } catch (Exception e) {
+            throw new SkipException("audit entries not retrieved.");
+        }
+
+        assertNotNull(result);
+        assertTrue(result.size() > 0);
+
+        List<String> deletedGuids = 
AtlasType.fromJson(result.get(0).getResultSummary(), List.class);
+        assertNotNull(deletedGuids);
+        assertFalse(deletedGuids.contains(ENTITY_GUID_REPLICATED));
+        assertTrue(deletedGuids.contains(ENTITY_GUID_NOT_REPLICATED_1));
+        assertTrue(deletedGuids.contains(ENTITY_GUID_NOT_REPLICATED_2));
+    }
+}

Reply via email to