http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/30711973/repository/src/main/java/org/apache/hadoop/metadata/repository/typestore/GraphBackedTypeStore.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/hadoop/metadata/repository/typestore/GraphBackedTypeStore.java b/repository/src/main/java/org/apache/hadoop/metadata/repository/typestore/GraphBackedTypeStore.java deleted file mode 100755 index c1da867..0000000 --- a/repository/src/main/java/org/apache/hadoop/metadata/repository/typestore/GraphBackedTypeStore.java +++ /dev/null @@ -1,330 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.metadata.repository.typestore; - -import com.google.common.collect.ImmutableList; -import com.google.inject.Inject; -import com.google.inject.Singleton; -import com.thinkaurelius.titan.core.TitanGraph; -import com.tinkerpop.blueprints.Direction; -import com.tinkerpop.blueprints.Edge; -import com.tinkerpop.blueprints.Vertex; -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.metadata.GraphTransaction; -import org.apache.hadoop.metadata.MetadataException; -import org.apache.hadoop.metadata.repository.Constants; -import org.apache.hadoop.metadata.repository.graph.GraphProvider; -import org.apache.hadoop.metadata.typesystem.TypesDef; -import org.apache.hadoop.metadata.typesystem.types.AttributeDefinition; -import org.apache.hadoop.metadata.typesystem.types.AttributeInfo; -import org.apache.hadoop.metadata.typesystem.types.ClassType; -import org.apache.hadoop.metadata.typesystem.types.DataTypes; -import org.apache.hadoop.metadata.typesystem.types.EnumType; -import org.apache.hadoop.metadata.typesystem.types.EnumTypeDefinition; -import org.apache.hadoop.metadata.typesystem.types.EnumValue; -import org.apache.hadoop.metadata.typesystem.types.HierarchicalType; -import org.apache.hadoop.metadata.typesystem.types.HierarchicalTypeDefinition; -import org.apache.hadoop.metadata.typesystem.types.IDataType; -import org.apache.hadoop.metadata.typesystem.types.StructType; -import org.apache.hadoop.metadata.typesystem.types.StructTypeDefinition; -import org.apache.hadoop.metadata.typesystem.types.TraitType; -import org.apache.hadoop.metadata.typesystem.types.TypeSystem; -import org.apache.hadoop.metadata.typesystem.types.TypeUtils; -import org.codehaus.jettison.json.JSONException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; - -@Singleton -public class GraphBackedTypeStore implements ITypeStore { - public static final String VERTEX_TYPE = "typeSystem"; - private static final String PROPERTY_PREFIX = Constants.INTERNAL_PROPERTY_KEY_PREFIX + "type."; - public static final String SUPERTYPE_EDGE_LABEL = PROPERTY_PREFIX + ".supertype"; - - private static Logger LOG = LoggerFactory.getLogger(GraphBackedTypeStore.class); - - private final TitanGraph titanGraph; - - @Inject - public GraphBackedTypeStore(GraphProvider<TitanGraph> graphProvider) { - titanGraph = graphProvider.get(); - } - - @Override - public void store(TypeSystem typeSystem) throws MetadataException { - store(typeSystem, ImmutableList.copyOf(typeSystem.getTypeNames())); - } - - @Override - public void store(TypeSystem typeSystem, ImmutableList<String> typeNames) throws MetadataException { - ImmutableList<String> coreTypes = typeSystem.getCoreTypes(); - for (String typeName : typeNames) { - if (!coreTypes.contains(typeName)) { - IDataType dataType = typeSystem.getDataType(IDataType.class, typeName); - LOG.debug("Processing {}.{} in type store", dataType.getTypeCategory(), dataType.getName()); - switch (dataType.getTypeCategory()) { - case ENUM: - storeInGraph((EnumType)dataType); - break; - - case STRUCT: - StructType structType = (StructType) dataType; - storeInGraph(typeSystem, dataType.getTypeCategory(), dataType.getName(), - ImmutableList.copyOf(structType.infoToNameMap.keySet()), ImmutableList.<String>of()); - break; - - case TRAIT: - case CLASS: - HierarchicalType type = (HierarchicalType) dataType; - storeInGraph(typeSystem, dataType.getTypeCategory(), dataType.getName(), - type.immediateAttrs, type.superTypes); - break; - - default: //Ignore primitive/collection types as they are covered under references - break; - } - } - } - } - - private void addProperty(Vertex vertex, String propertyName, Object value) { - LOG.debug("Setting property {} = \"{}\" to vertex {}", propertyName, value, vertex); - vertex.setProperty(propertyName, value); - } - - private void storeInGraph(EnumType dataType) { - Vertex vertex = createVertex(dataType.getTypeCategory(), dataType.getName()); - List<String> values = new ArrayList<>(dataType.values().size()); - for (EnumValue enumValue : dataType.values()) { - String key = getPropertyKey(dataType.getName(), enumValue.value); - addProperty(vertex, key, enumValue.ordinal); - values.add(enumValue.value); - } - addProperty(vertex, getPropertyKey(dataType.getName()), values); - } - - private String getPropertyKey(String name) { - return PROPERTY_PREFIX + name; - } - - private String getPropertyKey(String parent, String child) { - return PROPERTY_PREFIX + parent + "." + child; - } - - private String getEdgeLabel(String parent, String child) { - return PROPERTY_PREFIX + "edge." + parent + "." + child; - } - - private void storeInGraph(TypeSystem typeSystem, DataTypes.TypeCategory category, String typeName, - ImmutableList<AttributeInfo> attributes, ImmutableList<String> superTypes) throws MetadataException { - Vertex vertex = createVertex(category, typeName); - List<String> attrNames = new ArrayList<>(); - if (attributes != null) { - for (AttributeInfo attribute : attributes) { - String propertyKey = getPropertyKey(typeName, attribute.name); - try { - addProperty(vertex, propertyKey, attribute.toJson()); - } catch (JSONException e) { - throw new StorageException(typeName, e); - } - attrNames.add(attribute.name); - addReferencesForAttribute(typeSystem, vertex, attribute); - } - } - addProperty(vertex, getPropertyKey(typeName), attrNames); - - //Add edges for hierarchy - if (superTypes != null) { - for (String superTypeName : superTypes) { - HierarchicalType superType = typeSystem.getDataType(HierarchicalType.class, superTypeName); - Vertex superVertex = createVertex(superType.getTypeCategory(), superTypeName); - addEdge(vertex, superVertex, SUPERTYPE_EDGE_LABEL); - } - } - } - - //Add edges for complex attributes - private void addReferencesForAttribute(TypeSystem typeSystem, Vertex vertex, AttributeInfo attribute) throws MetadataException { - ImmutableList<String> coreTypes = typeSystem.getCoreTypes(); - List<IDataType> attrDataTypes = new ArrayList<>(); - IDataType attrDataType = attribute.dataType(); - String vertexTypeName = vertex.getProperty(Constants.TYPENAME_PROPERTY_KEY); - - switch (attrDataType.getTypeCategory()) { - case ARRAY: - String attrType = TypeUtils.parseAsArrayType(attrDataType.getName()); - IDataType elementType = typeSystem.getDataType(IDataType.class, attrType); - attrDataTypes.add(elementType); - break; - - case MAP: - String[] attrTypes = TypeUtils.parseAsMapType(attrDataType.getName()); - IDataType keyType = typeSystem.getDataType(IDataType.class, attrTypes[0]); - IDataType valueType = typeSystem.getDataType(IDataType.class, attrTypes[1]); - attrDataTypes.add(keyType); - attrDataTypes.add(valueType); - break; - - case ENUM: - case STRUCT: - case CLASS: - attrDataTypes.add(attrDataType); - break; - - case PRIMITIVE: //no vertex for primitive type, hence no edge required - break; - - default: - throw new IllegalArgumentException("Attribute cannot reference instances of type : " + attrDataType.getTypeCategory()); - } - - for (IDataType attrType : attrDataTypes) { - if (!coreTypes.contains(attrType.getName())) { - Vertex attrVertex = createVertex(attrType.getTypeCategory(), attrType.getName()); - String label = getEdgeLabel(vertexTypeName, attribute.name); - addEdge(vertex, attrVertex, label); - } - } - } - - private void addEdge(Vertex fromVertex, Vertex toVertex, String label) { - LOG.debug("Adding edge from {} to {} with label {}" + toString(fromVertex), toString(toVertex), label); - titanGraph.addEdge(null, fromVertex, toVertex, label); - } - - @Override - @GraphTransaction - public TypesDef restore() throws MetadataException { - //Get all vertices for type system - Iterator vertices = - titanGraph.query().has(Constants.VERTEX_TYPE_PROPERTY_KEY, VERTEX_TYPE).vertices().iterator(); - - ImmutableList.Builder<EnumTypeDefinition> enums = ImmutableList.builder(); - ImmutableList.Builder<StructTypeDefinition> structs = ImmutableList.builder(); - ImmutableList.Builder<HierarchicalTypeDefinition<ClassType>> classTypes = ImmutableList.builder(); - ImmutableList.Builder<HierarchicalTypeDefinition<TraitType>> traits = ImmutableList.builder(); - - while (vertices.hasNext()) { - Vertex vertex = (Vertex) vertices.next(); - DataTypes.TypeCategory typeCategory = vertex.getProperty(Constants.TYPE_CATEGORY_PROPERTY_KEY); - String typeName = vertex.getProperty(Constants.TYPENAME_PROPERTY_KEY); - LOG.info("Restoring type {}.{}", typeCategory, typeName); - switch (typeCategory) { - case ENUM: - enums.add(getEnumType(vertex)); - break; - - case STRUCT: - AttributeDefinition[] attributes = getAttributes(vertex, typeName); - structs.add(new StructTypeDefinition(typeName, attributes)); - break; - - case CLASS: - ImmutableList<String> superTypes = getSuperTypes(vertex); - attributes = getAttributes(vertex, typeName); - classTypes.add(new HierarchicalTypeDefinition(ClassType.class, typeName, superTypes, attributes)); - break; - - case TRAIT: - superTypes = getSuperTypes(vertex); - attributes = getAttributes(vertex, typeName); - traits.add(new HierarchicalTypeDefinition(TraitType.class, typeName, superTypes, attributes)); - break; - - default: - throw new IllegalArgumentException("Unhandled type category " + typeCategory); - } - } - return TypeUtils.getTypesDef(enums.build(), structs.build(), traits.build(), classTypes.build()); - } - - private EnumTypeDefinition getEnumType(Vertex vertex) { - String typeName = vertex.getProperty(Constants.TYPENAME_PROPERTY_KEY); - List<EnumValue> enumValues = new ArrayList<>(); - List<String> values = vertex.getProperty(getPropertyKey(typeName)); - for (String value : values) { - String valueProperty = getPropertyKey(typeName, value); - enumValues.add(new EnumValue(value, vertex.<Integer>getProperty(valueProperty))); - } - return new EnumTypeDefinition(typeName, enumValues.toArray(new EnumValue[enumValues.size()])); - } - - private ImmutableList<String> getSuperTypes(Vertex vertex) { - List<String> superTypes = new ArrayList<>(); - Iterator<Edge> edges = vertex.getEdges(Direction.OUT, SUPERTYPE_EDGE_LABEL).iterator(); - while (edges.hasNext()) { - Edge edge = edges.next(); - superTypes.add((String) edge.getVertex(Direction.IN).getProperty(Constants.TYPENAME_PROPERTY_KEY)); - } - return ImmutableList.copyOf(superTypes); - } - - private AttributeDefinition[] getAttributes(Vertex vertex, String typeName) throws MetadataException { - List<AttributeDefinition> attributes = new ArrayList<>(); - List<String> attrNames = vertex.getProperty(getPropertyKey(typeName)); - if (attrNames != null) { - for (String attrName : attrNames) { - try { - String propertyKey = getPropertyKey(typeName, attrName); - attributes.add(AttributeInfo.fromJson((String) vertex.getProperty(propertyKey))); - } catch (JSONException e) { - throw new MetadataException(e); - } - } - } - return attributes.toArray(new AttributeDefinition[attributes.size()]); - } - - private String toString(Vertex vertex) { - return PROPERTY_PREFIX + "." + vertex.getProperty(Constants.TYPENAME_PROPERTY_KEY); - } - - /** - * Find vertex for the given type category and name, else create new vertex - * @param category - * @param typeName - * @return vertex - */ - private Vertex findVertex(DataTypes.TypeCategory category, String typeName) { - LOG.debug("Finding vertex for {}.{}", category, typeName); - - Iterator results = titanGraph.query().has(Constants.TYPENAME_PROPERTY_KEY, typeName).vertices().iterator(); - Vertex vertex = null; - if (results != null && results.hasNext()) { - //There should be just one vertex with the given typeName - vertex = (Vertex) results.next(); - } - return vertex; - } - - private Vertex createVertex(DataTypes.TypeCategory category, String typeName) { - Vertex vertex = findVertex(category, typeName); - if (vertex == null) { - LOG.debug("Adding vertex {}{}", PROPERTY_PREFIX, typeName); - vertex = titanGraph.addVertex(null); - addProperty(vertex, Constants.VERTEX_TYPE_PROPERTY_KEY, VERTEX_TYPE); //Mark as type vertex - addProperty(vertex, Constants.TYPE_CATEGORY_PROPERTY_KEY, category); - addProperty(vertex, Constants.TYPENAME_PROPERTY_KEY, typeName); - } - return vertex; - } -}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/30711973/repository/src/main/java/org/apache/hadoop/metadata/repository/typestore/ITypeStore.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/hadoop/metadata/repository/typestore/ITypeStore.java b/repository/src/main/java/org/apache/hadoop/metadata/repository/typestore/ITypeStore.java deleted file mode 100755 index 930ec1e..0000000 --- a/repository/src/main/java/org/apache/hadoop/metadata/repository/typestore/ITypeStore.java +++ /dev/null @@ -1,48 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.metadata.repository.typestore; - -import com.google.common.collect.ImmutableList; -import org.apache.hadoop.metadata.MetadataException; -import org.apache.hadoop.metadata.typesystem.TypesDef; -import org.apache.hadoop.metadata.typesystem.types.TypeSystem; - -public interface ITypeStore { - /** - * Persist the entire type system - insert or update - * @param typeSystem type system to persist - * @throws StorageException - */ - void store(TypeSystem typeSystem) throws MetadataException; - - /** - * Persist the given type in the type system - insert or update - * @param typeSystem type system - * @param types types to persist - * @throws StorageException - */ - void store(TypeSystem typeSystem, ImmutableList<String> types) throws MetadataException; - - /** - * Restore all type definitions - * @return List of persisted type definitions - * @throws org.apache.hadoop.metadata.MetadataException - */ - TypesDef restore() throws MetadataException; -} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/30711973/repository/src/main/java/org/apache/hadoop/metadata/repository/typestore/StorageException.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/hadoop/metadata/repository/typestore/StorageException.java b/repository/src/main/java/org/apache/hadoop/metadata/repository/typestore/StorageException.java deleted file mode 100755 index bb5b2a3..0000000 --- a/repository/src/main/java/org/apache/hadoop/metadata/repository/typestore/StorageException.java +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.metadata.repository.typestore; - -import org.apache.hadoop.metadata.MetadataException; - -public class StorageException extends MetadataException { - public StorageException(String type) { - super("Failure in typesystem storage for type " + type); - } - - public StorageException(String type, Throwable cause) { - super("Failure in typesystem storage for type " + type, cause); - } - - public StorageException(Throwable cause) { - super("Failure in type system storage", cause); - } -} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/30711973/repository/src/main/java/org/apache/hadoop/metadata/services/DefaultMetadataService.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/hadoop/metadata/services/DefaultMetadataService.java b/repository/src/main/java/org/apache/hadoop/metadata/services/DefaultMetadataService.java deleted file mode 100755 index a5b647f..0000000 --- a/repository/src/main/java/org/apache/hadoop/metadata/services/DefaultMetadataService.java +++ /dev/null @@ -1,427 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.metadata.services; - -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; -import org.apache.hadoop.metadata.GraphTransaction; -import org.apache.hadoop.metadata.MetadataException; -import org.apache.hadoop.metadata.MetadataServiceClient; -import org.apache.hadoop.metadata.ParamChecker; -import org.apache.hadoop.metadata.classification.InterfaceAudience; -import org.apache.hadoop.metadata.discovery.SearchIndexer; -import org.apache.hadoop.metadata.listener.EntityChangeListener; -import org.apache.hadoop.metadata.listener.TypesChangeListener; -import org.apache.hadoop.metadata.repository.MetadataRepository; -import org.apache.hadoop.metadata.repository.typestore.ITypeStore; -import org.apache.hadoop.metadata.typesystem.ITypedReferenceableInstance; -import org.apache.hadoop.metadata.typesystem.ITypedStruct; -import org.apache.hadoop.metadata.typesystem.Referenceable; -import org.apache.hadoop.metadata.typesystem.Struct; -import org.apache.hadoop.metadata.typesystem.TypesDef; -import org.apache.hadoop.metadata.typesystem.json.InstanceSerialization; -import org.apache.hadoop.metadata.typesystem.json.TypesSerialization; -import org.apache.hadoop.metadata.typesystem.types.AttributeDefinition; -import org.apache.hadoop.metadata.typesystem.types.ClassType; -import org.apache.hadoop.metadata.typesystem.types.DataTypes; -import org.apache.hadoop.metadata.typesystem.types.EnumTypeDefinition; -import org.apache.hadoop.metadata.typesystem.types.HierarchicalTypeDefinition; -import org.apache.hadoop.metadata.typesystem.types.IDataType; -import org.apache.hadoop.metadata.typesystem.types.Multiplicity; -import org.apache.hadoop.metadata.typesystem.types.StructTypeDefinition; -import org.apache.hadoop.metadata.typesystem.types.TraitType; -import org.apache.hadoop.metadata.typesystem.types.TypeSystem; -import org.apache.hadoop.metadata.typesystem.types.TypeUtils; -import org.apache.hadoop.metadata.typesystem.types.utils.TypesUtil; -import org.codehaus.jettison.json.JSONException; -import org.codehaus.jettison.json.JSONObject; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.inject.Inject; -import javax.inject.Singleton; -import java.util.HashMap; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - -/** - * Simple wrapper over TypeSystem and MetadataRepository services with hooks - * for listening to changes to the repository. - */ -@Singleton -public class DefaultMetadataService implements MetadataService { - - private static final Logger LOG = - LoggerFactory.getLogger(DefaultMetadataService.class); - - private final Set<TypesChangeListener> typesChangeListeners = new LinkedHashSet<>(); - private final Set<EntityChangeListener> entityChangeListeners - = new LinkedHashSet<>(); - - private final TypeSystem typeSystem; - private final MetadataRepository repository; - private final ITypeStore typeStore; - - @Inject - DefaultMetadataService(MetadataRepository repository, - SearchIndexer searchIndexer, ITypeStore typeStore) throws MetadataException { - this.typeStore = typeStore; - this.typeSystem = TypeSystem.getInstance(); - this.repository = repository; - - registerListener(searchIndexer); - restoreTypeSystem(); - } - - private void restoreTypeSystem() { - LOG.info("Restoring type system from the store"); - try { - TypesDef typesDef = typeStore.restore(); - typeSystem.defineTypes(typesDef); - - // restore types before creating super types - createSuperTypes(); - - } catch (MetadataException e) { - throw new RuntimeException(e); - } - LOG.info("Restored type system from the store"); - } - - private static final AttributeDefinition NAME_ATTRIBUTE = - TypesUtil.createRequiredAttrDef("name", DataTypes.STRING_TYPE); - private static final AttributeDefinition DESCRIPTION_ATTRIBUTE = - TypesUtil.createOptionalAttrDef("description", DataTypes.STRING_TYPE); - - @InterfaceAudience.Private - private void createSuperTypes() throws MetadataException { - if (typeSystem.isRegistered(MetadataServiceClient.DATA_SET_SUPER_TYPE)) { - return; // this is already registered - } - - HierarchicalTypeDefinition<ClassType> infraType = - TypesUtil.createClassTypeDef(MetadataServiceClient.INFRASTRUCTURE_SUPER_TYPE, - ImmutableList.<String>of(), NAME_ATTRIBUTE, DESCRIPTION_ATTRIBUTE); - - HierarchicalTypeDefinition<ClassType> datasetType = TypesUtil - .createClassTypeDef(MetadataServiceClient.DATA_SET_SUPER_TYPE, ImmutableList.<String>of(), - NAME_ATTRIBUTE, DESCRIPTION_ATTRIBUTE); - - HierarchicalTypeDefinition<ClassType> processType = TypesUtil - .createClassTypeDef(MetadataServiceClient.PROCESS_SUPER_TYPE, ImmutableList.<String>of(), - NAME_ATTRIBUTE, DESCRIPTION_ATTRIBUTE, new AttributeDefinition("inputs", - DataTypes.arrayTypeName(MetadataServiceClient.DATA_SET_SUPER_TYPE), - Multiplicity.OPTIONAL, false, null), - new AttributeDefinition("outputs", - DataTypes.arrayTypeName(MetadataServiceClient.DATA_SET_SUPER_TYPE), - Multiplicity.OPTIONAL, false, null)); - - TypesDef typesDef = TypeUtils - .getTypesDef(ImmutableList.<EnumTypeDefinition>of(), ImmutableList.<StructTypeDefinition>of(), - ImmutableList.<HierarchicalTypeDefinition<TraitType>>of(), - ImmutableList.of(infraType, datasetType, processType)); - createType(TypesSerialization.toJson(typesDef)); - } - - /** - * Creates a new type based on the type system to enable adding - * entities (instances for types). - * - * @param typeDefinition definition as json - * @return a unique id for this type - */ - @Override - @GraphTransaction - public JSONObject createType(String typeDefinition) throws MetadataException { - ParamChecker.notEmpty(typeDefinition, "type definition cannot be empty"); - - TypesDef typesDef; - try { - typesDef = TypesSerialization.fromJson(typeDefinition); - if(typesDef.isEmpty()) { - throw new MetadataException("Invalid type definition"); - } - } catch (Exception e) { - LOG.error("Unable to deserialize json={}", typeDefinition, e); - throw new IllegalArgumentException("Unable to deserialize json"); - } - - try { - final Map<String, IDataType> typesAdded = typeSystem.defineTypes(typesDef); - - try { - typeStore.store(typeSystem, ImmutableList.copyOf(typesAdded.keySet())); - onTypesAddedToRepo(typesAdded); - } catch(Throwable t) { - typeSystem.removeTypes(ImmutableList.copyOf(typesAdded.keySet())); - throw new MetadataException(t); - } - - return new JSONObject() {{ - put(MetadataServiceClient.TYPES, typesAdded.keySet()); - }}; - } catch (JSONException e) { - LOG.error("Unable to create response for types={}", typeDefinition, e); - throw new MetadataException("Unable to create response"); - } - } - - /** - * Return the definition for the given type. - * - * @param typeName name for this type, must be unique - * @return type definition as JSON - */ - @Override - public String getTypeDefinition(String typeName) throws MetadataException { - final IDataType dataType = typeSystem.getDataType(IDataType.class, typeName); - return TypesSerialization.toJson(typeSystem, dataType.getName()); - } - - /** - * Return the list of types in the repository. - * - * @return list of type names in the repository - */ - @Override - public List<String> getTypeNamesList() throws MetadataException { - return typeSystem.getTypeNames(); - } - - /** - * Return the list of trait type names in the type system. - * - * @return list of trait type names in the type system - */ - @Override - public List<String> getTypeNamesByCategory(DataTypes.TypeCategory typeCategory) throws MetadataException { - return typeSystem.getTypeNamesByCategory(typeCategory); - } - - /** - * Creates an entity, instance of the type. - * - * @param entityInstanceDefinition definition - * @return guid - */ - @Override - public String createEntity(String entityInstanceDefinition) throws MetadataException { - ParamChecker.notEmpty(entityInstanceDefinition, "Entity instance definition cannot be empty"); - - ITypedReferenceableInstance entityTypedInstance = - deserializeClassInstance(entityInstanceDefinition); - - final String guid = repository.createEntity(entityTypedInstance); - - onEntityAddedToRepo(entityTypedInstance); - return guid; - } - - private ITypedReferenceableInstance deserializeClassInstance( - String entityInstanceDefinition) throws MetadataException { - - final Referenceable entityInstance; - try { - entityInstance = InstanceSerialization.fromJsonReferenceable( - entityInstanceDefinition, true); - } catch (Exception e) { // exception from deserializer - LOG.error("Unable to deserialize json={}", entityInstanceDefinition, e); - throw new IllegalArgumentException("Unable to deserialize json"); - } - final String entityTypeName = entityInstance.getTypeName(); - ParamChecker.notEmpty(entityTypeName, "Entity type cannot be null"); - - ClassType entityType = typeSystem.getDataType(ClassType.class, entityTypeName); - return entityType.convert(entityInstance, Multiplicity.REQUIRED); - } - - /** - * Return the definition for the given guid. - * - * @param guid guid - * @return entity definition as JSON - */ - @Override - public String getEntityDefinition(String guid) throws MetadataException { - ParamChecker.notEmpty(guid, "guid cannot be null"); - - final ITypedReferenceableInstance instance = repository.getEntityDefinition(guid); - return InstanceSerialization.toJson(instance, true); - } - - /** - * Return the list of entity names for the given type in the repository. - * - * @param entityType type - * @return list of entity names for the given type in the repository - */ - @Override - public List<String> getEntityList(String entityType) throws MetadataException { - validateTypeExists(entityType); - - return repository.getEntityList(entityType); - } - - @Override - public void updateEntity(String guid, String property, String value) throws MetadataException { - ParamChecker.notEmpty(guid, "guid cannot be null"); - ParamChecker.notEmpty(property, "property cannot be null"); - ParamChecker.notEmpty(value, "property value cannot be null"); - - repository.updateEntity(guid, property, value); - } - - private void validateTypeExists(String entityType) throws MetadataException { - ParamChecker.notEmpty(entityType, "entity type cannot be null"); - - // verify if the type exists - if (!typeSystem.isRegistered(entityType)) { - throw new MetadataException("type is not defined for : " + entityType); - } - } - - /** - * Gets the list of trait names for a given entity represented by a guid. - * - * @param guid globally unique identifier for the entity - * @return a list of trait names for the given entity guid - * @throws MetadataException - */ - @Override - public List<String> getTraitNames(String guid) throws MetadataException { - ParamChecker.notEmpty(guid, "entity GUID cannot be null"); - return repository.getTraitNames(guid); - } - - /** - * Adds a new trait to an existing entity represented by a guid. - * - * @param guid globally unique identifier for the entity - * @param traitInstanceDefinition trait instance json that needs to be added to entity - * @throws MetadataException - */ - @Override - public void addTrait(String guid, - String traitInstanceDefinition) throws MetadataException { - ParamChecker.notEmpty(guid, "entity GUID cannot be null"); - ParamChecker.notEmpty(traitInstanceDefinition, "Trait instance cannot be null"); - - ITypedStruct traitInstance = deserializeTraitInstance(traitInstanceDefinition); - final String traitName = traitInstance.getTypeName(); - - // ensure trait type is already registered with the TS - Preconditions.checkArgument(typeSystem.isRegistered(traitName), - "trait=%s should be defined in type system before it can be added", traitName); - // ensure trait is not already defined - Preconditions.checkArgument(!getTraitNames(guid).contains(traitName), - "trait=%s is already defined for entity=%s", traitName, guid); - - repository.addTrait(guid, traitInstance); - - onTraitAddedToEntity(guid, traitName); - } - - private ITypedStruct deserializeTraitInstance(String traitInstanceDefinition) - throws MetadataException { - - try { - Struct traitInstance = InstanceSerialization.fromJsonStruct( - traitInstanceDefinition, true); - final String entityTypeName = traitInstance.getTypeName(); - ParamChecker.notEmpty(entityTypeName, "entity type cannot be null"); - - TraitType traitType = typeSystem.getDataType(TraitType.class, entityTypeName); - return traitType.convert( - traitInstance, Multiplicity.REQUIRED); - } catch (Exception e) { - throw new MetadataException("Error deserializing trait instance", e); - } - } - - /** - * Deletes a given trait from an existing entity represented by a guid. - * - * @param guid globally unique identifier for the entity - * @param traitNameToBeDeleted name of the trait - * @throws MetadataException - */ - @Override - public void deleteTrait(String guid, - String traitNameToBeDeleted) throws MetadataException { - ParamChecker.notEmpty(guid, "entity GUID cannot be null"); - ParamChecker.notEmpty(traitNameToBeDeleted, "Trait name cannot be null"); - - // ensure trait type is already registered with the TS - Preconditions.checkArgument(typeSystem.isRegistered(traitNameToBeDeleted), - "trait=%s should be defined in type system before it can be deleted", - traitNameToBeDeleted); - - repository.deleteTrait(guid, traitNameToBeDeleted); - - onTraitDeletedFromEntity(guid, traitNameToBeDeleted); - } - - private void onTypesAddedToRepo(Map<String, IDataType> typesAdded) throws MetadataException { - for (TypesChangeListener listener : typesChangeListeners) { - for (Map.Entry<String, IDataType> entry : typesAdded.entrySet()) { - listener.onAdd(entry.getKey(), entry.getValue()); - } - } - } - - public void registerListener(TypesChangeListener listener) { - typesChangeListeners.add(listener); - } - - public void unregisterListener(TypesChangeListener listener) { - typesChangeListeners.remove(listener); - } - - private void onEntityAddedToRepo(ITypedReferenceableInstance typedInstance) - throws MetadataException { - - for (EntityChangeListener listener : entityChangeListeners) { - listener.onEntityAdded(typedInstance); - } - } - - private void onTraitAddedToEntity(String typeName, - String traitName) throws MetadataException { - for (EntityChangeListener listener : entityChangeListeners) { - listener.onTraitAdded(typeName, traitName); - } - } - - private void onTraitDeletedFromEntity(String typeName, - String traitName) throws MetadataException { - for (EntityChangeListener listener : entityChangeListeners) { - listener.onTraitDeleted(typeName, traitName); - } - } - - public void registerListener(EntityChangeListener listener) { - entityChangeListeners.add(listener); - } - - public void unregisterListener(EntityChangeListener listener) { - entityChangeListeners.remove(listener); - } -} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/30711973/repository/src/main/java/org/apache/hadoop/metadata/services/MetadataService.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/hadoop/metadata/services/MetadataService.java b/repository/src/main/java/org/apache/hadoop/metadata/services/MetadataService.java deleted file mode 100755 index 3c43590..0000000 --- a/repository/src/main/java/org/apache/hadoop/metadata/services/MetadataService.java +++ /dev/null @@ -1,125 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.metadata.services; - -import org.apache.hadoop.metadata.MetadataException; -import org.apache.hadoop.metadata.typesystem.types.DataTypes; -import org.codehaus.jettison.json.JSONObject; - -import java.util.List; - -/** - * Metadata service. - */ -public interface MetadataService { - - /** - * Creates a new type based on the type system to enable adding - * entities (instances for types). - * - * @param typeDefinition definition as json - * @return a unique id for this type - */ - JSONObject createType(String typeDefinition) throws MetadataException; - - /** - * Return the definition for the given type. - * - * @param typeName name for this type, must be unique - * @return type definition as JSON - */ - String getTypeDefinition(String typeName) throws MetadataException; - - /** - * Return the list of types in the type system. - * - * @return list of type names in the type system - */ - List<String> getTypeNamesList() throws MetadataException; - - /** - * Return the list of trait type names in the type system. - * - * @return list of trait type names in the type system - */ - List<String> getTypeNamesByCategory(DataTypes.TypeCategory typeCategory) throws MetadataException; - - /** - * Creates an entity, instance of the type. - * - * @param entityDefinition definition - * @return guid - */ - String createEntity(String entityDefinition) throws MetadataException; - - /** - * Return the definition for the given guid. - * - * @param guid guid - * @return entity definition as JSON - */ - String getEntityDefinition(String guid) throws MetadataException; - - /** - * Return the list of entity names for the given type in the repository. - * - * @param entityType type - * @return list of entity names for the given type in the repository - */ - List<String> getEntityList(String entityType) throws MetadataException; - - /** - * Adds the property to the given entity id(guid). - * - * @param guid entity id - * @param property property name - * @param value property value - */ - void updateEntity(String guid, String property, String value) throws MetadataException; - - // Trait management functions - /** - * Gets the list of trait names for a given entity represented by a guid. - * - * @param guid globally unique identifier for the entity - * @return a list of trait names for the given entity guid - * @throws MetadataException - */ - List<String> getTraitNames(String guid) throws MetadataException; - - /** - * Adds a new trait to an existing entity represented by a guid. - * - * @param guid globally unique identifier for the entity - * @param traitInstanceDefinition trait instance that needs to be added to entity - * @throws MetadataException - */ - void addTrait(String guid, - String traitInstanceDefinition) throws MetadataException; - - /** - * Deletes a given trait from an existing entity represented by a guid. - * - * @param guid globally unique identifier for the entity - * @param traitNameToBeDeleted name of the trait - * @throws MetadataException - */ - void deleteTrait(String guid, - String traitNameToBeDeleted) throws MetadataException; -} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/30711973/repository/src/main/scala/org/apache/atlas/query/ClosureQuery.scala ---------------------------------------------------------------------- diff --git a/repository/src/main/scala/org/apache/atlas/query/ClosureQuery.scala b/repository/src/main/scala/org/apache/atlas/query/ClosureQuery.scala new file mode 100755 index 0000000..05dc6a4 --- /dev/null +++ b/repository/src/main/scala/org/apache/atlas/query/ClosureQuery.scala @@ -0,0 +1,329 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.query + +import java.util + +import com.thinkaurelius.titan.core.TitanGraph +import org.apache.atlas.query.Expressions._ +import org.apache.atlas.typesystem.ITypedStruct +import org.apache.atlas.typesystem.json.{InstanceSerialization, Serialization} +import org.apache.atlas.typesystem.persistence.StructInstance +import org.apache.atlas.typesystem.types.DataTypes.{MapType, PrimitiveType} +import org.apache.atlas.typesystem.types.{DataTypes, StructType, TypeSystem} + +/** + * Represents a Query to compute the closure based on a relationship between entities of a particular type. + * For e.g. Database Tables are related to each other to capture the '''Lineage''' of data in a Table based + * on other Tables. + * + * A Closure Query is specified by the following information: + * - The Type whose instances are in a closure relationship. For e.g. 'Table' + * - The Closure relation. This is specified as an ''Attribute path''. For e.g. if we have the following model: + * {{{ + * class Table { + * name : String, + * ... + * } + * + * class LoadTableProcess { + * name : String, + * inputTables : List[Table], + * outputTable : Table, + * ... + * } + * }}} + * ''LoadTable'' instance captures the relationship between the data in an output Table and a set of input Tables. + * In order to compute the '''Lineage''' of a Table, the ''Attribute path'' that relates 2 Tables is + * '''[(LoadTableProcess,outputTable), inputTables]'''. This list is saying that for any Table I want to connect to other + * tables via the LoadProcess.outputTable attribute, and then via the inputTables attribute. So each entry in the + * Attribute Path represents an attribute in an object. For reverse relations the Type and attribute must be specified, + * as in 'LoadTableProcess,outputTable)', whereas for forward relations the attribute name is sufficient. + * - The depth of the traversal. Certain times you are not interested in the complete closure, but to only + * discover related instances up to a certain depth. Specify the depth as number of hops, or you can ask for the + * complete closure. + * - You can ask for certain attributes to be returned. For e.g. you may only want the Table name, owner and + * creationDate. By default only the Ids of the related instances is returned. + * - For pair of related instances, you optionally ask for the Path of the relation to be returned. This is + * returned as a list of ''Id''s. + * + * Given these 5 things the ClosureQuery can be executed, it returns a GremlinQueryResult of the Closure Query. + */ +trait ClosureQuery { + + val SRC_PREFIX = TypeUtils.GraphResultStruct.SRC_PREFIX + val DEST_PREFIX = TypeUtils.GraphResultStruct.DEST_PREFIX + + sealed trait PathAttribute { + + def toExpr : Expression = this match { + case r : Relation => id(r.attributeName) + case rr : ReverseRelation => id(s"${rr.typeName}->${rr.attributeName}") + } + + def toFieldName : String = this match { + case r : Relation => r.attributeName + case rr : ReverseRelation => rr.typeName + } + } + case class ReverseRelation(typeName : String, attributeName : String) extends PathAttribute + case class Relation(attributeName : String) extends PathAttribute + + /** + * Type on whose instances the closure needs to be computed + * @return + */ + def closureType : String + + /** + * specify how instances are related. + */ + def closureRelation : List[PathAttribute] + + /** + * The maximum hops between related instances. A [[None]] implies there is maximum. + * @return + */ + def depth : Option[Int] + + /** + * The attributes to return for the instances. These will be prefixed by 'src_' and 'dest_' in the + * output rows. + * @return + */ + def selectAttributes : Option[List[String]] + + /** + * specify if the Path should be returned. + * @return + */ + def withPath : Boolean + + def persistenceStrategy: GraphPersistenceStrategies + def g: TitanGraph + + def pathExpr : Expressions.Expression = { + closureRelation.tail.foldLeft(closureRelation.head.toExpr)((b,a) => b.field(a.toFieldName)) + } + + def selectExpr(alias : String) : List[Expression] = { + selectAttributes.map { _.map { a => + id(alias).field(a).as(s"${alias}_$a") + } + }.getOrElse(List(id(alias))) + } + + /** + * hook to allow a filter to be added for the closureType + * @param expr + * @return + */ + def srcCondition(expr : Expression) : Expression = expr + + def expr : Expressions.Expression = { + val e = srcCondition(Expressions._class(closureType)).as(SRC_PREFIX).loop(pathExpr).as(DEST_PREFIX). + select((selectExpr(SRC_PREFIX) ++ selectExpr(DEST_PREFIX)):_*) + if (withPath) e.path else e + } + + def evaluate(): GremlinQueryResult = { + var e = expr + QueryProcessor.evaluate(e, g, persistenceStrategy) + } + + def graph : GraphResult = { + + if (!withPath) { + throw new ExpressionException(expr, "Graph requested for non Path Query") + } + + import scala.collection.JavaConverters._ + + val res = evaluate() + + val graphResType = TypeUtils.GraphResultStruct.createType(res.resultDataType.asInstanceOf[StructType]) + val vertexPayloadType = { + val mT = graphResType.fieldMapping.fields.get(TypeUtils.GraphResultStruct.verticesAttrName). + dataType().asInstanceOf[MapType] + mT.getValueType.asInstanceOf[StructType] + } + + def id(idObj : StructInstance) : String = idObj.getString(TypeSystem.ID_STRUCT_ID_ATTRNAME) + + def vertexStruct(idObj : StructInstance, resRow : ITypedStruct, attrPrefix : String) : StructInstance = { + val vP = vertexPayloadType.createInstance() + vP.set(TypeUtils.GraphResultStruct.vertexIdAttrName, idObj) + vertexPayloadType.fieldMapping.fields.asScala.keys. + filter(_ != TypeUtils.GraphResultStruct.vertexIdAttrName).foreach{a => + vP.set(a, resRow.get(s"${attrPrefix}$a")) + } + vP.asInstanceOf[StructInstance] + } + + val instance = graphResType.createInstance() + val vertices = new util.HashMap[String, AnyRef]() + val edges = new util.HashMap[String,java.util.List[String]]() + + /** + * foreach resultRow + * for each Path entry + * add an entry in the edges Map + * add an entry for the Src Vertex to the vertex Map + * add an entry for the Dest Vertex to the vertex Map + */ + res.rows.map(_.asInstanceOf[StructInstance]).foreach { r => + + val path = r.get(TypeUtils.ResultWithPathStruct.pathAttrName).asInstanceOf[java.util.List[_]].asScala + val srcVertex = path.head.asInstanceOf[StructInstance] + + var currVertex = srcVertex + path.tail.foreach { n => + val nextVertex = n.asInstanceOf[StructInstance] + val iList = if (!edges.containsKey(id(currVertex))) { + val l = new util.ArrayList[String]() + edges.put(id(currVertex), l) + l + } else { + edges.get(id(currVertex)) + } + if ( !iList.contains(id(nextVertex))) { + iList.add(id(nextVertex)) + } + currVertex = nextVertex + } + val vertex = r.get(TypeUtils.ResultWithPathStruct.resultAttrName) + vertices.put(id(srcVertex), vertexStruct(srcVertex, + r.get(TypeUtils.ResultWithPathStruct.resultAttrName).asInstanceOf[ITypedStruct], + s"${SRC_PREFIX}_")) + vertices.put(id(currVertex), vertexStruct(currVertex, + r.get(TypeUtils.ResultWithPathStruct.resultAttrName).asInstanceOf[ITypedStruct], + s"${DEST_PREFIX}_")) + } + + instance.set(TypeUtils.GraphResultStruct.verticesAttrName, vertices) + instance.set(TypeUtils.GraphResultStruct.edgesAttrName, edges) + GraphResult(res.query, instance) + } +} + +/** + * Closure for a single instance. Instance is specified by an ''attributeToSelectInstance'' and the value + * for the attribute. + * + * @tparam T + */ +trait SingleInstanceClosureQuery[T] extends ClosureQuery { + + def attributeToSelectInstance : String + + def attributeTyp : PrimitiveType[T] + def instanceValue : T + + override def srcCondition(expr : Expression) : Expression = { + expr.where( + Expressions.id(attributeToSelectInstance).`=`(Expressions.literal(attributeTyp, instanceValue)) + ) + } +} + +/** + * A ClosureQuery to compute '''Lineage''' for Hive tables. Assumes the Lineage relation is captured in a ''CTAS'' + * type, and the table relations are captured as attributes from a CTAS instance to Table instances. + * + * @param tableTypeName The name of the Table Type. + * @param ctasTypeName The name of the Create Table As Select(CTAS) Type. + * @param ctasInputTableAttribute The attribute in CTAS Type that associates it to the ''Input'' tables. + * @param ctasOutputTableAttribute The attribute in CTAS Type that associates it to the ''Output'' tables. + * @param depth depth as needed by the closure Query. + * @param selectAttributes as needed by the closure Query. + * @param withPath as needed by the closure Query. + * @param persistenceStrategy as needed to evaluate the Closure Query. + * @param g as needed to evaluate the Closure Query. + */ +case class HiveLineageQuery(tableTypeName : String, + tableName : String, + ctasTypeName : String, + ctasInputTableAttribute : String, + ctasOutputTableAttribute : String, + depth : Option[Int], + selectAttributes : Option[List[String]], + withPath : Boolean, + persistenceStrategy: GraphPersistenceStrategies, + g: TitanGraph + ) extends SingleInstanceClosureQuery[String] { + + val closureType : String = tableTypeName + + val attributeToSelectInstance = "name" + val attributeTyp = DataTypes.STRING_TYPE + + val instanceValue = tableName + + lazy val closureRelation = List( + ReverseRelation(ctasTypeName, ctasOutputTableAttribute), + Relation(ctasInputTableAttribute) + ) +} + +/** + * A ClosureQuery to compute where a table is used based on the '''Lineage''' for Hive tables. + * Assumes the Lineage relation is captured in a ''CTAS'' + * type, and the table relations are captured as attributes from a CTAS instance to Table instances. + * + * @param tableTypeName The name of the Table Type. + * @param ctasTypeName The name of the Create Table As Select(CTAS) Type. + * @param ctasInputTableAttribute The attribute in CTAS Type that associates it to the ''Input'' tables. + * @param ctasOutputTableAttribute The attribute in CTAS Type that associates it to the ''Output'' tables. + * @param depth depth as needed by the closure Query. + * @param selectAttributes as needed by the closure Query. + * @param withPath as needed by the closure Query. + * @param persistenceStrategy as needed to evaluate the Closure Query. + * @param g as needed to evaluate the Closure Query. + */ +case class HiveWhereUsedQuery(tableTypeName : String, + tableName : String, + ctasTypeName : String, + ctasInputTableAttribute : String, + ctasOutputTableAttribute : String, + depth : Option[Int], + selectAttributes : Option[List[String]], + withPath : Boolean, + persistenceStrategy: GraphPersistenceStrategies, + g: TitanGraph + ) extends SingleInstanceClosureQuery[String] { + + val closureType : String = tableTypeName + + val attributeToSelectInstance = "name" + val attributeTyp = DataTypes.STRING_TYPE + + val instanceValue = tableName + + lazy val closureRelation = List( + ReverseRelation(ctasTypeName, ctasInputTableAttribute), + Relation(ctasOutputTableAttribute) + ) +} + +case class GraphResult(query: String, result : ITypedStruct) { + + def toTypedJson = Serialization.toJson(result) + + def toInstanceJson = InstanceSerialization.toJson(result) +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/30711973/repository/src/main/scala/org/apache/atlas/query/Expressions.scala ---------------------------------------------------------------------- diff --git a/repository/src/main/scala/org/apache/atlas/query/Expressions.scala b/repository/src/main/scala/org/apache/atlas/query/Expressions.scala new file mode 100755 index 0000000..ed57a91 --- /dev/null +++ b/repository/src/main/scala/org/apache/atlas/query/Expressions.scala @@ -0,0 +1,739 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.query + +import org.apache.atlas.MetadataException +import org.apache.atlas.typesystem.types.DataTypes.{ArrayType, PrimitiveType, TypeCategory} +import org.apache.atlas.typesystem.types._ + +object Expressions { + + import TypeUtils._ + + class ExpressionException(val e: Expression, message: String, cause: Throwable, enableSuppression: Boolean, + writableStackTrace: Boolean) + extends MetadataException(message, cause, enableSuppression, writableStackTrace) { + + def this(e: Expression, message: String) { + this(e, message, null, false, false) + } + + def this(e: Expression, message: String, cause: Throwable) { + this(e, message, cause, false, false) + } + + def this(e: Expression, cause: Throwable) { + this(e, null, cause, false, false) + } + + override def getMessage: String = { + val eString = e.toString + s"${super.getMessage}, expression:${if (eString contains "\n") "\n" else " "}$e" + } + + } + + class UnresolvedException(expr: Expression, function: String) extends + ExpressionException(expr, s"Unresolved $function") + + def attachExpression[A](e: Expression, msg: String = "")(f: => A): A = { + try f catch { + case eex: ExpressionException => throw eex + case ex: Exception => throw new ExpressionException(e, msg, ex) + } + } + + trait Expression { + self: Product => + + def children: Seq[Expression] + + /** + * Returns `true` if the schema for this expression and all its children have been resolved. + * The default logic is that an Expression is resolve if all its children are resolved. + */ + lazy val resolved: Boolean = childrenResolved + + /** + * Returns the output [[IDataType[_]] of this expression. Expressions that are unresolved will + * throw if this method is invoked. + */ + def dataType: IDataType[_] + + /** + * Returns true if all the children have been resolved. + */ + def childrenResolved = !children.exists(!_.resolved) + + + /** + * the aliases that are present in this Expression Tree + */ + def namedExpressions: Map[String, Expression] = Map() + + def fastEquals(other: Expression): Boolean = { + this.eq(other) || this == other + } + + def makeCopy(newArgs: Array[AnyRef]): this.type = attachExpression(this, "makeCopy") { + try { + val defaultCtor = getClass.getConstructors.find(_.getParameterTypes.size != 0).head + defaultCtor.newInstance(newArgs: _*).asInstanceOf[this.type] + } catch { + case e: java.lang.IllegalArgumentException => + throw new ExpressionException( + this, s"Failed to copy node. Reason: ${e.getMessage}.") + } + } + + def transformChildrenDown(rule: PartialFunction[Expression, Expression]): this.type = { + var changed = false + val newArgs = productIterator.map { + case arg: Expression if children contains arg => + val newChild = arg.asInstanceOf[Expression].transformDown(rule) + if (!(newChild fastEquals arg)) { + changed = true + newChild + } else { + arg + } + case Some(arg: Expression) if children contains arg => + val newChild = arg.asInstanceOf[Expression].transformDown(rule) + if (!(newChild fastEquals arg)) { + changed = true + Some(newChild) + } else { + Some(arg) + } + case m: Map[_, _] => m + case args: Traversable[_] => args.map { + case arg: Expression if children contains arg => + val newChild = arg.asInstanceOf[Expression].transformDown(rule) + if (!(newChild fastEquals arg)) { + changed = true + newChild + } else { + arg + } + case other => other + } + case nonChild: AnyRef => nonChild + case null => null + }.toArray + if (changed) makeCopy(newArgs) else this + } + + def transformDown(rule: PartialFunction[Expression, Expression]): Expression = { + val afterRule = rule.applyOrElse(this, identity[Expression]) + // Check if unchanged and then possibly return old copy to avoid gc churn. + if (this fastEquals afterRule) { + transformChildrenDown(rule) + } else { + afterRule.transformChildrenDown(rule) + } + } + + def traverseChildren(traverseFunc: (Expression, PartialFunction[Expression, Unit]) => Unit) + (rule: PartialFunction[Expression, Unit]): Unit = { + productIterator.foreach { + case arg: Expression if children contains arg => + traverseFunc(arg.asInstanceOf[Expression], rule) + case Some(arg: Expression) if children contains arg => + traverseFunc(arg.asInstanceOf[Expression], rule) + case m: Map[_, _] => m + case args: Traversable[_] => args.map { + case arg: Expression if children contains arg => + traverseFunc(arg.asInstanceOf[Expression], rule) + case other => other + } + case nonChild: AnyRef => nonChild + case null => null + } + } + + def traverseChildrenDown = traverseChildren(_traverseDown) _ + + private def _traverseDown(e: Expression, rule: PartialFunction[Expression, Unit]): Unit = { + if (rule.isDefinedAt(e)) { + rule.apply(e) + } + e.traverseChildrenDown(rule) + } + + def traverseDown(rule: PartialFunction[Expression, Unit]): Unit = { + _traverseDown(this, rule) + } + + def traverseChildrenUp = traverseChildren(_traverseUp) _ + + private def _traverseUp(e: Expression, rule: PartialFunction[Expression, Unit]): Unit = { + e.traverseChildrenUp(rule) + if (rule.isDefinedAt(e)) { + rule.apply(e) + } + } + + def traverseUp(rule: PartialFunction[Expression, Unit]): Unit = { + _traverseUp(this, rule) + } + + def transformUp(rule: PartialFunction[Expression, Expression]): Expression = { + val afterRuleOnChildren = transformChildrenUp(rule); + if (this fastEquals afterRuleOnChildren) { + rule.applyOrElse(this, identity[Expression]) + } else { + rule.applyOrElse(afterRuleOnChildren, identity[Expression]) + } + } + + def transformChildrenUp(rule: PartialFunction[Expression, Expression]): this.type = { + var changed = false + val newArgs = productIterator.map { + case arg: Expression if children contains arg => + val newChild = arg.asInstanceOf[Expression].transformUp(rule) + if (!(newChild fastEquals arg)) { + changed = true + newChild + } else { + arg + } + case Some(arg: Expression) if children contains arg => + val newChild = arg.asInstanceOf[Expression].transformUp(rule) + if (!(newChild fastEquals arg)) { + changed = true + Some(newChild) + } else { + Some(arg) + } + case m: Map[_, _] => m + case args: Traversable[_] => args.map { + case arg: Expression if children contains arg => + val newChild = arg.asInstanceOf[Expression].transformUp(rule) + if (!(newChild fastEquals arg)) { + changed = true + newChild + } else { + arg + } + case other => other + } + case nonChild: AnyRef => nonChild + case null => null + }.toArray + if (changed) makeCopy(newArgs) else this + } + + /* + * treeString methods + */ + def nodeName = getClass.getSimpleName + + def argString: String = productIterator.flatMap { + case e: Expression if children contains e => Nil + case e: Expression if e.toString contains "\n" => s"(${e.simpleString})" :: Nil + case seq: Seq[_] => seq.mkString("[", ",", "]") :: Nil + case set: Set[_] => set.mkString("{", ",", "}") :: Nil + case f: IDataType[_] => f.getName :: Nil + case other => other :: Nil + }.mkString(", ") + + /** String representation of this node without any children */ + def simpleString = s"$nodeName $argString" + + protected def generateTreeString(depth: Int, builder: StringBuilder): StringBuilder = { + builder.append(" " * depth) + builder.append(simpleString) + builder.append("\n") + children.foreach(_.generateTreeString(depth + 1, builder)) + builder + } + + def treeString = generateTreeString(0, new StringBuilder).toString + + /* + * Fluent API methods + */ + def field(fieldName: String) = new UnresolvedFieldExpression(this, fieldName) + + def join(fieldName: String) = field(fieldName) + + def `.`(fieldName: String) = field(fieldName) + + def as(alias: String) = new AliasExpression(this, alias) + + def arith(op: String)(rightExpr: Expression) = new ArithmeticExpression(op, this, rightExpr) + + def + = arith("+") _ + + def - = arith("-") _ + + def * = arith("*") _ + + def / = arith("/") _ + + def % = arith("%") _ + + def isTrait(name: String) = new isTraitUnaryExpression(name, this) + + def hasField(name: String) = new hasFieldUnaryExpression(name, this) + + def compareOp(op: String)(rightExpr: Expression) = new ComparisonExpression(op, this, rightExpr) + + def `=` = compareOp("=") _ + + def `!=` = compareOp("!=") _ + + def `>` = compareOp(">") _ + + def `>=` = compareOp(">=") _ + + def `<` = compareOp("<") _ + + def `<=` = compareOp("=") _ + + def logicalOp(op: String)(rightExpr: Expression) = new LogicalExpression(op, List(this, rightExpr)) + + def and = logicalOp("and") _ + + def or = logicalOp("or") _ + + def where(condExpr: Expression) = new FilterExpression(this, condExpr) + + def select(selectList: Expression*) = new SelectExpression(this, selectList.toList) + + def loop(loopingExpr: Expression) = new LoopExpression(this, loopingExpr, None) + + def loop(loopingExpr: Expression, times: Literal[Integer]) = + new LoopExpression(this, loopingExpr, Some(times)) + + def traitInstance() = new TraitInstanceExpression(this) + def instance() = new InstanceExpression(this) + + def path() = new PathExpression(this) + } + + trait BinaryNode { + self: Expression => + def left: Expression + + def right: Expression + + def children = Seq(left, right) + + override def namedExpressions = left.namedExpressions ++ right.namedExpressions + } + + trait LeafNode { + def children = Nil + } + + trait UnaryNode { + self: Expression => + def child: Expression + + override def namedExpressions = child.namedExpressions + + def children = child :: Nil + } + + abstract class BinaryExpression extends Expression with BinaryNode { + self: Product => + def symbol: String + + override def toString = s"($left $symbol $right)" + } + + case class ClassExpression(clsName: String) extends Expression with LeafNode { + val dataType = typSystem.getDataType(classOf[ClassType], clsName) + + override def toString = clsName + } + + def _class(name: String): Expression = new ClassExpression(name) + + case class TraitExpression(traitName: String) extends Expression with LeafNode { + val dataType = typSystem.getDataType(classOf[TraitType], traitName) + + override def toString = traitName + } + + def _trait(name: String) = new TraitExpression(name) + + case class IdExpression(name: String) extends Expression with LeafNode { + override def toString = name + + override lazy val resolved = false + + override def dataType = throw new UnresolvedException(this, "id") + } + + def id(name: String) = new IdExpression(name) + + case class UnresolvedFieldExpression(child: Expression, fieldName: String) extends Expression + with UnaryNode { + override def toString = s"${child}.$fieldName" + + override lazy val resolved = false + + override def dataType = throw new UnresolvedException(this, "field") + } + + case class FieldExpression(fieldName: String, fieldInfo: FieldInfo, child: Option[Expression]) + extends Expression { + + def elemType(t: IDataType[_]): IDataType[_] = { + if (t.getTypeCategory == TypeCategory.ARRAY) { + val aT = t.asInstanceOf[ArrayType] + if (aT.getElemType.getTypeCategory == TypeCategory.CLASS || + aT.getElemType.getTypeCategory == TypeCategory.STRUCT) { + return aT.getElemType + } + } + t + } + + val children = if (child.isDefined) List(child.get) else Nil + import scala.language.existentials + lazy val dataType = { + val t = { + if (fieldInfo.traitName != null ) { + typSystem.getDataType(classOf[TraitType], fieldInfo.traitName) + } else if (!fieldInfo.isReverse) { + fieldInfo.attrInfo.dataType() + } else { + fieldInfo.reverseDataType + } + } + elemType(t) + } + override lazy val resolved: Boolean = true + + override def namedExpressions = if (child.isDefined) child.get.namedExpressions else Map() + + override def toString = { + if (child.isDefined) { + val sep = if (dataType.isInstanceOf[ClassType]) " " else "." + s"${child.get}${sep}$fieldName" + } else { + fieldName + } + } + } + + case class AliasExpression(child: Expression, alias: String) extends Expression with UnaryNode { + override def namedExpressions = child.namedExpressions + (alias -> child) + + override def toString = s"$child as $alias" + + lazy val dataType = { + if (!resolved) { + throw new UnresolvedException(this, + s"datatype. Can not resolve due to unresolved child") + } + child.dataType + } + } + + case class BackReference(alias: String, reference: Expression, child: Option[Expression]) extends Expression { + val children = if (child.isDefined) List(child.get) else Nil + val dataType = reference.dataType + + override def namedExpressions = if (child.isDefined) child.get.namedExpressions else Map() + + override def toString = if (child.isDefined) s"${child.get} $alias" else alias + } + + case class Literal[T](dataType: PrimitiveType[T], rawValue: Any) extends Expression with LeafNode { + val value = if (rawValue == null) dataType.nullValue() else dataType.convert(rawValue, Multiplicity.REQUIRED) + + override def toString = value match { + case s: String => s""""$s"""" + case x => x.toString + } + } + + def literal[T](typ: PrimitiveType[T], rawValue: Any) = new Literal[T](typ, rawValue) + + def boolean(rawValue: Any) = literal(DataTypes.BOOLEAN_TYPE, rawValue) + + def byte(rawValue: Any) = literal(DataTypes.BYTE_TYPE, rawValue) + + def short(rawValue: Any) = literal(DataTypes.SHORT_TYPE, rawValue) + + def int(rawValue: Any) = literal(DataTypes.INT_TYPE, rawValue) + + def long(rawValue: Any) = literal(DataTypes.LONG_TYPE, rawValue) + + def float(rawValue: Any) = literal(DataTypes.FLOAT_TYPE, rawValue) + + def double(rawValue: Any) = literal(DataTypes.DOUBLE_TYPE, rawValue) + + def bigint(rawValue: Any) = literal(DataTypes.BIGINTEGER_TYPE, rawValue) + + def bigdecimal(rawValue: Any) = literal(DataTypes.BIGDECIMAL_TYPE, rawValue) + + def string(rawValue: Any) = literal(DataTypes.STRING_TYPE, rawValue) + + case class ArithmeticExpression(symbol: String, + left: Expression, + right: Expression) + extends BinaryExpression { + + lazy val dataType = { + if (!resolved) { + throw new UnresolvedException(this, + s"datatype. Can not resolve due to unresolved children") + } + TypeUtils.combinedType(left.dataType, right.dataType) + } + } + + case class isTraitLeafExpression(traitName: String, classExpression: Option[Expression] = None) + extends Expression with LeafNode { + // validate TraitName + try { + typSystem.getDataType(classOf[TraitType], traitName) + } catch { + case me: MetadataException => throw new ExpressionException(this, "not a TraitType", me) + } + + override lazy val resolved = classExpression.isDefined + lazy val dataType = { + + if (!resolved) { + throw new UnresolvedException(this, + s"cannot resolve isTrait application") + } + + if (!classExpression.get.dataType.isInstanceOf[ClassType]) { + throw new ExpressionException(this, + s"Cannot apply isTrait on ${classExpression.get.dataType.getName}, it is not a ClassType") + } + DataTypes.BOOLEAN_TYPE + } + + override def toString = s"${classExpression.getOrElse("")} is $traitName" + } + + def isTrait(name: String) = new isTraitLeafExpression(name) + + case class isTraitUnaryExpression(traitName: String, child: Expression) + extends Expression with UnaryNode { + // validate TraitName + typSystem.getDataType(classOf[TraitType], traitName) + lazy val dataType = { + if (!resolved) { + throw new UnresolvedException(this, + s"datatype. Can not resolve due to unresolved child") + } + if (!child.dataType.isInstanceOf[ClassType]) { + throw new ExpressionException(this, + s"Cannot apply isTrait on ${child.dataType.getName}, it is not a ClassType") + } + DataTypes.BOOLEAN_TYPE + } + + override def toString = s"$child is $traitName" + } + + case class hasFieldLeafExpression(fieldName: String, classExpression: Option[Expression] = None) + extends Expression with LeafNode { + + override lazy val resolved = classExpression.isDefined + lazy val dataType = { + if (!resolved) { + throw new UnresolvedException(this, + s"Cannot apply hasField on ${classExpression.get.dataType.getName}, it is not a ClassType") + } + if (classExpression.isDefined && !TypeUtils.fieldMapping(classExpression.get.dataType).isDefined) { + throw new ExpressionException(this, s"Cannot apply hasField on ${classExpression.get.dataType.getName}") + } + DataTypes.BOOLEAN_TYPE + } + + override def toString = s"${classExpression.getOrElse("")} has $fieldName" + } + + def hasField(name: String) = new hasFieldLeafExpression(name) + + case class hasFieldUnaryExpression(fieldName: String, child: Expression) + extends Expression with UnaryNode { + lazy val dataType = { + if (!resolved) { + throw new UnresolvedException(this, + s"datatype. Can not resolve due to unresolved child") + } + if (!TypeUtils.fieldMapping(child.dataType).isDefined) { + throw new MetadataException(s"Cannot apply hasField on ${child.dataType.getName}") + } + DataTypes.BOOLEAN_TYPE + } + + override def toString = s"$child has $fieldName" + } + + case class ComparisonExpression(symbol: String, + left: Expression, + right: Expression) + extends BinaryExpression { + + lazy val dataType = { + if (!resolved) { + throw new UnresolvedException(this, + s"datatype. Can not resolve due to unresolved children") + } + if (left.dataType != DataTypes.STRING_TYPE || right.dataType != DataTypes.STRING_TYPE) { + TypeUtils.combinedType(left.dataType, right.dataType) + } + DataTypes.BOOLEAN_TYPE + } + } + + case class LogicalExpression(symbol: String, children: List[Expression]) + extends Expression { + assert(children.size > 0) + lazy val dataType = { + if (!resolved) { + throw new UnresolvedException(this, + s"datatype. Can not resolve due to unresolved children") + } + children.foreach { childExpr => + if (childExpr.dataType != DataTypes.BOOLEAN_TYPE) { + throw new MetadataException( + s"Cannot apply logical operator '$symbol' on input of type '${childExpr.dataType}") + } + } + DataTypes.BOOLEAN_TYPE + } + + override def toString = children.mkString("", s" $symbol ", "") + } + + case class FilterExpression(val child: Expression, val condExpr: Expression) extends Expression { + val children = List(child, condExpr) + lazy val dataType = { + if (!resolved) { + throw new UnresolvedException(this, + s"datatype. Can not resolve due to unresolved children") + } + if (condExpr.dataType != DataTypes.BOOLEAN_TYPE) { + throw new ExpressionException(this, s"Filter condition '$condExpr' is not a boolean expression") + } + child.dataType + } + + override def namedExpressions = child.namedExpressions ++ condExpr.namedExpressions + + override def toString = s"$child where $condExpr" + } + + val GEN_COL_ALIAS_PREFIX = "_col" + + case class SelectExpression(child: Expression, selectList: List[Expression]) extends Expression { + + val children = List(child) ::: selectList + lazy val selectListWithAlias = selectList.zipWithIndex map { + case (s: AliasExpression, _) => s + case (x, i) => new AliasExpression(x, s"${GEN_COL_ALIAS_PREFIX}_$i") + } + + lazy val dataType = { + if (!resolved) { + throw new UnresolvedException(this, + s"datatype. Can not resolve due to unresolved children") + } + TypeUtils.createStructType(selectListWithAlias) + } + + override def namedExpressions = child.namedExpressions ++ (selectList.flatMap(_.namedExpressions)) + + override def toString = s"""$child select ${selectListWithAlias.mkString("", ", ", "")}""" + } + + case class LoopExpression(val input: Expression, val loopingExpression: Expression, + val times: Option[Literal[Integer]]) extends Expression { + val children = List(input, loopingExpression) + lazy val dataType = { + if (!resolved) { + throw new UnresolvedException(this, + s"datatype. Can not resolve due to unresolved children") + } + if (input.dataType.getTypeCategory != TypeCategory.CLASS) { + throw new ExpressionException(this, s"Loop Expression applied to type : '${input.dataType.getName}';" + + " loop can only be applied to Class Expressions") + } + if (input.dataType != loopingExpression.dataType) { + throw new ExpressionException(this, + s"Invalid Loop Expression; input and loopExpression dataTypes don't match: " + + s"(${input.dataType.getName},${loopingExpression.dataType.getName}})") + } + input.dataType + } + + override def namedExpressions = input.namedExpressions + + override def toString = { + if (times.isDefined) s"$input loop ($loopingExpression) times ${times.get.value}" + else s"$input loop ($loopingExpression)" + } + } + + case class TraitInstanceExpression(child: Expression) + extends Expression with UnaryNode { + lazy val dataType = { + if (!resolved) { + throw new UnresolvedException(this, + s"datatype. Can not resolve due to unresolved child") + } + if (!child.dataType.isInstanceOf[TraitType]) { + throw new ExpressionException(this, + s"Cannot apply instance on ${child.dataType.getName}, it is not a TraitType") + } + typSystem.getIdType.getStructType + } + + override def toString = s"$child traitInstance" + } + + case class InstanceExpression(child: Expression) + extends Expression with UnaryNode { + lazy val dataType = { + if (!resolved) { + throw new UnresolvedException(this, + s"datatype. Can not resolve due to unresolved child") + } + typSystem.getIdType.getStructType + } + + override def toString = s"$child instance" + } + + case class PathExpression(child: Expression) + extends Expression with UnaryNode { + lazy val dataType = { + if (!resolved) { + throw new UnresolvedException(this, + s"datatype. Can not resolve due to unresolved child") + } + TypeUtils.ResultWithPathStruct.createType(this, child.dataType) + } + + override def toString = s"$child withPath" + } +}
