This is an automated email from the ASF dual-hosted git repository. nixon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/atlas.git
commit 08b76391cfb8fd231218e5788261e5f2310703cc Author: nikhilbonte <[email protected]> AuthorDate: Fri Jun 14 15:21:06 2019 +0530 ATLAS-3256 Modify export API to process with relationshipAttributes Signed-off-by: nixonrodrigues <[email protected]> --- .../atlas/repository/impexp/EntitiesExtractor.java | 81 +++++ .../atlas/repository/impexp/ExportService.java | 223 ++----------- .../atlas/repository/impexp/ExtractStrategy.java | 28 ++ .../impexp/IncrementalExportEntityProvider.java | 32 +- .../impexp/RelationshipAttributesExtractor.java | 115 +++++++ .../atlas/repository/impexp/VertexExtractor.java | 183 +++++++++++ .../IncrementalExportEntityProviderTest.java | 2 +- .../RelationshipAttributesExtractorTest.java | 354 +++++++++++++++++++++ 8 files changed, 821 insertions(+), 197 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/EntitiesExtractor.java b/repository/src/main/java/org/apache/atlas/repository/impexp/EntitiesExtractor.java new file mode 100644 index 0000000..15cb111 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/EntitiesExtractor.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.repository.impexp; + +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.typedef.AtlasEntityDef; +import org.apache.atlas.repository.graphdb.AtlasGraph; +import org.apache.atlas.type.AtlasTypeRegistry; + +import java.util.HashMap; +import java.util.Map; + +public class EntitiesExtractor { + static final String PROPERTY_GUID = "__guid"; + private static final String VERTEX_BASED_EXTRACT = "default"; + private static final String INCREMENTAL_EXTRACT = "incremental"; + private static final String RELATION_BASED_EXTRACT = "relationship"; + + private Map<String, ExtractStrategy> extractors = new HashMap<>(); + private ExtractStrategy extractor; + + public EntitiesExtractor(AtlasGraph atlasGraph, AtlasTypeRegistry typeRegistry) { + extractors.put(VERTEX_BASED_EXTRACT, new VertexExtractor(atlasGraph, typeRegistry)); + extractors.put(INCREMENTAL_EXTRACT, new IncrementalExportEntityProvider(atlasGraph)); + extractors.put(RELATION_BASED_EXTRACT, new RelationshipAttributesExtractor(typeRegistry)); + } + + public void get(AtlasEntity entity, ExportService.ExportContext context) { + if(extractor == null) { + extractor = extractors.get(VERTEX_BASED_EXTRACT); + } + + switch (context.fetchType) { + case CONNECTED: + extractor.connectedFetch(entity, context); + break; + + case INCREMENTAL: + if (context.isHiveDBIncrementalSkipLineage()) { + extractors.get(INCREMENTAL_EXTRACT).fullFetch(entity, context); + break; + } + + case FULL: + default: + extractor.fullFetch(entity, context); + } + } + + public void setExtractor(AtlasEntityDef atlasEntityDef) { + extractor = extractUsing(atlasEntityDef); + } + + public void close() { + for (ExtractStrategy es : extractors.values()) { + es.close(); + } + } + + private ExtractStrategy extractUsing(AtlasEntityDef atlasEntityDef) { + return (atlasEntityDef == null || atlasEntityDef.getRelationshipAttributeDefs().size() == 0) + ? extractors.get(VERTEX_BASED_EXTRACT) + : extractors.get(RELATION_BASED_EXTRACT); + } +} diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java index 11289ea..5055607 100644 --- a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java @@ -25,7 +25,6 @@ import org.apache.atlas.model.impexp.AtlasExportResult; import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; import org.apache.atlas.model.instance.AtlasObjectId; -import org.apache.atlas.model.typedef.AtlasBaseTypeDef; import org.apache.atlas.model.typedef.AtlasClassificationDef; import org.apache.atlas.model.typedef.AtlasEntityDef; import org.apache.atlas.model.typedef.AtlasEnumDef; @@ -35,19 +34,13 @@ import org.apache.atlas.model.typedef.AtlasTypesDef; import org.apache.atlas.repository.graphdb.AtlasGraph; import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever; import org.apache.atlas.repository.util.UniqueList; -import org.apache.atlas.type.AtlasEntityType; import org.apache.atlas.type.AtlasTypeRegistry; -import org.apache.atlas.type.AtlasTypeUtil; import org.apache.atlas.util.AtlasGremlinQueryProvider; -import org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery; -import org.apache.commons.collections.CollectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import javax.inject.Inject; -import javax.script.ScriptEngine; -import javax.script.ScriptException; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -63,30 +56,23 @@ import static org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_INCREM public class ExportService { private static final Logger LOG = LoggerFactory.getLogger(ExportService.class); - public static final String PROPERTY_GUID = "__guid"; - private static final String PROPERTY_IS_PROCESS = "isProcess"; - private final AtlasTypeRegistry typeRegistry; - private final String QUERY_BINDING_START_GUID = "startGuid"; private final StartEntityFetchByExportRequest startEntityFetchByExportRequest; + private final EntitiesExtractor entitiesExtractor; private AuditsWriter auditsWriter; - private final AtlasGraph atlasGraph; private final EntityGraphRetriever entityGraphRetriever; - private final AtlasGremlinQueryProvider gremlinQueryProvider; private ExportTypeProcessor exportTypeProcessor; private final HdfsPathEntityCreator hdfsPathEntityCreator; - private IncrementalExportEntityProvider incrementalExportEntityProvider; @Inject public ExportService(final AtlasTypeRegistry typeRegistry, AtlasGraph atlasGraph, AuditsWriter auditsWriter, HdfsPathEntityCreator hdfsPathEntityCreator) { this.typeRegistry = typeRegistry; this.entityGraphRetriever = new EntityGraphRetriever(this.typeRegistry); - this.atlasGraph = atlasGraph; - this.gremlinQueryProvider = AtlasGremlinQueryProvider.INSTANCE; this.auditsWriter = auditsWriter; this.hdfsPathEntityCreator = hdfsPathEntityCreator; this.startEntityFetchByExportRequest = new StartEntityFetchByExportRequest(atlasGraph, typeRegistry, AtlasGremlinQueryProvider.INSTANCE); + this.entitiesExtractor = new EntitiesExtractor(atlasGraph, typeRegistry); } public AtlasExportResult run(ZipSink exportSink, AtlasExportRequest request, String userName, String hostName, @@ -95,7 +81,7 @@ public class ExportService { AtlasExportResult result = new AtlasExportResult(request, userName, requestingIP, hostName, startTime, getCurrentChangeMarker()); - ExportContext context = new ExportContext(atlasGraph, result, exportSink); + ExportContext context = new ExportContext(result, exportSink); exportTypeProcessor = new ExportTypeProcessor(typeRegistry); try { @@ -109,12 +95,12 @@ public class ExportService { } catch(Exception ex) { LOG.error("Operation failed: ", ex); } finally { - atlasGraph.releaseGremlinScriptEngine(context.scriptEngine); + entitiesExtractor.close(); + LOG.info("<== export(user={}, from={}): status {}: changeMarker: {}", userName, requestingIP, context.result.getOperationStatus(), context.result.getChangeMarker()); context.clear(); result.clear(); - incrementalExportEntityProvider = null; } return context.result; @@ -203,7 +189,9 @@ public class ExportService { } private AtlasExportResult.OperationStatus processObjectId(AtlasObjectId item, ExportContext context) { - debugLog("==> processObjectId({})", item); + if (LOG.isDebugEnabled()) { + LOG.debug("==> processObjectId({})", item); + } try { List<String> entityGuids = getStartingEntity(item, context); @@ -211,9 +199,10 @@ public class ExportService { return AtlasExportResult.OperationStatus.FAIL; } + entitiesExtractor.setExtractor(typeRegistry.getEntityDefByName(item.getTypeName())); + for (String guid : entityGuids) { processEntityGuid(guid, context); - populateEntitesForIncremental(guid, context); } while (!context.guidsToProcess.isEmpty()) { @@ -227,13 +216,16 @@ public class ExportService { context.lineageProcessed.addAll(context.lineageToProcess.getList()); context.lineageToProcess.clear(); } + context.isSkipConnectedFetch = false; } } catch (AtlasBaseException excp) { LOG.error("Fetching entity failed for: {}", item, excp); return AtlasExportResult.OperationStatus.FAIL; } - debugLog("<== processObjectId({})", item); + if (LOG.isDebugEnabled()) { + LOG.debug("<== processObjectId({})", item); + } return AtlasExportResult.OperationStatus.SUCCESS; } @@ -245,181 +237,41 @@ public class ExportService { return startEntityFetchByExportRequest.get(context.result.getRequest(), item); } - private void debugLog(String s, Object... params) { - if (!LOG.isDebugEnabled()) { - return; - } - - LOG.debug(s, params); - } - private void processEntityGuid(String guid, ExportContext context) throws AtlasBaseException { - debugLog("==> processEntityGuid({})", guid); + + if (LOG.isDebugEnabled()) { + LOG.debug("==> processEntityGuid({})", guid); + } if (context.guidsProcessed.contains(guid)) { return; } - TraversalDirection direction = context.guidDirection.get(guid); AtlasEntityWithExtInfo entityWithExtInfo = entityGraphRetriever.toAtlasEntityWithExtInfo(guid); - processEntity(entityWithExtInfo, context, direction); - - debugLog("<== processEntityGuid({})", guid); + processEntity(entityWithExtInfo, context); + if (LOG.isDebugEnabled()) { + LOG.debug("<== processEntityGuid({})", guid); + } } - public void processEntity(AtlasEntityWithExtInfo entityWithExtInfo, - ExportContext context, - TraversalDirection direction) throws AtlasBaseException { - + public void processEntity(AtlasEntityWithExtInfo entityWithExtInfo, ExportContext context) throws AtlasBaseException { addEntity(entityWithExtInfo, context); exportTypeProcessor.addTypes(entityWithExtInfo.getEntity(), context); context.guidsProcessed.add(entityWithExtInfo.getEntity().getGuid()); - getConntedEntitiesBasedOnOption(entityWithExtInfo.getEntity(), context, direction); + entitiesExtractor.get(entityWithExtInfo.getEntity(), context); if (entityWithExtInfo.getReferredEntities() != null) { for (AtlasEntity e : entityWithExtInfo.getReferredEntities().values()) { exportTypeProcessor.addTypes(e, context); - getConntedEntitiesBasedOnOption(e, context, direction); + entitiesExtractor.get(e, context); } context.guidsProcessed.addAll(entityWithExtInfo.getReferredEntities().keySet()); } } - private void getConntedEntitiesBasedOnOption(AtlasEntity entity, ExportContext context, TraversalDirection direction) { - switch (context.fetchType) { - case CONNECTED: - getEntityGuidsForConnectedFetch(entity, context, direction); - break; - - case INCREMENTAL: - if(context.isHiveDBIncrementalSkipLineage()) { - break; - } - - case FULL: - default: - getEntityGuidsForFullFetch(entity, context); - } - } - - private void populateEntitesForIncremental(String topLevelEntityGuid, ExportContext context) { - if (context.isHiveDBIncrementalSkipLineage() == false || incrementalExportEntityProvider != null) { - return; - } - - incrementalExportEntityProvider = new IncrementalExportEntityProvider(atlasGraph, context.scriptEngine); - incrementalExportEntityProvider.populate(topLevelEntityGuid, context.changeMarker, context.guidsToProcess); - } - - private void getEntityGuidsForConnectedFetch(AtlasEntity entity, ExportContext context, TraversalDirection direction) { - if (direction == null || direction == TraversalDirection.UNKNOWN) { - getConnectedEntityGuids(entity, context, TraversalDirection.OUTWARD, TraversalDirection.INWARD); - } else { - if (isProcessEntity(entity)) { - direction = TraversalDirection.OUTWARD; - } - - getConnectedEntityGuids(entity, context, direction); - } - } - - private boolean isProcessEntity(AtlasEntity entity) { - String typeName = entity.getTypeName(); - AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName); - - return entityType.isSubTypeOf(AtlasBaseTypeDef.ATLAS_TYPE_PROCESS); - } - - private void getConnectedEntityGuids(AtlasEntity entity, ExportContext context, TraversalDirection... directions) { - if(directions == null) { - return; - } - - for (TraversalDirection direction : directions) { - String query = getQueryForTraversalDirection(direction); - - if(LOG.isDebugEnabled()) { - debugLog("==> getConnectedEntityGuids({}): guidsToProcess {} query {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size(), query); - } - - context.bindings.clear(); - context.bindings.put(QUERY_BINDING_START_GUID, entity.getGuid()); - - List<Map<String, Object>> result = executeGremlinQuery(query, context); - - if (CollectionUtils.isEmpty(result)) { - continue; - } - - for (Map<String, Object> hashMap : result) { - String guid = (String) hashMap.get(PROPERTY_GUID); - TraversalDirection currentDirection = context.guidDirection.get(guid); - boolean isLineage = (boolean) hashMap.get(PROPERTY_IS_PROCESS); - - if(context.skipLineage && isLineage) continue; - - if (currentDirection == null) { - context.addToBeProcessed(isLineage, guid, direction); - - } else if (currentDirection == TraversalDirection.OUTWARD && direction == TraversalDirection.INWARD) { - // the entity should be reprocessed to get inward entities - context.guidsProcessed.remove(guid); - context.addToBeProcessed(isLineage, guid, direction); - } - } - - if(LOG.isDebugEnabled()) { - debugLog("<== getConnectedEntityGuids({}): found {} guids; guidsToProcess {}", entity.getGuid(), result.size(), context.guidsToProcess.size()); - } - } - } - - private String getQueryForTraversalDirection(TraversalDirection direction) { - switch (direction) { - case INWARD: - return this.gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_BY_GUID_CONNECTED_IN_EDGE); - - default: - case OUTWARD: - return this.gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_BY_GUID_CONNECTED_OUT_EDGE); - } - } - - private void getEntityGuidsForFullFetch(AtlasEntity entity, ExportContext context) { - if(LOG.isDebugEnabled()) { - debugLog("==> getEntityGuidsForFullFetch({}): guidsToProcess {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size()); - } - - String query = this.gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_BY_GUID_FULL); - - context.bindings.clear(); - context.bindings.put(QUERY_BINDING_START_GUID, entity.getGuid()); - - List<Map<String, Object>> result = executeGremlinQuery(query, context); - - if (CollectionUtils.isEmpty(result)) { - return; - } - - for (Map<String, Object> hashMap : result) { - String guid = (String) hashMap.get(PROPERTY_GUID); - boolean isLineage = (boolean) hashMap.get(PROPERTY_IS_PROCESS); - - if(context.getSkipLineage() && isLineage) continue; - - if (!context.guidsProcessed.contains(guid)) { - context.addToBeProcessed(isLineage, guid, TraversalDirection.BOTH); - } - } - - if(LOG.isDebugEnabled()) { - debugLog("<== getEntityGuidsForFullFetch({}): found {} guids; guidsToProcess {}", - entity.getGuid(), result.size(), context.guidsToProcess.size()); - } - } private void addEntity(AtlasEntityWithExtInfo entityWithExtInfo, ExportContext context) throws AtlasBaseException { if(context.sink.hasEntity(entityWithExtInfo.getEntity().getGuid())) { @@ -448,15 +300,6 @@ public class ExportService { context.reportProgress(); } - private List<Map<String, Object>> executeGremlinQuery(String query, ExportContext context) { - try { - return (List<Map<String, Object>>) atlasGraph.executeGremlinScript(context.scriptEngine, context.bindings, query, false); - } catch (ScriptException e) { - LOG.error("Script execution failed for query: ", query, e); - return null; - } - } - public enum TraversalDirection { UNKNOWN, INWARD, @@ -493,7 +336,7 @@ public class ExportService { final UniqueList<String> entityCreationOrder = new UniqueList<>(); final Set<String> guidsProcessed = new HashSet<>(); - final private UniqueList<String> guidsToProcess = new UniqueList<>(); + final UniqueList<String> guidsToProcess = new UniqueList<>(); final UniqueList<String> lineageToProcess = new UniqueList<>(); final Set<String> lineageProcessed = new HashSet<>(); final Map<String, TraversalDirection> guidDirection = new HashMap<>(); @@ -505,25 +348,23 @@ public class ExportService { final AtlasExportResult result; private final ZipSink sink; - private final ScriptEngine scriptEngine; - private final Map<String, Object> bindings; - private final ExportFetchType fetchType; - private final boolean skipLineage; - private final long changeMarker; + final ExportFetchType fetchType; + final boolean skipLineage; + final long changeMarker; + boolean isSkipConnectedFetch; private final boolean isHiveDBIncremental; private int progressReportCount = 0; - ExportContext(AtlasGraph atlasGraph, AtlasExportResult result, ZipSink sink) throws AtlasBaseException { + ExportContext(AtlasExportResult result, ZipSink sink) { this.result = result; this.sink = sink; - scriptEngine = atlasGraph.getGremlinScriptEngine(); - bindings = new HashMap<>(); fetchType = ExportFetchType.from(result.getRequest().getFetchTypeOptionValue()); skipLineage = result.getRequest().getSkipLineageOptionValue(); this.changeMarker = result.getRequest().getChangeTokenFromOptions(); this.isHiveDBIncremental = checkHiveDBIncrementalSkipLineage(result.getRequest()); + this.isSkipConnectedFetch = false; } private boolean checkHiveDBIncrementalSkipLineage(AtlasExportRequest request) { diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ExtractStrategy.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ExtractStrategy.java new file mode 100644 index 0000000..6475016 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ExtractStrategy.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.repository.impexp; + +import org.apache.atlas.model.instance.AtlasEntity; + +public interface ExtractStrategy { + + void connectedFetch(AtlasEntity entity, ExportService.ExportContext context); + void fullFetch(AtlasEntity entity, ExportService.ExportContext context); + void close(); +} diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProvider.java b/repository/src/main/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProvider.java index 3a2a917..256d9de 100644 --- a/repository/src/main/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProvider.java +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProvider.java @@ -18,6 +18,8 @@ package org.apache.atlas.repository.impexp; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.repository.graphdb.AtlasGraph; import org.apache.atlas.repository.util.UniqueList; import org.slf4j.Logger; @@ -28,11 +30,10 @@ import javax.script.ScriptEngine; import javax.script.ScriptException; import java.util.ArrayList; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; -public class IncrementalExportEntityProvider { +public class IncrementalExportEntityProvider implements ExtractStrategy { private static final Logger LOG = LoggerFactory.getLogger(IncrementalExportEntityProvider.class); private static final String QUERY_PARAMETER_START_GUID = "startGuid"; @@ -50,9 +51,23 @@ public class IncrementalExportEntityProvider { private ScriptEngine scriptEngine; @Inject - public IncrementalExportEntityProvider(AtlasGraph atlasGraph, ScriptEngine scriptEngine) { + public IncrementalExportEntityProvider(AtlasGraph atlasGraph) { this.atlasGraph = atlasGraph; - this.scriptEngine = scriptEngine; + try { + this.scriptEngine = atlasGraph.getGremlinScriptEngine(); + } catch (AtlasBaseException e) { + LOG.error("Error instantiating script engine.", e); + } + } + + @Override + public void fullFetch(AtlasEntity entity, ExportService.ExportContext context) { + populate(entity.getGuid(), context.changeMarker, context.guidsToProcess); + } + + @Override + public void connectedFetch(AtlasEntity entity, ExportService.ExportContext context) { + } public void populate(String dbEntityGuid, long timeStamp, UniqueList<String> guidsToProcess) { @@ -63,6 +78,13 @@ public class IncrementalExportEntityProvider { } } + @Override + public void close() { + if (scriptEngine != null) { + atlasGraph.releaseGremlinScriptEngine(scriptEngine); + } + } + private void partial(String dbEntityGuid, long timeStamp, UniqueList<String> guidsToProcess) { guidsToProcess.addAll(fetchGuids(dbEntityGuid, QUERY_TABLE, timeStamp)); guidsToProcess.addAll(fetchGuids(dbEntityGuid, QUERY_SD, timeStamp)); @@ -98,7 +120,7 @@ public class IncrementalExportEntityProvider { } for (Map<String, Object> item : result) { - guids.add((String) item.get(ExportService.PROPERTY_GUID)); + guids.add((String) item.get(EntitiesExtractor.PROPERTY_GUID)); } return guids; diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/RelationshipAttributesExtractor.java b/repository/src/main/java/org/apache/atlas/repository/impexp/RelationshipAttributesExtractor.java new file mode 100644 index 0000000..d609071 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/RelationshipAttributesExtractor.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.impexp; + +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasRelatedObjectId; +import org.apache.atlas.model.typedef.AtlasEntityDef; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.apache.atlas.type.AtlasTypeUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +public class RelationshipAttributesExtractor implements ExtractStrategy { + + private static final Logger LOG = LoggerFactory.getLogger(RelationshipAttributesExtractor.class); + + private final AtlasTypeRegistry typeRegistry; + + public RelationshipAttributesExtractor(AtlasTypeRegistry typeRegistry) { + this.typeRegistry = typeRegistry; + } + + @Override + public void fullFetch(AtlasEntity entity, ExportService.ExportContext context) { + if (LOG.isDebugEnabled()) { + LOG.debug("==> fullFetch({}): guidsToProcess {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size()); + } + + List<AtlasRelatedObjectId> atlasRelatedObjectIdList = getRelatedObjectIds(entity); + + for (AtlasRelatedObjectId ar : atlasRelatedObjectIdList) { + boolean isLineage = isLineageType(ar.getTypeName()); + + if (context.skipLineage && isLineage) { + continue; + } + context.addToBeProcessed(isLineage, ar.getGuid(), ExportService.TraversalDirection.BOTH); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("<== fullFetch({}): guidsToProcess {}", entity.getGuid(), context.guidsToProcess.size()); + } + } + + @Override + public void connectedFetch(AtlasEntity entity, ExportService.ExportContext context) { + if (LOG.isDebugEnabled()) { + LOG.debug("==> connectedFetch({}): guidsToProcess {} isSkipConnectedFetch :{}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size(), context.isSkipConnectedFetch); + } + + List<AtlasRelatedObjectId> atlasRelatedObjectIdList = getRelatedObjectIds(entity); + for (AtlasRelatedObjectId ar : atlasRelatedObjectIdList) { + boolean isLineage = isLineageType(ar.getTypeName()); + + if (context.skipLineage && isLineage) { + continue; + } + if (!context.isSkipConnectedFetch || isLineage) { + context.addToBeProcessed(isLineage, ar.getGuid(), ExportService.TraversalDirection.BOTH); + } + } + + if(isLineageType(entity.getTypeName())){ + context.isSkipConnectedFetch = false; + }else{ + context.isSkipConnectedFetch = true; + } + + if (LOG.isDebugEnabled()) { + LOG.debug("==> connectedFetch({}): guidsToProcess {}, isSkipConnectedFetch :{}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size(), context.isSkipConnectedFetch); + } + } + + @Override + public void close() { + } + + private boolean isLineageType(String typeName) { + AtlasEntityDef entityDef = typeRegistry.getEntityDefByName(typeName); + return entityDef.getSuperTypes().contains("Process"); + } + + private List<AtlasRelatedObjectId> getRelatedObjectIds(AtlasEntity entity) { + List<AtlasRelatedObjectId> relatedObjectIds = new ArrayList<>(); + + for (Object o : entity.getRelationshipAttributes().values()) { + if (o instanceof AtlasRelatedObjectId) { + relatedObjectIds.add((AtlasRelatedObjectId) o); + } else if (o instanceof Collection) { + relatedObjectIds.addAll((List) o); + } + } + + return relatedObjectIds; + } +} diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/VertexExtractor.java b/repository/src/main/java/org/apache/atlas/repository/impexp/VertexExtractor.java new file mode 100644 index 0000000..a5b11be --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/VertexExtractor.java @@ -0,0 +1,183 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.repository.impexp; + +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.typedef.AtlasBaseTypeDef; +import org.apache.atlas.repository.graphdb.AtlasGraph; +import org.apache.atlas.type.AtlasEntityType; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.apache.atlas.type.AtlasTypeUtil; +import org.apache.atlas.util.AtlasGremlinQueryProvider; +import org.apache.commons.collections.CollectionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.script.ScriptEngine; +import javax.script.ScriptException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.atlas.repository.impexp.EntitiesExtractor.PROPERTY_GUID; + +public class VertexExtractor implements ExtractStrategy { + private static final Logger LOG = LoggerFactory.getLogger(VertexExtractor.class); + + private static final String PROPERTY_IS_PROCESS = "isProcess"; + private static final String QUERY_BINDING_START_GUID = "startGuid"; + + private final AtlasGremlinQueryProvider gremlinQueryProvider; + + private final Map<String, Object> bindings; + private AtlasGraph atlasGraph; + private AtlasTypeRegistry typeRegistry; + private ScriptEngine scriptEngine; + + public VertexExtractor(AtlasGraph atlasGraph, AtlasTypeRegistry typeRegistry) { + this.atlasGraph = atlasGraph; + this.typeRegistry = typeRegistry; + try { + this.scriptEngine = atlasGraph.getGremlinScriptEngine(); + } catch (AtlasBaseException e) { + LOG.error("Script Engine: Instantiation failed!"); + } + this.gremlinQueryProvider = AtlasGremlinQueryProvider.INSTANCE; + this.bindings = new HashMap<>(); + } + + @Override + public void fullFetch(AtlasEntity entity, ExportService.ExportContext context) { + if (LOG.isDebugEnabled()){ + LOG.debug("==> fullFetch({}): guidsToProcess {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size()); + } + + String query = this.gremlinQueryProvider.getQuery(AtlasGremlinQueryProvider.AtlasGremlinQuery.EXPORT_BY_GUID_FULL); + + bindings.clear(); + bindings.put(QUERY_BINDING_START_GUID, entity.getGuid()); + + List<Map<String, Object>> result = executeGremlinQuery(query, context); + + if (CollectionUtils.isEmpty(result)) { + return; + } + + for (Map<String, Object> hashMap : result) { + String guid = (String) hashMap.get(PROPERTY_GUID); + boolean isLineage = (boolean) hashMap.get(PROPERTY_IS_PROCESS); + + if (context.getSkipLineage() && isLineage) continue; + + if (!context.guidsProcessed.contains(guid)) { + context.addToBeProcessed(isLineage, guid, ExportService.TraversalDirection.BOTH); + } + } + } + + @Override + public void connectedFetch(AtlasEntity entity, ExportService.ExportContext context) { + if (LOG.isDebugEnabled()){ + LOG.debug("==> connectedFetch({}): guidsToProcess {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size()); + } + + ExportService.TraversalDirection direction = context.guidDirection.get(entity.getGuid()); + + if (direction == null || direction == ExportService.TraversalDirection.UNKNOWN) { + getConnectedEntityGuids(entity, context, ExportService.TraversalDirection.OUTWARD, ExportService.TraversalDirection.INWARD); + } else { + if (isProcessEntity(entity)) { + direction = ExportService.TraversalDirection.OUTWARD; + } + + getConnectedEntityGuids(entity, context, direction); + } + } + + @Override + public void close() { + if (scriptEngine != null) { + atlasGraph.releaseGremlinScriptEngine(scriptEngine); + } + } + + private void getConnectedEntityGuids(AtlasEntity entity, ExportService.ExportContext context, ExportService.TraversalDirection... directions) { + if (directions == null) { + return; + } + + for (ExportService.TraversalDirection direction : directions) { + String query = getQueryForTraversalDirection(direction); + + bindings.clear(); + bindings.put(QUERY_BINDING_START_GUID, entity.getGuid()); + + List<Map<String, Object>> result = executeGremlinQuery(query, context); + + if (CollectionUtils.isEmpty(result)) { + continue; + } + + for (Map<String, Object> hashMap : result) { + String guid = (String) hashMap.get(PROPERTY_GUID); + ExportService.TraversalDirection currentDirection = context.guidDirection.get(guid); + boolean isLineage = (boolean) hashMap.get(PROPERTY_IS_PROCESS); + + if (context.skipLineage && isLineage) continue; + + if (currentDirection == null) { + context.addToBeProcessed(isLineage, guid, direction); + + } else if (currentDirection == ExportService.TraversalDirection.OUTWARD && direction == ExportService.TraversalDirection.INWARD) { + // the entity should be reprocessed to get inward entities + context.guidsProcessed.remove(guid); + context.addToBeProcessed(isLineage, guid, direction); + } + } + } + } + + private boolean isProcessEntity(AtlasEntity entity) { + String typeName = entity.getTypeName(); + AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName); + + return entityType.isSubTypeOf(AtlasBaseTypeDef.ATLAS_TYPE_PROCESS); + } + + private String getQueryForTraversalDirection(ExportService.TraversalDirection direction) { + switch (direction) { + case INWARD: + return this.gremlinQueryProvider.getQuery(AtlasGremlinQueryProvider.AtlasGremlinQuery.EXPORT_BY_GUID_CONNECTED_IN_EDGE); + + default: + case OUTWARD: + return this.gremlinQueryProvider.getQuery(AtlasGremlinQueryProvider.AtlasGremlinQuery.EXPORT_BY_GUID_CONNECTED_OUT_EDGE); + } + } + + private List<Map<String, Object>> executeGremlinQuery(String query, ExportService.ExportContext context) { + try { + return (List<Map<String, Object>>) atlasGraph.executeGremlinScript(scriptEngine, bindings, query, false); + } catch (ScriptException e) { + LOG.error("Script execution failed for query: ", query, e); + return null; + } + } +} diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProviderTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProviderTest.java index 85ed5f9..10a0838 100644 --- a/repository/src/test/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProviderTest.java +++ b/repository/src/test/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProviderTest.java @@ -63,7 +63,7 @@ public class IncrementalExportEntityProviderTest extends ExportImportTestBase { verifyCreatedEntities(entityStore, entityGuids, 2); gremlinScriptEngine = atlasGraph.getGremlinScriptEngine(); - incrementalExportEntityProvider = new IncrementalExportEntityProvider(atlasGraph, gremlinScriptEngine); + incrementalExportEntityProvider = new IncrementalExportEntityProvider(atlasGraph); } @AfterClass diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/RelationshipAttributesExtractorTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/RelationshipAttributesExtractorTest.java new file mode 100644 index 0000000..03d50f1 --- /dev/null +++ b/repository/src/test/java/org/apache/atlas/repository/impexp/RelationshipAttributesExtractorTest.java @@ -0,0 +1,354 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.impexp; + +import org.apache.atlas.RequestContext; +import org.apache.atlas.TestModules; +import org.apache.atlas.TestUtilsV2; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.impexp.AtlasExportRequest; +import org.apache.atlas.model.impexp.AtlasExportResult; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasObjectId; +import org.apache.atlas.repository.graph.AtlasGraphProvider; +import org.apache.atlas.runner.LocalSolrRunner; +import org.apache.atlas.store.AtlasTypeDefStore; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.testng.ITestContext; +import org.testng.annotations.Test; +import org.testng.annotations.Guice; +import org.testng.annotations.BeforeTest; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.AfterClass; +import org.testng.annotations.DataProvider; + +import javax.inject.Inject; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.ArrayList; +import java.util.HashMap; + +import static org.apache.atlas.graph.GraphSandboxUtil.useLocalSolr; +import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.*; +import static org.testng.Assert.*; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; + +@Guice(modules = TestModules.TestOnlyModule.class) +public class RelationshipAttributesExtractorTest { + + private static final String EXPORT_FULL = "full"; + private static final String EXPORT_CONNECTED = "connected"; + private static final String QUALIFIED_NAME_DB = "db_test_1@02052019"; + private static final String QUALIFIED_NAME_TABLE_LINEAGE = "db_test_1.test_tbl_ctas_2@02052019"; + private static final String QUALIFIED_NAME_TABLE_NON_LINEAGE = "db_test_1.test_tbl_1@02052019"; + + private static final String GUID_DB = "f0b72ab4-7452-4e42-ac74-2aee7728cce4"; + private static final String GUID_TABLE_1 = "4d5adf00-2c9b-4877-ad23-c41fd7319150"; + private static final String GUID_TABLE_2 = "8d0b834c-61ce-42d8-8f66-6fa51c36bccb"; + private static final String GUID_TABLE_CTAS_2 = "eaec545b-3ac7-4e1b-a497-bd4a2b6434a2"; + private static final String GUID_HIVE_PROCESS = "bd3138b2-f29e-4226-b859-de25eaa1c18b"; + + @Inject + private ImportService importService; + + @Inject + AtlasTypeRegistry typeRegistry; + + @Inject + private AtlasTypeDefStore typeDefStore; + + @Inject + private ExportService exportService; + + @BeforeClass + public void setup() throws IOException, AtlasBaseException { + loadBaseModel(); + loadHiveModel(); + } + + @BeforeTest + public void setupTest() { + RequestContext.clear(); + RequestContext.get().setUser(TestUtilsV2.TEST_USER, null); + } + + @AfterClass + public void clear() throws Exception { + AtlasGraphProvider.cleanup(); + + if (useLocalSolr()) { + LocalSolrRunner.stop(); + } + } + + @DataProvider(name = "hiveDb") + public static Object[][] getData(ITestContext context) throws IOException, AtlasBaseException { + return getZipSource("hive_db_lineage.zip"); + } + + @Test(dataProvider = "hiveDb") + public void importHiveDb(ZipSource zipSource) throws AtlasBaseException, IOException { + runImportWithNoParameters(importService, zipSource); + } + + @Test(dependsOnMethods = "importHiveDb") + public void exportDBFull() throws Exception { + ZipSource source = runExport(getExportRequestForHiveDb(QUALIFIED_NAME_DB, EXPORT_FULL, false)); + verifyDBFull(source); + } + + @Test(dependsOnMethods = "importHiveDb") + public void exportDBFullSkipLineageFull() throws Exception { + ZipSource source = runExport(getExportRequestForHiveDb(QUALIFIED_NAME_DB, EXPORT_FULL, true)); + verifyDBFullSkipLineageFull(source); + } + + @Test(dependsOnMethods = "importHiveDb") + public void exportTableWithLineageFull() throws Exception { + ZipSource source = runExport(getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_LINEAGE, EXPORT_FULL, false)); + verifyTableWithLineageFull(source); + } + + @Test(dependsOnMethods = "importHiveDb") + public void exportTableWithLineageSkipLineageFull() throws Exception { + ZipSource source = runExport(getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_LINEAGE, EXPORT_FULL, true)); + verifyTableWithLineageSkipLineageFull(source); + } + + @Test(dependsOnMethods = "importHiveDb") + public void exportTableWithoutLineageFull() throws Exception { + ZipSource source = runExport(getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_NON_LINEAGE, EXPORT_FULL, false)); + verifyTableWithoutLineageFull(source); + } + + @Test(dependsOnMethods = "importHiveDb") + public void exportTableWithoutLineageSkipLineageFull() throws Exception { + ZipSource source = runExport(getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_NON_LINEAGE, EXPORT_FULL, true)); + verifyTableWithoutLineageSkipLineageFull(source); + } + + @Test(dependsOnMethods = "importHiveDb") + public void exportDBConn() throws Exception { + ZipSource source = runExport(getExportRequestForHiveDb(QUALIFIED_NAME_DB, EXPORT_CONNECTED, false)); + verifyDBConn(source); + } + + @Test(dependsOnMethods = "importHiveDb") + public void exportDBSkipLineageConn() throws Exception { + ZipSource source = runExport(getExportRequestForHiveDb(QUALIFIED_NAME_DB, EXPORT_CONNECTED, true)); + verifyDBSkipLineageConn(source); + } + + @Test(dependsOnMethods = "importHiveDb") + public void exportTableWithLineageConn() throws Exception { + ZipSource source = runExport(getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_LINEAGE, EXPORT_CONNECTED, false)); + verifyTableWithLineageConn(source); + } + + @Test(dependsOnMethods = "importHiveDb") + public void exportTableWithLineageSkipLineageConn() throws Exception { + ZipSource source = runExport(getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_LINEAGE, EXPORT_CONNECTED, true)); + verifyTableWithLineageSkipLineageConn(source); + } + + @Test(dependsOnMethods = "importHiveDb") + public void exportTableWithoutLineageConn() throws Exception { + ZipSource source = runExport(getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_NON_LINEAGE, EXPORT_CONNECTED, false)); + verifyTableWithoutLineageConn(source); + } + + @Test(dependsOnMethods = "importHiveDb") + public void exportTableWithoutLineageSkipLineageConn() throws Exception { + ZipSource source = runExport(getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_NON_LINEAGE, EXPORT_CONNECTED, true)); + verifyTableWithoutLineageSkipLineageConn(source); + } + + private void loadHiveModel() throws IOException, AtlasBaseException { + loadModelFromJson("1000-Hadoop/1030-hive_model.json", typeDefStore, typeRegistry); + } + + private void loadBaseModel() throws IOException, AtlasBaseException { + loadModelFromJson("0000-Area0/0010-base_model.json", typeDefStore, typeRegistry); + } + + private AtlasExportRequest getExportRequestForHiveDb(String hiveDbName, String fetchType, boolean skipLineage) { + AtlasExportRequest request = new AtlasExportRequest(); + + List<AtlasObjectId> itemsToExport = new ArrayList<>(); + itemsToExport.add(new AtlasObjectId("hive_db", "qualifiedName", hiveDbName)); + request.setItemsToExport(itemsToExport); + request.setOptions(getOptionsMap(fetchType, skipLineage)); + + return request; + } + + private AtlasExportRequest getExportRequestForHiveTable(String hiveTableName, String fetchType, boolean skipLineage) { + AtlasExportRequest request = new AtlasExportRequest(); + + List<AtlasObjectId> itemsToExport = new ArrayList<>(); + itemsToExport.add(new AtlasObjectId("hive_table", "qualifiedName", hiveTableName)); + request.setItemsToExport(itemsToExport); + request.setOptions(getOptionsMap(fetchType, skipLineage)); + + return request; + } + + private Map<String, Object> getOptionsMap(String fetchType, boolean skipLineage){ + Map<String, Object> optionsMap = new HashMap<>(); + optionsMap.put("fetchType", fetchType.isEmpty() ? "full" : fetchType ); + optionsMap.put("skipLineage", skipLineage); + + return optionsMap; + } + + private ZipSource runExport(AtlasExportRequest request) throws AtlasBaseException, IOException { + final String requestingIP = "1.0.0.0"; + final String hostName = "localhost"; + final String userName = "admin"; + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ZipSink zipSink = new ZipSink(baos); + AtlasExportResult result = exportService.run(zipSink, request, userName, hostName, requestingIP); + + zipSink.close(); + + ByteArrayInputStream bis = new ByteArrayInputStream(baos.toByteArray()); + ZipSource zipSource = new ZipSource(bis); + return zipSource; + } + + private void verifyDBFull(ZipSource zipSource) { + assertNotNull(zipSource.getCreationOrder()); + assertEquals(zipSource.getCreationOrder().size(), 5); + + assertTrue(zipSource.getCreationOrder().contains(GUID_HIVE_PROCESS)); + verifyExpectedEntities(getFileNames(zipSource), GUID_DB, GUID_TABLE_1, GUID_TABLE_2, GUID_TABLE_CTAS_2, GUID_HIVE_PROCESS); + } + + private void verifyDBFullSkipLineageFull(ZipSource zipSource) { + assertNotNull(zipSource.getCreationOrder()); + assertEquals(zipSource.getCreationOrder().size(), 4); + + assertFalse(zipSource.getCreationOrder().contains(GUID_HIVE_PROCESS)); + verifyExpectedEntities(getFileNames(zipSource), GUID_DB, GUID_TABLE_1, GUID_TABLE_2, GUID_TABLE_CTAS_2); + } + + private void verifyTableWithLineageFull(ZipSource zipSource) { + assertNotNull(zipSource.getCreationOrder()); + assertEquals(zipSource.getCreationOrder().size(), 5); + + assertTrue(zipSource.getCreationOrder().contains(GUID_HIVE_PROCESS)); + verifyExpectedEntities(getFileNames(zipSource), GUID_DB, GUID_TABLE_1, GUID_TABLE_2, GUID_TABLE_CTAS_2, GUID_HIVE_PROCESS); + } + + private void verifyTableWithLineageSkipLineageFull(ZipSource zipSource) { + assertNotNull(zipSource.getCreationOrder()); + assertEquals(zipSource.getCreationOrder().size(), 4); + + assertFalse(zipSource.getCreationOrder().contains(GUID_HIVE_PROCESS)); + verifyExpectedEntities(getFileNames(zipSource), GUID_DB, GUID_TABLE_1, GUID_TABLE_2, GUID_TABLE_CTAS_2); + } + + private void verifyTableWithoutLineageFull(ZipSource zipSource) { + assertNotNull(zipSource.getCreationOrder()); + assertEquals(zipSource.getCreationOrder().size(), 5); + + assertTrue(zipSource.getCreationOrder().contains(GUID_HIVE_PROCESS)); + verifyExpectedEntities(getFileNames(zipSource), GUID_DB, GUID_TABLE_1, GUID_TABLE_2, GUID_TABLE_CTAS_2,GUID_HIVE_PROCESS); + } + + private void verifyTableWithoutLineageSkipLineageFull(ZipSource zipSource) { + assertNotNull(zipSource.getCreationOrder()); + assertEquals(zipSource.getCreationOrder().size(), 4); + + assertFalse(zipSource.getCreationOrder().contains(GUID_HIVE_PROCESS)); + verifyExpectedEntities(getFileNames(zipSource), GUID_DB, GUID_TABLE_1, GUID_TABLE_2, GUID_TABLE_CTAS_2); + } + + + private void verifyDBConn(ZipSource zipSource) { + assertNotNull(zipSource.getCreationOrder()); + assertEquals(zipSource.getCreationOrder().size(), 5); + + assertTrue(zipSource.getCreationOrder().contains(GUID_HIVE_PROCESS)); + verifyExpectedEntities(getFileNames(zipSource), GUID_DB, GUID_TABLE_1, GUID_TABLE_2, GUID_TABLE_CTAS_2, GUID_HIVE_PROCESS); + } + + private void verifyDBSkipLineageConn(ZipSource zipSource) { + assertNotNull(zipSource.getCreationOrder()); + assertEquals(zipSource.getCreationOrder().size(), 4); + + assertFalse(zipSource.getCreationOrder().contains(GUID_HIVE_PROCESS)); + verifyExpectedEntities(getFileNames(zipSource), GUID_DB, GUID_TABLE_1, GUID_TABLE_2, GUID_TABLE_CTAS_2); + } + + private void verifyTableWithLineageConn(ZipSource zipSource) { + assertNotNull(zipSource.getCreationOrder()); + assertEquals(zipSource.getCreationOrder().size(), 4); + + assertTrue(zipSource.getCreationOrder().contains(GUID_HIVE_PROCESS)); + verifyExpectedEntities(getFileNames(zipSource), GUID_DB, GUID_TABLE_2, GUID_TABLE_CTAS_2, GUID_HIVE_PROCESS); + } + + private void verifyTableWithLineageSkipLineageConn(ZipSource zipSource) { + assertNotNull(zipSource.getCreationOrder()); + assertEquals(zipSource.getCreationOrder().size(),2); + + assertFalse(zipSource.getCreationOrder().contains(GUID_HIVE_PROCESS)); + verifyExpectedEntities(getFileNames(zipSource), GUID_DB, GUID_TABLE_CTAS_2);; + } + + private void verifyTableWithoutLineageConn(ZipSource zipSource) { + assertNotNull(zipSource.getCreationOrder()); + assertEquals(zipSource.getCreationOrder().size(), 2); + + assertFalse(zipSource.getCreationOrder().contains(GUID_HIVE_PROCESS)); + verifyExpectedEntities(getFileNames(zipSource), GUID_DB, GUID_TABLE_1); + } + + private void verifyTableWithoutLineageSkipLineageConn(ZipSource zipSource) { + assertNotNull(zipSource.getCreationOrder()); + assertEquals(zipSource.getCreationOrder().size(), 2);; + + assertFalse(zipSource.getCreationOrder().contains(GUID_HIVE_PROCESS)); + verifyExpectedEntities(getFileNames(zipSource), GUID_DB, GUID_TABLE_1); + } + + private void verifyExpectedEntities(List<String> fileNames, String... guids){ + assertEquals(fileNames.size(), guids.length); + for (String guid : guids) { + assertTrue(fileNames.contains(guid.toLowerCase())); + } + } + + private List<String> getFileNames(ZipSource zipSource){ + List<String> ret = new ArrayList<>(); + assertTrue(zipSource.hasNext()); + + while (zipSource.hasNext()){ + AtlasEntity atlasEntity = zipSource.next(); + assertNotNull(atlasEntity); + ret.add(atlasEntity.getGuid()); + } + return ret; + } +}
