ATLAS-2874: Include handling of Atlas Entity Transformers in current Import 
logic


Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/e33b8bf1
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/e33b8bf1
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/e33b8bf1

Branch: refs/heads/branch-1.0
Commit: e33b8bf1faf478d8d1ae09eae9969d3012208cbc
Parents: dde9355
Author: Sarath Subramanian <ssubraman...@hortonworks.com>
Authored: Mon Sep 17 21:57:40 2018 -0700
Committer: Ashutosh Mestry <ames...@hortonworks.com>
Committed: Thu Nov 1 15:42:55 2018 -0700

----------------------------------------------------------------------
 .../atlas/model/impexp/AtlasImportRequest.java  |  1 +
 .../atlas/repository/impexp/ImportService.java  | 50 +++++++++++++++++--
 .../atlas/repository/impexp/ZipSource.java      | 51 ++++++++++++++++----
 3 files changed, 89 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/atlas/blob/e33b8bf1/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java
----------------------------------------------------------------------
diff --git 
a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java 
b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java
index 2989fbe..06bc231 100644
--- a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java
+++ b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java
@@ -40,6 +40,7 @@ public class AtlasImportRequest implements Serializable {
     private static final long   serialVersionUID = 1L;
 
     public  static final String TRANSFORMS_KEY             = "transforms";
+    public  static final String TRANSFORMERS_KEY           = "transformers";
     public  static final String OPTION_KEY_REPLICATED_FROM = "replicatedFrom";
     private static final String START_POSITION_KEY         = "startPosition";
     private static final String START_GUID_KEY             = "startGuid";

http://git-wip-us.apache.org/repos/asf/atlas/blob/e33b8bf1/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
 
b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
index a88ba2b..a09385e 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
@@ -19,15 +19,17 @@ package org.apache.atlas.repository.impexp;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.entitytransform.BaseEntityHandler;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.impexp.AtlasImportRequest;
 import org.apache.atlas.model.impexp.AtlasImportResult;
+import org.apache.atlas.model.impexp.AttributeTransform;
 import org.apache.atlas.model.typedef.AtlasTypesDef;
 import org.apache.atlas.repository.store.graph.BulkImporter;
 import org.apache.atlas.store.AtlasTypeDefStore;
-import org.apache.atlas.type.AtlasEntityType;
 import org.apache.atlas.type.AtlasType;
 import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -40,6 +42,11 @@ import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static 
org.apache.atlas.model.impexp.AtlasImportRequest.TRANSFORMERS_KEY;
+import static org.apache.atlas.model.impexp.AtlasImportRequest.TRANSFORMS_KEY;
 
 @Component
 public class ImportService {
@@ -82,9 +89,12 @@ public class ImportService {
         try {
             LOG.info("==> import(user={}, from={}, request={})", userName, 
requestingIP, request);
 
-            String transforms = MapUtils.isNotEmpty(request.getOptions()) ? 
request.getOptions().get(AtlasImportRequest.TRANSFORMS_KEY) : null;
-
+            String transforms = MapUtils.isNotEmpty(request.getOptions()) ? 
request.getOptions().get(TRANSFORMS_KEY) : null;
             setImportTransform(source, transforms);
+
+            String transformers = MapUtils.isNotEmpty(request.getOptions()) ? 
request.getOptions().get(TRANSFORMERS_KEY) : null;
+            setEntityTransformerHandlers(source, transformers);
+
             startTimestamp = System.currentTimeMillis();
             processTypes(source.getTypesDef(), result);
             setStartPosition(request, source);
@@ -121,6 +131,38 @@ public class ImportService {
 
     }
 
+    private void setEntityTransformerHandlers(ZipSource source, String 
transformersString) {
+        if (StringUtils.isEmpty(transformersString)) {
+            return;
+        }
+
+        Object transformersObj = AtlasType.fromJson(transformersString, 
Object.class);
+        List   transformers    = (transformersObj != null && transformersObj 
instanceof List) ? (List) transformersObj : null;
+
+        List<AttributeTransform> attributeTransforms = new ArrayList<>();
+
+        if (CollectionUtils.isNotEmpty(transformers)) {
+            for (Object transformer : transformers) {
+                String             transformerStr     = 
AtlasType.toJson(transformer);
+                AttributeTransform attributeTransform = 
AtlasType.fromJson(transformerStr, AttributeTransform.class);
+
+                if (attributeTransform == null) {
+                    continue;
+                }
+
+                attributeTransforms.add(attributeTransform);
+            }
+        }
+
+        if (CollectionUtils.isNotEmpty(attributeTransforms)) {
+            List<BaseEntityHandler> entityHandlers = 
BaseEntityHandler.createEntityHandlers(attributeTransforms);
+
+            if (CollectionUtils.isNotEmpty(entityHandlers)) {
+                source.setEntityHandlers(entityHandlers);
+            }
+        }
+    }
+
     private void debugLog(String s, Object... params) {
         if(!LOG.isDebugEnabled()) return;
 
@@ -148,7 +190,7 @@ public class ImportService {
         try {
             LOG.info("==> import(user={}, from={}, fileName={})", userName, 
requestingIP, fileName);
 
-            String transforms = MapUtils.isNotEmpty(request.getOptions()) ? 
request.getOptions().get(AtlasImportRequest.TRANSFORMS_KEY) : null;
+            String transforms = MapUtils.isNotEmpty(request.getOptions()) ? 
request.getOptions().get(TRANSFORMS_KEY) : null;
             File file = new File(fileName);
             ZipSource source = new ZipSource(new 
ByteArrayInputStream(FileUtils.readFileToByteArray(file)), 
ImportTransforms.fromJson(transforms));
             result = run(source, request, userName, hostName, requestingIP);

http://git-wip-us.apache.org/repos/asf/atlas/blob/e33b8bf1/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSource.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSource.java 
b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSource.java
index 60cd9f8..1f436ce 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSource.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSource.java
@@ -17,6 +17,7 @@
  */
 package org.apache.atlas.repository.impexp;
 
+import org.apache.atlas.entitytransform.BaseEntityHandler;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.impexp.AtlasExportResult;
 import org.apache.atlas.model.instance.AtlasEntity;
@@ -24,6 +25,7 @@ import 
org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
 import org.apache.atlas.model.typedef.AtlasTypesDef;
 import org.apache.atlas.repository.store.graph.v2.EntityImportStream;
 import org.apache.atlas.type.AtlasType;
+import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,12 +44,13 @@ import java.util.zip.ZipInputStream;
 public class ZipSource implements EntityImportStream {
     private static final Logger LOG = LoggerFactory.getLogger(ZipSource.class);
 
-    private final InputStream    inputStream;
-    private List<String>         creationOrder;
-    private Iterator<String>     iterator;
-    private Map<String, String>  guidEntityJsonMap;
-    private ImportTransforms     importTransform;
-    private int currentPosition;
+    private final InputStream       inputStream;
+    private List<String>            creationOrder;
+    private Iterator<String>        iterator;
+    private Map<String, String>     guidEntityJsonMap;
+    private ImportTransforms        importTransform;
+    private List<BaseEntityHandler> entityHandlers;
+    private int                     currentPosition;
 
     public ZipSource(InputStream inputStream) throws IOException {
         this(inputStream, null);
@@ -68,6 +71,14 @@ public class ZipSource implements EntityImportStream {
         this.importTransform = importTransform;
     }
 
+    public List<BaseEntityHandler> getEntityHandlers() {
+        return entityHandlers;
+    }
+
+    public void setEntityHandlers(List<BaseEntityHandler> entityHandlers) {
+        this.entityHandlers = entityHandlers;
+    }
+
     public AtlasTypesDef getTypesDef() throws AtlasBaseException {
         final String fileName = 
ZipExportFileNames.ATLAS_TYPESDEF_NAME.toString();
 
@@ -123,17 +134,39 @@ public class ZipSource implements EntityImportStream {
         return this.creationOrder;
     }
 
-    public AtlasEntity.AtlasEntityWithExtInfo getEntityWithExtInfo(String 
guid) throws AtlasBaseException {
+    public AtlasEntityWithExtInfo getEntityWithExtInfo(String guid) throws 
AtlasBaseException {
         String s = getFromCache(guid);
-        AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = 
convertFromJson(AtlasEntity.AtlasEntityWithExtInfo.class, s);
+        AtlasEntityWithExtInfo entityWithExtInfo = 
convertFromJson(AtlasEntityWithExtInfo.class, s);
 
-        if (importTransform != null) {
+        if (entityHandlers != null) {
+            applyTransformers(entityWithExtInfo);
+        } else if (importTransform != null) {
             entityWithExtInfo = importTransform.apply(entityWithExtInfo);
         }
 
         return entityWithExtInfo;
     }
 
+    private void applyTransformers(AtlasEntityWithExtInfo entityWithExtInfo) {
+        if (entityWithExtInfo == null) {
+            return;
+        }
+
+        transform(entityWithExtInfo.getEntity());
+
+        if (MapUtils.isNotEmpty(entityWithExtInfo.getReferredEntities())) {
+            for (AtlasEntity e : 
entityWithExtInfo.getReferredEntities().values()) {
+                transform(e);
+            }
+        }
+    }
+
+    private void transform(AtlasEntity e) {
+        for (BaseEntityHandler handler : entityHandlers) {
+            handler.transform(e);
+        }
+    }
+
     private <T> T convertFromJson(Class<T> clazz, String jsonData) throws 
AtlasBaseException {
         T t;
         try {

Reply via email to