Repository: incubator-atlas Updated Branches: refs/heads/0.7-incubating 4c3a7f3dc -> dc0b29446
ATLAS-1403 and Performance fixes for search, lineage Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/3407303d Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/3407303d Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/3407303d Branch: refs/heads/0.7-incubating Commit: 3407303d7876ec8a90539c4d6737607116439989 Parents: 4c3a7f3 Author: Suma Shivaprasad <[email protected]> Authored: Tue Dec 27 15:09:12 2016 -0800 Committer: Madhan Neethiraj <[email protected]> Committed: Tue Dec 27 15:09:12 2016 -0800 ---------------------------------------------------------------------- release-log.txt | 2 + .../atlas/discovery/DataSetLineageService.java | 178 +++++++++++++++---- .../graph/DefaultGraphPersistenceStrategy.java | 5 + .../graph/GraphBackedDiscoveryService.java | 3 +- .../atlas/repository/graph/GraphHelper.java | 18 +- .../org/apache/atlas/query/ClosureQuery.scala | 6 +- .../query/GraphPersistenceStrategies.scala | 22 ++- .../apache/atlas/query/GremlinEvaluator.scala | 8 +- .../org/apache/atlas/BaseRepositoryTest.java | 4 +- .../discovery/DataSetLineageServiceTest.java | 64 ++++--- .../GraphBackedMetadataRepositoryTest.java | 4 +- .../org/apache/atlas/query/GremlinTest2.scala | 12 +- .../titan/diskstorage/solr/Solr5Index.java | 1 - 13 files changed, 239 insertions(+), 88 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/3407303d/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 2543526..8f956f9 100644 --- a/release-log.txt +++ b/release-log.txt @@ -32,6 +32,8 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file (dosset ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags) ALL CHANGES: +ATLAS-1403 Performance fixes for search, lineage +ATLAS-1342 Titan Solrclient - Add timeouts for zookeeper connect and session (sumasai) ATLAS-1402 fix UI input validation ATLAS-1192 Atlas IE support (kevalbhatt) ATLAS-1215 Atlas UI not working in firefox due to fix in ATLAS-1199 (kevalbhatt) http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/3407303d/repository/src/main/java/org/apache/atlas/discovery/DataSetLineageService.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/discovery/DataSetLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/DataSetLineageService.java index c216469..5a3a8cc 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/DataSetLineageService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/DataSetLineageService.java @@ -18,7 +18,9 @@ package org.apache.atlas.discovery; +import com.google.common.base.Splitter; import com.thinkaurelius.titan.core.TitanGraph; +import com.tinkerpop.blueprints.Vertex; import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasException; @@ -26,17 +28,30 @@ import org.apache.atlas.AtlasProperties; import org.apache.atlas.GraphTransaction; import org.apache.atlas.discovery.graph.DefaultGraphPersistenceStrategy; import org.apache.atlas.discovery.graph.GraphBackedDiscoveryService; +import org.apache.atlas.query.ClosureQuery; +import org.apache.atlas.query.Expressions; +import org.apache.atlas.query.GremlinEvaluator; +import org.apache.atlas.query.GremlinQuery; import org.apache.atlas.query.GremlinQueryResult; +import org.apache.atlas.query.GremlinTranslator; import org.apache.atlas.query.InputLineageClosureQuery; import org.apache.atlas.query.OutputLineageClosureQuery; import org.apache.atlas.query.QueryParams; +import org.apache.atlas.query.QueryProcessor; +import org.apache.atlas.repository.Constants; import org.apache.atlas.repository.MetadataRepository; import org.apache.atlas.repository.graph.GraphProvider; +import org.apache.atlas.repository.graph.GraphHelper; +import org.apache.atlas.typesystem.ITypedReferenceableInstance; import org.apache.atlas.typesystem.exception.EntityNotFoundException; import org.apache.atlas.typesystem.exception.SchemaNotFoundException; -import org.apache.atlas.typesystem.persistence.ReferenceableInstance; +import org.apache.atlas.typesystem.persistence.Id; +import org.apache.atlas.typesystem.types.ClassType; +import org.apache.atlas.typesystem.types.TypeSystem; +import org.apache.atlas.typesystem.types.TypeUtils; import org.apache.atlas.utils.ParamChecker; import org.apache.commons.configuration.Configuration; +import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.Option; @@ -45,6 +60,11 @@ import scala.collection.immutable.List; import javax.inject.Inject; import javax.inject.Singleton; +import javax.script.ScriptException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; /** * Hive implementation of Lineage service interface. @@ -61,16 +81,39 @@ public class DataSetLineageService implements LineageService { public static final String DATASET_SCHEMA_QUERY_PREFIX = "atlas.lineage.schema.query."; + public static final String DATASET_SCHEMA_ATTRIBUTE = "atlas.lineage.schema.attribute."; + private static final String HIVE_PROCESS_TYPE_NAME = "Process"; private static final String HIVE_PROCESS_INPUT_ATTRIBUTE_NAME = "inputs"; private static final String HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME = "outputs"; - private static final String DATASET_EXISTS_QUERY = AtlasClient.DATA_SET_SUPER_TYPE + " where __guid = '%s'"; - private static final String DATASET_NAME_EXISTS_QUERY = - AtlasClient.DATA_SET_SUPER_TYPE + " where " + AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME + "='%s' and __state = 'ACTIVE'"; + private static final String INPUT_PROCESS_EDGE = "__Process.inputs"; + private static final String OUTPUT_PROCESS_EDGE = "__Process.outputs"; private static final Configuration propertiesConf; + private MetadataRepository metadataRepository; + + private final TypeSystem typeSystem = TypeSystem.getInstance(); + + private final GraphHelper graphHelper = GraphHelper.getInstance(); + + private final TitanGraph titanGraph; + private final DefaultGraphPersistenceStrategy graphPersistenceStrategy; + private final GraphBackedDiscoveryService discoveryService; + + private Map<String, String> schemaAttributeCache = new HashMap<>(); + + /** + * Gremlin query to retrieve all (no fixed depth) input/output lineage for a DataSet entity. + * return list of Atlas vertices paths. + */ + private static final String FULL_LINEAGE_QUERY = "g.v(%s).as('src').in('%s').out('%s')." + + "loop('src', {((it.path.contains(it.object)) ? false : true)}, " + + "{((it.object.'__superTypeNames') ? " + + "(it.object.'__superTypeNames'.contains('DataSet')) : false)})." + + "enablePath().as('dest').select(['src', 'dest'], {[it.'Asset.name', it.'Referenceable.qualifiedName']}, {[it.'Asset.name', it.'Referenceable.qualifiedName']}).path().toList()"; + static { try { propertiesConf = ApplicationProperties.get(); @@ -80,14 +123,11 @@ public class DataSetLineageService implements LineageService { } - private final TitanGraph titanGraph; - private final DefaultGraphPersistenceStrategy graphPersistenceStrategy; - private final GraphBackedDiscoveryService discoveryService; - @Inject DataSetLineageService(GraphProvider<TitanGraph> graphProvider, MetadataRepository metadataRepository, GraphBackedDiscoveryService discoveryService) throws DiscoveryException { this.titanGraph = graphProvider.get(); + this.metadataRepository = metadataRepository; this.graphPersistenceStrategy = new DefaultGraphPersistenceStrategy(metadataRepository); this.discoveryService = discoveryService; } @@ -103,8 +143,8 @@ public class DataSetLineageService implements LineageService { public String getOutputsGraph(String datasetName) throws AtlasException { LOG.info("Fetching lineage outputs graph for datasetName={}", datasetName); datasetName = ParamChecker.notEmpty(datasetName, "dataset name"); - ReferenceableInstance datasetInstance = validateDatasetNameExists(datasetName); - return getOutputsGraphForId(datasetInstance.getId()._getId()); + TypeUtils.Pair<String, String> typeIdPair = validateDatasetNameExists(datasetName); + return getOutputsGraphForId(typeIdPair.right); } /** @@ -118,8 +158,8 @@ public class DataSetLineageService implements LineageService { public String getInputsGraph(String tableName) throws AtlasException { LOG.info("Fetching lineage inputs graph for tableName={}", tableName); tableName = ParamChecker.notEmpty(tableName, "table name"); - ReferenceableInstance datasetInstance = validateDatasetNameExists(tableName); - return getInputsGraphForId(datasetInstance.getId()._getId()); + TypeUtils.Pair<String, String> typeIdPair = validateDatasetNameExists(tableName); + return getInputsGraphForId(typeIdPair.right); } @Override @@ -131,13 +171,22 @@ public class DataSetLineageService implements LineageService { return getInputsGraphForId(guid); } - private String getInputsGraphForId(String guid) { + private String getInputsGraphForId(String guid) throws AtlasException { + + Vertex instanceVertex = GraphHelper.getInstance().getVertexForGUID(guid); + Object instanceVertexId = instanceVertex.getId(); + + String lineageQuery = String.format(FULL_LINEAGE_QUERY, instanceVertexId, OUTPUT_PROCESS_EDGE, INPUT_PROCESS_EDGE); InputLineageClosureQuery - inputsQuery = new InputLineageClosureQuery(AtlasClient.DATA_SET_SUPER_TYPE, SELECT_INSTANCE_GUID, - guid, HIVE_PROCESS_TYPE_NAME, - HIVE_PROCESS_INPUT_ATTRIBUTE_NAME, HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME, Option.empty(), - SELECT_ATTRIBUTES, true, graphPersistenceStrategy, titanGraph); - return inputsQuery.graph().toInstanceJson(); + inputsQuery = new InputLineageClosureQuery(AtlasClient.DATA_SET_SUPER_TYPE, SELECT_INSTANCE_GUID, + guid, HIVE_PROCESS_TYPE_NAME, + HIVE_PROCESS_INPUT_ATTRIBUTE_NAME, HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME, Option.empty(), + SELECT_ATTRIBUTES, true, graphPersistenceStrategy, titanGraph); + + LOG.info("Evaluating gremlin lineage input query ={}", lineageQuery); + + GremlinQueryResult result = evaluateLineageQuery(inputsQuery, lineageQuery); + return inputsQuery.graph(result).toInstanceJson(); } @Override @@ -149,12 +198,29 @@ public class DataSetLineageService implements LineageService { return getOutputsGraphForId(guid); } - private String getOutputsGraphForId(String guid) { + private String getOutputsGraphForId(String guid) throws EntityNotFoundException { + Object instanceVertexId = graphHelper.getVertexId(guid); + String lineageQuery = String.format(FULL_LINEAGE_QUERY, instanceVertexId, INPUT_PROCESS_EDGE, OUTPUT_PROCESS_EDGE); + OutputLineageClosureQuery outputsQuery = new OutputLineageClosureQuery(AtlasClient.DATA_SET_SUPER_TYPE, SELECT_INSTANCE_GUID, guid, HIVE_PROCESS_TYPE_NAME, HIVE_PROCESS_INPUT_ATTRIBUTE_NAME, HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME, Option.empty(), SELECT_ATTRIBUTES, true, graphPersistenceStrategy, titanGraph); - return outputsQuery.graph().toInstanceJson(); + + LOG.info("Evaluating gremlin lineage output query ={}", lineageQuery); + + GremlinQueryResult result = evaluateLineageQuery(outputsQuery, lineageQuery); + return outputsQuery.graph(result).toInstanceJson(); + } + + private GremlinQueryResult evaluateLineageQuery(ClosureQuery closureQuery, String lineageQuery) { + Expressions.Expression validatedExpression = QueryProcessor.validate(closureQuery.expr()); + GremlinQuery gremlinQuery = new GremlinTranslator(validatedExpression, graphPersistenceStrategy).translate(); + //Replace with handcrafted gremlin till we optimize the DSL query associated with guid + typeName + GremlinQuery optimizedQuery = new GremlinQuery(gremlinQuery.expr(), lineageQuery, gremlinQuery.resultMaping()); + + + return new GremlinEvaluator(optimizedQuery, graphPersistenceStrategy, titanGraph).evaluate(); } /** @@ -168,12 +234,12 @@ public class DataSetLineageService implements LineageService { public String getSchema(String datasetName) throws AtlasException { datasetName = ParamChecker.notEmpty(datasetName, "table name"); LOG.info("Fetching schema for tableName={}", datasetName); - ReferenceableInstance datasetInstance = validateDatasetNameExists(datasetName); + TypeUtils.Pair<String, String> typeIdPair = validateDatasetNameExists(datasetName); - return getSchemaForId(datasetInstance.getTypeName(), datasetInstance.getId()._getId()); + return getSchemaForId(typeIdPair.left, typeIdPair.right); } - private String getSchemaForId(String typeName, String guid) throws DiscoveryException, SchemaNotFoundException { + private String getSchemaForId(String typeName, String guid) throws AtlasException { String configName = DATASET_SCHEMA_QUERY_PREFIX + typeName; if (propertiesConf.getString(configName) != null) { final String schemaQuery = @@ -184,13 +250,45 @@ public class DataSetLineageService implements LineageService { throw new SchemaNotFoundException("Schema is not configured for type " + typeName + ". Configure " + configName); } + private String getSchemaForId(ITypedReferenceableInstance instance, ClassType type) throws AtlasException { + String configName = DATASET_SCHEMA_QUERY_PREFIX + instance.getTypeName(); + String schemaAttrName = null; + + if ( schemaAttributeCache.containsKey(instance.getTypeName()) ) { + schemaAttrName = schemaAttributeCache.get(instance.getTypeName()); + } else { + schemaAttrName = getSchemaAttributeName(configName, instance.getTypeName()); + schemaAttributeCache.put(instance.getTypeName(), schemaAttrName); + } + + if (schemaAttrName != null) { + java.util.List schemaValue = (java.util.List) instance.get(schemaAttrName); + GremlinQueryResult queryResult = new GremlinQueryResult(schemaAttrName, type, schemaValue); + return queryResult.toJson(); + } + throw new SchemaNotFoundException("Schema is not configured for type " + instance.getTypeName() + ". Configure " + configName); + } + + + private String getSchemaAttributeName(String configName, String typeName) throws SchemaNotFoundException { + String schemaQuery = propertiesConf.getString(configName); + final String[] configs = schemaQuery != null ? schemaQuery.split(",") : null; + + if (configs != null && configs.length == 2) { + LOG.info("Extracted schema attribute {} for type {} with query {} ", configs[1], typeName, schemaQuery); + return configs[1].trim(); + } else { + throw new SchemaNotFoundException("Schema is not configured as expected for type " + typeName); + } + } + @Override @GraphTransaction public String getSchemaForEntity(String guid) throws AtlasException { guid = ParamChecker.notEmpty(guid, "Entity id"); LOG.info("Fetching schema for entity guid={}", guid); - String typeName = validateDatasetExists(guid); - return getSchemaForId(typeName, guid); + Pair<ITypedReferenceableInstance, ClassType> instanceClassTypePair = validateDatasetExists(guid); + return getSchemaForId(instanceClassTypePair.getLeft(), instanceClassTypePair.getRight()); } /** @@ -198,14 +296,16 @@ public class DataSetLineageService implements LineageService { * * @param datasetName table name */ - private ReferenceableInstance validateDatasetNameExists(String datasetName) throws AtlasException { - final String tableExistsQuery = String.format(DATASET_NAME_EXISTS_QUERY, datasetName); - GremlinQueryResult queryResult = discoveryService.evaluate(tableExistsQuery, new QueryParams(1, 0)); - if (!(queryResult.rows().length() > 0)) { - throw new EntityNotFoundException(datasetName + " does not exist"); + private TypeUtils.Pair<String, String> validateDatasetNameExists(String datasetName) throws AtlasException { + Iterator<Vertex> results = titanGraph.query().has("Referenceable.qualifiedName", datasetName) + .has(Constants.STATE_PROPERTY_KEY, Id.EntityState.ACTIVE.name()) + .has(Constants.SUPER_TYPES_PROPERTY_KEY, AtlasClient.DATA_SET_SUPER_TYPE).limit(1) + .vertices().iterator(); + while (results.hasNext()) { + Vertex vertex = results.next(); + return TypeUtils.Pair.of(GraphHelper.getTypeName(vertex), GraphHelper.getIdFromVertex(vertex)); } - - return (ReferenceableInstance)queryResult.rows().apply(0); + throw new EntityNotFoundException("Dataset with name = " + datasetName + " does not exist"); } /** @@ -213,14 +313,14 @@ public class DataSetLineageService implements LineageService { * * @param guid entity id */ - private String validateDatasetExists(String guid) throws AtlasException { - final String datasetExistsQuery = String.format(DATASET_EXISTS_QUERY, guid); - GremlinQueryResult queryResult = discoveryService.evaluate(datasetExistsQuery, new QueryParams(1, 0)); - if (!(queryResult.rows().length() > 0)) { + private Pair<ITypedReferenceableInstance, ClassType> validateDatasetExists(String guid) throws AtlasException { + + ITypedReferenceableInstance instance = metadataRepository.getEntityDefinition(guid); + String typeName = instance.getTypeName(); + ClassType clsType = typeSystem.getDataType(ClassType.class, typeName); + if ( !clsType.superTypes.contains(AtlasClient.DATA_SET_SUPER_TYPE) ) { throw new EntityNotFoundException("Dataset with guid = " + guid + " does not exist"); } - - ReferenceableInstance referenceable = (ReferenceableInstance)queryResult.rows().apply(0); - return referenceable.getTypeName(); + return Pair.of(instance, clsType); } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/3407303d/repository/src/main/java/org/apache/atlas/discovery/graph/DefaultGraphPersistenceStrategy.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/discovery/graph/DefaultGraphPersistenceStrategy.java b/repository/src/main/java/org/apache/atlas/discovery/graph/DefaultGraphPersistenceStrategy.java index b17eec7..5143fc8 100755 --- a/repository/src/main/java/org/apache/atlas/discovery/graph/DefaultGraphPersistenceStrategy.java +++ b/repository/src/main/java/org/apache/atlas/discovery/graph/DefaultGraphPersistenceStrategy.java @@ -247,6 +247,11 @@ public class DefaultGraphPersistenceStrategy implements GraphPersistenceStrategi } @Override + public boolean filterBySubTypes() { + return GraphPersistenceStrategies$class.filterBySubTypes(this); + } + + @Override public boolean addGraphVertexPrefix(scala.collection.Traversable<String> preStatements) { return GraphPersistenceStrategies$class.addGraphVertexPrefix(this, preStatements); } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/3407303d/repository/src/main/java/org/apache/atlas/discovery/graph/GraphBackedDiscoveryService.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/discovery/graph/GraphBackedDiscoveryService.java b/repository/src/main/java/org/apache/atlas/discovery/graph/GraphBackedDiscoveryService.java index 0c029bb..060b086 100755 --- a/repository/src/main/java/org/apache/atlas/discovery/graph/GraphBackedDiscoveryService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/graph/GraphBackedDiscoveryService.java @@ -56,6 +56,7 @@ import javax.script.ScriptEngine; import javax.script.ScriptEngineManager; import javax.script.ScriptException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -150,7 +151,7 @@ public class GraphBackedDiscoveryService implements DiscoveryService { //If the final limit is 0, don't launch the query, return with 0 rows if (validatedExpression instanceof Expressions.LimitExpression && ((Integer)((Expressions.LimitExpression) validatedExpression).limit().rawValue()) == 0) { - return new GremlinQueryResult(dslQuery, validatedExpression.dataType()); + return new GremlinQueryResult(dslQuery, validatedExpression.dataType(), Collections.emptyList()); } GremlinQuery gremlinQuery = new GremlinTranslator(validatedExpression, graphPersistenceStrategy).translate(); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/3407303d/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java index 334177c..fb2c2ee 100755 --- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java +++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java @@ -19,8 +19,10 @@ package org.apache.atlas.repository.graph; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; import com.google.common.collect.BiMap; import com.google.common.collect.HashBiMap; +import com.google.common.collect.Lists; import com.thinkaurelius.titan.core.TitanGraph; import com.thinkaurelius.titan.core.TitanProperty; import com.thinkaurelius.titan.core.TitanVertex; @@ -30,6 +32,7 @@ import com.tinkerpop.blueprints.Element; import com.tinkerpop.blueprints.Graph; import com.tinkerpop.blueprints.GraphQuery; import com.tinkerpop.blueprints.Vertex; +import com.tinkerpop.pipes.util.structures.Row; import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasException; import org.apache.atlas.RequestContext; @@ -50,11 +53,17 @@ import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.script.Bindings; +import javax.script.ScriptContext; +import javax.script.ScriptEngine; +import javax.script.ScriptEngineManager; +import javax.script.ScriptException; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.UUID; @@ -579,4 +588,11 @@ public final class GraphHelper { } return key; } -} + + public Object getVertexId(String guid) throws EntityNotFoundException { + Vertex instanceVertex = getVertexForGUID(guid); + Object instanceVertexId = instanceVertex.getId(); + return instanceVertexId; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/3407303d/repository/src/main/scala/org/apache/atlas/query/ClosureQuery.scala ---------------------------------------------------------------------- diff --git a/repository/src/main/scala/org/apache/atlas/query/ClosureQuery.scala b/repository/src/main/scala/org/apache/atlas/query/ClosureQuery.scala index c4621cd..f2dd4e6 100755 --- a/repository/src/main/scala/org/apache/atlas/query/ClosureQuery.scala +++ b/repository/src/main/scala/org/apache/atlas/query/ClosureQuery.scala @@ -147,7 +147,7 @@ trait ClosureQuery { QueryProcessor.evaluate(e, g, persistenceStrategy) } - def graph : GraphResult = { + def graph(res: GremlinQueryResult) : GraphResult = { if (!withPath) { throw new ExpressionException(expr, "Graph requested for non Path Query") @@ -155,8 +155,6 @@ trait ClosureQuery { import scala.collection.JavaConverters._ - val res = evaluate() - val graphResType = TypeUtils.GraphResultStruct.createType(res.resultDataType.asInstanceOf[StructType]) val vertexPayloadType = { val mT = graphResType.fieldMapping.fields.get(TypeUtils.GraphResultStruct.verticesAttrName). @@ -187,7 +185,7 @@ trait ClosureQuery { * add an entry for the Src Vertex to the vertex Map * add an entry for the Dest Vertex to the vertex Map */ - res.rows.map(_.asInstanceOf[StructInstance]).foreach { r => + res.rows.asScala.map(_.asInstanceOf[StructInstance]).foreach { r => val path = r.get(TypeUtils.ResultWithPathStruct.pathAttrName).asInstanceOf[java.util.List[_]].asScala val srcVertex = path.head.asInstanceOf[StructInstance] http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/3407303d/repository/src/main/scala/org/apache/atlas/query/GraphPersistenceStrategies.scala ---------------------------------------------------------------------- diff --git a/repository/src/main/scala/org/apache/atlas/query/GraphPersistenceStrategies.scala b/repository/src/main/scala/org/apache/atlas/query/GraphPersistenceStrategies.scala index f774d97..92b7e96 100755 --- a/repository/src/main/scala/org/apache/atlas/query/GraphPersistenceStrategies.scala +++ b/repository/src/main/scala/org/apache/atlas/query/GraphPersistenceStrategies.scala @@ -30,7 +30,9 @@ import org.apache.atlas.repository.graph.{GraphHelper, GraphBackedMetadataReposi import org.apache.atlas.typesystem.persistence.Id import org.apache.atlas.typesystem.types.DataTypes._ import org.apache.atlas.typesystem.types._ +import org.apache.atlas.typesystem.types.cache.TypeCache import org.apache.atlas.typesystem.{ITypedInstance, ITypedReferenceableInstance} +import org.elasticsearch.common.collect.ImmutableList import scala.collection.JavaConversions._ import scala.collection.mutable @@ -149,10 +151,14 @@ trait GraphPersistenceStrategies { * * @return */ - def collectTypeInstancesIntoVar = true + def collectTypeInstancesIntoVar = false + + def filterBySubTypes = true def typeTestExpression(typeName : String, intSeq : IntSequence) : Seq[String] = { - if (collectTypeInstancesIntoVar) + if (filterBySubTypes) + typeTestExpressionUsingInFilter(typeName) + else if (collectTypeInstancesIntoVar) typeTestExpressionMultiStep(typeName, intSeq) else typeTestExpressionUsingFilter(typeName) @@ -180,6 +186,18 @@ trait GraphPersistenceStrategies { ) } + private def typeTestExpressionUsingInFilter(typeName: String) = { + val filters = collection.mutable.Map[TypeCache.TYPE_FILTER, String](); + filters put (TypeCache.TYPE_FILTER.SUPERTYPE, typeName) + val subTypes : com.google.common.collect.ImmutableList[String] = TypeSystem.getInstance().getTypeNames(filters) + val typeNames = new util.ArrayList[String]() + typeNames.add(typeName) + if ( !subTypes.isEmpty ) + typeNames.addAll(subTypes) + + Seq(s"""has("${typeAttributeName}", T.in, ${typeNames.mkString("['", "','", "']")})""") + } + private def newSetVar(varName : String) = s"$varName = [] as Set" private def fillVarWithTypeInstances(typeName : String, fillVar : String) = { http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/3407303d/repository/src/main/scala/org/apache/atlas/query/GremlinEvaluator.scala ---------------------------------------------------------------------- diff --git a/repository/src/main/scala/org/apache/atlas/query/GremlinEvaluator.scala b/repository/src/main/scala/org/apache/atlas/query/GremlinEvaluator.scala index 10d66a9..2020b44 100755 --- a/repository/src/main/scala/org/apache/atlas/query/GremlinEvaluator.scala +++ b/repository/src/main/scala/org/apache/atlas/query/GremlinEvaluator.scala @@ -29,14 +29,12 @@ import org.json4s._ import org.json4s.native.Serialization._ import scala.language.existentials import org.apache.atlas.query.Expressions._ +import scala.collection.JavaConversions._ + case class GremlinQueryResult(query: String, resultDataType: IDataType[_], - rows: List[_]) { - def this(query: String,resultDataType: IDataType[_]) { - this(query,resultDataType,List.empty) - } - + rows: java.util.List[_]) { def toJson = JsonHelper.toJson(this) } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/3407303d/repository/src/test/java/org/apache/atlas/BaseRepositoryTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/BaseRepositoryTest.java b/repository/src/test/java/org/apache/atlas/BaseRepositoryTest.java index 500a305..04eb8a4 100644 --- a/repository/src/test/java/org/apache/atlas/BaseRepositoryTest.java +++ b/repository/src/test/java/org/apache/atlas/BaseRepositoryTest.java @@ -22,7 +22,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.thinkaurelius.titan.core.TitanGraph; import com.thinkaurelius.titan.core.util.TitanCleanup; - import org.apache.atlas.repository.MetadataRepository; import org.apache.atlas.repository.graph.GraphBackedSearchIndexer; import org.apache.atlas.repository.graph.GraphProvider; @@ -46,7 +45,6 @@ import org.apache.atlas.typesystem.types.utils.TypesUtil; import org.testng.annotations.Guice; import javax.inject.Inject; - import java.util.ArrayList; import java.util.Date; import java.util.List; @@ -329,7 +327,7 @@ public class BaseRepositoryTest { List<Referenceable> columns, String... traitNames) throws Exception { Referenceable referenceable = new Referenceable(HIVE_TABLE_TYPE, traitNames); referenceable.set("name", name); - referenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name); + referenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, "qualified:" + name); referenceable.set("description", description); referenceable.set("owner", owner); referenceable.set("tableType", tableType); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/3407303d/repository/src/test/java/org/apache/atlas/discovery/DataSetLineageServiceTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/discovery/DataSetLineageServiceTest.java b/repository/src/test/java/org/apache/atlas/discovery/DataSetLineageServiceTest.java index b675459..a0ee26c 100644 --- a/repository/src/test/java/org/apache/atlas/discovery/DataSetLineageServiceTest.java +++ b/repository/src/test/java/org/apache/atlas/discovery/DataSetLineageServiceTest.java @@ -34,6 +34,7 @@ import org.apache.atlas.typesystem.persistence.Id; import org.apache.commons.collections.ArrayStack; import org.apache.commons.lang.RandomStringUtils; import org.codehaus.jettison.json.JSONArray; +import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONObject; import org.testng.Assert; import org.testng.annotations.AfterClass; @@ -156,14 +157,14 @@ public class DataSetLineageServiceTest extends BaseRepositoryTest { testInvalidArguments(expectedException, new Invoker() { @Override void run() throws AtlasException { - lineageService.getInputsGraphForEntity(tableName); + lineageService.getInputsGraph(tableName); } }); } @Test public void testGetInputsGraph() throws Exception { - JSONObject results = new JSONObject(lineageService.getInputsGraph("sales_fact_monthly_mv")); + JSONObject results = getInputsGraph("sales_fact_monthly_mv"); assertNotNull(results); System.out.println("inputs graph = " + results); @@ -179,7 +180,7 @@ public class DataSetLineageServiceTest extends BaseRepositoryTest { @Test public void testCircularLineage() throws Exception{ - JSONObject results = new JSONObject(lineageService.getInputsGraph("table2")); + JSONObject results = getInputsGraph("table2"); assertNotNull(results); System.out.println("inputs graph = " + results); @@ -223,19 +224,19 @@ public class DataSetLineageServiceTest extends BaseRepositoryTest { } @Test(dataProvider = "invalidArgumentsProvider") - public void testGetOutputsGraphForEntityInvalidArguments(final String tableName, String expectedException) + public void testGetOutputsGraphForEntityInvalidArguments(final String tableId, String expectedException) throws Exception { testInvalidArguments(expectedException, new Invoker() { @Override void run() throws AtlasException { - lineageService.getOutputsGraphForEntity(tableName); + lineageService.getOutputsGraphForEntity(tableId); } }); } @Test public void testGetOutputsGraph() throws Exception { - JSONObject results = new JSONObject(lineageService.getOutputsGraph("sales_fact")); + JSONObject results = getOutputsGraph("sales_fact"); assertNotNull(results); System.out.println("outputs graph = " + results); @@ -276,7 +277,7 @@ public class DataSetLineageServiceTest extends BaseRepositoryTest { @Test(dataProvider = "tableNamesProvider") public void testGetSchema(String tableName, String expected) throws Exception { - JSONObject results = new JSONObject(lineageService.getSchema(tableName)); + JSONObject results = getSchema(tableName); assertNotNull(results); System.out.println("columns = " + results); @@ -284,11 +285,7 @@ public class DataSetLineageServiceTest extends BaseRepositoryTest { Assert.assertEquals(rows.length(), Integer.parseInt(expected)); for (int index = 0; index < rows.length(); index++) { - final JSONObject row = rows.getJSONObject(index); - assertNotNull(row.getString("name")); - assertNotNull(row.getString("comment")); - assertNotNull(row.getString("dataType")); - Assert.assertEquals(row.getString("$typeName$"), "hive_column"); + assertColumn(rows.getJSONObject(index)); } } @@ -305,14 +302,17 @@ public class DataSetLineageServiceTest extends BaseRepositoryTest { Assert.assertEquals(rows.length(), Integer.parseInt(expected)); for (int index = 0; index < rows.length(); index++) { - final JSONObject row = rows.getJSONObject(index); - assertNotNull(row.getString("name")); - assertNotNull(row.getString("comment")); - assertNotNull(row.getString("dataType")); - Assert.assertEquals(row.getString("$typeName$"), "hive_column"); + assertColumn(rows.getJSONObject(index)); } } + private void assertColumn(JSONObject jsonObject) throws JSONException { + assertNotNull(jsonObject.getString("name")); + assertNotNull(jsonObject.getString("comment")); + assertNotNull(jsonObject.getString("dataType")); + Assert.assertEquals(jsonObject.getString("$typeName$"), "hive_column"); + } + @Test(expectedExceptions = SchemaNotFoundException.class) public void testGetSchemaForDBEntity() throws Exception { String dbId = getEntityId(DATASET_SUBTYPE, "name", "dataSetSubTypeInst1"); @@ -359,23 +359,35 @@ public class DataSetLineageServiceTest extends BaseRepositoryTest { }); } + private JSONObject getSchema(String tableName) throws Exception { + return new JSONObject(lineageService.getSchema("qualified:" + tableName)); + } + + private JSONObject getInputsGraph(String tableName) throws Exception { + return new JSONObject(lineageService.getInputsGraph("qualified:" + tableName)); + } + + private JSONObject getOutputsGraph(String tableName) throws Exception { + return new JSONObject(lineageService.getOutputsGraph("qualified:" + tableName)); + } + @Test public void testLineageWithDelete() throws Exception { String tableName = "table" + random(); createTable(tableName, 3, true); String tableId = getEntityId(HIVE_TABLE_TYPE, "name", tableName); - JSONObject results = new JSONObject(lineageService.getSchema(tableName)); + JSONObject results = getSchema(tableName); assertEquals(results.getJSONArray("rows").length(), 3); - results = new JSONObject(lineageService.getInputsGraph(tableName)); + results = getInputsGraph(tableName); Struct resultInstance = InstanceSerialization.fromJsonStruct(results.toString(), true); Map<String, Struct> vertices = (Map) resultInstance.get("vertices"); assertEquals(vertices.size(), 2); Struct vertex = vertices.get(tableId); assertEquals(((Struct) vertex.get("vertexId")).get("state"), Id.EntityState.ACTIVE.name()); - results = new JSONObject(lineageService.getOutputsGraph(tableName)); + results = getOutputsGraph(tableName); assertEquals(results.getJSONObject("values").getJSONObject("vertices").length(), 2); results = new JSONObject(lineageService.getSchemaForEntity(tableId)); @@ -408,21 +420,21 @@ public class DataSetLineageServiceTest extends BaseRepositoryTest { assertEquals(results.getJSONObject("values").getJSONObject("vertices").length(), 2); try { - lineageService.getSchema(tableName); + getSchema(tableName); fail("Expected EntityNotFoundException"); } catch (EntityNotFoundException e) { //expected } try { - lineageService.getInputsGraph(tableName); + getInputsGraph(tableName); fail("Expected EntityNotFoundException"); } catch (EntityNotFoundException e) { //expected } try { - lineageService.getOutputsGraph(tableName); + getOutputsGraph(tableName); fail("Expected EntityNotFoundException"); } catch (EntityNotFoundException e) { //expected @@ -430,13 +442,13 @@ public class DataSetLineageServiceTest extends BaseRepositoryTest { //Create table again should show new lineage createTable(tableName, 2, false); - results = new JSONObject(lineageService.getSchema(tableName)); + results = getSchema(tableName); assertEquals(results.getJSONArray("rows").length(), 2); - results = new JSONObject(lineageService.getOutputsGraph(tableName)); + results = getOutputsGraph(tableName); assertEquals(results.getJSONObject("values").getJSONObject("vertices").length(), 0); - results = new JSONObject(lineageService.getInputsGraph(tableName)); + results = getInputsGraph(tableName); assertEquals(results.getJSONObject("values").getJSONObject("vertices").length(), 0); tableId = getEntityId(HIVE_TABLE_TYPE, "name", tableName); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/3407303d/repository/src/test/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepositoryTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepositoryTest.java b/repository/src/test/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepositoryTest.java index 2541541..69bb45b 100755 --- a/repository/src/test/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepositoryTest.java +++ b/repository/src/test/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepositoryTest.java @@ -135,8 +135,8 @@ public class GraphBackedMetadataRepositoryTest { } } - @Test //In some cases of parallel APIs, the edge is added, but get edge by label doesn't return the edge. ATLAS-1104 + @Test public void testConcurrentCalls() throws Exception { final HierarchicalTypeDefinition<ClassType> refType = createClassTypeDef(randomString(), ImmutableSet.<String>of()); @@ -188,7 +188,7 @@ public class GraphBackedMetadataRepositoryTest { private boolean assertEdge(String id, String typeName) throws Exception { TitanGraph graph = graphProvider.get(); - Vertex vertex = (Vertex)graph.query().has(Constants.GUID_PROPERTY_KEY, id).vertices().iterator().next(); + Vertex vertex = (Vertex) graph.query().has(Constants.GUID_PROPERTY_KEY, id).vertices().iterator().next(); Iterable<Edge> edges = vertex.getEdges(Direction.OUT, Constants.INTERNAL_PROPERTY_KEY_PREFIX + typeName + ".ref"); if (!edges.iterator().hasNext()) { ITypedReferenceableInstance entity = repositoryService.getEntityDefinition(id); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/3407303d/repository/src/test/scala/org/apache/atlas/query/GremlinTest2.scala ---------------------------------------------------------------------- diff --git a/repository/src/test/scala/org/apache/atlas/query/GremlinTest2.scala b/repository/src/test/scala/org/apache/atlas/query/GremlinTest2.scala index f65cedb..b0961b0 100755 --- a/repository/src/test/scala/org/apache/atlas/query/GremlinTest2.scala +++ b/repository/src/test/scala/org/apache/atlas/query/GremlinTest2.scala @@ -116,11 +116,13 @@ class GremlinTest2 extends BaseGremlinTest { } @Test def testHighLevelLineageReturnGraph { - val r = InputLineageClosureQuery("Table", "name", "sales_fact_monthly_mv", + val q = InputLineageClosureQuery("Table", "name", "sales_fact_monthly_mv", "LoadProcess", "inputTables", "outputTable", - None, Some(List("name")), true, GraphPersistenceStrategy1, g).graph + None, Some(List("name")), true, GraphPersistenceStrategy1, g); + val gr = q.evaluate(); + val r = q.graph(gr); println(r.toInstanceJson) //validateJson(r) @@ -136,11 +138,13 @@ class GremlinTest2 extends BaseGremlinTest { } @Test def testHighLevelWhereUsedReturnGraph { - val r = OutputLineageClosureQuery("Table", "name", "sales_fact", + val q = OutputLineageClosureQuery("Table", "name", "sales_fact", "LoadProcess", "inputTables", "outputTable", - None, Some(List("name")), true, GraphPersistenceStrategy1, g).graph + None, Some(List("name")), true, GraphPersistenceStrategy1, g) + val gr = q.evaluate(); + val r = q.graph(gr); println(r.toInstanceJson) } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/3407303d/titan/src/main/java/com/thinkaurelius/titan/diskstorage/solr/Solr5Index.java ---------------------------------------------------------------------- diff --git a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/solr/Solr5Index.java b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/solr/Solr5Index.java index f3b9fd9..0176208 100644 --- a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/solr/Solr5Index.java +++ b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/solr/Solr5Index.java @@ -224,7 +224,6 @@ public class Solr5Index implements IndexProvider { logger.info("Zookeeper session timeout : " + config.get(ZOOKEEPER_SESSION_TIMEOUT)); cloudServer.setZkClientTimeout(config.get(ZOOKEEPER_SESSION_TIMEOUT)); - cloudServer.connect(); solrClient = cloudServer; } else if (mode==Mode.HTTP) {
