http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java b/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java deleted file mode 100755 index 9eb695c..0000000 --- a/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java +++ /dev/null @@ -1,821 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.atlas.services; - -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; -import org.apache.atlas.AtlasClient; -import org.apache.atlas.AtlasErrorCode; -import org.apache.atlas.AtlasException; -import org.apache.atlas.CreateUpdateEntitiesResult; -import org.apache.atlas.EntityAuditEvent; -import org.apache.atlas.RequestContext; -import org.apache.atlas.exception.AtlasBaseException; -import org.apache.atlas.ha.HAConfiguration; -import org.apache.atlas.listener.ActiveStateChangeHandler; -import org.apache.atlas.listener.ChangedTypeDefs; -import org.apache.atlas.listener.EntityChangeListener; -import org.apache.atlas.listener.TypeDefChangeListener; -import org.apache.atlas.listener.TypesChangeListener; -import org.apache.atlas.model.legacy.EntityResult; -import org.apache.atlas.repository.MetadataRepository; -import org.apache.atlas.repository.RepositoryException; -import org.apache.atlas.repository.audit.EntityAuditRepository; -import org.apache.atlas.repository.graph.GraphHelper; -import org.apache.atlas.repository.typestore.ITypeStore; -import org.apache.atlas.type.AtlasTypeUtil; -import org.apache.atlas.typesystem.IReferenceableInstance; -import org.apache.atlas.typesystem.IStruct; -import org.apache.atlas.typesystem.ITypedReferenceableInstance; -import org.apache.atlas.typesystem.ITypedStruct; -import org.apache.atlas.typesystem.Referenceable; -import org.apache.atlas.typesystem.Struct; -import org.apache.atlas.typesystem.TypesDef; -import org.apache.atlas.typesystem.exception.EntityNotFoundException; -import org.apache.atlas.typesystem.exception.TypeNotFoundException; -import org.apache.atlas.typesystem.json.InstanceSerialization; -import org.apache.atlas.typesystem.json.TypesSerialization; -import org.apache.atlas.typesystem.persistence.Id; -import org.apache.atlas.typesystem.persistence.ReferenceableInstance; -import org.apache.atlas.typesystem.types.*; -import org.apache.atlas.typesystem.types.cache.TypeCache; -import org.apache.atlas.utils.ParamChecker; -import org.apache.commons.configuration.Configuration; -import org.codehaus.jettison.json.JSONException; -import org.codehaus.jettison.json.JSONObject; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.stereotype.Component; - -import javax.inject.Inject; -import javax.inject.Singleton; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - - - -/** - * Simple wrapper over TypeSystem and MetadataRepository services with hooks - * for listening to changes to the repository. - */ -@Singleton -@Component -@Deprecated -public class DefaultMetadataService implements MetadataService, ActiveStateChangeHandler, TypeDefChangeListener { - private enum OperationType { - CREATE, UPDATE, DELETE - }; - - private static final Logger LOG = LoggerFactory.getLogger(DefaultMetadataService.class); - private final short maxAuditResults; - private static final String CONFIG_MAX_AUDIT_RESULTS = "atlas.audit.maxResults"; - private static final short DEFAULT_MAX_AUDIT_RESULTS = 1000; - - private final TypeSystem typeSystem; - private final MetadataRepository repository; - private final ITypeStore typeStore; - - private final Collection<TypesChangeListener> typeChangeListeners = new LinkedHashSet<>(); - private final Collection<EntityChangeListener> entityChangeListeners = new LinkedHashSet<>(); - - private EntityAuditRepository auditRepository; - - @Inject - public DefaultMetadataService(final MetadataRepository repository, final ITypeStore typeStore, - final Set<TypesChangeListener> typesChangeListeners, - final Set<EntityChangeListener> entityChangeListeners, - final TypeSystem typeSystem, - final Configuration configuration, - TypeCache typeCache, - EntityAuditRepository auditRepository) throws AtlasException { - this.typeStore = typeStore; - this.typeSystem = typeSystem; - /** - * Ideally a TypeCache implementation should have been injected in the TypeSystemProvider, - * but a singleton of TypeSystem is constructed privately within the class so that - * clients of TypeSystem would never instantiate a TypeSystem object directly in - * their code. As soon as a client makes a call to TypeSystem.getInstance(), they - * should have the singleton ready for consumption. Manually inject TypeSystem with - * the Guice-instantiated type cache here, before types are restored. - * This allows cache implementations to participate in Guice dependency injection. - */ - this.typeSystem.setTypeCache(typeCache); - - this.repository = repository; - - this.typeChangeListeners.addAll(typesChangeListeners); - - this.entityChangeListeners.addAll(entityChangeListeners); - - if (!HAConfiguration.isHAEnabled(configuration)) { - restoreTypeSystem(); - } - - maxAuditResults = configuration.getShort(CONFIG_MAX_AUDIT_RESULTS, DEFAULT_MAX_AUDIT_RESULTS); - - this.auditRepository = auditRepository; - } - - private void restoreTypeSystem() throws AtlasException { - LOG.info("Restoring type system from the store"); - - TypesDef typesDef = typeStore.restore(); - - refreshCache(typesDef); - - LOG.info("Restored type system from the store"); - } - - private void refreshCache(TypesDef typesDef) throws AtlasException { - if (typesDef != null && !typesDef.isEmpty()) { - TypeSystem.TransientTypeSystem transientTypeSystem = typeSystem.createTransientTypeSystem(typesDef, true); - Map<String, IDataType> typesAdded = transientTypeSystem.getTypesAdded(); - LOG.info("Number of types got from transient type system: {}", typesAdded.size()); - typeSystem.commitTypes(typesAdded); - } - } - - /** - * Creates a new type based on the type system to enable adding - * entities (instances for types). - * - * @param typeDefinition definition as json - * @return a unique id for this type - */ - @Override - public JSONObject createType(String typeDefinition) throws AtlasException { - return createOrUpdateTypes(OperationType.CREATE, typeDefinition, false); - } - - private JSONObject createOrUpdateTypes(OperationType opType, String typeDefinition, boolean isUpdate) throws AtlasException { - typeDefinition = ParamChecker.notEmpty(typeDefinition, "type definition"); - TypesDef typesDef = validateTypeDefinition(opType, typeDefinition); - - - try { - final TypeSystem.TransientTypeSystem transientTypeSystem = typeSystem.createTransientTypeSystem(typesDef, isUpdate); - final Map<String, IDataType> typesAdded = transientTypeSystem.getTypesAdded(); - try { - /* Create indexes first so that if index creation fails then we rollback - the typesystem and also do not persist the graph - */ - if (isUpdate) { - onTypesUpdated(typesAdded); - } else { - onTypesAdded(typesAdded); - } - typeStore.store(transientTypeSystem, ImmutableList.copyOf(typesAdded.keySet())); - typeSystem.commitTypes(typesAdded); - } catch (Throwable t) { - throw new AtlasException("Unable to persist types ", t); - } - - return new JSONObject() {{ - put(AtlasClient.TYPES, typesAdded.keySet()); - }}; - } catch (JSONException e) { - LOG.error("Unable to create response for types={}", typeDefinition, e); - throw new AtlasException("Unable to create response ", e); - } - } - - @Override - public JSONObject updateType(String typeDefinition) throws AtlasException { - return createOrUpdateTypes(OperationType.UPDATE, typeDefinition, true); - } - - private TypesDef validateTypeDefinition(OperationType opType, String typeDefinition) throws AtlasException { - final String exceptionErrorMessageFormat = "%s for '%s' failed: %s"; - - try { - TypesDef typesDef = TypesSerialization.fromJson(typeDefinition); - if (typesDef.isEmpty()) { - throw new IllegalArgumentException("Invalid type definition"); - } - - for (HierarchicalTypeDefinition<ClassType> t : typesDef.classTypesAsJavaList()) { - if (!AtlasTypeUtil.isValidTypeName(t.typeName)) - throw new AtlasException( - String.format(exceptionErrorMessageFormat, opType.toString(), t.typeName, AtlasTypeUtil.getInvalidTypeNameErrorMessage())); - } - - for (StructTypeDefinition t : typesDef.structTypesAsJavaList()) { - if (!AtlasTypeUtil.isValidTypeName(t.typeName)) - throw new AtlasException( - String.format(exceptionErrorMessageFormat, opType.toString(), t.typeName, AtlasTypeUtil.getInvalidTypeNameErrorMessage())); - } - - for (EnumTypeDefinition t : typesDef.enumTypesAsJavaList()) { - if (!AtlasTypeUtil.isValidTypeName(t.name)) - throw new AtlasException( - String.format(exceptionErrorMessageFormat, opType.toString(), t.name, AtlasTypeUtil.getInvalidTypeNameErrorMessage())); - } - - for (HierarchicalTypeDefinition<TraitType> t : typesDef.traitTypesAsJavaList()) { - if (!AtlasTypeUtil.isValidTraitTypeName(t.typeName)) - throw new AtlasException( - String.format(exceptionErrorMessageFormat, opType.toString(), t.typeName, AtlasTypeUtil.getInvalidTraitTypeNameErrorMessage())); - } - - return typesDef; - } - catch (Exception e) { - LOG.error("Unable to deserialize json={}", typeDefinition, e); - throw new IllegalArgumentException("Unable to deserialize json " + typeDefinition, e); - } - } - - /** - * Return the definition for the given type. - * - * @param typeName name for this type, must be unique - * @return type definition as JSON - */ - @Override - public String getTypeDefinition(String typeName) throws AtlasException { - final IDataType dataType = typeSystem.getDataType(IDataType.class, typeName); - return TypesSerialization.toJson(typeSystem, dataType.getName()); - } - - /** - * Return the list of type names in the type system which match the specified filter. - * - * @return list of type names - * @param filterMap - Map of filter for type names. Valid keys are CATEGORY, SUPERTYPE, NOT_SUPERTYPE - * For example, CATEGORY = TRAIT && SUPERTYPE contains 'X' && SUPERTYPE !contains 'Y' - * If there is no filter, all the types are returned - */ - @Override - public List<String> getTypeNames(Map<TypeCache.TYPE_FILTER, String> filterMap) throws AtlasException { - return typeSystem.getTypeNames(filterMap); - } - - /** - * Creates an entity, instance of the type. - * - * @param entityInstanceDefinition json array of entity definitions - * @return guids - list of guids - */ - @Override - public CreateUpdateEntitiesResult createEntities(String entityInstanceDefinition) throws AtlasException { - entityInstanceDefinition = ParamChecker.notEmpty(entityInstanceDefinition, "Entity instance definition"); - - ITypedReferenceableInstance[] typedInstances = deserializeClassInstances(entityInstanceDefinition); - - return createEntities(typedInstances); - } - - public CreateUpdateEntitiesResult createEntities(ITypedReferenceableInstance[] typedInstances) throws AtlasException { - final CreateUpdateEntitiesResult result = repository.createEntities(typedInstances); - onEntitiesAdded(result.getCreatedEntities()); - return result; - } - - @Override - public ITypedReferenceableInstance[] deserializeClassInstances(String entityInstanceDefinition) throws AtlasException { - return GraphHelper.deserializeClassInstances(typeSystem, entityInstanceDefinition); - } - - @Override - public ITypedReferenceableInstance getTypedReferenceableInstance(Referenceable entityInstance) throws AtlasException { - return GraphHelper.getTypedReferenceableInstance(typeSystem, entityInstance); - } - - /** - * Return the definition for the given guid. - * - * @param guid guid - * @return entity definition as JSON - */ - @Override - public String getEntityDefinitionJson(String guid) throws AtlasException { - guid = ParamChecker.notEmpty(guid, "entity id"); - - final ITypedReferenceableInstance instance = repository.getEntityDefinition(guid); - return InstanceSerialization.toJson(instance, true); - } - - /** - * Return the definition for the given guid. - * - * @param guid guid - * @return entity definition as JSON - */ - @Override - public ITypedReferenceableInstance getEntityDefinition(String guid) throws AtlasException { - guid = ParamChecker.notEmpty(guid, "entity id"); - - final ITypedReferenceableInstance instance = repository.getEntityDefinition(guid); - return instance; - } - - @Override - public ITypedReferenceableInstance getEntityDefinitionReference(String entityType, String attribute, String value) - throws AtlasException { - validateTypeExists(entityType); - validateUniqueAttribute(entityType, attribute); - - return repository.getEntityDefinition(entityType, attribute, value); - } - - @Override - public String getEntityDefinition(String entityType, String attribute, String value) throws AtlasException { - final ITypedReferenceableInstance instance = getEntityDefinitionReference(entityType, attribute, value); - return InstanceSerialization.toJson(instance, true); - } - - /** - * Validate that attribute is unique attribute - * @param entityType the entity type - * @param attributeName the name of the attribute - */ - private void validateUniqueAttribute(String entityType, String attributeName) throws AtlasException { - ClassType type = typeSystem.getDataType(ClassType.class, entityType); - AttributeInfo attribute = type.fieldMapping().fields.get(attributeName); - if(attribute == null) { - throw new IllegalArgumentException( - String.format("%s is not an attribute in %s", attributeName, entityType)); - } - if (!attribute.isUnique) { - throw new IllegalArgumentException( - String.format("%s.%s is not a unique attribute", entityType, attributeName)); - } - } - - /** - * Return the list of entity guids for the given type in the repository. - * - * @param entityType type - * @return list of entity guids for the given type in the repository - */ - @Override - public List<String> getEntityList(String entityType) throws AtlasException { - validateTypeExists(entityType); - - return repository.getEntityList(entityType); - } - - /** - * Updates an entity, instance of the type based on the guid set. - * - * @param entityInstanceDefinition json array of entity definitions - * @return guids - json array of guids - */ - @Override - public CreateUpdateEntitiesResult updateEntities(String entityInstanceDefinition) throws AtlasException { - entityInstanceDefinition = ParamChecker.notEmpty(entityInstanceDefinition, "Entity instance definition"); - ITypedReferenceableInstance[] typedInstances = deserializeClassInstances(entityInstanceDefinition); - - CreateUpdateEntitiesResult result = repository.updateEntities(typedInstances); - onEntitiesAddedUpdated(result.getEntityResult()); - return result; - } - - /** - * Updates an entity, instance of the type based on the guid set. - * - * @param entityInstanceDefinitions - * @return guids - json array of guids - */ - @Override - public CreateUpdateEntitiesResult updateEntities(ITypedReferenceableInstance[] entityInstanceDefinitions) throws AtlasException { - CreateUpdateEntitiesResult result = repository.updateEntities(entityInstanceDefinitions); - onEntitiesAddedUpdated(result.getEntityResult()); - return result; - } - - private void onEntitiesAddedUpdated(EntityResult entityResult) throws AtlasException { - onEntitiesAdded(entityResult.getCreatedEntities()); - onEntitiesUpdated(entityResult.getUpdateEntities()); - //Note: doesn't access deletedEntities from entityResult - onEntitiesDeleted(RequestContext.get().getDeletedEntities()); - } - - @Override - public CreateUpdateEntitiesResult updateEntityAttributeByGuid(String guid, String attributeName, - String value) throws AtlasException { - guid = ParamChecker.notEmpty(guid, "entity id"); - attributeName = ParamChecker.notEmpty(attributeName, "attribute name"); - value = ParamChecker.notEmpty(value, "attribute value"); - - ITypedReferenceableInstance existInstance = validateEntityExists(guid); - ClassType type = typeSystem.getDataType(ClassType.class, existInstance.getTypeName()); - ITypedReferenceableInstance newInstance = type.createInstance(); - - AttributeInfo attributeInfo = type.fieldMapping.fields.get(attributeName); - if (attributeInfo == null) { - throw new AtlasException("Invalid property " + attributeName + " for entity " + existInstance.getTypeName()); - } - - DataTypes.TypeCategory attrTypeCategory = attributeInfo.dataType().getTypeCategory(); - - switch(attrTypeCategory) { - case PRIMITIVE: - newInstance.set(attributeName, value); - break; - case CLASS: - Id id = new Id(value, 0, attributeInfo.dataType().getName()); - newInstance.set(attributeName, id); - break; - default: - throw new AtlasException("Update of " + attrTypeCategory + " is not supported"); - } - - ((ReferenceableInstance)newInstance).replaceWithNewId(new Id(guid, 0, newInstance.getTypeName())); - CreateUpdateEntitiesResult result = repository.updatePartial(newInstance); - onEntitiesAddedUpdated(result.getEntityResult()); - return result; - } - - private ITypedReferenceableInstance validateEntityExists(String guid) - throws EntityNotFoundException, RepositoryException { - final ITypedReferenceableInstance instance = repository.getEntityDefinition(guid); - if (instance == null) { - throw new EntityNotFoundException(String.format("Entity with guid %s not found ", guid)); - } - return instance; - } - - @Override - public CreateUpdateEntitiesResult updateEntityPartialByGuid(String guid, Referenceable newEntity) - throws AtlasException { - guid = ParamChecker.notEmpty(guid, "guid cannot be null"); - newEntity = ParamChecker.notNull(newEntity, "updatedEntity cannot be null"); - ITypedReferenceableInstance existInstance = validateEntityExists(guid); - - ITypedReferenceableInstance newInstance = validateAndConvertToTypedInstance(newEntity, existInstance.getTypeName()); - ((ReferenceableInstance)newInstance).replaceWithNewId(new Id(guid, 0, newInstance.getTypeName())); - - CreateUpdateEntitiesResult result = repository.updatePartial(newInstance); - onEntitiesAddedUpdated(result.getEntityResult()); - return result; - } - - @Override - public ITypedReferenceableInstance validateAndConvertToTypedInstance(IReferenceableInstance updatedEntity, String typeName) - throws AtlasException { - ClassType type = typeSystem.getDataType(ClassType.class, typeName); - ITypedReferenceableInstance newInstance = type.createInstance(updatedEntity.getId()); - - for (String attributeName : updatedEntity.getValuesMap().keySet()) { - AttributeInfo attributeInfo = type.fieldMapping.fields.get(attributeName); - if (attributeInfo == null) { - throw new AtlasException("Invalid property " + attributeName + " for entity " + updatedEntity); - } - - DataTypes.TypeCategory attrTypeCategory = attributeInfo.dataType().getTypeCategory(); - Object value = updatedEntity.get(attributeName); - switch (attrTypeCategory) { - case CLASS: - if (value != null) { - if (value instanceof Referenceable) { - newInstance.set(attributeName, value); - } else { - Id id = new Id((String) value, 0, attributeInfo.dataType().getName()); - newInstance.set(attributeName, id); - } - } - break; - - case ENUM: - case PRIMITIVE: - case ARRAY: - case STRUCT: - case MAP: - newInstance.set(attributeName, value); - break; - - case TRAIT: - //TODO - handle trait updates as well? - default: - throw new AtlasException("Update of " + attrTypeCategory + " is not supported"); - } - } - - return newInstance; - } - - @Override - public CreateUpdateEntitiesResult updateEntityByUniqueAttribute(String typeName, String uniqueAttributeName, - String attrValue, - Referenceable updatedEntity) throws AtlasException { - typeName = ParamChecker.notEmpty(typeName, "typeName"); - uniqueAttributeName = ParamChecker.notEmpty(uniqueAttributeName, "uniqueAttributeName"); - attrValue = ParamChecker.notNull(attrValue, "unique attribute value"); - updatedEntity = ParamChecker.notNull(updatedEntity, "updatedEntity"); - - ITypedReferenceableInstance oldInstance = getEntityDefinitionReference(typeName, uniqueAttributeName, attrValue); - - final ITypedReferenceableInstance newInstance = validateAndConvertToTypedInstance(updatedEntity, typeName); - ((ReferenceableInstance)newInstance).replaceWithNewId(oldInstance.getId()); - - CreateUpdateEntitiesResult result = repository.updatePartial(newInstance); - onEntitiesAddedUpdated(result.getEntityResult()); - return result; - } - - private void validateTypeExists(String entityType) throws AtlasException { - entityType = ParamChecker.notEmpty(entityType, "entity type"); - - IDataType type = typeSystem.getDataType(IDataType.class, entityType); - if (type.getTypeCategory() != DataTypes.TypeCategory.CLASS) { - throw new IllegalArgumentException("type " + entityType + " not a CLASS type"); - } - } - - /** - * Gets the list of trait names for a given entity represented by a guid. - * - * @param guid globally unique identifier for the entity - * @return a list of trait names for the given entity guid - * @throws AtlasException - */ - @Override - public List<String> getTraitNames(String guid) throws AtlasException { - guid = ParamChecker.notEmpty(guid, "entity id"); - return repository.getTraitNames(guid); - } - - /** - * Adds a new trait to the list of existing entities represented by their respective guids - * @param entityGuids list of guids of entities - * @param traitInstance trait instance json that needs to be added to entities - * @throws AtlasException - */ - @Override - public void addTrait(List<String> entityGuids, ITypedStruct traitInstance) throws AtlasException { - Preconditions.checkNotNull(entityGuids, "entityGuids list cannot be null"); - Preconditions.checkNotNull(traitInstance, "Trait instance cannot be null"); - - final String traitName = traitInstance.getTypeName(); - - // ensure trait type is already registered with the TypeSystem - if (!typeSystem.isRegistered(traitName)) { - String msg = String.format("trait=%s should be defined in type system before it can be added", traitName); - LOG.error(msg); - throw new TypeNotFoundException(msg); - } - - //ensure trait is not already registered with any of the given entities - for (String entityGuid : entityGuids) { - Preconditions.checkArgument(!getTraitNames(entityGuid).contains(traitName), - "trait=%s is already defined for entity=%s", traitName, entityGuid); - } - - repository.addTrait(entityGuids, traitInstance); - - for (String entityGuid : entityGuids) { - onTraitAddedToEntity(repository.getEntityDefinition(entityGuid), traitInstance); - } - } - - /** - * Adds a new trait to an existing entity represented by a guid. - * - * @param guid globally unique identifier for the entity - * @param traitInstanceDefinition trait instance json that needs to be added to entity - * @throws AtlasException - */ - @Override - public void addTrait(String guid, String traitInstanceDefinition) throws AtlasException { - guid = ParamChecker.notEmpty(guid, "entity id"); - traitInstanceDefinition = ParamChecker.notEmpty(traitInstanceDefinition, "trait instance definition"); - - ITypedStruct traitInstance = deserializeTraitInstance(traitInstanceDefinition); - addTrait(guid, traitInstance); - } - - public void addTrait(String guid, ITypedStruct traitInstance) throws AtlasException { - final String traitName = traitInstance.getTypeName(); - - // ensure trait type is already registered with the TS - if (!typeSystem.isRegistered(traitName)) { - String msg = String.format("trait=%s should be defined in type system before it can be added", traitName); - LOG.error(msg); - throw new TypeNotFoundException(msg); - } - - // ensure trait is not already defined - Preconditions - .checkArgument(!getTraitNames(guid).contains(traitName), "trait=%s is already defined for entity=%s", - traitName, guid); - - repository.addTrait(guid, traitInstance); - - onTraitAddedToEntity(repository.getEntityDefinition(guid), traitInstance); - } - - private ITypedStruct deserializeTraitInstance(String traitInstanceDefinition) - throws AtlasException { - return createTraitInstance(InstanceSerialization.fromJsonStruct(traitInstanceDefinition, true)); - } - - @Override - public ITypedStruct createTraitInstance(Struct traitInstance) throws AtlasException { - try { - final String entityTypeName = ParamChecker.notEmpty(traitInstance.getTypeName(), "entity type"); - - TraitType traitType = typeSystem.getDataType(TraitType.class, entityTypeName); - return traitType.convert(traitInstance, Multiplicity.REQUIRED); - } catch (TypeNotFoundException e) { - throw e; - } catch (Exception e) { - throw new AtlasException("Error deserializing trait instance", e); - } - } - - @Override - public IStruct getTraitDefinition(String guid, final String traitName) throws AtlasException { - guid = ParamChecker.notEmpty(guid, "entity id"); - - final ITypedReferenceableInstance instance = repository.getEntityDefinition(guid); - return instance.getTrait(traitName); - } - - /** - * Deletes a given trait from an existing entity represented by a guid. - * - * @param guid globally unique identifier for the entity - * @param traitNameToBeDeleted name of the trait - * @throws AtlasException - */ - @Override - public void deleteTrait(String guid, String traitNameToBeDeleted) throws AtlasException { - guid = ParamChecker.notEmpty(guid, "entity id"); - traitNameToBeDeleted = ParamChecker.notEmpty(traitNameToBeDeleted, "trait name"); - - // ensure trait type is already registered with the TS - if (!typeSystem.isRegistered(traitNameToBeDeleted)) { - final String msg = String.format("trait=%s should be defined in type system before it can be deleted", - traitNameToBeDeleted); - LOG.error(msg); - throw new TypeNotFoundException(msg); - } - - repository.deleteTrait(guid, traitNameToBeDeleted); - - onTraitDeletedFromEntity(repository.getEntityDefinition(guid), traitNameToBeDeleted); - } - - private void onTypesAdded(Map<String, IDataType> typesAdded) throws AtlasException { - for (TypesChangeListener listener : typeChangeListeners) { - listener.onAdd(typesAdded.values()); - } - } - - private void onEntitiesAdded(List<String> guids) throws AtlasException { - List<ITypedReferenceableInstance> entities = loadEntities(guids); - for (EntityChangeListener listener : entityChangeListeners) { - listener.onEntitiesAdded(entities, false); - } - } - - private List<ITypedReferenceableInstance> loadEntities(List<String> guids) throws RepositoryException, EntityNotFoundException { - return repository.getEntityDefinitions(guids.toArray(new String[guids.size()])); - } - - private void onTypesUpdated(Map<String, IDataType> typesUpdated) throws AtlasException { - for (TypesChangeListener listener : typeChangeListeners) { - listener.onChange(typesUpdated.values()); - } - } - - private void onEntitiesUpdated(List<String> guids) throws AtlasException { - List<ITypedReferenceableInstance> entities = loadEntities(guids); - for (EntityChangeListener listener : entityChangeListeners) { - listener.onEntitiesUpdated(entities, false); - } - } - - private void onTraitAddedToEntity(ITypedReferenceableInstance entity, IStruct trait) throws AtlasException { - Collection<IStruct> traits = Collections.singletonList(trait); - - for (EntityChangeListener listener : entityChangeListeners) { - listener.onTraitsAdded(entity, traits); - } - } - - private void onTraitDeletedFromEntity(ITypedReferenceableInstance entity, String traitName) throws AtlasException { - Collection<String> traitNames = Collections.singletonList(traitName); - - for (EntityChangeListener listener : entityChangeListeners) { - listener.onTraitsDeleted(entity, traitNames); - } - } - - public void registerListener(EntityChangeListener listener) { - entityChangeListeners.add(listener); - } - - public void unregisterListener(EntityChangeListener listener) { - entityChangeListeners.remove(listener); - } - - @Override - public List<EntityAuditEvent> getAuditEvents(String guid, String startKey, short count) throws AtlasException { - guid = ParamChecker.notEmpty(guid, "entity id"); - startKey = ParamChecker.notEmptyIfNotNull(startKey, "start key"); - ParamChecker.lessThan(count, maxAuditResults, "count"); - - return auditRepository.listEvents(guid, startKey, count); - } - - /* (non-Javadoc) - * @see org.apache.atlas.services.MetadataService#deleteEntities(java.lang.String) - */ - @Override - public EntityResult deleteEntities(List<String> deleteCandidateGuids) throws AtlasException { - ParamChecker.notEmpty(deleteCandidateGuids, "delete candidate guids"); - return deleteGuids(deleteCandidateGuids); - } - - @Override - public EntityResult deleteEntityByUniqueAttribute(String typeName, String uniqueAttributeName, - String attrValue) throws AtlasException { - typeName = ParamChecker.notEmpty(typeName, "delete candidate typeName"); - uniqueAttributeName = ParamChecker.notEmpty(uniqueAttributeName, "delete candidate unique attribute name"); - attrValue = ParamChecker.notEmpty(attrValue, "delete candidate unique attribute value"); - - //Throws EntityNotFoundException if the entity could not be found by its unique attribute - ITypedReferenceableInstance instance = getEntityDefinitionReference(typeName, uniqueAttributeName, attrValue); - final Id instanceId = instance.getId(); - List<String> deleteCandidateGuids = new ArrayList<String>() {{ add(instanceId._getId());}}; - - return deleteGuids(deleteCandidateGuids); - } - - private EntityResult deleteGuids(List<String> deleteCandidateGuids) throws AtlasException { - EntityResult entityResult = repository.deleteEntities(deleteCandidateGuids); - onEntitiesAddedUpdated(entityResult); - return entityResult; - } - - private void onEntitiesDeleted(List<ITypedReferenceableInstance> entities) throws AtlasException { - for (EntityChangeListener listener : entityChangeListeners) { - listener.onEntitiesDeleted(entities, false); - } - } - - /** - * Create or restore the {@link TypeSystem} cache on server activation. - * - * When an instance is passive, types could be created outside of its cache by the active instance. - * Hence, when this instance becomes active, it needs to restore the cache from the backend store. - * The first time initialization happens, the indices for these types also needs to be created. - * This must happen only from the active instance, as it updates shared backend state. - */ - @Override - public void instanceIsActive() throws AtlasException { - LOG.info("Reacting to active state: restoring type system"); - restoreTypeSystem(); - } - - @Override - public void instanceIsPassive() { - LOG.info("Reacting to passive state: no action right now"); - } - - @Override - public int getHandlerOrder() { - return HandlerOrder.DEFAULT_METADATA_SERVICE.getOrder(); - } - - @Override - public void onChange(ChangedTypeDefs changedTypeDefs) throws AtlasBaseException { - // All we need here is a restore of the type-system - LOG.info("TypeSystem reset invoked by TypeRegistry changes"); - try { - TypesDef typesDef = typeStore.restore(); - typeSystem.reset(); - TypeSystem.TransientTypeSystem transientTypeSystem - = typeSystem.createTransientTypeSystem(typesDef, false); - Map<String, IDataType> typesAdded = transientTypeSystem.getTypesAdded(); - LOG.info("Number of types got from transient type system: {}", typesAdded.size()); - typeSystem.commitTypes(typesAdded); - } catch (AtlasException e) { - LOG.error("Failed to restore type-system after TypeRegistry changes", e); - throw new AtlasBaseException(AtlasErrorCode.INTERNAL_ERROR, e); - } - } -}
http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/repository/src/main/java/org/apache/atlas/util/AtlasRepositoryConfiguration.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/util/AtlasRepositoryConfiguration.java b/repository/src/main/java/org/apache/atlas/util/AtlasRepositoryConfiguration.java index 0370c8e..0e1e5b6 100644 --- a/repository/src/main/java/org/apache/atlas/util/AtlasRepositoryConfiguration.java +++ b/repository/src/main/java/org/apache/atlas/util/AtlasRepositoryConfiguration.java @@ -21,13 +21,9 @@ import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasException; import org.apache.atlas.repository.audit.EntityAuditRepository; import org.apache.atlas.repository.audit.HBaseBasedAuditRepository; -import org.apache.atlas.repository.graph.DeleteHandler; -import org.apache.atlas.repository.graph.SoftDeleteHandler; import org.apache.atlas.repository.graphdb.GraphDatabase; import org.apache.atlas.repository.store.graph.v1.DeleteHandlerV1; import org.apache.atlas.repository.store.graph.v1.SoftDeleteHandlerV1; -import org.apache.atlas.typesystem.types.cache.DefaultTypeCache; -import org.apache.atlas.typesystem.types.cache.TypeCache; import org.apache.commons.configuration.Configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,18 +62,6 @@ public class AtlasRepositoryConfiguration { return ApplicationProperties.get().getBoolean(ENABLE_FULLTEXT_SEARCH_PROPERTY, true); } - @SuppressWarnings("unchecked") - public static Class<? extends TypeCache> getTypeCache() { - // Get the type cache implementation class from Atlas configuration. - try { - Configuration config = ApplicationProperties.get(); - return ApplicationProperties.getClass(config, TYPE_CACHE_IMPLEMENTATION_PROPERTY, - DefaultTypeCache.class.getName(), TypeCache.class); - } catch (AtlasException e) { - LOG.error("Error loading typecache ", e); - return DefaultTypeCache.class; - } - } private static final String AUDIT_REPOSITORY_IMPLEMENTATION_PROPERTY = "atlas.EntityAuditRepository.impl"; @SuppressWarnings("unchecked") @@ -91,20 +75,8 @@ public class AtlasRepositoryConfiguration { } } - private static final String DELETE_HANDLER_IMPLEMENTATION_PROPERTY = "atlas.DeleteHandler.impl"; private static final String DELETE_HANDLER_V1_IMPLEMENTATION_PROPERTY = "atlas.DeleteHandlerV1.impl"; - @SuppressWarnings("unchecked") - public static Class<? extends DeleteHandler> getDeleteHandlerImpl() { - try { - Configuration config = ApplicationProperties.get(); - return ApplicationProperties.getClass(config, - DELETE_HANDLER_IMPLEMENTATION_PROPERTY, SoftDeleteHandler.class.getName(), DeleteHandler.class); - } catch (AtlasException e) { - throw new RuntimeException(e); - } - } - public static Class<? extends DeleteHandlerV1> getDeleteHandlerV1Impl() { try { Configuration config = ApplicationProperties.get(); http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/repository/src/main/java/org/apache/atlas/util/AttributeValueMap.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/util/AttributeValueMap.java b/repository/src/main/java/org/apache/atlas/util/AttributeValueMap.java index 6e22604..080eefb 100644 --- a/repository/src/main/java/org/apache/atlas/util/AttributeValueMap.java +++ b/repository/src/main/java/org/apache/atlas/util/AttributeValueMap.java @@ -23,8 +23,8 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; +import org.apache.atlas.v1.model.instance.Referenceable; import org.apache.atlas.repository.graph.GraphHelper; -import org.apache.atlas.typesystem.IReferenceableInstance; /** * Map of attribute values to a collection of IndexedInstances with that attribute value. @@ -37,7 +37,7 @@ public class AttributeValueMap { //need collection in case they are adding the same entity twice? private Map<Object,Collection<IndexedInstance>> valueMap_ = new HashMap<>(); - public void put(Object value, IReferenceableInstance instance, int index) { + public void put(Object value, Referenceable instance, int index) { IndexedInstance wrapper = new IndexedInstance(instance, index); Collection<IndexedInstance> existingValues = valueMap_.get(value); if(existingValues == null) { http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/repository/src/main/java/org/apache/atlas/util/CompiledQueryCacheKey.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/util/CompiledQueryCacheKey.java b/repository/src/main/java/org/apache/atlas/util/CompiledQueryCacheKey.java deleted file mode 100644 index 56a5a2a..0000000 --- a/repository/src/main/java/org/apache/atlas/util/CompiledQueryCacheKey.java +++ /dev/null @@ -1,87 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.atlas.util; - -import org.apache.atlas.query.QueryParams; - -/** - * Represents a key for an entry in the compiled query cache. - * - */ -public class CompiledQueryCacheKey { - - private final String dslQuery; - private final QueryParams queryParams; - - public CompiledQueryCacheKey(String dslQuery, QueryParams queryParams) { - super(); - this.dslQuery = dslQuery; - this.queryParams = queryParams; - } - - public CompiledQueryCacheKey(String dslQuery) { - super(); - this.dslQuery = dslQuery; - this.queryParams = null; - } - - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + ((dslQuery == null) ? 0 : dslQuery.hashCode()); - result = prime * result + ((queryParams == null) ? 0 : queryParams.hashCode()); - return result; - } - - @Override - public boolean equals(Object obj) { - - if (this == obj) { - return true; - } - - if (obj == null) { - return false; - } - if (!(obj instanceof CompiledQueryCacheKey)) { - return false; - } - - CompiledQueryCacheKey other = (CompiledQueryCacheKey) obj; - if (! equals(dslQuery, other.dslQuery)) { - return false; - } - - if (! equals(queryParams, other.queryParams)) { - return false; - } - - return true; - } - - private static boolean equals(Object o1, Object o2) { - if(o1 == o2) { - return true; - } - if(o1 == null) { - return o2 == null; - } - return o1.equals(o2); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/repository/src/main/java/org/apache/atlas/util/IndexedInstance.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/util/IndexedInstance.java b/repository/src/main/java/org/apache/atlas/util/IndexedInstance.java index 60ec8cc..17ff511 100644 --- a/repository/src/main/java/org/apache/atlas/util/IndexedInstance.java +++ b/repository/src/main/java/org/apache/atlas/util/IndexedInstance.java @@ -17,8 +17,8 @@ */ package org.apache.atlas.util; +import org.apache.atlas.v1.model.instance.Referenceable; import org.apache.atlas.repository.graph.GraphHelper; -import org.apache.atlas.typesystem.IReferenceableInstance; /** * Data structure that stores an IReferenceableInstance and its location within @@ -28,16 +28,16 @@ import org.apache.atlas.typesystem.IReferenceableInstance; */ public class IndexedInstance { - private final IReferenceableInstance instance_; + private final Referenceable instance_; private final int index_; - public IndexedInstance(IReferenceableInstance instance, int index) { + public IndexedInstance(Referenceable instance, int index) { super(); this.instance_ = instance; this.index_ = index; } - public IReferenceableInstance getInstance() { + public Referenceable getInstance() { return instance_; } http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/repository/src/main/java/org/apache/atlas/util/NoopGremlinQuery.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/util/NoopGremlinQuery.java b/repository/src/main/java/org/apache/atlas/util/NoopGremlinQuery.java deleted file mode 100644 index 280570e..0000000 --- a/repository/src/main/java/org/apache/atlas/util/NoopGremlinQuery.java +++ /dev/null @@ -1,39 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.atlas.util; - -import org.apache.atlas.query.GremlinQuery; -import org.apache.atlas.typesystem.types.IDataType; - -/** - * Represents a query that we know will have no results. - * - */ -public class NoopGremlinQuery extends GremlinQuery { - - private final IDataType dataType; - - public NoopGremlinQuery(IDataType dataType) { - super(null, null, null); - this.dataType = dataType; - } - - public IDataType getDataType() { - return dataType; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/repository/src/main/java/org/apache/atlas/util/TypeDefSorter.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/util/TypeDefSorter.java b/repository/src/main/java/org/apache/atlas/util/TypeDefSorter.java deleted file mode 100644 index 733aefd..0000000 --- a/repository/src/main/java/org/apache/atlas/util/TypeDefSorter.java +++ /dev/null @@ -1,83 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.atlas.util; - -import org.apache.atlas.model.typedef.AtlasClassificationDef; -import org.apache.atlas.model.typedef.AtlasEntityDef; -import org.apache.atlas.model.typedef.AtlasStructDef; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - -public class TypeDefSorter { - private static final Logger LOG = LoggerFactory.getLogger(TypeDefSorter.class); - - public static <T extends AtlasStructDef> List<T> sortTypes(List<T> types) { - Map<String, T> typesByName = new HashMap<>(); - for (T type : types) { - typesByName.put(type.getName(), type); - } - List<T> result = new ArrayList<>(types.size()); - Set<T> processed = new HashSet<>(); - for (T type : types) { - addToResult(type, result, processed, typesByName); - } - return result; - } - - private static <T extends AtlasStructDef> void addToResult(T type, List<T> result, - Set<T> processed, - Map<String, T> typesByName) { - if (processed.contains(type)) { - return; - } - processed.add(type); - Set<String> superTypeNames = new HashSet<>(); - if (type.getClass().equals(AtlasClassificationDef.class)) { - try { - AtlasClassificationDef classificationDef = AtlasClassificationDef.class.cast(type); - superTypeNames.addAll(classificationDef.getSuperTypes()); - } catch (ClassCastException ex) { - LOG.warn("Casting to ClassificationDef failed"); - } - } - if (type.getClass().equals(AtlasEntityDef.class)) { - try { - AtlasEntityDef entityDef = AtlasEntityDef.class.cast(type); - superTypeNames.addAll(entityDef.getSuperTypes()); - } catch (ClassCastException ex) { - LOG.warn("Casting to AtlasEntityDef failed"); - } - } - - for (String superTypeName : superTypeNames) { - // Recursively add any supertypes first to the result. - T superType = typesByName.get(superTypeName); - if (superType != null) { - addToResult(superType, result, processed, typesByName); - } - } - result.add(type); - } -} http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/repository/src/main/scala/org/apache/atlas/query/ClosureQuery.scala ---------------------------------------------------------------------- diff --git a/repository/src/main/scala/org/apache/atlas/query/ClosureQuery.scala b/repository/src/main/scala/org/apache/atlas/query/ClosureQuery.scala deleted file mode 100644 index 1190114..0000000 --- a/repository/src/main/scala/org/apache/atlas/query/ClosureQuery.scala +++ /dev/null @@ -1,327 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.atlas.query - -import java.util - -import org.apache.atlas.repository.graphdb.AtlasGraph -import org.apache.atlas.query.Expressions._ -import org.apache.atlas.typesystem.ITypedStruct -import org.apache.atlas.typesystem.json.{InstanceSerialization, Serialization} -import org.apache.atlas.typesystem.persistence.StructInstance -import org.apache.atlas.typesystem.types.DataTypes.{MapType, PrimitiveType} -import org.apache.atlas.typesystem.types.{DataTypes, StructType, TypeSystem} - -/** - * Represents a Query to compute the closure based on a relationship between entities of a particular type. - * For e.g. Database Tables are related to each other to capture the '''Lineage''' of data in a Table based - * on other Tables. - * - * A Closure Query is specified by the following information: - * - The Type whose instances are in a closure relationship. For e.g. 'Table' - * - The Closure relation. This is specified as an ''Attribute path''. For e.g. if we have the following model: - * {{{ - * class Table { - * name : String, - * ... - * } - * - * class LoadTableProcess { - * name : String, - * inputTables : List[Table], - * outputTable : Table, - * ... - * } - * }}} - * ''LoadTable'' instance captures the relationship between the data in an output Table and a set of input Tables. - * In order to compute the '''Lineage''' of a Table, the ''Attribute path'' that relates 2 Tables is - * '''[(LoadTableProcess,outputTable), inputTables]'''. This list is saying that for any Table I want to connect to other - * tables via the LoadProcess.outputTable attribute, and then via the inputTables attribute. So each entry in the - * Attribute Path represents an attribute in an object. For reverse relations the Type and attribute must be specified, - * as in 'LoadTableProcess,outputTable)', whereas for forward relations the attribute name is sufficient. - * - The depth of the traversal. Certain times you are not interested in the complete closure, but to only - * discover related instances up to a certain depth. Specify the depth as number of hops, or you can ask for the - * complete closure. - * - You can ask for certain attributes to be returned. For e.g. you may only want the Table name, owner and - * creationDate. By default only the Ids of the related instances is returned. - * - For pair of related instances, you optionally ask for the Path of the relation to be returned. This is - * returned as a list of ''Id''s. - * - * Given these 5 things the ClosureQuery can be executed, it returns a GremlinQueryResult of the Closure Query. - */ -trait ClosureQuery { - - val SRC_PREFIX = TypeUtils.GraphResultStruct.SRC_PREFIX - val DEST_PREFIX = TypeUtils.GraphResultStruct.DEST_PREFIX - - sealed trait PathAttribute { - - def toExpr : Expression = this match { - case r : Relation => fieldId(r.attributeName) - case rr : ReverseRelation => fieldId(s"${rr.typeName}->${rr.attributeName}") - } - - def toFieldName : String = this match { - case r : Relation => r.attributeName - case rr : ReverseRelation => rr.typeName - } - } - case class ReverseRelation(typeName : String, attributeName : String) extends PathAttribute - case class Relation(attributeName : String) extends PathAttribute - - /** - * Type on whose instances the closure needs to be computed - * @return - */ - def closureType : String - - /** - * specify how instances are related. - */ - def closureRelation : List[PathAttribute] - - /** - * The maximum hops between related instances. A [[None]] implies there is maximum. - * @return - */ - def depth : Option[Int] - - /** - * The attributes to return for the instances. These will be prefixed by 'src_' and 'dest_' in the - * output rows. - * @return - */ - def selectAttributes : Option[List[String]] - - /** - * specify if the Path should be returned. - * @return - */ - def withPath : Boolean - - def persistenceStrategy: GraphPersistenceStrategies - def g: AtlasGraph[_,_] - - def pathExpr : Expressions.Expression = { - closureRelation.tail.foldLeft(closureRelation.head.toExpr)((b,a) => b.field(a.toFieldName)) - } - - def selectExpr(alias : String) : List[Expression] = { - selectAttributes.map { _.map { a => - fieldId(alias).field(a).as(s"${alias}_$a") - } - }.getOrElse(List(fieldId(alias))) - } - - /** - * hook to allow a filter to be added for the closureType - * @param expr - * @return - */ - def srcCondition(expr : Expression) : Expression = expr - - def expr : Expressions.Expression = { - val e = srcCondition(Expressions._class(closureType)).as(SRC_PREFIX).loop(pathExpr).as(DEST_PREFIX). - select((selectExpr(SRC_PREFIX) ++ selectExpr(DEST_PREFIX)):_*) - if (withPath) e.path else e - } - - def evaluate(): GremlinQueryResult = { - var e = expr - QueryProcessor.evaluate(e, g, persistenceStrategy) - } - - def graph(res: GremlinQueryResult) : GraphResult = { - - if (!withPath) { - throw new ExpressionException(expr, "Graph requested for non Path Query") - } - - import scala.collection.JavaConverters._ - - val graphResType = TypeUtils.GraphResultStruct.createType(res.resultDataType.asInstanceOf[StructType]) - val vertexPayloadType = { - val mT = graphResType.fieldMapping.fields.get(TypeUtils.GraphResultStruct.verticesAttrName). - dataType().asInstanceOf[MapType] - mT.getValueType.asInstanceOf[StructType] - } - - def id(idObj : StructInstance) : String = idObj.getString(TypeSystem.ID_STRUCT_ID_ATTRNAME) - - def vertexStruct(idObj : StructInstance, resRow : ITypedStruct, attrPrefix : String) : StructInstance = { - val vP = vertexPayloadType.createInstance() - vP.set(TypeUtils.GraphResultStruct.vertexIdAttrName, idObj) - vertexPayloadType.fieldMapping.fields.asScala.keys. - filter(_ != TypeUtils.GraphResultStruct.vertexIdAttrName).foreach{a => - vP.set(a, resRow.get(s"${attrPrefix}$a")) - } - vP.asInstanceOf[StructInstance] - } - - val instance = graphResType.createInstance() - val vertices = new util.HashMap[String, AnyRef]() - val edges = new util.HashMap[String,java.util.List[String]]() - - /** - * foreach resultRow - * for each Path entry - * add an entry in the edges Map - * add an entry for the Src vertex to the vertex Map - * add an entry for the Dest vertex to the vertex Map - */ - res.rows.asScala.map(_.asInstanceOf[StructInstance]).foreach { r => - val path = r.get(TypeUtils.ResultWithPathStruct.pathAttrName).asInstanceOf[java.util.List[_]].asScala - val srcVertex = path.head.asInstanceOf[StructInstance] - - var currVertex = srcVertex - path.tail.foreach { n => - val nextVertex = n.asInstanceOf[StructInstance] - val iList = if (!edges.containsKey(id(currVertex))) { - val l = new util.ArrayList[String]() - edges.put(id(currVertex), l) - l - } else { - edges.get(id(currVertex)) - } - if ( !iList.contains(id(nextVertex))) { - iList.add(id(nextVertex)) - } - currVertex = nextVertex - } - val vertex = r.get(TypeUtils.ResultWithPathStruct.resultAttrName) - vertices.put(id(srcVertex), vertexStruct(srcVertex, - r.get(TypeUtils.ResultWithPathStruct.resultAttrName).asInstanceOf[ITypedStruct], - s"${SRC_PREFIX}_")) - vertices.put(id(currVertex), vertexStruct(currVertex, - r.get(TypeUtils.ResultWithPathStruct.resultAttrName).asInstanceOf[ITypedStruct], - s"${DEST_PREFIX}_")) - } - - instance.set(TypeUtils.GraphResultStruct.verticesAttrName, vertices) - instance.set(TypeUtils.GraphResultStruct.edgesAttrName, edges) - GraphResult(res.query, instance) - } -} - -/** - * Closure for a single instance. Instance is specified by an ''attributeToSelectInstance'' and the value - * for the attribute. - * - * @tparam T - */ -trait SingleInstanceClosureQuery[T] extends ClosureQuery { - - def attributeToSelectInstance : String - - def attributeTyp : PrimitiveType[T] - def instanceValue : T - - override def srcCondition(expr : Expression) : Expression = { - expr.where( - Expressions.fieldId(attributeToSelectInstance).`=`(Expressions.literal(attributeTyp, instanceValue)) - ) - } -} - -import scala.language.existentials; -/** - * A ClosureQuery to compute '''Lineage''' for Hive tables. Assumes the Lineage relation is captured in a ''CTAS'' - * type, and the table relations are captured as attributes from a CTAS instance to Table instances. - * - * @param tableTypeName The name of the Table Type. - * @param ctasTypeName The name of the Create Table As Select(CTAS) Type. - * @param ctasInputTableAttribute The attribute in CTAS Type that associates it to the ''Input'' tables. - * @param ctasOutputTableAttribute The attribute in CTAS Type that associates it to the ''Output'' tables. - * @param depth depth as needed by the closure Query. - * @param selectAttributes as needed by the closure Query. - * @param withPath as needed by the closure Query. - * @param persistenceStrategy as needed to evaluate the Closure Query. - * @param g as needed to evaluate the Closure Query. - */ -case class InputLineageClosureQuery(tableTypeName : String, - attributeToSelectInstance : String, - tableName : String, - ctasTypeName : String, - ctasInputTableAttribute : String, - ctasOutputTableAttribute : String, - depth : Option[Int], - selectAttributes : Option[List[String]], - withPath : Boolean, - persistenceStrategy: GraphPersistenceStrategies, - g: AtlasGraph[_,_] - ) extends SingleInstanceClosureQuery[String] { - - val closureType : String = tableTypeName - - val attributeTyp = DataTypes.STRING_TYPE - - val instanceValue = tableName - - lazy val closureRelation = List( - ReverseRelation(ctasTypeName, ctasOutputTableAttribute), - Relation(ctasInputTableAttribute) - ) -} - -/** - * A ClosureQuery to compute where a table is used based on the '''Lineage''' for Hive tables. - * Assumes the Lineage relation is captured in a ''CTAS'' - * type, and the table relations are captured as attributes from a CTAS instance to Table instances. - * - * @param tableTypeName The name of the Table Type. - * @param ctasTypeName The name of the Create Table As Select(CTAS) Type. - * @param ctasInputTableAttribute The attribute in CTAS Type that associates it to the ''Input'' tables. - * @param ctasOutputTableAttribute The attribute in CTAS Type that associates it to the ''Output'' tables. - * @param depth depth as needed by the closure Query. - * @param selectAttributes as needed by the closure Query. - * @param withPath as needed by the closure Query. - * @param persistenceStrategy as needed to evaluate the Closure Query. - * @param g as needed to evaluate the Closure Query. - */ -case class OutputLineageClosureQuery(tableTypeName : String, - attributeToSelectInstance : String, - tableName : String, - ctasTypeName : String, - ctasInputTableAttribute : String, - ctasOutputTableAttribute : String, - depth : Option[Int], - selectAttributes : Option[List[String]], - withPath : Boolean, - persistenceStrategy: GraphPersistenceStrategies, - g: AtlasGraph[_,_] - ) extends SingleInstanceClosureQuery[String] { - - val closureType : String = tableTypeName - - val attributeTyp = DataTypes.STRING_TYPE - - val instanceValue = tableName - - lazy val closureRelation = List( - ReverseRelation(ctasTypeName, ctasInputTableAttribute), - Relation(ctasOutputTableAttribute) - ) -} - -case class GraphResult(query: String, result : ITypedStruct) { - - def toTypedJson = Serialization.toJson(result) - - def toInstanceJson = InstanceSerialization.toJson(result) -} \ No newline at end of file