ATLAS-1665: export optimization to reduce file-size and export-time Signed-off-by: Madhan Neethiraj <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/160b2874 Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/160b2874 Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/160b2874 Branch: refs/heads/master Commit: 160b28740b0689d7892299fd246394508c28d2dd Parents: 537f6e3 Author: ashutoshm <[email protected]> Authored: Thu Mar 16 22:03:42 2017 -0700 Committer: Madhan Neethiraj <[email protected]> Committed: Sat Mar 18 14:48:28 2017 -0700 ---------------------------------------------------------------------- .../atlas/model/instance/AtlasEntity.java | 2 +- .../graph/v1/AtlasEntityGraphDiscoveryV1.java | 24 +- .../store/graph/v1/AtlasEntityStoreV1.java | 7 +- .../store/graph/v1/AtlasEntityStream.java | 14 +- .../graph/v1/AtlasEntityStreamForImport.java | 26 ++- .../store/graph/v1/EntityImportStream.java | 4 + .../repository/store/graph/v1/EntityStream.java | 1 - .../store/graph/v1/InMemoryMapEntityStream.java | 2 - .../atlas/util/AtlasGremlin2QueryProvider.java | 6 +- .../atlas/web/resources/ExportService.java | 234 ++++++++++++------- .../org/apache/atlas/web/resources/ZipSink.java | 5 + .../apache/atlas/web/resources/ZipSource.java | 42 ++-- 12 files changed, 236 insertions(+), 131 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/160b2874/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntity.java ---------------------------------------------------------------------- diff --git a/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntity.java b/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntity.java index 4e3895d..0e277b1 100644 --- a/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntity.java +++ b/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntity.java @@ -196,6 +196,7 @@ public class AtlasEntity extends AtlasStruct implements Serializable { } sb.append("AtlasEntity{"); + super.toString(sb); sb.append("guid='").append(guid).append('\''); sb.append(", status=").append(status); sb.append(", createdBy='").append(createdBy).append('\''); @@ -207,7 +208,6 @@ public class AtlasEntity extends AtlasStruct implements Serializable { AtlasBaseTypeDef.dumpObjects(classifications, sb); sb.append(']'); sb.append(", "); - super.toString(sb); sb.append('}'); return sb; http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/160b2874/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityGraphDiscoveryV1.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityGraphDiscoveryV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityGraphDiscoveryV1.java index 6c88510..12e8bb1 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityGraphDiscoveryV1.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityGraphDiscoveryV1.java @@ -17,14 +17,6 @@ */ package org.apache.atlas.repository.store.graph.v1; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; - import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.TypeCategory; @@ -34,12 +26,26 @@ import org.apache.atlas.model.instance.AtlasStruct; import org.apache.atlas.repository.store.graph.EntityGraphDiscovery; import org.apache.atlas.repository.store.graph.EntityGraphDiscoveryContext; import org.apache.atlas.repository.store.graph.EntityResolver; -import org.apache.atlas.type.*; +import org.apache.atlas.type.AtlasArrayType; import org.apache.atlas.type.AtlasBuiltInTypes.AtlasObjectIdType; +import org.apache.atlas.type.AtlasEntityType; +import org.apache.atlas.type.AtlasMapType; +import org.apache.atlas.type.AtlasStructType; import org.apache.atlas.type.AtlasStructType.AtlasAttribute; +import org.apache.atlas.type.AtlasType; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.apache.atlas.type.AtlasTypeUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + public class AtlasEntityGraphDiscoveryV1 implements EntityGraphDiscovery { private static final Logger LOG = LoggerFactory.getLogger(AtlasEntityGraphDiscoveryV1.class); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/160b2874/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java index 86fd6ad..fa4c051 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java @@ -159,13 +159,14 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore { int progressReportedAtCount = 0; while (entityStream.hasNext()) { - AtlasEntity entity = entityStream.next(); + AtlasEntityWithExtInfo entityWithExtInfo = entityStream.getNextEntityWithExtInfo(); + AtlasEntity entity = entityWithExtInfo != null ? entityWithExtInfo.getEntity() : null; if(entity == null || processedGuids.contains(entity.getGuid())) { continue; } - AtlasEntityStreamForImport oneEntityStream = new AtlasEntityStreamForImport(entity, entityStream); + AtlasEntityStreamForImport oneEntityStream = new AtlasEntityStreamForImport(entityWithExtInfo, entityStream); EntityMutationResponse resp = createOrUpdate(oneEntityStream, false, true); @@ -177,7 +178,7 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore { updateImportMetrics("entity:%s:updated", resp.getUpdatedEntities(), processedGuids, importResult); updateImportMetrics("entity:%s:deleted", resp.getDeletedEntities(), processedGuids, importResult); - if ((processedGuids.size() - progressReportedAtCount) > 10) { + if ((processedGuids.size() - progressReportedAtCount) > 1000) { progressReportedAtCount = processedGuids.size(); LOG.info("bulkImport(): in progress.. number of entities imported: {}", progressReportedAtCount); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/160b2874/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStream.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStream.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStream.java index 5d9a7d4..eb860ff 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStream.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStream.java @@ -24,9 +24,9 @@ import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; import java.util.Iterator; public class AtlasEntityStream implements EntityStream { - private final AtlasEntitiesWithExtInfo entitiesWithExtInfo; - private final EntityStream entityStream; - private Iterator<AtlasEntity> iterator; + protected final AtlasEntitiesWithExtInfo entitiesWithExtInfo; + protected final EntityStream entityStream; + private Iterator<AtlasEntity> iterator; public AtlasEntityStream(AtlasEntity entity) { @@ -49,6 +49,12 @@ public class AtlasEntityStream implements EntityStream { this.entityStream = entityStream; } + public AtlasEntityStream(AtlasEntityWithExtInfo entityWithExtInfo, EntityStream entityStream) { + this.entitiesWithExtInfo = new AtlasEntitiesWithExtInfo(entityWithExtInfo); + this.iterator = this.entitiesWithExtInfo.getEntities().iterator(); + this.entityStream = entityStream; + } + @Override public boolean hasNext() { return iterator.hasNext(); @@ -66,7 +72,7 @@ public class AtlasEntityStream implements EntityStream { @Override public AtlasEntity getByGuid(String guid) { - return entityStream != null ? entityStream.getByGuid(guid) : entitiesWithExtInfo.getEntity(guid); + return entityStream != null ? entityStream.getByGuid(guid) : entitiesWithExtInfo.getEntity(guid); } @Override http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/160b2874/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStreamForImport.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStreamForImport.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStreamForImport.java index 8cb36ac..69140e6 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStreamForImport.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStreamForImport.java @@ -18,17 +18,29 @@ package org.apache.atlas.repository.store.graph.v1; import org.apache.atlas.model.instance.AtlasEntity; -import org.apache.atlas.model.instance.AtlasEntityHeader; - -import java.util.List; +import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; public class AtlasEntityStreamForImport extends AtlasEntityStream implements EntityImportStream { - public AtlasEntityStreamForImport(AtlasEntity entity) { - super(entity); + public AtlasEntityStreamForImport(AtlasEntityWithExtInfo entityWithExtInfo, EntityStream entityStream) { + super(entityWithExtInfo, entityStream); + } + + @Override + public AtlasEntityWithExtInfo getNextEntityWithExtInfo() { + AtlasEntity entity = next(); + + return entity != null ? new AtlasEntityWithExtInfo(entity, super.entitiesWithExtInfo) : null; } - public AtlasEntityStreamForImport(AtlasEntity entity, EntityStream entityStream) { - super(entity, entityStream); + @Override + public AtlasEntity getByGuid(String guid) { + AtlasEntity ent = super.entitiesWithExtInfo.getEntity(guid); + + if(ent == null && entityStream != null) { + return entityStream.getByGuid(guid); + } + + return ent; } @Override http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/160b2874/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityImportStream.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityImportStream.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityImportStream.java index 73994b9..0f711db 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityImportStream.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityImportStream.java @@ -18,7 +18,11 @@ package org.apache.atlas.repository.store.graph.v1; +import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; + public interface EntityImportStream extends EntityStream { + AtlasEntityWithExtInfo getNextEntityWithExtInfo(); + void onImportComplete(String guid); } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/160b2874/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityStream.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityStream.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityStream.java index 4c43921..3444bfd 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityStream.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityStream.java @@ -18,7 +18,6 @@ package org.apache.atlas.repository.store.graph.v1; import org.apache.atlas.model.instance.AtlasEntity; -import org.apache.atlas.model.instance.AtlasObjectId; public interface EntityStream { http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/160b2874/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/InMemoryMapEntityStream.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/InMemoryMapEntityStream.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/InMemoryMapEntityStream.java index 241f6d0..68d7f11 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/InMemoryMapEntityStream.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/InMemoryMapEntityStream.java @@ -19,9 +19,7 @@ package org.apache.atlas.repository.store.graph.v1; import org.apache.atlas.model.instance.AtlasEntity; -import org.apache.atlas.model.instance.AtlasObjectId; -import java.util.HashMap; import java.util.Iterator; import java.util.Map; http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/160b2874/repository/src/main/java/org/apache/atlas/util/AtlasGremlin2QueryProvider.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/util/AtlasGremlin2QueryProvider.java b/repository/src/main/java/org/apache/atlas/util/AtlasGremlin2QueryProvider.java index 4743b73..d3413c2 100644 --- a/repository/src/main/java/org/apache/atlas/util/AtlasGremlin2QueryProvider.java +++ b/repository/src/main/java/org/apache/atlas/util/AtlasGremlin2QueryProvider.java @@ -38,11 +38,11 @@ public class AtlasGremlin2QueryProvider extends AtlasGremlinQueryProvider { case ENTITIES_FOR_TAG_METRIC: return "g.V().has('__typeName', T.in, g.V().has('__type', 'typeSystem').filter{it.getProperty('__type.category').name() == 'TRAIT'}.'__type.name'.toSet()).groupCount{it.getProperty('__typeName')}.cap.toList()"; case EXPORT_BY_GUID_FULL: - return "g.V('__guid', startGuid).bothE().bothV().has('__guid').__guid.dedup().toList()"; + return "g.V('__guid', startGuid).bothE().bothV().has('__guid').transform{[__guid:it.__guid,isProcess:(it.__superTypeNames != null) ? it.__superTypeNames.contains('Process') : false ]}.dedup().toList()"; case EXPORT_BY_GUID_CONNECTED_IN_EDGE: - return "g.V('__guid', startGuid).inE().outV().has('__guid').__guid.dedup().toList()"; + return "g.V('__guid', startGuid).inE().outV().has('__guid').transform{[__guid:it.__guid,isProcess:(it.__superTypeNames != null) ? it.__superTypeNames.contains('Process') : false ]}.dedup().toList()"; case EXPORT_BY_GUID_CONNECTED_OUT_EDGE: - return "g.V('__guid', startGuid).outE().inV().has('__guid').__guid.dedup().toList()"; + return "g.V('__guid', startGuid).outE().inV().has('__guid').transform{[__guid:it.__guid,isProcess:(it.__superTypeNames != null) ? it.__superTypeNames.contains('Process') : false ]}.dedup().toList()"; case EXPORT_TYPE_STARTS_WITH: return "g.V().has('__typeName',typeName).filter({it.getProperty(attrName).startsWith(attrValue)}).has('__guid').__guid.toList()"; case EXPORT_TYPE_ENDS_WITH: http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/160b2874/webapp/src/main/java/org/apache/atlas/web/resources/ExportService.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/ExportService.java b/webapp/src/main/java/org/apache/atlas/web/resources/ExportService.java index e123ff7..54faee0 100644 --- a/webapp/src/main/java/org/apache/atlas/web/resources/ExportService.java +++ b/webapp/src/main/java/org/apache/atlas/web/resources/ExportService.java @@ -25,6 +25,7 @@ import org.apache.atlas.model.impexp.AtlasExportRequest; import org.apache.atlas.model.impexp.AtlasExportResult; import org.apache.atlas.model.instance.AtlasClassification; import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.atlas.model.typedef.AtlasBaseTypeDef; import org.apache.atlas.model.typedef.AtlasClassificationDef; @@ -55,14 +56,7 @@ import java.util.List; import java.util.Map; import java.util.Set; -import static org.apache.atlas.model.impexp.AtlasExportRequest.OPTION_FETCH_TYPE; -import static org.apache.atlas.model.impexp.AtlasExportRequest.OPTION_ATTR_MATCH_TYPE; -import static org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_FULL; -import static org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_CONNECTED; -import static org.apache.atlas.model.impexp.AtlasExportRequest.MATCH_TYPE_STARTS_WITH; -import static org.apache.atlas.model.impexp.AtlasExportRequest.MATCH_TYPE_CONTAINS; -import static org.apache.atlas.model.impexp.AtlasExportRequest.MATCH_TYPE_MATCHES; -import static org.apache.atlas.model.impexp.AtlasExportRequest.MATCH_TYPE_ENDS_WITH; +import static org.apache.atlas.model.impexp.AtlasExportRequest.*; public class ExportService { private static final Logger LOG = LoggerFactory.getLogger(ExportService.class); @@ -119,18 +113,22 @@ public class ExportService { } try { - List<AtlasEntity> entities = getStartingEntity(item, context); + List<AtlasEntityWithExtInfo> entities = getStartingEntity(item, context); - for (AtlasEntity entity: entities) { - processEntity(entity, context, TraversalDirection.UNKNOWN); + for (AtlasEntityWithExtInfo entityWithExtInfo : entities) { + processEntity(entityWithExtInfo.getEntity().getGuid(), context); } - while (!context.guidsToProcessIsEmpty()) { - String guid = context.guidsToProcessRemove(0); - TraversalDirection direction = context.guidDirection.get(guid); - AtlasEntity entity = entityGraphRetriever.toAtlasEntity(guid); + while (!context.guidsToProcess.isEmpty()) { + while (!context.guidsToProcess.isEmpty()) { + String guid = context.guidsToProcess.remove(0); + processEntity(guid, context); + } - processEntity(entity, context, direction); + if (!context.guidsLineageToProcess.isEmpty()) { + context.guidsToProcess.addAll(context.guidsLineageToProcess); + context.guidsLineageToProcess.clear(); + } } } catch (AtlasBaseException excp) { context.result.setOperationStatus(AtlasExportResult.OperationStatus.PARTIAL_SUCCESS); @@ -143,11 +141,11 @@ public class ExportService { } } - private List<AtlasEntity> getStartingEntity(AtlasObjectId item, ExportContext context) throws AtlasBaseException { - List<AtlasEntity> ret = new ArrayList<>(); + private List<AtlasEntityWithExtInfo> getStartingEntity(AtlasObjectId item, ExportContext context) throws AtlasBaseException { + List<AtlasEntityWithExtInfo> ret = new ArrayList<>(); if (StringUtils.isNotEmpty(item.getGuid())) { - AtlasEntity entity = entityGraphRetriever.toAtlasEntity(item); + AtlasEntityWithExtInfo entity = entityGraphRetriever.toAtlasEntityWithExtInfo(item); if (entity != null) { ret = Collections.singletonList(entity); @@ -188,17 +186,17 @@ public class ExportService { context.bindings.put("attrName", attribute.getQualifiedName()); context.bindings.put("attrValue", attrValue); - List<String> guids = executeGremlinQuery(queryTemplate, context); + List<String> guids = executeGremlinQueryForGuids(queryTemplate, context); if (CollectionUtils.isNotEmpty(guids)) { for (String guid : guids) { - AtlasEntity entity = entityGraphRetriever.toAtlasEntity(guid); + AtlasEntityWithExtInfo entityWithExtInfo = entityGraphRetriever.toAtlasEntityWithExtInfo(guid); - if (entity == null) { + if (entityWithExtInfo == null) { continue; } - ret.add(entity); + ret.add(entityWithExtInfo); } } @@ -211,24 +209,37 @@ public class ExportService { return ret; } - private void processEntity(AtlasEntity entity, ExportContext context, TraversalDirection direction) throws AtlasBaseException { + private void processEntity(String guid, ExportContext context) throws AtlasBaseException { if (LOG.isDebugEnabled()) { - LOG.debug("==> processEntity({})", AtlasTypeUtil.getAtlasObjectId(entity)); + LOG.debug("==> processEntity({})", guid); } - if (!context.guidsProcessed.contains(entity.getGuid())) { - context.guidsProcessed.add(entity.getGuid()); - context.result.getData().getEntityCreationOrder().add(entity.getGuid()); + if (!context.guidsProcessed.contains(guid)) { + TraversalDirection direction = context.guidDirection.get(guid); + AtlasEntityWithExtInfo entityWithExtInfo = entityGraphRetriever.toAtlasEntityWithExtInfo(guid); + + context.result.getData().getEntityCreationOrder().add(entityWithExtInfo.getEntity().getGuid()); + + addEntity(entityWithExtInfo, context); + addTypesAsNeeded(entityWithExtInfo.getEntity().getTypeName(), context); + addClassificationsAsNeeded(entityWithExtInfo.getEntity(), context); - addTypesAsNeeded(entity.getTypeName(), context); - addClassificationsAsNeeded(entity, context); - addEntity(entity, context); + context.guidsProcessed.add(entityWithExtInfo.getEntity().getGuid()); + getConntedEntitiesBasedOnOption(entityWithExtInfo.getEntity(), context, direction); - getConntedEntitiesBasedOnOption(entity, context, direction); + if(entityWithExtInfo.getReferredEntities() != null) { + for (AtlasEntity e : entityWithExtInfo.getReferredEntities().values()) { + addTypesAsNeeded(e.getTypeName(), context); + addClassificationsAsNeeded(e, context); + getConntedEntitiesBasedOnOption(e, context, direction); + } + + context.guidsProcessed.addAll(entityWithExtInfo.getReferredEntities().keySet()); + } } if (LOG.isDebugEnabled()) { - LOG.debug("<== processEntity({})", AtlasTypeUtil.getAtlasObjectId(entity)); + LOG.debug("<== processEntity({})", guid); } } @@ -245,7 +256,7 @@ public class ExportService { } private void getEntityGuidsForConnectedFetch(AtlasEntity entity, ExportContext context, TraversalDirection direction) throws AtlasBaseException { - if (direction == TraversalDirection.UNKNOWN) { + if (direction == null || direction == TraversalDirection.UNKNOWN) { getConnectedEntityGuids(entity, context, TraversalDirection.OUTWARD, TraversalDirection.INWARD); } else { if (isProcessEntity(entity)) { @@ -272,41 +283,35 @@ public class ExportService { String query = getQueryForTraversalDirection(direction); if (LOG.isDebugEnabled()) { - LOG.debug("==> getConnectedEntityGuids({}): guidsToProcess {} query {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcessSize(), query); + LOG.debug("==> getConnectedEntityGuids({}): guidsToProcess {} query {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size(), query); } context.bindings.clear(); context.bindings.put("startGuid", entity.getGuid()); - List<String> guids = executeGremlinQuery(query, context); + List<HashMap<String, Object>> result = executeGremlinQuery(query, context); - if (CollectionUtils.isEmpty(guids)) { + if (CollectionUtils.isEmpty(result)) { continue; } - for (String guid : guids) { + for (HashMap<String, Object> hashMap : result) { + String guid = (String) hashMap.get("__guid"); TraversalDirection currentDirection = context.guidDirection.get(guid); + boolean isLineage = (boolean) hashMap.get("isProcess"); if (currentDirection == null) { - context.guidDirection.put(guid, direction); + context.addToBeProcessed(isLineage, guid, direction); - if (!context.guidsToProcessContains(guid)) { - context.guidsToProcessAdd(guid); - } } else if (currentDirection == TraversalDirection.OUTWARD && direction == TraversalDirection.INWARD) { - context.guidDirection.put(guid, direction); - // the entity should be reprocessed to get inward entities context.guidsProcessed.remove(guid); - - if (!context.guidsToProcessContains(guid)) { - context.guidsToProcessAdd(guid); - } + context.addToBeProcessed(isLineage, guid, direction); } } if (LOG.isDebugEnabled()) { - LOG.debug("<== getConnectedEntityGuids({}): found {} guids; guidsToProcess {}", entity.getGuid(), guids.size(), context.guidsToProcessSize()); + LOG.debug("<== getConnectedEntityGuids({}): found {} guids; guidsToProcess {}", entity.getGuid(), result.size(), context.guidsToProcess.size()); } } } @@ -324,7 +329,7 @@ public class ExportService { private void getEntityGuidsForFullFetch(AtlasEntity entity, ExportContext context) { if (LOG.isDebugEnabled()) { - LOG.debug("==> getEntityGuidsForFullFetch({}): guidsToProcess {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcessSize()); + LOG.debug("==> getEntityGuidsForFullFetch({}): guidsToProcess {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size()); } String query = this.gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_BY_GUID_FULL); @@ -332,36 +337,38 @@ public class ExportService { context.bindings.clear(); context.bindings.put("startGuid", entity.getGuid()); - List<String> result = executeGremlinQuery(query, context); + List<HashMap<String, Object>> result = executeGremlinQuery(query, context); - if (result == null) { + if (CollectionUtils.isEmpty(result)) { return; } - for (String guid : result) { - if (!context.guidsProcessed.contains(guid)) { - if (!context.guidsToProcessContains(guid)) { - context.guidsToProcessAdd(guid); - } + for (HashMap<String, Object> hashMap : result) { + String guid = (String) hashMap.get("__guid"); + boolean isLineage = (boolean) hashMap.get("isProcess"); - context.guidDirection.put(guid, TraversalDirection.BOTH); + if (!context.guidsProcessed.contains(guid)) { + context.addToBeProcessed(isLineage, guid, TraversalDirection.BOTH); } } if (LOG.isDebugEnabled()) { - LOG.debug("<== getEntityGuidsForFullFetch({}): found {} guids; guidsToProcess {}", entity.getGuid(), result.size(), context.guidsToProcessSize()); + LOG.debug("<== getEntityGuidsForFullFetch({}): found {} guids; guidsToProcess {}", entity.getGuid(), result.size(), context.guidsToProcess.size()); } } - private void addEntity(AtlasEntity entity, ExportContext context) throws AtlasBaseException { + private void addEntity(AtlasEntityWithExtInfo entity, ExportContext context) throws AtlasBaseException { context.sink.add(entity); - context.result.incrementMeticsCounter(String.format("entity:%s", entity.getTypeName())); - context.result.incrementMeticsCounter("entities"); - - if (context.guidsProcessed.size() % 10 == 0) { - LOG.info("export(): in progress.. number of entities exported: {}", context.guidsProcessed.size()); + context.result.incrementMeticsCounter(String.format("entity:%s", entity.getEntity().getTypeName())); + if(entity.getReferredEntities() != null) { + for (AtlasEntity e: entity.getReferredEntities().values()) { + context.result.incrementMeticsCounter(String.format("entity:%s", e.getTypeName())); + } } + + context.result.incrementMeticsCounter("entity:withExtInfo"); + context.reportProgress(); } private void addClassificationsAsNeeded(AtlasEntity entity, ExportContext context) { @@ -394,15 +401,23 @@ public class ExportService { } } - private List<String> executeGremlinQuery(String query, ExportContext context) { + private List<HashMap<String, Object>> executeGremlinQuery(String query, ExportContext context) { try { - return (List<String>) atlasGraph.executeGremlinScript(context.scriptEngine, context.bindings, query, false); + return (List<HashMap<String, Object>>) atlasGraph.executeGremlinScript(context.scriptEngine, context.bindings, query, false); } catch (ScriptException e) { LOG.error("Script execution failed for query: ", query, e); return null; } } + private List<String> executeGremlinQueryForGuids(String query, ExportContext context) { + try { + return (List<String>) atlasGraph.executeGremlinScript(context.scriptEngine, context.bindings, query, false); + } catch (ScriptException e) { + LOG.error("Script execution failed for query: ", query, e); + return null; + } + } private enum TraversalDirection { UNKNOWN, @@ -432,11 +447,57 @@ public class ExportService { } } + private class UniqueList<T> { + private final List<T> list = new ArrayList<>(); + private final Set<T> set = new HashSet<>(); + + public void add(T e) { + if(set.contains(e)) { + return; + } + + list.add(e); + set.add(e); + } + + public void addAll(UniqueList<T> uniqueList) { + for (T item : uniqueList.list) { + if(set.contains(item)) continue; + + set.add(item); + list.add(item); + } + } + + public T remove(int index) { + T e = list.remove(index); + set.remove(e); + return e; + } + + public boolean contains(T e) { + return set.contains(e); + } + + public int size() { + return list.size(); + } + + public boolean isEmpty() { + return list.isEmpty(); + } + + public void clear() { + list.clear(); + set.clear(); + } + } + private class ExportContext { final Set<String> guidsProcessed = new HashSet<>(); - private final List<String> guidsToProcessList = new ArrayList<>(); - private final Set<String> guidsToProcessSet = new HashSet<>(); + final UniqueList<String> guidsToProcess = new UniqueList<>(); + final UniqueList<String> guidsLineageToProcess = new UniqueList<>(); final Map<String, TraversalDirection> guidDirection = new HashMap<>(); final AtlasExportResult result; final ZipSink sink; @@ -446,6 +507,8 @@ public class ExportService { private final ExportFetchType fetchType; private final String matchType; + private int progressReportCount = 0; + ExportContext(AtlasExportResult result, ZipSink sink) { this.result = result; this.sink = sink; @@ -481,33 +544,30 @@ public class ExportService { } public void clear() { - guidsToProcessList.clear(); - guidsToProcessSet.clear(); + guidsToProcess.clear(); guidsProcessed.clear(); guidDirection.clear(); } - public boolean guidsToProcessIsEmpty() { - return this.guidsToProcessList.isEmpty(); - } + public void addToBeProcessed(boolean isSuperTypeProcess, String guid, TraversalDirection direction) { + if(!isSuperTypeProcess) { + guidsToProcess.add(guid); + } - public String guidsToProcessRemove(int i) { - String s = this.guidsToProcessList.remove(i); - guidsToProcessSet.remove(s); - return s; - } + if(isSuperTypeProcess) { + guidsLineageToProcess.add(guid); + } - public int guidsToProcessSize() { - return this.guidsToProcessList.size(); + guidDirection.put(guid, direction); } - public boolean guidsToProcessContains(String guid) { - return guidsToProcessSet.contains(guid); - } + public void reportProgress() { + + if ((guidsProcessed.size() - progressReportCount) > 1000) { + progressReportCount = guidsProcessed.size(); - public void guidsToProcessAdd(String guid) { - this.guidsToProcessList.add(guid); - guidsToProcessSet.add(guid); + LOG.info("export(): in progress.. number of entities exported: {}", this.guidsProcessed.size()); + } } } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/160b2874/webapp/src/main/java/org/apache/atlas/web/resources/ZipSink.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/ZipSink.java b/webapp/src/main/java/org/apache/atlas/web/resources/ZipSink.java index 37d9eb5..c197d41 100644 --- a/webapp/src/main/java/org/apache/atlas/web/resources/ZipSink.java +++ b/webapp/src/main/java/org/apache/atlas/web/resources/ZipSink.java @@ -45,6 +45,11 @@ public class ZipSink { saveToZip(entity.getGuid(), jsonData); } + public void add(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) throws AtlasBaseException { + String jsonData = convertToJSON(entityWithExtInfo); + saveToZip(entityWithExtInfo.getEntity().getGuid(), jsonData); + } + public void setResult(AtlasExportResult result) throws AtlasBaseException { String jsonData = convertToJSON(result); saveToZip(ZipExportFileNames.ATLAS_EXPORT_INFO_NAME, jsonData); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/160b2874/webapp/src/main/java/org/apache/atlas/web/resources/ZipSource.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/ZipSource.java b/webapp/src/main/java/org/apache/atlas/web/resources/ZipSource.java index a69f7fa..661542f 100644 --- a/webapp/src/main/java/org/apache/atlas/web/resources/ZipSource.java +++ b/webapp/src/main/java/org/apache/atlas/web/resources/ZipSource.java @@ -17,17 +17,19 @@ */ package org.apache.atlas.web.resources; -import org.apache.atlas.model.instance.AtlasEntityHeader; -import org.codehaus.jackson.type.TypeReference; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; import org.apache.atlas.model.typedef.AtlasTypesDef; import org.apache.atlas.repository.store.graph.v1.EntityImportStream; import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.type.TypeReference; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.*; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -57,7 +59,7 @@ public class ZipSource implements EntityImportStream { public AtlasTypesDef getTypesDef() throws AtlasBaseException { final String fileName = ZipExportFileNames.ATLAS_TYPESDEF_NAME.toString(); - String s = getFromCache(fileName); + String s = (String) getFromCache(fileName); return convertFromJson(AtlasTypesDef.class, s); } @@ -104,9 +106,10 @@ public class ZipSource implements EntityImportStream { return this.creationOrder; } - public AtlasEntity getEntity(String guid) throws AtlasBaseException { - String s = getFromCache(guid); - return convertFromJson(AtlasEntity.class, s); + public AtlasEntity.AtlasEntityWithExtInfo getEntityWithExtInfo(String guid) throws AtlasBaseException { + String s = (String) getFromCache(guid); + AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = convertFromJson(AtlasEntity.AtlasEntityWithExtInfo.class, s); + return entityWithExtInfo; } private <T> T convertFromJson(TypeReference clazz, String jsonData) throws AtlasBaseException { @@ -136,9 +139,7 @@ public class ZipSource implements EntityImportStream { } private String getFromCache(String entryName) { - if(!guidEntityJsonMap.containsKey(entryName)) return ""; - - return guidEntityJsonMap.get(entryName).toString(); + return guidEntityJsonMap.get(entryName); } public void close() { @@ -158,8 +159,15 @@ public class ZipSource implements EntityImportStream { @Override public AtlasEntity next() { + AtlasEntityWithExtInfo entityWithExtInfo = getNextEntityWithExtInfo(); + + return entityWithExtInfo != null ? entityWithExtInfo.getEntity() : null; + } + + @Override + public AtlasEntityWithExtInfo getNextEntityWithExtInfo() { try { - return getEntity(this.iterator.next()); + return getEntityWithExtInfo(this.iterator.next()); } catch (AtlasBaseException e) { e.printStackTrace(); return null; @@ -186,10 +194,16 @@ public class ZipSource implements EntityImportStream { } } + private AtlasEntity getEntity(String guid) throws AtlasBaseException { + if(guidEntityJsonMap.containsKey(guid)) { + return getEntityWithExtInfo(guid).getEntity(); + } + + return null; + } + @Override public void onImportComplete(String guid) { - if(guid != null) { - guidEntityJsonMap.remove(guid); - } + guidEntityJsonMap.remove(guid); } }
