ATLAS-2814: Cluster stores replication details.

Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/6f10481d
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/6f10481d
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/6f10481d

Branch: refs/heads/branch-1.0
Commit: 6f10481d0f846f5ca4f321ffd4bd8ddc8305a081
Parents: 39cc4b0
Author: Ashutosh Mestry <ames...@hortonworks.com>
Authored: Thu Aug 16 12:11:39 2018 -0700
Committer: Ashutosh Mestry <ames...@hortonworks.com>
Committed: Thu Nov 1 15:42:54 2018 -0700

----------------------------------------------------------------------
 .../java/org/apache/atlas/AtlasBaseClient.java  |   8 +-
 .../atlas/model/clusterinfo/AtlasCluster.java   | 115 ----------
 .../apache/atlas/model/impexp/AtlasCluster.java | 154 ++++++++++++++
 .../atlas/model/impexp/AtlasExportResult.java   |  16 ++
 .../atlas/model/impexp/AtlasImportResult.java   |   9 +
 .../atlas/repository/impexp/AuditsWriter.java   |  82 ++++----
 .../atlas/repository/impexp/ClusterService.java |  77 ++++---
 .../impexp/ExportImportAuditService.java        | 109 +++++++---
 .../atlas/repository/impexp/ExportService.java  |  46 ++--
 .../atlas/repository/impexp/ImportService.java  |  12 ++
 .../repository/impexp/ImportTransformer.java    | 209 ++++++++++++++++++-
 .../atlas/repository/ogm/AtlasClusterDTO.java   |   3 +-
 .../apache/atlas/repository/ogm/DataAccess.java |  20 ++
 .../ogm/ExportImportAuditEntryDTO.java          |  41 ++--
 .../store/graph/v2/EntityGraphMapper.java       |   8 +
 .../store/graph/v2/EntityGraphRetriever.java    |   6 +-
 .../repository/impexp/ClusterServiceTest.java   |  59 +++---
 .../impexp/ExportImportAuditServiceTest.java    |  30 +--
 .../repository/impexp/ExportImportTestBase.java |  24 ++-
 .../impexp/ExportIncrementalTest.java           |   3 -
 .../repository/impexp/ImportServiceTest.java    |   2 +-
 .../repository/impexp/ImportTransformsTest.java | 128 ++++++++++--
 .../impexp/ReplicationEntityAttributeTest.java  |  54 ++---
 .../stocksDB-Entities/replicationAttrs.json     |   5 +-
 .../atlas/web/resources/AdminResource.java      |  24 +--
 .../web/resources/AdminExportImportTestIT.java  |  47 ++++-
 .../test/resources/json/export-incremental.json |   4 +-
 webapp/src/test/resources/stocks-base.zip       | Bin 13166 -> 17706 bytes
 28 files changed, 920 insertions(+), 375 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/atlas/blob/6f10481d/client/common/src/main/java/org/apache/atlas/AtlasBaseClient.java
----------------------------------------------------------------------
diff --git a/client/common/src/main/java/org/apache/atlas/AtlasBaseClient.java 
b/client/common/src/main/java/org/apache/atlas/AtlasBaseClient.java
index df021ad..c247902 100644
--- a/client/common/src/main/java/org/apache/atlas/AtlasBaseClient.java
+++ b/client/common/src/main/java/org/apache/atlas/AtlasBaseClient.java
@@ -37,6 +37,7 @@ import com.sun.jersey.multipart.MultiPart;
 import com.sun.jersey.multipart.file.FileDataBodyPart;
 import com.sun.jersey.multipart.file.StreamDataBodyPart;
 import com.sun.jersey.multipart.impl.MultiPartWriter;
+import org.apache.atlas.model.impexp.AtlasCluster;
 import org.apache.atlas.model.impexp.AtlasExportRequest;
 import org.apache.atlas.model.impexp.AtlasImportRequest;
 import org.apache.atlas.model.impexp.AtlasImportResult;
@@ -79,7 +80,7 @@ public abstract class AtlasBaseClient {
     public static final String ADMIN_METRICS = "admin/metrics";
     public static final String ADMIN_IMPORT = "admin/import";
     public static final String ADMIN_EXPORT = "admin/export";
-    public static final String HTTP_AUTHENTICATION_ENABLED = 
"atlas.http.authentication.enabled";
+    public static final String ADMIN_CLUSTER_TEMPLATE = "%sadmin/cluster/%s";
 
     public static final String QUERY = "query";
     public static final String LIMIT = "limit";
@@ -519,6 +520,11 @@ public abstract class AtlasBaseClient {
         return new FormDataBodyPart(IMPORT_REQUEST_PARAMTER, 
AtlasType.toJson(request), MediaType.APPLICATION_JSON_TYPE);
     }
 
+    public AtlasCluster getCluster(String clusterName) throws 
AtlasServiceException {
+        API api = new API(String.format(ADMIN_CLUSTER_TEMPLATE, BASE_URI, 
clusterName), HttpMethod.GET, Response.Status.OK);
+        return callAPI(api, AtlasCluster.class, null);
+    }
+
     boolean isRetryableException(ClientHandlerException che) {
         return che.getCause().getClass().equals(IOException.class)
                 || che.getCause().getClass().equals(ConnectException.class);

http://git-wip-us.apache.org/repos/asf/atlas/blob/6f10481d/intg/src/main/java/org/apache/atlas/model/clusterinfo/AtlasCluster.java
----------------------------------------------------------------------
diff --git 
a/intg/src/main/java/org/apache/atlas/model/clusterinfo/AtlasCluster.java 
b/intg/src/main/java/org/apache/atlas/model/clusterinfo/AtlasCluster.java
deleted file mode 100644
index efea55a..0000000
--- a/intg/src/main/java/org/apache/atlas/model/clusterinfo/AtlasCluster.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.atlas.model.clusterinfo;
-
-import com.fasterxml.jackson.annotation.JsonAutoDetect;
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-import com.fasterxml.jackson.databind.annotation.JsonSerialize;
-import org.apache.atlas.model.AtlasBaseModelObject;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
-import static 
com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
-
-@JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = 
PUBLIC_ONLY, fieldVisibility = NONE)
-@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class AtlasCluster extends AtlasBaseModelObject implements Serializable 
{
-    private static final long serialVersionUID = 1L;
-
-    public static final String SYNC_INFO_KEY            = "syncInfo";
-    public static final String OPERATION                = "operation";
-    public static final String NEXT_MODIFIED_TIMESTAMP  = 
"nextModifiedTimestamp";
-
-    private String name;
-    private String qualifiedName;
-    private Map<String, String> additionalInfo;
-    private List<String> urls;
-
-    public AtlasCluster() {
-        urls = new ArrayList<>();
-    }
-
-    public AtlasCluster(String name, String qualifiedName) {
-        this.name = name;
-        this.qualifiedName = qualifiedName;
-    }
-
-    public void setName(String name) {
-        this.name = name;
-    }
-
-    public String getName() {
-        return this.name;
-    }
-
-    public void setAdditionalInfo(Map<String, String> additionalInfo) {
-        if(this.additionalInfo == null) {
-            this.additionalInfo = new HashMap<>();
-        }
-
-        this.additionalInfo = additionalInfo;
-    }
-
-    public void setAdditionalInfo(String key, String value) {
-        if(this.additionalInfo == null) {
-            this.additionalInfo = new HashMap<>();
-        }
-
-        additionalInfo.put(key, value);
-    }
-
-    public Map<String, String> getAdditionalInfo() {
-        return this.additionalInfo;
-    }
-
-    public String getAdditionalInfo(String key) {
-        return additionalInfo.get(key);
-    }
-
-    public String getQualifiedName() {
-        return qualifiedName;
-    }
-
-    public void setQualifiedName(String qualifiedName) {
-        this.qualifiedName = qualifiedName;
-    }
-
-    public void setUrls(List<String> urls) {
-        this.urls = urls;
-    }
-
-    public List<String> getUrls() {
-        return this.urls;
-    }
-
-    @Override
-    public StringBuilder toString(StringBuilder sb) {
-        sb.append(", name=").append(name);
-        sb.append(", qualifiedName=").append(getQualifiedName());
-        sb.append(", urls=").append(urls);
-        sb.append(", additionalInfo=").append(additionalInfo);
-        sb.append("}");
-        return sb;
-    }
-}

http://git-wip-us.apache.org/repos/asf/atlas/blob/6f10481d/intg/src/main/java/org/apache/atlas/model/impexp/AtlasCluster.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasCluster.java 
b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasCluster.java
new file mode 100644
index 0000000..f70a219
--- /dev/null
+++ b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasCluster.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.model.impexp;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.atlas.model.AtlasBaseModelObject;
+import org.apache.atlas.type.AtlasType;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
+import static 
com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
+
+@JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = 
PUBLIC_ONLY, fieldVisibility = NONE)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class AtlasCluster extends AtlasBaseModelObject implements Serializable 
{
+    private static final long serialVersionUID = 1L;
+
+    public static final String KEY_REPLICATION_DETAILS = "REPL_DETAILS";
+
+    private String name;
+    private String qualifiedName;
+    private Map<String, String> additionalInfo;
+    private List<String> urls;
+
+    public AtlasCluster() {
+        urls = new ArrayList<>();
+        additionalInfo = new HashMap<>();
+    }
+
+    public AtlasCluster(String name, String qualifiedName) {
+        this();
+        this.name = name;
+        this.qualifiedName = qualifiedName;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public String getName() {
+        return this.name;
+    }
+
+    public void setAdditionalInfo(Map<String, String> additionalInfo) {
+        this.additionalInfo = additionalInfo;
+    }
+
+    public void setAdditionalInfo(String key, String value) {
+        if(additionalInfo == null) {
+            additionalInfo = new HashMap<>();
+        }
+
+        additionalInfo.put(key, value);
+    }
+
+    public void setAdditionalInfoRepl(String guid, long modifiedTimestamp) {
+        Map<String, Object> replicationDetailsMap = null;
+
+        if(additionalInfo != null && 
additionalInfo.containsKey(KEY_REPLICATION_DETAILS)) {
+            replicationDetailsMap = 
AtlasType.fromJson(getAdditionalInfo().get(KEY_REPLICATION_DETAILS), Map.class);
+        }
+
+        if(replicationDetailsMap == null) {
+            replicationDetailsMap = new HashMap<>();
+        }
+
+        if(modifiedTimestamp == 0) {
+            replicationDetailsMap.remove(guid);
+        } else {
+            replicationDetailsMap.put(guid, modifiedTimestamp);
+        }
+
+        updateReplicationMap(replicationDetailsMap);
+    }
+
+    private void updateReplicationMap(Map<String, Object> 
replicationDetailsMap) {
+        String json = AtlasType.toJson(replicationDetailsMap);
+        setAdditionalInfo(KEY_REPLICATION_DETAILS, json);
+    }
+
+
+    public Object getAdditionalInfoRepl(String guid) {
+        if(additionalInfo == null || 
!additionalInfo.containsKey(KEY_REPLICATION_DETAILS)) {
+            return null;
+        }
+
+        String key = guid;
+        String mapJson = additionalInfo.get(KEY_REPLICATION_DETAILS);
+
+        Map<String, String> replicationDetailsMap = 
AtlasType.fromJson(mapJson, Map.class);
+        if(!replicationDetailsMap.containsKey(key)) {
+            return null;
+        }
+
+        return replicationDetailsMap.get(key);
+    }
+
+    public Map<String, String> getAdditionalInfo() {
+        return this.additionalInfo;
+    }
+
+    public String getAdditionalInfo(String key) {
+        return additionalInfo.get(key);
+    }
+
+    public String getQualifiedName() {
+        return qualifiedName;
+    }
+
+    public void setQualifiedName(String qualifiedName) {
+        this.qualifiedName = qualifiedName;
+    }
+
+    public void setUrls(List<String> urls) {
+        this.urls = urls;
+    }
+
+    public List<String> getUrls() {
+        return this.urls;
+    }
+
+    @Override
+    public StringBuilder toString(StringBuilder sb) {
+        sb.append(", name=").append(name);
+        sb.append(", qualifiedName=").append(getQualifiedName());
+        sb.append(", urls=").append(urls);
+        sb.append(", additionalInfo=").append(additionalInfo);
+        sb.append("}");
+        return sb;
+    }
+}

http://git-wip-us.apache.org/repos/asf/atlas/blob/6f10481d/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportResult.java
----------------------------------------------------------------------
diff --git 
a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportResult.java 
b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportResult.java
index 14a1f65..a5203c9 100644
--- a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportResult.java
+++ b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportResult.java
@@ -173,6 +173,22 @@ public class AtlasExportResult implements Serializable {
         metrics.put(key, currentValue + incrementBy);
     }
 
+    public AtlasExportResult shallowCopy() {
+        AtlasExportResult result  = new AtlasExportResult();
+
+        result.setRequest(getRequest());
+        result.setUserName(getUserName());
+        result.setClientIpAddress(getClientIpAddress());
+        result.setHostName(getHostName());
+        result.setTimeStamp(getTimeStamp());
+        result.setMetrics(getMetrics());
+        result.setOperationStatus(getOperationStatus());
+        result.setSourceClusterName(getSourceClusterName());
+        result.setLastModifiedTimestamp(getLastModifiedTimestamp());
+
+        return result;
+    }
+
     public StringBuilder toString(StringBuilder sb) {
         if (sb == null) {
             sb = new StringBuilder();

http://git-wip-us.apache.org/repos/asf/atlas/blob/6f10481d/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportResult.java
----------------------------------------------------------------------
diff --git 
a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportResult.java 
b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportResult.java
index b97cbb3..30e93d5 100644
--- a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportResult.java
+++ b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportResult.java
@@ -56,6 +56,7 @@ public class AtlasImportResult {
     private Map<String, Integer> metrics;
     private List<String>         processedEntities;
     private OperationStatus      operationStatus;
+    private AtlasExportResult    exportResultWithoutData;
 
     public AtlasImportResult() {
         this(null, null, null, null, System.currentTimeMillis());
@@ -143,6 +144,14 @@ public class AtlasImportResult {
 
     public List<String> getProcessedEntities() { return 
this.processedEntities; }
 
+    public AtlasExportResult getExportResult() {
+        return exportResultWithoutData;
+    }
+
+    public void setExportResult(AtlasExportResult exportResult) {
+        this.exportResultWithoutData = exportResult.shallowCopy();
+    }
+
     public StringBuilder toString(StringBuilder sb) {
         if (sb == null) {
             sb = new StringBuilder();

http://git-wip-us.apache.org/repos/asf/atlas/blob/6f10481d/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java 
b/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java
index 6a3fbec..467d383 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java
@@ -22,19 +22,24 @@ import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasConstants;
 import org.apache.atlas.AtlasException;
 import org.apache.atlas.exception.AtlasBaseException;
-import org.apache.atlas.model.clusterinfo.AtlasCluster;
+import org.apache.atlas.model.impexp.AtlasCluster;
 import org.apache.atlas.model.impexp.AtlasExportRequest;
 import org.apache.atlas.model.impexp.AtlasExportResult;
 import org.apache.atlas.model.impexp.AtlasImportRequest;
 import org.apache.atlas.model.impexp.AtlasImportResult;
 import org.apache.atlas.model.impexp.ExportImportAuditEntry;
+import org.apache.atlas.model.instance.AtlasObjectId;
 import org.apache.atlas.repository.Constants;
 import org.apache.atlas.type.AtlasType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
+import org.springframework.util.CollectionUtils;
 
 import javax.inject.Inject;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
@@ -55,7 +60,9 @@ public class AuditsWriter {
         this.auditService = auditService;
     }
 
-    public void write(String userName, AtlasExportResult result, long 
startTime, long endTime, List<String> entityCreationOrder) throws 
AtlasBaseException {
+    public void write(String userName, AtlasExportResult result,
+                      long startTime, long endTime,
+                      List<String> entityCreationOrder) throws 
AtlasBaseException {
         auditForExport.add(userName, result, startTime, endTime, 
entityCreationOrder);
     }
 
@@ -67,15 +74,17 @@ public class AuditsWriter {
         return options.containsKey(replicatedKey);
     }
 
-    private void updateReplicationAttribute(boolean isReplicationSet, String 
clusterName,
+    private void updateReplicationAttribute(boolean isReplicationSet,
+                                            String clusterName,
                                             List<String> exportedGuids,
-                                            String attrNameReplicated) throws 
AtlasBaseException {
-        if (!isReplicationSet) {
+                                            String attrNameReplicated,
+                                            long lastModifiedTimestamp) throws 
AtlasBaseException {
+        if (!isReplicationSet || CollectionUtils.isEmpty(exportedGuids)) {
             return;
         }
 
-        AtlasCluster cluster = saveCluster(clusterName);
-        clusterService.updateEntityWithCluster(cluster, exportedGuids, 
attrNameReplicated);
+        AtlasCluster cluster = saveCluster(clusterName, exportedGuids.get(0), 
lastModifiedTimestamp);
+        clusterService.updateEntitiesWithCluster(cluster, exportedGuids, 
attrNameReplicated);
     }
 
     private String getClusterNameFromOptions(Map options, String key) {
@@ -84,27 +93,14 @@ public class AuditsWriter {
                 : "";
     }
 
-    private void addAuditEntry(String userName, String sourceCluster, String 
targetCluster, String operation,
-                               String result, long startTime, long endTime, 
boolean hasData) throws AtlasBaseException {
-        if(!hasData) return;
-
-        ExportImportAuditEntry entry = new ExportImportAuditEntry();
-
-        entry.setUserName(userName);
-        entry.setSourceClusterName(sourceCluster);
-        entry.setTargetClusterName(targetCluster);
-        entry.setOperation(operation);
-        entry.setResultSummary(result);
-        entry.setStartTime(startTime);
-        entry.setEndTime(endTime);
-
-        auditService.save(entry);
-        LOG.info("addAuditEntry: user: {}, source: {}, target: {}, operation: 
{}", entry.getUserName(),
-                            entry.getSourceClusterName(), 
entry.getTargetClusterName(), entry.getOperation());
+    private AtlasCluster saveCluster(String clusterName) throws 
AtlasBaseException {
+        AtlasCluster cluster = new AtlasCluster(clusterName, clusterName);
+        return clusterService.save(cluster);
     }
 
-    private AtlasCluster saveCluster(String clusterName) throws 
AtlasBaseException {
+    private AtlasCluster saveCluster(String clusterName, String entityGuid, 
long lastModifiedTimestamp) throws AtlasBaseException {
         AtlasCluster cluster = new AtlasCluster(clusterName, clusterName);
+        cluster.setAdditionalInfoRepl(entityGuid, lastModifiedTimestamp);
         return clusterService.save(cluster);
     }
 
@@ -128,22 +124,25 @@ public class AuditsWriter {
         public void add(String userName, AtlasExportResult result, long 
startTime, long endTime, List<String> entitityGuids) throws AtlasBaseException {
             optionKeyReplicatedTo = 
AtlasExportRequest.OPTION_KEY_REPLICATED_TO;
             request = result.getRequest();
-            cluster = saveCluster(getCurrentClusterName());
             replicationOptionState = 
isReplicationOptionSet(request.getOptions(), optionKeyReplicatedTo);
             targetClusterName = 
getClusterNameFromOptions(request.getOptions(), optionKeyReplicatedTo);
 
-            addAuditEntry(userName,
-                    cluster.getName(), targetClusterName,
+            cluster = saveCluster(getCurrentClusterName());
+
+            auditService.add(userName, getCurrentClusterName(), 
targetClusterName,
                     ExportImportAuditEntry.OPERATION_EXPORT,
                     AtlasType.toJson(result), startTime, endTime, 
!entitityGuids.isEmpty());
 
-            updateReplicationAttributeForExport(entitityGuids, request);
+            updateReplicationAttributeForExport(request, entitityGuids);
         }
 
-        private void updateReplicationAttributeForExport(List<String> 
entityGuids, AtlasExportRequest request) throws AtlasBaseException {
-            if(!replicationOptionState) return;
+        private void updateReplicationAttributeForExport(AtlasExportRequest 
request, List<String> entityGuids) throws AtlasBaseException {
+            if(!replicationOptionState) {
+                return;
+            }
 
-            updateReplicationAttribute(replicationOptionState, 
targetClusterName, entityGuids, Constants.ATTR_NAME_REPLICATED_TO_CLUSTER);
+            updateReplicationAttribute(replicationOptionState, 
targetClusterName,
+                    entityGuids, Constants.ATTR_NAME_REPLICATED_TO_CLUSTER, 
0L);
         }
     }
 
@@ -159,12 +158,13 @@ public class AuditsWriter {
             request = result.getRequest();
             optionKeyReplicatedFrom = 
AtlasImportRequest.OPTION_KEY_REPLICATED_FROM;
             replicationOptionState = 
isReplicationOptionSet(request.getOptions(), optionKeyReplicatedFrom);
-            cluster = saveCluster(getClusterNameFromOptionsState());
 
-            String sourceCluster = 
getClusterNameFromOptions(request.getOptions(), optionKeyReplicatedFrom);
-            addAuditEntry(userName,
-                    sourceCluster, cluster.getName(),
-                    ExportImportAuditEntry.OPERATION_EXPORT, 
AtlasType.toJson(result), startTime, endTime, !entitityGuids.isEmpty());
+            String sourceCluster = getClusterNameFromOptionsState();
+            cluster = saveCluster(sourceCluster);
+
+            auditService.add(userName,
+                    sourceCluster, getCurrentClusterName(),
+                    ExportImportAuditEntry.OPERATION_IMPORT, 
AtlasType.toJson(result), startTime, endTime, !entitityGuids.isEmpty());
 
             updateReplicationAttributeForImport(entitityGuids);
         }
@@ -173,13 +173,17 @@ public class AuditsWriter {
             if(!replicationOptionState) return;
 
             String targetClusterName = cluster.getName();
-            updateReplicationAttribute(replicationOptionState, 
targetClusterName, entityGuids, Constants.ATTR_NAME_REPLICATED_FROM_CLUSTER);
+
+            updateReplicationAttribute(replicationOptionState, 
targetClusterName,
+                    entityGuids,
+                    Constants.ATTR_NAME_REPLICATED_FROM_CLUSTER,
+                    result.getExportResult().getLastModifiedTimestamp());
         }
 
         private String getClusterNameFromOptionsState() {
             return replicationOptionState
                     ? getClusterNameFromOptions(request.getOptions(), 
optionKeyReplicatedFrom)
-                    : getCurrentClusterName();
+                    : "";
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/atlas/blob/6f10481d/repository/src/main/java/org/apache/atlas/repository/impexp/ClusterService.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/impexp/ClusterService.java
 
b/repository/src/main/java/org/apache/atlas/repository/impexp/ClusterService.java
index ab3333d..5da4b75 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/impexp/ClusterService.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/impexp/ClusterService.java
@@ -21,12 +21,18 @@ package org.apache.atlas.repository.impexp;
 import org.apache.atlas.annotation.AtlasService;
 import org.apache.atlas.annotation.GraphTransaction;
 import org.apache.atlas.exception.AtlasBaseException;
-import org.apache.atlas.model.clusterinfo.AtlasCluster;
+import org.apache.atlas.model.impexp.AtlasCluster;
 import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
 import org.apache.atlas.repository.ogm.DataAccess;
 import org.apache.atlas.repository.store.graph.AtlasEntityStore;
-import org.apache.atlas.repository.store.graph.v2.AtlasEntityStream;
+import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper;
+import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
+import org.apache.atlas.type.AtlasEntityType;
+import org.apache.atlas.type.AtlasStructType;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,21 +47,24 @@ public class ClusterService {
 
     private final DataAccess dataAccess;
     private final AtlasEntityStore entityStore;
+    private final AtlasTypeRegistry typeRegistry;
+    private final EntityGraphRetriever entityGraphRetriever;
 
     @Inject
-    public ClusterService(DataAccess dataAccess, AtlasEntityStore entityStore) 
{
+    public ClusterService(DataAccess dataAccess, AtlasEntityStore entityStore, 
AtlasTypeRegistry typeRegistry, EntityGraphRetriever entityGraphRetriever) {
         this.dataAccess = dataAccess;
         this.entityStore = entityStore;
+        this.typeRegistry = typeRegistry;
+        this.entityGraphRetriever = entityGraphRetriever;
     }
 
-    public AtlasCluster get(AtlasCluster cluster) {
+    public AtlasCluster get(AtlasCluster cluster) throws AtlasBaseException {
         try {
             return dataAccess.load(cluster);
         } catch (AtlasBaseException e) {
             LOG.error("dataAccess", e);
+            throw e;
         }
-
-        return null;
     }
 
     @GraphTransaction
@@ -68,14 +77,15 @@ public class ClusterService {
     }
 
     @GraphTransaction
-    public void updateEntityWithCluster(AtlasCluster cluster, List<String> 
guids, String attributeName) throws AtlasBaseException {
-        if(cluster != null && StringUtils.isEmpty(cluster.getGuid())) return;
+    public void updateEntitiesWithCluster(AtlasCluster cluster, List<String> 
entityGuids, String attributeName) throws AtlasBaseException {
+        if (cluster != null && StringUtils.isEmpty(cluster.getGuid())) {
+            return;
+        }
 
         AtlasObjectId objectId = getObjectId(cluster);
-        for (String guid : guids) {
+        for (String guid : entityGuids) {
             AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = 
entityStore.getById(guid);
             updateAttribute(entityWithExtInfo, attributeName, objectId);
-            entityStore.createOrUpdate(new 
AtlasEntityStream(entityWithExtInfo), true);
         }
     }
 
@@ -88,33 +98,46 @@ public class ClusterService {
      * Attribute passed by name is updated with the value passed.
      * @param entityWithExtInfo Entity to be updated
      * @param propertyName attribute name
-     * @param value Value to be set for attribute
+     * @param objectId Value to be set for attribute
      */
-    private void updateAttribute(AtlasEntity.AtlasEntityWithExtInfo 
entityWithExtInfo, String propertyName, Object value) {
+    private void updateAttribute(AtlasEntity.AtlasEntityWithExtInfo 
entityWithExtInfo,
+                                 String propertyName,
+                                 AtlasObjectId objectId) {
+        String value = EntityGraphMapper.getSoftRefFormattedValue(objectId);
         updateAttribute(entityWithExtInfo.getEntity(), propertyName, value);
         for (AtlasEntity e : entityWithExtInfo.getReferredEntities().values()) 
{
             updateAttribute(e, propertyName, value);
         }
     }
 
-    private void updateAttribute(AtlasEntity e, String propertyName, Object 
value) {
-        if(e.hasAttribute(propertyName) == false) return;
-
-        Object oVal = e.getAttribute(propertyName);
-        if (oVal != null && !(oVal instanceof List)) return;
-
-        List list;
+    private void updateAttribute(AtlasEntity entity, String attributeName, 
Object value) {
+        if(entity.hasAttribute(attributeName) == false) return;
 
-        if (oVal == null) {
-            list = new ArrayList();
-        } else {
-            list = (List) oVal;
+        try {
+            AtlasVertex vertex = 
entityGraphRetriever.getEntityVertex(entity.getGuid());
+            if(vertex == null) {
+                return;
+            }
+
+            String qualifiedFieldName = getVertexPropertyName(entity, 
attributeName);
+            List list = vertex.getListProperty(qualifiedFieldName);
+            if (CollectionUtils.isEmpty(list)) {
+                list = new ArrayList();
+            }
+
+            if (!list.contains(value)) {
+                list.add(value);
+                vertex.setListProperty(qualifiedFieldName, list);
+            }
         }
-
-        if (!list.contains(value)) {
-            list.add(value);
+        catch (AtlasBaseException ex) {
+            LOG.error("error retrieving vertex from guid: {}", 
entity.getGuid(), ex);
         }
+    }
 
-        e.setAttribute(propertyName, list);
+    private String getVertexPropertyName(AtlasEntity entity, String 
attributeName) throws AtlasBaseException {
+        AtlasEntityType type = (AtlasEntityType) 
typeRegistry.getType(entity.getTypeName());
+        AtlasStructType.AtlasAttribute attribute = 
type.getAttribute(attributeName);
+        return attribute.getVertexPropertyName();
     }
 }

http://git-wip-us.apache.org/repos/asf/atlas/blob/6f10481d/repository/src/main/java/org/apache/atlas/repository/impexp/ExportImportAuditService.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportImportAuditService.java
 
b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportImportAuditService.java
index f7e32dc..89b1110 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportImportAuditService.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportImportAuditService.java
@@ -25,14 +25,19 @@ import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.discovery.AtlasSearchResult;
 import org.apache.atlas.model.discovery.SearchParameters;
 import org.apache.atlas.model.impexp.ExportImportAuditEntry;
+import org.apache.atlas.model.instance.AtlasEntityHeader;
 import org.apache.atlas.repository.ogm.DataAccess;
 import org.apache.atlas.repository.ogm.ExportImportAuditEntryDTO;
+import org.apache.cassandra.cql3.statements.Restriction;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.util.CollectionUtils;
 
 import javax.inject.Inject;
 import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
 
 @AtlasService
 public class ExportImportAuditService {
@@ -59,17 +64,43 @@ public class ExportImportAuditService {
         return dataAccess.load(entry);
     }
 
-    public AtlasSearchResult get(String userName, String operation, String 
sourceCluster, String targetCluster,
-                                 String startTime, String endTime,
-                                 int limit, int offset) throws 
AtlasBaseException {
+    public List<ExportImportAuditEntry> get(String userName, String operation, 
String cluster,
+                                            String startTime, String endTime,
+                                            int limit, int offset) throws 
AtlasBaseException {
         SearchParameters.FilterCriteria criteria = new 
SearchParameters.FilterCriteria();
-        criteria.setCriterion(new 
ArrayList<SearchParameters.FilterCriteria>());
+        criteria.setCondition(SearchParameters.FilterCriteria.Condition.AND);
+        criteria.setCriterion(new ArrayList<>());
 
-        addSearchParameters(criteria, userName, operation, sourceCluster, 
targetCluster, startTime, endTime);
+        addSearchParameters(criteria, userName, operation, cluster, startTime, 
endTime);
 
         SearchParameters searchParameters = getSearchParameters(limit, offset, 
criteria);
+        searchParameters.setAttributes(getAuditEntityAttributes());
 
-        return discoveryService.searchWithParameters(searchParameters);
+        AtlasSearchResult result = 
discoveryService.searchWithParameters(searchParameters);
+        return toExportImportAuditEntry(result);
+    }
+
+    private Set<String> getAuditEntityAttributes() {
+        return ExportImportAuditEntryDTO.getAttributes();
+    }
+
+    private List<ExportImportAuditEntry> 
toExportImportAuditEntry(AtlasSearchResult result) {
+        List<ExportImportAuditEntry> ret = new ArrayList<>();
+        if(CollectionUtils.isEmpty(result.getEntities())) {
+            return ret;
+        }
+
+        for (AtlasEntityHeader entityHeader : result.getEntities()) {
+            ExportImportAuditEntry entry = 
ExportImportAuditEntryDTO.from(entityHeader.getGuid(),
+                                                                            
entityHeader.getAttributes());
+            if(entry == null) {
+                continue;
+            }
+
+            ret.add(entry);
+        }
+
+        return ret;
     }
 
     private SearchParameters getSearchParameters(int limit, int offset, 
SearchParameters.FilterCriteria criteria) {
@@ -78,46 +109,64 @@ public class ExportImportAuditService {
         searchParameters.setEntityFilters(criteria);
         searchParameters.setLimit(limit);
         searchParameters.setOffset(offset);
+
         return searchParameters;
     }
 
-    private void addSearchParameters(SearchParameters.FilterCriteria criteria,
-                                     String userName, String operation, String 
sourceCluster, String targetCluster,
-                                     String startTime, String endTime) {
-
+    private void addSearchParameters(SearchParameters.FilterCriteria criteria, 
String userName, String operation,
+                                     String cluster, String startTime, String 
endTime) {
         addParameterIfValueNotEmpty(criteria, 
ExportImportAuditEntryDTO.PROPERTY_USER_NAME, userName);
         addParameterIfValueNotEmpty(criteria, 
ExportImportAuditEntryDTO.PROPERTY_OPERATION, operation);
-        addParameterIfValueNotEmpty(criteria, 
ExportImportAuditEntryDTO.PROPERTY_SOURCE_CLUSTER_NAME, sourceCluster);
-        addParameterIfValueNotEmpty(criteria, 
ExportImportAuditEntryDTO.PROPERTY_TARGET_CLUSTER_NAME, targetCluster);
         addParameterIfValueNotEmpty(criteria, 
ExportImportAuditEntryDTO.PROPERTY_START_TIME, startTime);
         addParameterIfValueNotEmpty(criteria, 
ExportImportAuditEntryDTO.PROPERTY_END_TIME, endTime);
+
+        addClusterFilterCriteria(criteria, cluster);
     }
 
-    private void addParameterIfValueNotEmpty(SearchParameters.FilterCriteria 
criteria,
-                                             String attributeName, String 
value) {
-        if(StringUtils.isEmpty(value)) return;
+    private void addClusterFilterCriteria(SearchParameters.FilterCriteria 
parentCriteria, String cluster) {
+        if (StringUtils.isEmpty(cluster)) {
+            return;
+        }
 
-        boolean isFirstCriteria = criteria.getAttributeName() == null;
-        SearchParameters.FilterCriteria cx = isFirstCriteria
-                                                ? criteria
-                                                : new 
SearchParameters.FilterCriteria();
+        SearchParameters.FilterCriteria criteria = new 
SearchParameters.FilterCriteria();
+        criteria.setCondition(SearchParameters.FilterCriteria.Condition.OR);
+        criteria.setCriterion(new ArrayList<>());
 
-        setCriteria(cx, attributeName, value);
+        addParameterIfValueNotEmpty(criteria, 
ExportImportAuditEntryDTO.PROPERTY_SOURCE_CLUSTER_NAME, cluster);
+        addParameterIfValueNotEmpty(criteria, 
ExportImportAuditEntryDTO.PROPERTY_TARGET_CLUSTER_NAME, cluster);
 
-        if(isFirstCriteria) {
-            cx.setCondition(SearchParameters.FilterCriteria.Condition.AND);
-        }
+        parentCriteria.getCriterion().add(criteria);
+    }
 
-        if(!isFirstCriteria) {
-            criteria.getCriterion().add(cx);
+    private void addParameterIfValueNotEmpty(SearchParameters.FilterCriteria 
criteria, String attributeName, String value) {
+        if(StringUtils.isEmpty(value)) {
+            return;
         }
+
+        SearchParameters.FilterCriteria filterCriteria = new 
SearchParameters.FilterCriteria();
+        filterCriteria.setAttributeName(attributeName);
+        filterCriteria.setAttributeValue(value);
+        filterCriteria.setOperator(SearchParameters.Operator.EQ);
+
+        criteria.getCriterion().add(filterCriteria);
     }
 
-    private SearchParameters.FilterCriteria 
setCriteria(SearchParameters.FilterCriteria criteria, String attributeName, 
String value) {
-        criteria.setAttributeName(attributeName);
-        criteria.setAttributeValue(value);
-        criteria.setOperator(SearchParameters.Operator.EQ);
+    public void add(String userName, String sourceCluster, String 
targetCluster, String operation,
+                               String result, long startTime, long endTime, 
boolean hasData) throws AtlasBaseException {
+        if(!hasData) return;
+
+        ExportImportAuditEntry entry = new ExportImportAuditEntry();
+
+        entry.setUserName(userName);
+        entry.setSourceClusterName(sourceCluster);
+        entry.setTargetClusterName(targetCluster);
+        entry.setOperation(operation);
+        entry.setResultSummary(result);
+        entry.setStartTime(startTime);
+        entry.setEndTime(endTime);
 
-        return criteria;
+        save(entry);
+        LOG.info("addAuditEntry: user: {}, source: {}, target: {}, operation: 
{}", entry.getUserName(),
+                entry.getSourceClusterName(), entry.getTargetClusterName(), 
entry.getOperation());
     }
 }

http://git-wip-us.apache.org/repos/asf/atlas/blob/6f10481d/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
 
b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
index 30dd8c1..97c2123 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
@@ -125,11 +125,11 @@ public class ExportService {
         
context.result.getData().getEntityCreationOrder().addAll(context.lineageProcessed);
         
context.sink.setExportOrder(context.result.getData().getEntityCreationOrder());
         context.sink.setTypesDef(context.result.getData().getTypesDef());
-        auditsWriter.write(userName, context.result, startTime, endTime, 
context.result.getData().getEntityCreationOrder());
-        clearContextData(context);
+        
context.result.setLastModifiedTimestamp(context.newestLastModifiedTimestamp);
         context.result.setOperationStatus(getOverallOperationStatus(statuses));
         context.result.incrementMeticsCounter("duration", duration);
-        
context.result.setLastModifiedTimestamp(context.newestLastModifiedTimestamp);
+        auditsWriter.write(userName, context.result, startTime, endTime, 
context.result.getData().getEntityCreationOrder());
+        clearContextData(context);
         context.sink.setResult(context.result);
     }
 
@@ -194,9 +194,7 @@ public class ExportService {
     }
 
     private AtlasExportResult.OperationStatus processObjectId(AtlasObjectId 
item, ExportContext context) {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("==> processObjectId({})", item);
-        }
+        debugLog("==> processObjectId({})", item);
 
         try {
             List<String> entityGuids = getStartingEntity(item, context);
@@ -225,11 +223,16 @@ public class ExportService {
             return AtlasExportResult.OperationStatus.FAIL;
         }
 
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("<== processObjectId({})", item);
+        debugLog("<== processObjectId({})", item);
+        return AtlasExportResult.OperationStatus.SUCCESS;
+    }
+
+    private void debugLog(String s, Object... params) {
+        if (!LOG.isDebugEnabled()) {
+            return;
         }
 
-        return AtlasExportResult.OperationStatus.SUCCESS;
+        LOG.debug(s, params);
     }
 
     private List<String> getStartingEntity(AtlasObjectId item, ExportContext 
context) throws AtlasBaseException {
@@ -330,9 +333,7 @@ public class ExportService {
     }
 
     private void processEntity(String guid, ExportContext context) throws 
AtlasBaseException {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("==> processEntity({})", guid);
-        }
+        debugLog("==> processEntity({})", guid);
 
         if (!context.guidsProcessed.contains(guid)) {
             TraversalDirection      direction         = 
context.guidDirection.get(guid);
@@ -358,9 +359,7 @@ public class ExportService {
             }
         }
 
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("<== processEntity({})", guid);
-        }
+        debugLog("<== processEntity({})", guid);
     }
 
     private void getConntedEntitiesBasedOnOption(AtlasEntity entity, 
ExportContext context, TraversalDirection direction) throws AtlasBaseException {
@@ -403,8 +402,8 @@ public class ExportService {
         for (TraversalDirection direction : directions) {
             String query = getQueryForTraversalDirection(direction);
 
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("==> getConnectedEntityGuids({}): guidsToProcess {} 
query {}", AtlasTypeUtil.getAtlasObjectId(entity), 
context.guidsToProcess.size(), query);
+            if(LOG.isDebugEnabled()) {
+                debugLog("==> getConnectedEntityGuids({}): guidsToProcess {} 
query {}", AtlasTypeUtil.getAtlasObjectId(entity), 
context.guidsToProcess.size(), query);
             }
 
             context.bindings.clear();
@@ -433,8 +432,8 @@ public class ExportService {
                 }
             }
 
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("<== getConnectedEntityGuids({}): found {} guids; 
guidsToProcess {}", entity.getGuid(), result.size(), 
context.guidsToProcess.size());
+            if(LOG.isDebugEnabled()) {
+                debugLog("<== getConnectedEntityGuids({}): found {} guids; 
guidsToProcess {}", entity.getGuid(), result.size(), 
context.guidsToProcess.size());
             }
         }
     }
@@ -451,8 +450,8 @@ public class ExportService {
     }
 
     private void getEntityGuidsForFullFetch(AtlasEntity entity, ExportContext 
context) {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("==> getEntityGuidsForFullFetch({}): guidsToProcess {}", 
AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size());
+        if(LOG.isDebugEnabled()) {
+            debugLog("==> getEntityGuidsForFullFetch({}): guidsToProcess {}", 
AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size());
         }
 
         String query = 
this.gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_BY_GUID_FULL);
@@ -477,8 +476,9 @@ public class ExportService {
             }
         }
 
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("<== getEntityGuidsForFullFetch({}): found {} guids; 
guidsToProcess {}", entity.getGuid(), result.size(), 
context.guidsToProcess.size());
+        if(LOG.isDebugEnabled()) {
+            debugLog("<== getEntityGuidsForFullFetch({}): found {} guids; 
guidsToProcess {}",
+                                            entity.getGuid(), result.size(), 
context.guidsToProcess.size());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/atlas/blob/6f10481d/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
 
b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
index 98ef389..8a184fa 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
@@ -26,6 +26,7 @@ import org.apache.atlas.model.typedef.AtlasTypesDef;
 import org.apache.atlas.repository.store.graph.BulkImporter;
 import org.apache.atlas.store.AtlasTypeDefStore;
 import org.apache.atlas.type.AtlasEntityType;
+import org.apache.atlas.type.AtlasType;
 import org.apache.atlas.type.AtlasTypeRegistry;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.io.FileUtils;
@@ -111,6 +112,16 @@ public class ImportService {
 
         updateTransformsWithSubTypes(importTransform);
         source.setImportTransform(importTransform);
+
+        if(LOG.isDebugEnabled()) {
+            debugLog("   => transforms: {}", 
AtlasType.toJson(importTransform));
+        }
+    }
+
+    private void debugLog(String s, Object... params) {
+        if(!LOG.isDebugEnabled()) return;
+
+        LOG.debug(s, params);
     }
 
     private void updateTransformsWithSubTypes(ImportTransforms 
importTransforms) throws AtlasBaseException {
@@ -189,6 +200,7 @@ public class ImportService {
 
         endTimestamp = System.currentTimeMillis();
         result.incrementMeticsCounter("duration", 
getDuration(this.endTimestamp, this.startTimestamp));
+        result.setExportResult(importSource.getExportResult());
         auditsWriter.write(userName, result, startTimestamp, endTimestamp, 
importSource.getCreationOrder());
     }
 

http://git-wip-us.apache.org/repos/asf/atlas/blob/6f10481d/repository/src/main/java/org/apache/atlas/repository/impexp/ImportTransformer.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportTransformer.java
 
b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportTransformer.java
index 1b9305c..d68938b 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportTransformer.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportTransformer.java
@@ -19,12 +19,25 @@ package org.apache.atlas.repository.impexp;
 
 import org.apache.atlas.AtlasErrorCode;
 import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.instance.AtlasClassification;
+import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.commons.lang.StringUtils;
 
+import java.util.ArrayList;
+import java.util.List;
+
 
 public abstract class ImportTransformer {
     private static final String TRANSFORMER_PARAMETER_SEPARATOR = "\\:";
 
+    private static final String TRANSFORMER_NAME_ADD = "add";
+    private static final String TRANSFORMER_NAME_CLEAR_ATTR = "clearAttrValue";
+    private static final String TRANSFORMER_NAME_LOWERCASE = "lowercase";
+    private static final String TRANSFORMER_NAME_UPPERCASE = "uppercase";
+    private static final String TRANSFORMER_NAME_REMOVE_CLASSIFICATION = 
"removeClassification";
+    private static final String TRANSFORMER_NAME_REPLACE = "replace";
+    private static final String TRANSFORMER_SET_DELETED = "setDeleted";
+
     private final String transformType;
 
 
@@ -36,15 +49,26 @@ public abstract class ImportTransformer {
 
         if (StringUtils.isEmpty(key)) {
             throw new AtlasBaseException(AtlasErrorCode.INVALID_VALUE, "Error 
creating ImportTransformer. Invalid transformer-specification: {}.", 
transformerSpec);
-        } else if (key.equals("replace")) {
+        } else if (key.equals(TRANSFORMER_NAME_REPLACE)) {
             String toFindStr  = (params == null || params.length < 2) ? "" : 
params[1];
             String replaceStr = (params == null || params.length < 3) ? "" : 
params[2];
 
             ret = new Replace(toFindStr, replaceStr);
-        } else if (key.equals("lowercase")) {
+        } else if (key.equals(TRANSFORMER_NAME_LOWERCASE)) {
             ret = new Lowercase();
-        } else if (key.equals("uppercase")) {
+        } else if (key.equals(TRANSFORMER_NAME_UPPERCASE)) {
             ret = new Uppercase();
+        } else if (key.equals(TRANSFORMER_NAME_REMOVE_CLASSIFICATION)) {
+            String name = (params == null || params.length < 1) ? "" : 
StringUtils.join(params, ":", 1, params.length);
+            ret = new RemoveClassification(name);
+        } else if (key.equals(TRANSFORMER_NAME_ADD)) {
+            String name = (params == null || params.length < 1) ? "" : 
StringUtils.join(params, ":", 1, params.length);
+            ret = new AddValueToAttribute(name);
+        } else if (key.equals(TRANSFORMER_NAME_CLEAR_ATTR)) {
+            String name = (params == null || params.length < 1) ? "" : 
StringUtils.join(params, ":", 1, params.length);
+            ret = new ClearAttributes(name);
+        } else if (key.equals(TRANSFORMER_SET_DELETED)) {
+            ret = new SetDeleted();
         } else {
             throw new AtlasBaseException(AtlasErrorCode.INVALID_VALUE, "Error 
creating ImportTransformer. Unknown transformer: {}.", transformerSpec);
         }
@@ -66,7 +90,7 @@ public abstract class ImportTransformer {
         private final String replaceStr;
 
         public Replace(String toFindStr, String replaceStr) {
-            super("replace");
+            super(TRANSFORMER_NAME_REPLACE);
 
             this.toFindStr  = toFindStr;
             this.replaceStr = replaceStr;
@@ -77,7 +101,7 @@ public abstract class ImportTransformer {
         public String getReplaceStr() { return replaceStr; }
 
         @Override
-        public Object apply(Object o) throws AtlasBaseException {
+        public Object apply(Object o) {
             Object ret = o;
 
             if(o instanceof String) {
@@ -90,7 +114,7 @@ public abstract class ImportTransformer {
 
     static class Lowercase extends ImportTransformer {
         public Lowercase() {
-            super("lowercase");
+            super(TRANSFORMER_NAME_LOWERCASE);
         }
 
         @Override
@@ -107,7 +131,7 @@ public abstract class ImportTransformer {
 
     static class Uppercase extends ImportTransformer {
         public Uppercase() {
-            super("uppercase");
+            super(TRANSFORMER_NAME_UPPERCASE);
         }
 
         @Override
@@ -121,4 +145,175 @@ public abstract class ImportTransformer {
             return ret;
         }
     }
+
+    static class RemoveClassification extends ImportTransformer {
+        private final String classificationToBeRemoved;
+
+        public RemoveClassification(String name) {
+            super(TRANSFORMER_NAME_REMOVE_CLASSIFICATION);
+
+            this.classificationToBeRemoved = name;
+        }
+
+        @Override
+        public Object apply(Object o) {
+            if (!(o instanceof AtlasEntity)) {
+                return o;
+            }
+
+            AtlasEntity entity = (AtlasEntity) o;
+            if(entity.getClassifications().size() == 0) {
+                return o;
+            }
+
+            List<AtlasClassification> toRemove = null;
+            for (AtlasClassification classification : 
entity.getClassifications()) {
+                if 
(classification.getTypeName().equals(classificationToBeRemoved)) {
+                    if (toRemove == null) {
+                        toRemove = new ArrayList<AtlasClassification>();
+                    }
+
+
+                    toRemove.add(classification);
+
+                }
+            }
+
+            if (toRemove != null) {
+                entity.getClassifications().removeAll(toRemove);
+            }
+
+            return entity;
+        }
+
+        @Override
+        public String toString() {
+            return String.format("%s=%s", "RemoveClassification", 
classificationToBeRemoved);
+        }
+    }
+
+    static class AddValueToAttribute extends ImportTransformer {
+        private final String nameValuePair;
+        private String attrName;
+        private String attrValueRaw;
+        private Object attrValue;
+
+        protected AddValueToAttribute(String nameValuePair) {
+            super(TRANSFORMER_NAME_ADD);
+
+            this.nameValuePair = nameValuePair;
+            setAttrNameValue(this.nameValuePair);
+        }
+
+        private void setAttrNameValue(String nameValuePair) {
+            String SEPARATOR_EQUALS = "=";
+            if(!nameValuePair.contains(SEPARATOR_EQUALS)) return;
+
+            String splits[] = StringUtils.split(nameValuePair, 
SEPARATOR_EQUALS);
+            if(splits.length == 0) {
+                return;
+            }
+
+            if(splits.length >= 1) {
+                attrName = splits[0];
+            }
+
+            if(splits.length >= 1) {
+                attrValueRaw = splits[1];
+            }
+
+            setAttrValue(attrValueRaw);
+        }
+
+        private void setAttrValue(String attrValueRaw) {
+            final String type_prefix = "list:";
+
+            if(attrValueRaw.startsWith(type_prefix)) {
+                final String item = StringUtils.remove(attrValueRaw, 
type_prefix);
+                attrValue = new ArrayList<String>() {{
+                    add(item);
+                }};
+            } else {
+                attrValue = attrValueRaw;
+            }
+        }
+
+        @Override
+        public Object apply(Object o) {
+            if(o == null) {
+                return o;
+            }
+
+            if(!(o instanceof AtlasEntity)) {
+                return o;
+            }
+
+            AtlasEntity entity = (AtlasEntity) o;
+            Object attrExistingValue = entity.getAttribute(attrName);
+            if(attrExistingValue == null) {
+                entity.setAttribute(attrName, attrValue);
+            } else if(attrExistingValue instanceof List) {
+                List list = (List) attrExistingValue;
+
+                if(attrValue instanceof List) {
+                    list.addAll((List) attrValue);
+                } else {
+                    list.add(attrValue);
+                }
+            } else {
+                entity.setAttribute(attrName, attrValueRaw);
+            }
+
+            return entity;
+        }
+    }
+
+    static class ClearAttributes extends ImportTransformer {
+        private String[] attrNames;
+
+        protected ClearAttributes(String attrNames) {
+            super(TRANSFORMER_NAME_CLEAR_ATTR);
+
+            this.attrNames = StringUtils.split(attrNames, ",");
+        }
+
+        @Override
+        public Object apply(Object o) {
+            if (o == null) {
+                return o;
+            }
+
+            if (!(o instanceof AtlasEntity)) {
+                return o;
+            }
+
+            AtlasEntity entity = (AtlasEntity) o;
+            for (String attrName : attrNames) {
+                entity.setAttribute(attrName, null);
+            }
+
+            return entity;
+        }
+    }
+
+    static class SetDeleted extends ImportTransformer {
+        protected SetDeleted() {
+            super(TRANSFORMER_SET_DELETED);
+        }
+
+        @Override
+        public Object apply(Object o) {
+            if (o == null) {
+                return o;
+            }
+
+            if (!(o instanceof AtlasEntity)) {
+                return o;
+            }
+
+            AtlasEntity entity = (AtlasEntity) o;
+            entity.setStatus(AtlasEntity.Status.DELETED);
+            return entity;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/atlas/blob/6f10481d/repository/src/main/java/org/apache/atlas/repository/ogm/AtlasClusterDTO.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/ogm/AtlasClusterDTO.java 
b/repository/src/main/java/org/apache/atlas/repository/ogm/AtlasClusterDTO.java
index 8a89884..3427bd6 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/ogm/AtlasClusterDTO.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/ogm/AtlasClusterDTO.java
@@ -18,7 +18,7 @@
 
 package org.apache.atlas.repository.ogm;
 
-import org.apache.atlas.model.clusterinfo.AtlasCluster;
+import org.apache.atlas.model.impexp.AtlasCluster;
 import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.type.AtlasTypeRegistry;
 import org.springframework.stereotype.Component;
@@ -63,6 +63,7 @@ public class AtlasClusterDTO extends 
AbstractDataTransferObject<AtlasCluster> {
         entity.setAttribute(PROPERTY_CLUSTER_NAME, obj.getName());
         entity.setAttribute(PROPERTY_QUALIFIED_NAME, obj.getQualifiedName());
         entity.setAttribute(PROPERTY_ADDITIONAL_INFO, obj.getAdditionalInfo());
+        entity.setAttribute(PROPERTY_URLS, obj.getUrls());
 
         return entity;
     }

http://git-wip-us.apache.org/repos/asf/atlas/blob/6f10481d/repository/src/main/java/org/apache/atlas/repository/ogm/DataAccess.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/ogm/DataAccess.java 
b/repository/src/main/java/org/apache/atlas/repository/ogm/DataAccess.java
index bef7d05..f2df179 100644
--- a/repository/src/main/java/org/apache/atlas/repository/ogm/DataAccess.java
+++ b/repository/src/main/java/org/apache/atlas/repository/ogm/DataAccess.java
@@ -189,6 +189,26 @@ public class DataAccess {
 
     }
 
+    public <T extends AtlasBaseModelObject> T load(String guid, Class<? 
extends AtlasBaseModelObject> clazz) throws AtlasBaseException {
+        DataTransferObject<T>  dto = 
(DataTransferObject<T>)dtoRegistry.get(clazz);
+
+        AtlasEntityWithExtInfo entityWithExtInfo = null;
+
+        if (StringUtils.isNotEmpty(guid)) {
+            entityWithExtInfo = entityStore.getById(guid);
+        }
+
+        if(entityWithExtInfo == null) {
+            return null;
+        }
+
+        return dto.from(entityWithExtInfo);
+    }
+
+    public void deleteUsingGuid(String guid) throws AtlasBaseException {
+        entityStore.deleteById(guid);
+    }
+
     public void delete(String guid) throws AtlasBaseException {
         Objects.requireNonNull(guid, "guid");
         AtlasPerfTracer perf = null;

http://git-wip-us.apache.org/repos/asf/atlas/blob/6f10481d/repository/src/main/java/org/apache/atlas/repository/ogm/ExportImportAuditEntryDTO.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/ogm/ExportImportAuditEntryDTO.java
 
b/repository/src/main/java/org/apache/atlas/repository/ogm/ExportImportAuditEntryDTO.java
index c22d41f..fd19c80 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/ogm/ExportImportAuditEntryDTO.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/ogm/ExportImportAuditEntryDTO.java
@@ -19,14 +19,17 @@
 package org.apache.atlas.repository.ogm;
 
 import org.apache.atlas.exception.AtlasBaseException;
-import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.model.impexp.ExportImportAuditEntry;
+import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.repository.Constants;
 import org.apache.atlas.type.AtlasTypeRegistry;
 import org.springframework.stereotype.Component;
 
+import java.util.Arrays;
+import java.util.HashSet;
 import javax.inject.Inject;
 import java.util.Map;
+import java.util.Set;
 
 @Component
 public class ExportImportAuditEntryDTO extends 
AbstractDataTransferObject<ExportImportAuditEntry> {
@@ -40,30 +43,44 @@ public class ExportImportAuditEntryDTO extends 
AbstractDataTransferObject<Export
     public static final String PROPERTY_SOURCE_CLUSTER_NAME    = 
"sourceClusterName";
     public static final String PROPERTY_TARGET_CLUSTER_NAME    = 
"targetClusterName";
 
+    private static final Set<String> ATTRIBUTE_NAMES = new 
HashSet<>(Arrays.asList(PROPERTY_USER_NAME,
+            PROPERTY_OPERATION, PROPERTY_OPERATION_PARAMS,
+            PROPERTY_START_TIME, PROPERTY_END_TIME,
+            PROPERTY_RESULT_SUMMARY,
+            PROPERTY_SOURCE_CLUSTER_NAME, PROPERTY_TARGET_CLUSTER_NAME));
+
     @Inject
     public ExportImportAuditEntryDTO(AtlasTypeRegistry typeRegistry) {
         super(typeRegistry, ExportImportAuditEntry.class,
                 Constants.INTERNAL_PROPERTY_KEY_PREFIX + 
ExportImportAuditEntry.class.getSimpleName());
     }
 
-    @Override
-    public ExportImportAuditEntry from(AtlasEntity entity) {
+    public static Set<String> getAttributes() {
+        return ATTRIBUTE_NAMES;
+    }
+
+    public static ExportImportAuditEntry from(String guid, Map<String,Object> 
attributes) {
         ExportImportAuditEntry entry = new ExportImportAuditEntry();
 
-        setGuid(entry, entity);
-        entry.setUserName((String) entity.getAttribute(PROPERTY_USER_NAME));
-        entry.setOperation((String) entity.getAttribute(PROPERTY_OPERATION));
-        entry.setOperationParams((String) 
entity.getAttribute(PROPERTY_OPERATION_PARAMS));
-        entry.setStartTime((long) entity.getAttribute(PROPERTY_START_TIME));
-        entry.setEndTime((long) entity.getAttribute(PROPERTY_END_TIME));
-        entry.setSourceClusterName((String) 
entity.getAttribute(PROPERTY_SOURCE_CLUSTER_NAME));
-        entry.setTargetClusterName((String) 
entity.getAttribute(PROPERTY_TARGET_CLUSTER_NAME));
-        entry.setResultSummary((String) 
entity.getAttribute(PROPERTY_RESULT_SUMMARY));
+        entry.setGuid(guid);
+        entry.setUserName((String) attributes.get(PROPERTY_USER_NAME));
+        entry.setOperation((String) attributes.get(PROPERTY_OPERATION));
+        entry.setOperationParams((String) 
attributes.get(PROPERTY_OPERATION_PARAMS));
+        entry.setStartTime((long) attributes.get(PROPERTY_START_TIME));
+        entry.setEndTime((long) attributes.get(PROPERTY_END_TIME));
+        entry.setSourceClusterName((String) 
attributes.get(PROPERTY_SOURCE_CLUSTER_NAME));
+        entry.setTargetClusterName((String) 
attributes.get(PROPERTY_TARGET_CLUSTER_NAME));
+        entry.setResultSummary((String) 
attributes.get(PROPERTY_RESULT_SUMMARY));
 
         return entry;
     }
 
     @Override
+    public ExportImportAuditEntry from(AtlasEntity entity) {
+        return from(entity.getGuid(), entity.getAttributes());
+    }
+
+    @Override
     public ExportImportAuditEntry from(AtlasEntity.AtlasEntityWithExtInfo 
entityWithExtInfo) {
         return from(entityWithExtInfo.getEntity());
     }

http://git-wip-us.apache.org/repos/asf/atlas/blob/6f10481d/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
index f692ac8..a5246c6 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
@@ -1943,4 +1943,12 @@ public class EntityGraphMapper {
 
         type.getNormalizedValueForUpdate(classification);
     }
+
+    public static String getSoftRefFormattedValue(AtlasObjectId objectId) {
+        return getSoftRefFormattedString(objectId.getTypeName(), 
objectId.getGuid());
+    }
+
+    private static String getSoftRefFormattedString(String typeName, String 
resolvedGuid) {
+        return String.format(SOFT_REF_FORMAT, typeName, resolvedGuid);
+    }
 }

http://git-wip-us.apache.org/repos/asf/atlas/blob/6f10481d/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java
index 2a385e3..d9be2f7 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java
@@ -18,6 +18,7 @@
 package org.apache.atlas.repository.store.graph.v2;
 
 import com.fasterxml.jackson.core.type.TypeReference;
+import jnr.ffi.annotations.In;
 import org.apache.atlas.AtlasErrorCode;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.TimeBoundary;
@@ -58,7 +59,9 @@ import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
 
+import javax.inject.Inject;
 import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.util.ArrayList;
@@ -92,7 +95,7 @@ import static 
org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelation
 import static 
org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.IN;
 import static 
org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.OUT;
 
-
+@Component
 public final class EntityGraphRetriever {
     private static final Logger LOG = 
LoggerFactory.getLogger(EntityGraphRetriever.class);
 
@@ -116,6 +119,7 @@ public final class EntityGraphRetriever {
 
     private final boolean ignoreRelationshipAttr;
 
+    @Inject
     public EntityGraphRetriever(AtlasTypeRegistry typeRegistry) {
         this(typeRegistry, false);
     }

http://git-wip-us.apache.org/repos/asf/atlas/blob/6f10481d/repository/src/test/java/org/apache/atlas/repository/impexp/ClusterServiceTest.java
----------------------------------------------------------------------
diff --git 
a/repository/src/test/java/org/apache/atlas/repository/impexp/ClusterServiceTest.java
 
b/repository/src/test/java/org/apache/atlas/repository/impexp/ClusterServiceTest.java
index c931e74..2e4481e 100644
--- 
a/repository/src/test/java/org/apache/atlas/repository/impexp/ClusterServiceTest.java
+++ 
b/repository/src/test/java/org/apache/atlas/repository/impexp/ClusterServiceTest.java
@@ -20,11 +20,10 @@ package org.apache.atlas.repository.impexp;
 
 import org.apache.atlas.TestModules;
 import org.apache.atlas.exception.AtlasBaseException;
-import org.apache.atlas.model.clusterinfo.AtlasCluster;
-import org.apache.atlas.repository.store.graph.AtlasEntityStore;
-import org.apache.atlas.repository.impexp.ClusterService;
+import org.apache.atlas.model.impexp.AtlasCluster;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.repository.Constants;
 import org.apache.atlas.store.AtlasTypeDefStore;
-import org.apache.atlas.type.AtlasType;
 import org.apache.atlas.type.AtlasTypeRegistry;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Guice;
@@ -35,18 +34,20 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 
+import static org.apache.atlas.repository.Constants.ATTR_NAME_REFERENCEABLE;
 import static 
org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadBaseModel;
 import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertTrue;
 
 @Guice(modules = TestModules.TestOnlyModule.class)
 public class ClusterServiceTest {
     private final String TOP_LEVEL_ENTITY_NAME = "db1@cl1";
     private final String CLUSTER_NAME          = "testCl1";
     private final String TARGET_CLUSTER_NAME   = "testCl2";
+    private final String QUALIFIED_NAME_STOCKS = "stocks@cl1";
+    private final String TYPE_HIVE_DB          = "hive_db";
+    private final String topLevelEntityGuid    = "AAA-BBB-CCC";
 
     @Inject
     private AtlasTypeDefStore typeDefStore;
@@ -64,8 +65,8 @@ public class ClusterServiceTest {
 
     @Test
     public void saveAndRetrieveClusterInfo() throws AtlasBaseException {
-        AtlasCluster expected = getCluster(CLUSTER_NAME, 
TOP_LEVEL_ENTITY_NAME, "EXPORT", 0l, TARGET_CLUSTER_NAME);
-        AtlasCluster expected2 = getCluster(TARGET_CLUSTER_NAME, 
TOP_LEVEL_ENTITY_NAME, "IMPORT", 0L, TARGET_CLUSTER_NAME);
+        AtlasCluster expected = getCluster(CLUSTER_NAME + "_1", 
TOP_LEVEL_ENTITY_NAME, "EXPORT", 0l, TARGET_CLUSTER_NAME);
+        AtlasCluster expected2 = getCluster(TARGET_CLUSTER_NAME + "_1", 
TOP_LEVEL_ENTITY_NAME, "IMPORT", 0L, TARGET_CLUSTER_NAME);
         AtlasCluster expected3 = getCluster(TARGET_CLUSTER_NAME + "_3", 
TOP_LEVEL_ENTITY_NAME, "IMPORT", 0, TARGET_CLUSTER_NAME);
 
         AtlasCluster actual = clusterService.save(expected);
@@ -83,36 +84,38 @@ public class ClusterServiceTest {
 
         assertEquals(actual.getName(), expected.getName());
         assertEquals(actual.getQualifiedName(), expected.getQualifiedName());
-        assertEquals(getAdditionalInfo(actual, 
TOP_LEVEL_ENTITY_NAME).get(AtlasCluster.OPERATION),
-                    getAdditionalInfo(expected, 
TOP_LEVEL_ENTITY_NAME).get(AtlasCluster.OPERATION));
-
-        assertEquals(getAdditionalInfo(actual, 
TOP_LEVEL_ENTITY_NAME).get(AtlasCluster.NEXT_MODIFIED_TIMESTAMP),
-                    getAdditionalInfo(expected, 
TOP_LEVEL_ENTITY_NAME).get(AtlasCluster.NEXT_MODIFIED_TIMESTAMP));
     }
 
-    private AtlasCluster getCluster(String name, String topLevelEntity, String 
operation, long nextModifiedTimestamp, String targetClusterName) {
-        AtlasCluster cluster = new AtlasCluster(name, name);
+    private AtlasCluster getCluster(String clusterName, String topLevelEntity, 
String operation, long nextModifiedTimestamp, String targetClusterName) {
+        AtlasCluster cluster = new AtlasCluster(clusterName, clusterName);
+
+        Map<String, String> syncMap = new HashMap<>();
 
-        Map<String, Object> syncMap = new HashMap<>();
+        syncMap.put("topLevelEntity", topLevelEntity);
         syncMap.put("operation", operation);
-        syncMap.put("nextModifiedTimestamp", nextModifiedTimestamp);
+        syncMap.put("nextModifiedTimestamp", 
Long.toString(nextModifiedTimestamp));
         syncMap.put("targetCluster", targetClusterName);
 
-        String syncMapJson = AtlasType.toJson(syncMap);
-        String topLevelEntitySpecificKey = 
getTopLevelEntitySpecificKey(topLevelEntity);
-        cluster.setAdditionalInfo(topLevelEntitySpecificKey, syncMapJson);
+        cluster.setAdditionalInfo(syncMap);
+
         return cluster;
     }
 
-    private Map<String, Object> getAdditionalInfo(AtlasCluster cluster, String 
topLevelEntityName) {
-        String topLevelEntitySpecificKey = 
getTopLevelEntitySpecificKey(topLevelEntityName);
-        
assertTrue(cluster.getAdditionalInfo().containsKey(topLevelEntitySpecificKey));
+    @Test
+    public void verifyAdditionalInfo() throws AtlasBaseException {
+        final long expectedLastModifiedTimestamp = 200L;
+
+        AtlasCluster expectedCluster = new AtlasCluster(CLUSTER_NAME, 
CLUSTER_NAME);
 
-        String json = cluster.getAdditionalInfo(topLevelEntitySpecificKey);
-        return AtlasType.fromJson(json, Map.class);
-    }
+        String qualifiedNameAttr = 
Constants.QUALIFIED_NAME.replace(ATTR_NAME_REFERENCEABLE, "");
+        AtlasObjectId objectId = new AtlasObjectId(TYPE_HIVE_DB, 
qualifiedNameAttr, QUALIFIED_NAME_STOCKS);
+        expectedCluster.setAdditionalInfoRepl(topLevelEntityGuid, 
expectedLastModifiedTimestamp);
+
+        AtlasCluster actualCluster = clusterService.save(expectedCluster);
+        assertEquals(actualCluster.getName(), expectedCluster.getName());
+
+        int actualModifiedTimestamp = (int) 
actualCluster.getAdditionalInfoRepl(topLevelEntityGuid);
 
-    private String getTopLevelEntitySpecificKey(String topLevelEntity) {
-        return String.format("%s:%s", AtlasCluster.SYNC_INFO_KEY, 
topLevelEntity);
+        assertEquals(actualModifiedTimestamp, expectedLastModifiedTimestamp);
     }
 }

http://git-wip-us.apache.org/repos/asf/atlas/blob/6f10481d/repository/src/test/java/org/apache/atlas/repository/impexp/ExportImportAuditServiceTest.java
----------------------------------------------------------------------
diff --git 
a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportImportAuditServiceTest.java
 
b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportImportAuditServiceTest.java
index e019feb..13277a3 100644
--- 
a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportImportAuditServiceTest.java
+++ 
b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportImportAuditServiceTest.java
@@ -20,7 +20,6 @@ package org.apache.atlas.repository.impexp;
 
 import org.apache.atlas.TestModules;
 import org.apache.atlas.exception.AtlasBaseException;
-import org.apache.atlas.model.discovery.AtlasSearchResult;
 import org.apache.atlas.model.impexp.ExportImportAuditEntry;
 import org.apache.atlas.store.AtlasTypeDefStore;
 import org.apache.atlas.type.AtlasType;
@@ -31,14 +30,16 @@ import org.testng.annotations.Test;
 
 import javax.inject.Inject;
 import java.io.IOException;
+import java.util.List;
 
 import static 
org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadBaseModel;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
 
 @Guice(modules = TestModules.TestOnlyModule.class)
-public class ExportImportAuditServiceTest {
+public class ExportImportAuditServiceTest extends ExportImportTestBase {
     @Inject
     AtlasTypeRegistry typeRegistry;
 
@@ -69,7 +70,7 @@ public class ExportImportAuditServiceTest {
         String target2 = "clx1";
         ExportImportAuditEntry entry2 = saveAndGet(source2, 
ExportImportAuditEntry.OPERATION_EXPORT, target2);
 
-        Thread.sleep(1000);
+        pauseForIndexCreation();
         ExportImportAuditEntry actualEntry = retrieveEntry(entry);
         ExportImportAuditEntry actualEntry2 = retrieveEntry(entry2);
 
@@ -80,7 +81,7 @@ public class ExportImportAuditServiceTest {
         assertEquals(actualEntry.getOperation(), entry.getOperation());
     }
 
-    @Test(enabled = false)
+    @Test
     public void numberOfSavedEntries_Retrieved() throws AtlasBaseException, 
InterruptedException {
         final String source1 = "cluster1";
         final String target1 = "cly";
@@ -90,19 +91,20 @@ public class ExportImportAuditServiceTest {
             saveAndGet(source1, ExportImportAuditEntry.OPERATION_EXPORT, 
target1);
         }
 
-        Thread.sleep(5000);
-        AtlasSearchResult results = auditService.get(source1, 
ExportImportAuditEntry.OPERATION_EXPORT, "", "", "", "", 10, 0);
-        assertEquals(results.getEntities().size(), MAX_ENTRIES);
+        pauseForIndexCreation();
+        List<ExportImportAuditEntry> results = auditService.get("",
+                ExportImportAuditEntry.OPERATION_EXPORT,
+                "", "", "", 10, 0);
+        assertTrue(results.size() > 0);
     }
 
-
-    private ExportImportAuditEntry retrieveEntry(ExportImportAuditEntry entry) 
throws AtlasBaseException, InterruptedException {
-        Thread.sleep(5000);
-        AtlasSearchResult result = auditService.get(entry.getUserName(), 
entry.getOperation(), entry.getSourceClusterName(),
-                                                            
entry.getTargetClusterName(), Long.toString(entry.getStartTime()), "", 10, 0);
+    private ExportImportAuditEntry retrieveEntry(ExportImportAuditEntry entry) 
throws AtlasBaseException {
+        List<ExportImportAuditEntry> result = 
auditService.get(entry.getUserName(), entry.getOperation(),
+                                                            
entry.getSourceClusterName(),
+                                                            
Long.toString(entry.getStartTime()), "", 10, 0);
         assertNotNull(result);
-        assertEquals(result.getEntities().size(), 1);
-        entry.setGuid(result.getEntities().get(0).getGuid());
+        assertEquals(result.size(), 1);
+        entry.setGuid(result.get(0).getGuid());
         return auditService.get(entry);
     }
 

http://git-wip-us.apache.org/repos/asf/atlas/blob/6f10481d/repository/src/test/java/org/apache/atlas/repository/impexp/ExportImportTestBase.java
----------------------------------------------------------------------
diff --git 
a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportImportTestBase.java
 
b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportImportTestBase.java
index 79fd308..4b253ff 100644
--- 
a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportImportTestBase.java
+++ 
b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportImportTestBase.java
@@ -22,7 +22,7 @@ import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasConstants;
 import org.apache.atlas.AtlasException;
 import org.apache.atlas.exception.AtlasBaseException;
-import org.apache.atlas.model.discovery.AtlasSearchResult;
+import org.apache.atlas.model.impexp.ExportImportAuditEntry;
 import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.repository.store.graph.v1.DeleteHandlerV1;
 import org.apache.atlas.repository.store.graph.v1.SoftDeleteHandlerV1;
@@ -33,6 +33,7 @@ import org.testng.SkipException;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.List;
 
 import static 
org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.createAtlasEntity;
 import static 
org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadBaseModel;
@@ -76,21 +77,28 @@ public class ExportImportTestBase {
         }
     }
 
-    protected void assertAuditEntry(ExportImportAuditService auditService) {
-        AtlasSearchResult result = null;
+    protected void assertAuditEntry(ExportImportAuditService auditService) 
throws InterruptedException {
+        pauseForIndexCreation();
+        List<ExportImportAuditEntry> result = null;
         try {
-            Thread.sleep(5000);
-            result = auditService.get("", "", "", "", "", "", 10, 0);
+            result = auditService.get("", "", "", "",  "", 10, 0);
         } catch (Exception e) {
-            throw new SkipException("auditService.get: failed!");
+            throw new SkipException("audit entries not retrieved.");
         }
 
         assertNotNull(result);
-        assertNotNull(result.getEntities());
-        assertTrue(result.getEntities().size() > 0);
+        assertTrue(result.size() > 0);
     }
 
     private String getCurrentCluster() throws AtlasException {
         return 
ApplicationProperties.get().getString(AtlasConstants.CLUSTER_NAME_KEY, 
"default");
     }
+
+    protected void pauseForIndexCreation() {
+        try {
+            Thread.sleep(5000);
+        } catch (InterruptedException ex) {
+            throw new SkipException("pause interrupted.");
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/atlas/blob/6f10481d/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java
----------------------------------------------------------------------
diff --git 
a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java
 
b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java
index f86a463..b2dcf44 100644
--- 
a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java
+++ 
b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java
@@ -60,9 +60,6 @@ public class ExportIncrementalTest extends 
ExportImportTestBase {
     ExportService exportService;
 
     @Inject
-    ClusterService clusterService;
-
-    @Inject
     private AtlasEntityStoreV2 entityStore;
 
     private final String EXPORT_REQUEST_INCREMENTAL = "export-incremental";

http://git-wip-us.apache.org/repos/asf/atlas/blob/6f10481d/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java
----------------------------------------------------------------------
diff --git 
a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java
 
b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java
index 7c62efb..c283258 100644
--- 
a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java
+++ 
b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java
@@ -108,7 +108,7 @@ public class ImportServiceTest extends ExportImportTestBase 
{
     }
 
     @AfterTest
-    public void postTest() {
+    public void postTest() throws InterruptedException {
         assertAuditEntry(auditService);
     }
 

Reply via email to