Repository: incubator-atlas Updated Branches: refs/heads/master 0c1d599dd -> 515130ccd
ATLAS-1503: update export to specify objects-to-export using attribute value Signed-off-by: Madhan Neethiraj <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/515130cc Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/515130cc Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/515130cc Branch: refs/heads/master Commit: 515130ccd09003dbc0bbfd9436629c7d5648b15c Parents: 0c1d599 Author: ashutoshm <[email protected]> Authored: Fri Feb 24 15:41:44 2017 -0800 Committer: Madhan Neethiraj <[email protected]> Committed: Sun Feb 26 20:13:46 2017 -0800 ---------------------------------------------------------------------- .../java/org/apache/atlas/AtlasErrorCode.java | 8 +- .../graph/v1/AtlasEntityChangeNotifier.java | 22 +-- .../store/graph/v1/AtlasEntityStoreV1.java | 2 +- webapp/pom.xml | 1 - .../atlas/web/resources/ExportService.java | 176 +++++++++++++++---- .../apache/atlas/web/resources/ZipSource.java | 9 +- 6 files changed, 161 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/515130cc/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java ---------------------------------------------------------------------- diff --git a/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java b/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java index 542b659..d58c514 100644 --- a/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java +++ b/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java @@ -20,11 +20,10 @@ package org.apache.atlas; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.ws.rs.core.Response; import java.text.MessageFormat; import java.util.Arrays; -import javax.ws.rs.core.Response; - public enum AtlasErrorCode { NO_SEARCH_RESULTS(204, "ATLAS2041E", "Given search filter {0} did not yield any results"), @@ -90,7 +89,10 @@ public enum AtlasErrorCode { DISCOVERY_QUERY_FAILED(500, "ATLAS5004E", "Discovery query failed {0}"), FAILED_TO_OBTAIN_TYPE_UPDATE_LOCK(500, "ATLAS5005E", "Failed to get the lock; another type update might be in progress. Please try again"), FAILED_TO_OBTAIN_IMPORT_EXPORT_LOCK(500, "ATLAS5006E", "Another import or export is in progress. Please try again"), - NOTIFICATION_FAILED(500, "ATLAS5007E", "Failed to notify for change {0}"); + NOTIFICATION_FAILED(500, "ATLAS5007E", "Failed to notify for change {0}"), + GREMLIN_GROOVY_SCRIPT_ENGINE_FAILED(500, "ATLAS5008E", "scriptEngine cannot be initialized for: {0}"), + JSON_ERROR_OBJECT_MAPPER_NULL_RETURNED(500, "ATLAS5009E", "ObjectMapper.readValue returned NULL for class: {0}"), + GREMLIN_SCRIPT_EXECUTION_FAILED(500, "ATLAS5010E", "Script execution failed for: {0}"); private String errorCode; private String errorMessage; http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/515130cc/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java index feada34..4ec2a7c 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java @@ -23,18 +23,13 @@ import com.google.inject.Singleton; import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.AtlasException; import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.listener.EntityChangeListener; import org.apache.atlas.model.instance.AtlasEntityHeader; import org.apache.atlas.model.instance.EntityMutationResponse; import org.apache.atlas.model.instance.EntityMutations.EntityOperation; -import org.apache.atlas.listener.EntityChangeListener; import org.apache.atlas.repository.Constants; import org.apache.atlas.repository.converters.AtlasInstanceConverter; -import org.apache.atlas.repository.graph.AtlasGraphProvider; -import org.apache.atlas.repository.graph.DeleteHandler; -import org.apache.atlas.repository.graph.FullTextMapper; -import org.apache.atlas.repository.graph.GraphHelper; -import org.apache.atlas.repository.graph.GraphToTypedInstanceMapper; -import org.apache.atlas.repository.graph.TypedInstanceToGraphMapper; +import org.apache.atlas.repository.graph.*; import org.apache.atlas.repository.graphdb.AtlasVertex; import org.apache.atlas.typesystem.ITypedReferenceableInstance; import org.apache.atlas.util.AtlasRepositoryConfiguration; @@ -46,11 +41,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Set; -import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.CREATE; -import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.DELETE; -import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.PARTIAL_UPDATE; -import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.UPDATE; - @Singleton public class AtlasEntityChangeNotifier { @@ -157,11 +147,17 @@ public class AtlasEntityChangeNotifier { for (AtlasEntityHeader atlasEntityHeader : atlasEntityHeaders) { AtlasVertex atlasVertex = AtlasGraphUtilsV1.findByGuid(atlasEntityHeader.getGuid()); + + if(atlasVertex == null) { + continue; + } + try { String fullText = fullTextMapper.mapRecursive(atlasVertex, true); + GraphHelper.setProperty(atlasVertex, Constants.ENTITY_TEXT_PROPERTY_KEY, fullText); } catch (AtlasException e) { - LOG.error("FullText mapping failed for Vertex[ guid = {} ]", atlasEntityHeader.getGuid()); + LOG.error("FullText mapping failed for Vertex[ guid = {} ]", atlasEntityHeader.getGuid(), e); } } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/515130cc/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java index 587f3c7..c84f169 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java @@ -160,7 +160,7 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore { while (entityStream.hasNext()) { AtlasEntity entity = entityStream.next(); - if(processedGuids.contains(entity.getGuid())) { + if(entity == null || processedGuids.contains(entity.getGuid())) { continue; } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/515130cc/webapp/pom.xml ---------------------------------------------------------------------- diff --git a/webapp/pom.xml b/webapp/pom.xml index a431e02..e7dce78 100755 --- a/webapp/pom.xml +++ b/webapp/pom.xml @@ -377,7 +377,6 @@ <groupId>com.webcohesion.enunciate</groupId> <artifactId>enunciate-core-annotations</artifactId> </dependency> - </dependencies> <build> http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/515130cc/webapp/src/main/java/org/apache/atlas/web/resources/ExportService.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/ExportService.java b/webapp/src/main/java/org/apache/atlas/web/resources/ExportService.java index bbd48bc..1e98232 100644 --- a/webapp/src/main/java/org/apache/atlas/web/resources/ExportService.java +++ b/webapp/src/main/java/org/apache/atlas/web/resources/ExportService.java @@ -17,48 +17,73 @@ */ package org.apache.atlas.web.resources; +import com.tinkerpop.gremlin.groovy.jsr223.GremlinGroovyScriptEngine; +import org.apache.atlas.AtlasErrorCode; +import org.apache.atlas.AtlasException; +import org.apache.atlas.AtlasServiceException; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.impexp.AtlasExportRequest; +import org.apache.atlas.model.impexp.AtlasExportResult; import org.apache.atlas.model.instance.AtlasClassification; +import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasObjectId; +import org.apache.atlas.model.typedef.AtlasClassificationDef; +import org.apache.atlas.model.typedef.AtlasEntityDef; import org.apache.atlas.model.typedef.AtlasTypesDef; import org.apache.atlas.repository.graph.AtlasGraphProvider; import org.apache.atlas.repository.graphdb.AtlasGraph; import org.apache.atlas.repository.store.graph.v1.EntityGraphRetriever; +import org.apache.atlas.type.AtlasEntityType; +import org.apache.atlas.type.AtlasStructType.AtlasAttribute; import org.apache.atlas.type.AtlasTypeRegistry; -import org.apache.atlas.AtlasException; -import org.apache.atlas.AtlasServiceException; -import org.apache.atlas.exception.AtlasBaseException; -import org.apache.atlas.model.instance.AtlasEntity; -import org.apache.atlas.model.impexp.*; -import org.apache.atlas.model.typedef.AtlasClassificationDef; -import org.apache.atlas.model.typedef.AtlasEntityDef; import org.apache.atlas.type.AtlasTypeUtil; import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.MapUtils; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.script.*; +import javax.script.Bindings; +import javax.script.ScriptContext; +import javax.script.ScriptEngine; +import javax.script.ScriptException; import java.util.*; public class ExportService { private static final Logger LOG = LoggerFactory.getLogger(ExportService.class); + public static final String OPTION_ATTR_MATCH_TYPE = "matchType"; + public static final String MATCH_TYPE_STARTS_WITH = "startsWith"; + public static final String MATCH_TYPE_ENDS_WITH = "endsWith"; + public static final String MATCH_TYPE_CONTAINS = "contains"; + public static final String MATCH_TYPE_MATCHES = "matches"; + private final AtlasTypeRegistry typeRegistry; private final AtlasGraph atlasGraph; private final EntityGraphRetriever entityGraphRetriever; // query engine support - private ScriptEngineManager scriptEngineManager; - private ScriptEngine scriptEngine; - private Bindings bindings; - private final String gremlinQuery = "g.V('__guid', startGuid).bothE().bothV().has('__guid').__guid.dedup().toList()"; - - public ExportService(final AtlasTypeRegistry typeRegistry) { + private final ScriptEngine scriptEngine; + private final Bindings bindings; + private final String queryByGuid = "g.V('__guid', startGuid).bothE().bothV().has('__guid').__guid.dedup().toList()"; + final private String queryByAttrEquals = "g.V().has('__typeName','%s').has('%s', attrValue).has('__guid').__guid.toList()"; + final private String queryByAttrStartWith = "g.V().has('__typeName','%s').filter({it.'%s'.startsWith(attrValue)}).has('__guid').__guid.toList()"; + final private String queryByAttrEndsWith = "g.V().has('__typeName','%s').filter({it.'%s'.endsWith(attrValue)}).has('__guid').__guid.toList()"; + final private String queryByAttrContains = "g.V().has('__typeName','%s').filter({it.'%s'.contains(attrValue)}).has('__guid').__guid.toList()"; + final private String queryByAttrMatches = "g.V().has('__typeName','%s').filter({it.'%s'.matches(attrValue)}).has('__guid').__guid.toList()"; + + public ExportService(final AtlasTypeRegistry typeRegistry) throws AtlasBaseException { this.typeRegistry = typeRegistry; this.entityGraphRetriever = new EntityGraphRetriever(this.typeRegistry); this.atlasGraph = AtlasGraphProvider.getGraphInstance(); - initScriptEngine(); + this.scriptEngine = new GremlinGroovyScriptEngine(); + + //Do not cache script compilations due to memory implications + scriptEngine.getContext().setAttribute("#jsr223.groovy.engine.keep.globals", "phantom", ScriptContext.ENGINE_SCOPE); + + bindings = scriptEngine.getBindings(ScriptContext.ENGINE_SCOPE); } private class ExportContext { @@ -109,16 +134,18 @@ public class ExportService { } try { - AtlasEntity entity = entityGraphRetriever.toAtlasEntity(item); + List<AtlasEntity> entities = getStartingEntity(item, context); - processEntity(entity, context); + for (AtlasEntity entity: entities) { + processEntity(entity, context); + } while (!context.guidsToProcess.isEmpty()) { String guid = context.guidsToProcess.remove(0); - entity = entityGraphRetriever.toAtlasEntity(guid); + AtlasEntity e = entityGraphRetriever.toAtlasEntity(guid); - processEntity(entity, context); + processEntity(e, context); } } catch (AtlasBaseException excp) { context.result.setOperationStatus(AtlasExportResult.OperationStatus.PARTIAL_SUCCESS); @@ -131,19 +158,92 @@ public class ExportService { } } + private List<AtlasEntity> getStartingEntity(AtlasObjectId item, ExportContext context) throws AtlasBaseException { + List<AtlasEntity> ret = new ArrayList<>(); + + if (StringUtils.isNotEmpty(item.getGuid())) { + AtlasEntity entity = entityGraphRetriever.toAtlasEntity(item); + + if (entity != null) { + ret = Collections.singletonList(entity); + } + } else if (StringUtils.isNotEmpty(item.getTypeName()) && MapUtils.isNotEmpty(item.getUniqueAttributes())) { + String typeName = item.getTypeName(); + AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName); + + if (entityType == null) { + throw new AtlasBaseException(AtlasErrorCode.UNKNOWN_TYPENAME, typeName); + } + + AtlasExportRequest request = context.result.getRequest(); + String matchType = null; + + if (MapUtils.isNotEmpty(request.getOptions())) { + if (request.getOptions().get(OPTION_ATTR_MATCH_TYPE) != null) { + matchType = request.getOptions().get(OPTION_ATTR_MATCH_TYPE).toString(); + } + } + + final String queryTemplate; + if (StringUtils.equalsIgnoreCase(matchType, MATCH_TYPE_STARTS_WITH)) { + queryTemplate = queryByAttrStartWith; + } else if (StringUtils.equalsIgnoreCase(matchType, MATCH_TYPE_ENDS_WITH)) { + queryTemplate = queryByAttrEndsWith; + } else if (StringUtils.equalsIgnoreCase(matchType, MATCH_TYPE_CONTAINS)) { + queryTemplate = queryByAttrContains; + } else if (StringUtils.equalsIgnoreCase(matchType, MATCH_TYPE_MATCHES)) { + queryTemplate = queryByAttrMatches; + } else { // default + queryTemplate = queryByAttrEquals; + } + + for (Map.Entry<String, Object> e : item.getUniqueAttributes().entrySet()) { + String attrName = e.getKey(); + Object attrValue = e.getValue(); + + AtlasAttribute attribute = entityType.getAttribute(attrName); + + if (attribute == null || attrValue == null) { + continue; + } + + String query = String.format(queryTemplate, typeName, attribute.getQualifiedName()); + List<String> guids = executeGremlinScriptFor(query, "attrValue", attrValue.toString()); + + if (CollectionUtils.isNotEmpty(guids)) { + for (String guid : guids) { + AtlasEntity entity = entityGraphRetriever.toAtlasEntity(guid); + + if (entity == null) { + continue; + } + + ret.add(entity); + } + } + + break; + } + + LOG.info("export(item={}; matchType={}): found {} entities", item, matchType, ret.size()); + } + + return ret; + } + private void processEntity(AtlasEntity entity, ExportContext context) throws AtlasBaseException { if (LOG.isDebugEnabled()) { LOG.debug("==> processEntity({})", AtlasTypeUtil.getAtlasObjectId(entity)); } if (!context.guidsProcessed.contains(entity.getGuid())) { + context.guidsProcessed.add(entity.getGuid()); + context.result.getData().getEntityCreationOrder().add(entity.getGuid()); + addTypesAsNeeded(entity.getTypeName(), context); addClassificationsAsNeeded(entity, context); addEntity(entity, context); - context.guidsProcessed.add(entity.getGuid()); - context.result.getData().getEntityCreationOrder().add(entity.getGuid()); - getConnectedEntityGuids(entity, context); } @@ -159,7 +259,11 @@ public class ExportService { LOG.debug("==> getConnectedEntityGuids({}): guidsToProcess {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size()); } - List<String> result = executeGremlinScriptFor(entity.getGuid()); + List<String> result = executeGremlinScriptForHive(entity.getGuid()); + if(result == null) { + return; + } + for (String guid : result) { if (!context.guidsProcessed.contains(guid)) { context.guidsToProcess.add(guid); @@ -215,22 +319,20 @@ public class ExportService { } } - private List<String> executeGremlinScriptFor(String guid) throws ScriptException { - - bindings.put("startGuid", guid); - return (List<String>) atlasGraph.executeGremlinScript(this.scriptEngine, this.bindings, this.gremlinQuery, false); + private List<String> executeGremlinScriptForHive(String guid) throws ScriptException { + return executeGremlinScriptFor(this.queryByGuid, "startGuid", guid); } - private void initScriptEngine() { - if (scriptEngineManager != null) { - return; + private List<String> executeGremlinScriptFor(String query, String parameterName, String parameterValue) { + bindings.put(parameterName, parameterValue); + try { + return (List<String>) atlasGraph.executeGremlinScript(this.scriptEngine, + this.bindings, + query, + false); + } catch (ScriptException e) { + LOG.error("Script execution failed for query: ", query, e); + return null; } - - scriptEngineManager = new ScriptEngineManager(); - scriptEngine = scriptEngineManager.getEngineByName("gremlin-groovy"); - bindings = scriptEngine.createBindings(); - - //Do not cache script compilations due to memory implications - scriptEngine.getContext().setAttribute("#jsr223.groovy.engine.keep.globals", "phantom", ScriptContext.ENGINE_SCOPE); } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/515130cc/webapp/src/main/java/org/apache/atlas/web/resources/ZipSource.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/ZipSource.java b/webapp/src/main/java/org/apache/atlas/web/resources/ZipSource.java index e69a139..4596084 100644 --- a/webapp/src/main/java/org/apache/atlas/web/resources/ZipSource.java +++ b/webapp/src/main/java/org/apache/atlas/web/resources/ZipSource.java @@ -34,6 +34,8 @@ import java.util.Map; import java.util.zip.ZipEntry; import java.util.zip.ZipInputStream; +import static org.apache.atlas.AtlasErrorCode.JSON_ERROR_OBJECT_MAPPER_NULL_RETURNED; + public class ZipSource implements EntityImportStream { private static final Logger LOG = LoggerFactory.getLogger(ZipSource.class); @@ -80,7 +82,6 @@ public class ZipSource implements EntityImportStream { String entryName = zipEntry.getName().replace(".json", ""); if (guidEntityJsonMap.containsKey(entryName)) continue; - if (zipEntry == null) continue; byte[] buf = new byte[1024]; @@ -111,8 +112,12 @@ public class ZipSource implements EntityImportStream { try { ObjectMapper mapper = new ObjectMapper(); - return mapper.readValue(jsonData, clazz); + T ret = mapper.readValue(jsonData, clazz); + if(ret == null) { + throw new AtlasBaseException(JSON_ERROR_OBJECT_MAPPER_NULL_RETURNED, clazz.toString()); + } + return ret; } catch (Exception e) { throw new AtlasBaseException("Error converting file to JSON.", e); }
