ATLAS-2874: Include handling of Atlas Entity Transformers in current Import logic
Project: http://git-wip-us.apache.org/repos/asf/atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/e33b8bf1 Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/e33b8bf1 Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/e33b8bf1 Branch: refs/heads/branch-1.0 Commit: e33b8bf1faf478d8d1ae09eae9969d3012208cbc Parents: dde9355 Author: Sarath Subramanian <ssubraman...@hortonworks.com> Authored: Mon Sep 17 21:57:40 2018 -0700 Committer: Ashutosh Mestry <ames...@hortonworks.com> Committed: Thu Nov 1 15:42:55 2018 -0700 ---------------------------------------------------------------------- .../atlas/model/impexp/AtlasImportRequest.java | 1 + .../atlas/repository/impexp/ImportService.java | 50 +++++++++++++++++-- .../atlas/repository/impexp/ZipSource.java | 51 ++++++++++++++++---- 3 files changed, 89 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/atlas/blob/e33b8bf1/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java ---------------------------------------------------------------------- diff --git a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java index 2989fbe..06bc231 100644 --- a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java +++ b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java @@ -40,6 +40,7 @@ public class AtlasImportRequest implements Serializable { private static final long serialVersionUID = 1L; public static final String TRANSFORMS_KEY = "transforms"; + public static final String TRANSFORMERS_KEY = "transformers"; public static final String OPTION_KEY_REPLICATED_FROM = "replicatedFrom"; private static final String START_POSITION_KEY = "startPosition"; private static final String START_GUID_KEY = "startGuid"; http://git-wip-us.apache.org/repos/asf/atlas/blob/e33b8bf1/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java index a88ba2b..a09385e 100644 --- a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java @@ -19,15 +19,17 @@ package org.apache.atlas.repository.impexp; import com.google.common.annotations.VisibleForTesting; import org.apache.atlas.AtlasErrorCode; +import org.apache.atlas.entitytransform.BaseEntityHandler; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.impexp.AtlasImportRequest; import org.apache.atlas.model.impexp.AtlasImportResult; +import org.apache.atlas.model.impexp.AttributeTransform; import org.apache.atlas.model.typedef.AtlasTypesDef; import org.apache.atlas.repository.store.graph.BulkImporter; import org.apache.atlas.store.AtlasTypeDefStore; -import org.apache.atlas.type.AtlasEntityType; import org.apache.atlas.type.AtlasType; import org.apache.atlas.type.AtlasTypeRegistry; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.MapUtils; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; @@ -40,6 +42,11 @@ import java.io.ByteArrayInputStream; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.atlas.model.impexp.AtlasImportRequest.TRANSFORMERS_KEY; +import static org.apache.atlas.model.impexp.AtlasImportRequest.TRANSFORMS_KEY; @Component public class ImportService { @@ -82,9 +89,12 @@ public class ImportService { try { LOG.info("==> import(user={}, from={}, request={})", userName, requestingIP, request); - String transforms = MapUtils.isNotEmpty(request.getOptions()) ? request.getOptions().get(AtlasImportRequest.TRANSFORMS_KEY) : null; - + String transforms = MapUtils.isNotEmpty(request.getOptions()) ? request.getOptions().get(TRANSFORMS_KEY) : null; setImportTransform(source, transforms); + + String transformers = MapUtils.isNotEmpty(request.getOptions()) ? request.getOptions().get(TRANSFORMERS_KEY) : null; + setEntityTransformerHandlers(source, transformers); + startTimestamp = System.currentTimeMillis(); processTypes(source.getTypesDef(), result); setStartPosition(request, source); @@ -121,6 +131,38 @@ public class ImportService { } + private void setEntityTransformerHandlers(ZipSource source, String transformersString) { + if (StringUtils.isEmpty(transformersString)) { + return; + } + + Object transformersObj = AtlasType.fromJson(transformersString, Object.class); + List transformers = (transformersObj != null && transformersObj instanceof List) ? (List) transformersObj : null; + + List<AttributeTransform> attributeTransforms = new ArrayList<>(); + + if (CollectionUtils.isNotEmpty(transformers)) { + for (Object transformer : transformers) { + String transformerStr = AtlasType.toJson(transformer); + AttributeTransform attributeTransform = AtlasType.fromJson(transformerStr, AttributeTransform.class); + + if (attributeTransform == null) { + continue; + } + + attributeTransforms.add(attributeTransform); + } + } + + if (CollectionUtils.isNotEmpty(attributeTransforms)) { + List<BaseEntityHandler> entityHandlers = BaseEntityHandler.createEntityHandlers(attributeTransforms); + + if (CollectionUtils.isNotEmpty(entityHandlers)) { + source.setEntityHandlers(entityHandlers); + } + } + } + private void debugLog(String s, Object... params) { if(!LOG.isDebugEnabled()) return; @@ -148,7 +190,7 @@ public class ImportService { try { LOG.info("==> import(user={}, from={}, fileName={})", userName, requestingIP, fileName); - String transforms = MapUtils.isNotEmpty(request.getOptions()) ? request.getOptions().get(AtlasImportRequest.TRANSFORMS_KEY) : null; + String transforms = MapUtils.isNotEmpty(request.getOptions()) ? request.getOptions().get(TRANSFORMS_KEY) : null; File file = new File(fileName); ZipSource source = new ZipSource(new ByteArrayInputStream(FileUtils.readFileToByteArray(file)), ImportTransforms.fromJson(transforms)); result = run(source, request, userName, hostName, requestingIP); http://git-wip-us.apache.org/repos/asf/atlas/blob/e33b8bf1/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSource.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSource.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSource.java index 60cd9f8..1f436ce 100644 --- a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSource.java +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSource.java @@ -17,6 +17,7 @@ */ package org.apache.atlas.repository.impexp; +import org.apache.atlas.entitytransform.BaseEntityHandler; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.impexp.AtlasExportResult; import org.apache.atlas.model.instance.AtlasEntity; @@ -24,6 +25,7 @@ import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; import org.apache.atlas.model.typedef.AtlasTypesDef; import org.apache.atlas.repository.store.graph.v2.EntityImportStream; import org.apache.atlas.type.AtlasType; +import org.apache.commons.collections.MapUtils; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,12 +44,13 @@ import java.util.zip.ZipInputStream; public class ZipSource implements EntityImportStream { private static final Logger LOG = LoggerFactory.getLogger(ZipSource.class); - private final InputStream inputStream; - private List<String> creationOrder; - private Iterator<String> iterator; - private Map<String, String> guidEntityJsonMap; - private ImportTransforms importTransform; - private int currentPosition; + private final InputStream inputStream; + private List<String> creationOrder; + private Iterator<String> iterator; + private Map<String, String> guidEntityJsonMap; + private ImportTransforms importTransform; + private List<BaseEntityHandler> entityHandlers; + private int currentPosition; public ZipSource(InputStream inputStream) throws IOException { this(inputStream, null); @@ -68,6 +71,14 @@ public class ZipSource implements EntityImportStream { this.importTransform = importTransform; } + public List<BaseEntityHandler> getEntityHandlers() { + return entityHandlers; + } + + public void setEntityHandlers(List<BaseEntityHandler> entityHandlers) { + this.entityHandlers = entityHandlers; + } + public AtlasTypesDef getTypesDef() throws AtlasBaseException { final String fileName = ZipExportFileNames.ATLAS_TYPESDEF_NAME.toString(); @@ -123,17 +134,39 @@ public class ZipSource implements EntityImportStream { return this.creationOrder; } - public AtlasEntity.AtlasEntityWithExtInfo getEntityWithExtInfo(String guid) throws AtlasBaseException { + public AtlasEntityWithExtInfo getEntityWithExtInfo(String guid) throws AtlasBaseException { String s = getFromCache(guid); - AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = convertFromJson(AtlasEntity.AtlasEntityWithExtInfo.class, s); + AtlasEntityWithExtInfo entityWithExtInfo = convertFromJson(AtlasEntityWithExtInfo.class, s); - if (importTransform != null) { + if (entityHandlers != null) { + applyTransformers(entityWithExtInfo); + } else if (importTransform != null) { entityWithExtInfo = importTransform.apply(entityWithExtInfo); } return entityWithExtInfo; } + private void applyTransformers(AtlasEntityWithExtInfo entityWithExtInfo) { + if (entityWithExtInfo == null) { + return; + } + + transform(entityWithExtInfo.getEntity()); + + if (MapUtils.isNotEmpty(entityWithExtInfo.getReferredEntities())) { + for (AtlasEntity e : entityWithExtInfo.getReferredEntities().values()) { + transform(e); + } + } + } + + private void transform(AtlasEntity e) { + for (BaseEntityHandler handler : entityHandlers) { + handler.transform(e); + } + } + private <T> T convertFromJson(Class<T> clazz, String jsonData) throws AtlasBaseException { T t; try {