Repository: incubator-atlas Updated Branches: refs/heads/master 9dc4cfbcb -> 0b85d5a0c
ATLAS-1240 Adding Change listeners to react on changes in TypesDef (apoorvnaik via sumasai) Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/0b85d5a0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/0b85d5a0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/0b85d5a0 Branch: refs/heads/master Commit: 0b85d5a0cac7b0d9ca9e3548538051f8b332bc25 Parents: 9dc4cfb Author: Suma Shivaprasad <[email protected]> Authored: Mon Oct 31 15:20:04 2016 -0700 Committer: Suma Shivaprasad <[email protected]> Committed: Mon Oct 31 15:20:04 2016 -0700 ---------------------------------------------------------------------- intg/pom.xml | 5 + .../java/org/apache/atlas/AtlasErrorCode.java | 5 +- .../atlas/exception/AtlasBaseException.java | 14 +- .../apache/atlas/listener/ChangedTypeDefs.java | 70 ++++++ .../atlas/listener/TypeDefChangeListener.java | 24 ++ .../org/apache/atlas/type/AtlasTypeUtil.java | 18 +- .../atlas/model/typedef/TestAtlasStructDef.java | 5 +- release-log.txt | 1 + .../apache/atlas/RepositoryMetadataModule.java | 7 + .../graph/GraphBackedSearchIndexer.java | 233 +++++++++++++++++-- .../store/graph/AtlasTypeDefGraphStore.java | 150 +++++++++++- .../graph/v1/AtlasTypeDefGraphStoreV1.java | 6 +- .../atlas/services/DefaultMetadataService.java | 56 +++-- .../org/apache/atlas/BaseRepositoryTest.java | 7 +- .../test/java/org/apache/atlas/TestUtils.java | 43 ++-- .../GraphBackedDiscoveryServiceTest.java | 37 +-- ...hBackedMetadataRepositoryDeleteTestBase.java | 55 ++--- .../GraphBackedMetadataRepositoryTest.java | 45 ++-- .../graph/GraphBackedSearchIndexerMockTest.java | 18 +- .../atlas/repository/graph/GraphHelperTest.java | 34 +-- .../graph/GraphRepoMapperScaleTest.java | 17 +- .../service/DefaultMetadataServiceTest.java | 72 +++--- .../apache/atlas/query/QueryTestsUtils.scala | 3 +- typesystem/pom.xml | 4 + .../atlas/typesystem/types/TypeSystem.java | 4 +- .../org/apache/atlas/web/rest/TypesREST.java | 1 - .../service/ActiveInstanceElectorModule.java | 2 + 27 files changed, 732 insertions(+), 204 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0b85d5a0/intg/pom.xml ---------------------------------------------------------------------- diff --git a/intg/pom.xml b/intg/pom.xml index 3ac19cf..ad16635 100644 --- a/intg/pom.xml +++ b/intg/pom.xml @@ -55,6 +55,11 @@ <version>${codehaus.jackson.version}</version> </dependency> + <dependency> + <groupId>javax.inject</groupId> + <artifactId>javax.inject</artifactId> + <version>${javax-inject.version}</version> + </dependency> <dependency> <groupId>org.testng</groupId> http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0b85d5a0/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java ---------------------------------------------------------------------- diff --git a/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java b/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java index caffb6a..709fcbc 100644 --- a/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java +++ b/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java @@ -51,7 +51,10 @@ public enum AtlasErrorCode { TYPE_HAS_REFERENCES(409, "ATLAS4092E", "Given type {0} has references"), TYPE_MATCH_FAILED(409, "ATLAS4093E", "Given type {0} doesn't match {1}"), - INTERNAL_ERROR(500, "ATLAS5001E", "Internal server error {0}"); + INTERNAL_ERROR(500, "ATLAS5001E", "Internal server error {0}"), + INDEX_CREATION_FAILED(500, "ATLAS5002E", "Index creation failed for {0}"), + INDEX_ROLLBACK_FAILED(500, "ATLAS5003E", "Index rollback failed for {0}") + ; private String errorCode; private String errorMessage; http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0b85d5a0/intg/src/main/java/org/apache/atlas/exception/AtlasBaseException.java ---------------------------------------------------------------------- diff --git a/intg/src/main/java/org/apache/atlas/exception/AtlasBaseException.java b/intg/src/main/java/org/apache/atlas/exception/AtlasBaseException.java index d26ec0e..b88d4e8 100644 --- a/intg/src/main/java/org/apache/atlas/exception/AtlasBaseException.java +++ b/intg/src/main/java/org/apache/atlas/exception/AtlasBaseException.java @@ -42,6 +42,11 @@ public class AtlasBaseException extends Exception { this.atlasErrorCode = AtlasErrorCode.INTERNAL_ERROR; } + public AtlasBaseException(AtlasErrorCode errorCode, Throwable cause, String... params) { + super(errorCode.getFormattedErrorMessage(params), cause); + this.atlasErrorCode = errorCode; + } + public AtlasBaseException(String message, Throwable cause) { super(message, cause); this.atlasErrorCode = AtlasErrorCode.INTERNAL_ERROR; @@ -52,7 +57,14 @@ public class AtlasBaseException extends Exception { this.atlasErrorCode = AtlasErrorCode.INTERNAL_ERROR; } - public AtlasBaseException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { + public AtlasBaseException(AtlasErrorCode errorCode, Throwable cause, boolean enableSuppression, + boolean writableStackTrace, String ... params) { + super(errorCode.getFormattedErrorMessage(params), cause, enableSuppression, writableStackTrace); + this.atlasErrorCode = AtlasErrorCode.INTERNAL_ERROR; + } + + public AtlasBaseException(String message, Throwable cause, boolean enableSuppression, + boolean writableStackTrace) { super(message, cause, enableSuppression, writableStackTrace); this.atlasErrorCode = AtlasErrorCode.INTERNAL_ERROR; } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0b85d5a0/intg/src/main/java/org/apache/atlas/listener/ChangedTypeDefs.java ---------------------------------------------------------------------- diff --git a/intg/src/main/java/org/apache/atlas/listener/ChangedTypeDefs.java b/intg/src/main/java/org/apache/atlas/listener/ChangedTypeDefs.java new file mode 100644 index 0000000..0636677 --- /dev/null +++ b/intg/src/main/java/org/apache/atlas/listener/ChangedTypeDefs.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.listener; + +import org.apache.atlas.model.typedef.AtlasBaseTypeDef; + +import java.util.ArrayList; +import java.util.List; + +public class ChangedTypeDefs { + private List<? extends AtlasBaseTypeDef> createTypeDefs; + private List<? extends AtlasBaseTypeDef> updatedTypeDefs; + private List<? extends AtlasBaseTypeDef> deletedTypeDefs; + + public ChangedTypeDefs(List<? extends AtlasBaseTypeDef> createTypeDefs, + List<? extends AtlasBaseTypeDef> updatedTypeDefs, + List<? extends AtlasBaseTypeDef> deletedTypeDefs) { + this.createTypeDefs = createTypeDefs; + this.updatedTypeDefs = updatedTypeDefs; + this.deletedTypeDefs = deletedTypeDefs; + } + + public ChangedTypeDefs() { + createTypeDefs = new ArrayList<>(); + updatedTypeDefs = new ArrayList<>(); + deletedTypeDefs = new ArrayList<>(); + } + + public List<? extends AtlasBaseTypeDef> getCreateTypeDefs() { + return createTypeDefs; + } + + public ChangedTypeDefs setCreateTypeDefs(List<? extends AtlasBaseTypeDef> createTypeDefs) { + this.createTypeDefs = createTypeDefs; + return this; + } + + public List<? extends AtlasBaseTypeDef> getUpdatedTypeDefs() { + return updatedTypeDefs; + } + + public ChangedTypeDefs setUpdatedTypeDefs(List<? extends AtlasBaseTypeDef> updatedTypeDefs) { + this.updatedTypeDefs = updatedTypeDefs; + return this; + } + + public List<? extends AtlasBaseTypeDef> getDeletedTypeDefs() { + return deletedTypeDefs; + } + + public ChangedTypeDefs setDeletedTypeDefs(List<? extends AtlasBaseTypeDef> deletedTypeDefs) { + this.deletedTypeDefs = deletedTypeDefs; + return this; + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0b85d5a0/intg/src/main/java/org/apache/atlas/listener/TypeDefChangeListener.java ---------------------------------------------------------------------- diff --git a/intg/src/main/java/org/apache/atlas/listener/TypeDefChangeListener.java b/intg/src/main/java/org/apache/atlas/listener/TypeDefChangeListener.java new file mode 100644 index 0000000..e8ac8f4 --- /dev/null +++ b/intg/src/main/java/org/apache/atlas/listener/TypeDefChangeListener.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.listener; + +import org.apache.atlas.exception.AtlasBaseException; + +public interface TypeDefChangeListener { + void onChange(ChangedTypeDefs changedTypeDefs) throws AtlasBaseException; +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0b85d5a0/intg/src/main/java/org/apache/atlas/type/AtlasTypeUtil.java ---------------------------------------------------------------------- diff --git a/intg/src/main/java/org/apache/atlas/type/AtlasTypeUtil.java b/intg/src/main/java/org/apache/atlas/type/AtlasTypeUtil.java index 160f714..5c4da9a 100644 --- a/intg/src/main/java/org/apache/atlas/type/AtlasTypeUtil.java +++ b/intg/src/main/java/org/apache/atlas/type/AtlasTypeUtil.java @@ -18,17 +18,19 @@ package org.apache.atlas.type; import org.apache.atlas.model.typedef.AtlasBaseTypeDef; -import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_ARRAY_PREFIX; -import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_ARRAY_SUFFIX; -import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_MAP_PREFIX; -import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_MAP_SUFFIX; -import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_MAP_KEY_VAL_SEP; import org.apache.commons.lang.StringUtils; +import java.util.Collections; import java.util.HashSet; import java.util.Map; import java.util.Set; +import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_ARRAY_PREFIX; +import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_ARRAY_SUFFIX; +import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_MAP_KEY_VAL_SEP; +import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_MAP_PREFIX; +import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_MAP_SUFFIX; + /** * Utility methods for AtlasType/AtlasTypeDef. */ @@ -36,9 +38,7 @@ public class AtlasTypeUtil { private static final Set<String> ATLAS_BUILTIN_TYPENAMES = new HashSet<String>(); static { - for (String typeName : AtlasBaseTypeDef.ATLAS_BUILTIN_TYPES) { - ATLAS_BUILTIN_TYPENAMES.add(typeName); - } + Collections.addAll(ATLAS_BUILTIN_TYPENAMES, AtlasBaseTypeDef.ATLAS_BUILTIN_TYPES); } public static Set<String> getReferencedTypeNames(String typeName) { @@ -63,6 +63,7 @@ public class AtlasTypeUtil { && StringUtils.endsWith(typeName, ATLAS_TYPE_MAP_SUFFIX); } + public static String getStringValue(Map map, Object key) { Object ret = map != null ? map.get(key) : null; @@ -90,6 +91,5 @@ public class AtlasTypeUtil { referencedTypeNames.add(typeName); } } - } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0b85d5a0/intg/src/test/java/org/apache/atlas/model/typedef/TestAtlasStructDef.java ---------------------------------------------------------------------- diff --git a/intg/src/test/java/org/apache/atlas/model/typedef/TestAtlasStructDef.java b/intg/src/test/java/org/apache/atlas/model/typedef/TestAtlasStructDef.java index b87b33e..8d2bfe2 100644 --- a/intg/src/test/java/org/apache/atlas/model/typedef/TestAtlasStructDef.java +++ b/intg/src/test/java/org/apache/atlas/model/typedef/TestAtlasStructDef.java @@ -17,17 +17,16 @@ */ package org.apache.atlas.model.typedef; -import java.util.List; - import org.apache.atlas.model.ModelTestUtil; import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef; import org.apache.atlas.type.AtlasType; import org.testng.annotations.Test; +import java.util.List; import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertTrue; import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; public class TestAtlasStructDef { http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0b85d5a0/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index f6eb0d4..df88769 100644 --- a/release-log.txt +++ b/release-log.txt @@ -9,6 +9,7 @@ ATLAS-1060 Add composite indexes for exact match performance improvements for al ATLAS-1127 Modify creation and modification timestamps to Date instead of Long(sumasai) ALL CHANGES: +ATLAS-1240 Adding Change listeners to react on changes in TypesDef (apoorvnaik via sumasai) ATLAS-1239 when stopping Atlas on the command line it should explicitly say when it has stopped (ayubkhan via sumasai) ATLAS-1253 Extract error codes into AtlasErrorCode Enum (apoorvnaik via sumasai) ATLAS-1195 Clean up DSL Translation (jnhagelb via dkantor) http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0b85d5a0/repository/src/main/java/org/apache/atlas/RepositoryMetadataModule.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/RepositoryMetadataModule.java b/repository/src/main/java/org/apache/atlas/RepositoryMetadataModule.java index bbf75ae..129591a 100755 --- a/repository/src/main/java/org/apache/atlas/RepositoryMetadataModule.java +++ b/repository/src/main/java/org/apache/atlas/RepositoryMetadataModule.java @@ -29,6 +29,7 @@ import org.apache.atlas.discovery.DiscoveryService; import org.apache.atlas.discovery.LineageService; import org.apache.atlas.discovery.graph.GraphBackedDiscoveryService; import org.apache.atlas.listener.EntityChangeListener; +import org.apache.atlas.listener.TypeDefChangeListener; import org.apache.atlas.listener.TypesChangeListener; import org.apache.atlas.repository.MetadataRepository; import org.apache.atlas.repository.audit.EntityAuditListener; @@ -76,6 +77,12 @@ public class RepositoryMetadataModule extends com.google.inject.AbstractModule { Multibinder.newSetBinder(binder(), TypesChangeListener.class); typesChangeListenerBinder.addBinding().to(GraphBackedSearchIndexer.class).asEagerSingleton(); + // New typesdef/instance change listener should also be bound to the corresponding implementation + Multibinder<TypeDefChangeListener> typeDefChangeListenerMultibinder = + Multibinder.newSetBinder(binder(), TypeDefChangeListener.class); + typeDefChangeListenerMultibinder.addBinding().to(DefaultMetadataService.class); + typeDefChangeListenerMultibinder.addBinding().to(GraphBackedSearchIndexer.class).asEagerSingleton(); + // bind the MetadataService interface to an implementation bind(MetadataService.class).to(DefaultMetadataService.class).asEagerSingleton(); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0b85d5a0/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java index 8038815..3aaca9c 100755 --- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java +++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java @@ -18,21 +18,22 @@ package org.apache.atlas.repository.graph; -import java.math.BigDecimal; -import java.math.BigInteger; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Map; - -import javax.inject.Inject; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import org.apache.atlas.ApplicationProperties; +import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.AtlasException; import org.apache.atlas.discovery.SearchIndexer; +import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.ha.HAConfiguration; import org.apache.atlas.listener.ActiveStateChangeHandler; +import org.apache.atlas.listener.ChangedTypeDefs; +import org.apache.atlas.listener.TypeDefChangeListener; +import org.apache.atlas.model.typedef.AtlasBaseTypeDef; +import org.apache.atlas.model.typedef.AtlasEnumDef; +import org.apache.atlas.model.typedef.AtlasStructDef; +import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef; import org.apache.atlas.repository.Constants; import org.apache.atlas.repository.IndexCreationException; import org.apache.atlas.repository.IndexException; @@ -42,6 +43,13 @@ import org.apache.atlas.repository.graphdb.AtlasGraph; import org.apache.atlas.repository.graphdb.AtlasGraphIndex; import org.apache.atlas.repository.graphdb.AtlasGraphManagement; import org.apache.atlas.repository.graphdb.AtlasPropertyKey; +import org.apache.atlas.type.AtlasClassificationType; +import org.apache.atlas.type.AtlasEntityType; +import org.apache.atlas.type.AtlasEnumType; +import org.apache.atlas.type.AtlasStructType; +import org.apache.atlas.type.AtlasType; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.apache.atlas.type.AtlasTypeUtil; import org.apache.atlas.typesystem.types.AttributeInfo; import org.apache.atlas.typesystem.types.ClassType; import org.apache.atlas.typesystem.types.DataTypes; @@ -49,17 +57,39 @@ import org.apache.atlas.typesystem.types.IDataType; import org.apache.atlas.typesystem.types.Multiplicity; import org.apache.atlas.typesystem.types.StructType; import org.apache.atlas.typesystem.types.TraitType; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.configuration.Configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.annotations.VisibleForTesting; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import javax.inject.Inject; + +import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_BIGDECIMAL; +import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_BIGINTEGER; +import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_BOOLEAN; +import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_BYTE; +import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_DATE; +import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_DOUBLE; +import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_FLOAT; +import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_INT; +import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_LONG; +import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_SHORT; +import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_STRING; /** * Adds index for properties of a given type when its added before any instances are added. */ -public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChangeHandler { +public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChangeHandler, + TypeDefChangeListener { private static final Logger LOG = LoggerFactory.getLogger(GraphBackedSearchIndexer.class); @@ -70,19 +100,23 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang add(BigInteger.class); } }; - + + // Added for type lookup when indexing the new typedefs + private final AtlasTypeRegistry typeRegistry; + //allows injection of a dummy graph for testing private IAtlasGraphProvider provider; @Inject - public GraphBackedSearchIndexer() throws RepositoryException, AtlasException { - this(new AtlasGraphProvider(), ApplicationProperties.get()); + public GraphBackedSearchIndexer(AtlasTypeRegistry typeRegistry) throws AtlasException { + this(new AtlasGraphProvider(), ApplicationProperties.get(), typeRegistry); } @VisibleForTesting - GraphBackedSearchIndexer( IAtlasGraphProvider provider, Configuration configuration) + GraphBackedSearchIndexer( IAtlasGraphProvider provider, Configuration configuration, AtlasTypeRegistry typeRegistry) throws IndexException, RepositoryException { this.provider = provider; + this.typeRegistry = typeRegistry; if (!HAConfiguration.isHAEnabled(configuration)) { initialize(provider.get()); } @@ -211,6 +245,117 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang onAdd(dataTypes); } + private void addIndexForType(AtlasGraphManagement management, AtlasBaseTypeDef typeDef) { + if (typeDef instanceof AtlasEnumDef) { + // Only handle complex types like Struct, Classification and Entity + return; + } + if (typeDef instanceof AtlasStructDef) { + AtlasStructDef structDef = (AtlasStructDef) typeDef; + List<AtlasAttributeDef> attributeDefs = structDef.getAttributeDefs(); + if (CollectionUtils.isNotEmpty(attributeDefs)) { + for (AtlasAttributeDef attributeDef : attributeDefs) { + createIndexForAttribute(management, typeDef.getName(), attributeDef); + } + } + } else if (!AtlasTypeUtil.isBuiltInType(typeDef.getName())){ + throw new IllegalArgumentException("bad data type" + typeDef.getName()); + } + } + + private void createIndexForAttribute(AtlasGraphManagement management, String typeName, + AtlasAttributeDef attributeDef) { + final String propertyName = GraphHelper.encodePropertyKey(typeName + "." + attributeDef.getName()); + AtlasCardinality cardinality = toAtlasCardinality(attributeDef.getCardinality()); + boolean isUnique = attributeDef.isUnique(); + boolean isIndexable = attributeDef.isIndexable(); + String attribTypeName = attributeDef.getTypeName(); + boolean isBuiltInType = AtlasTypeUtil.isBuiltInType(attribTypeName); + boolean isArrayType = AtlasTypeUtil.isArrayType(attribTypeName); + boolean isMapType = AtlasTypeUtil.isMapType(attribTypeName); + + + try { + AtlasType atlasType = typeRegistry.getType(attribTypeName); + + if (isMapType || isArrayType || isClassificationType(atlasType) || isEntityType(atlasType)) { + LOG.warn("Ignoring non-indexable attribute {}", attribTypeName); + } + + if (isBuiltInType) { + createIndexes(management, propertyName, getPrimitiveClass(attribTypeName), isUnique, cardinality, false, isIndexable); + } + + if (isEnumType(atlasType)) { + createIndexes(management, propertyName, String.class, isUnique, cardinality, false, isIndexable); + } + + if (isStructType(atlasType)) { + AtlasStructDef structDef = typeRegistry.getStructDefByName(attributeDef.getName()); + updateIndexForTypeDef(management, structDef); + } + } catch (AtlasBaseException e) { + LOG.error("No type exists for {}", attribTypeName, e); + } + } + + private boolean isEntityType(AtlasType type) { + return type instanceof AtlasEntityType; + } + + private boolean isClassificationType(AtlasType type) { + return type instanceof AtlasClassificationType; + } + + private boolean isEnumType(AtlasType type) { + return type instanceof AtlasEnumType; + } + + private boolean isStructType(AtlasType type) { + return type instanceof AtlasStructType; + } + + private Class getPrimitiveClass(String attribTypeName) { + switch (attribTypeName.toLowerCase()) { + case ATLAS_TYPE_BOOLEAN: + return Boolean.class; + case ATLAS_TYPE_BYTE: + return Byte.class; + case ATLAS_TYPE_SHORT: + return Short.class; + case ATLAS_TYPE_INT: + return Integer.class; + case ATLAS_TYPE_LONG: + case ATLAS_TYPE_DATE: + return Long.class; + case ATLAS_TYPE_FLOAT: + return Float.class; + case ATLAS_TYPE_DOUBLE: + return Double.class; + case ATLAS_TYPE_BIGINTEGER: + return BigInteger.class; + case ATLAS_TYPE_BIGDECIMAL: + return BigDecimal.class; + case ATLAS_TYPE_STRING: + return String.class; + } + + throw new IllegalArgumentException(String.format("Unknown primitive typename %s", attribTypeName)); + } + + private AtlasCardinality toAtlasCardinality(AtlasAttributeDef.Cardinality cardinality) { + switch (cardinality) { + case SINGLE: + return AtlasCardinality.SINGLE; + case LIST: + return AtlasCardinality.LIST; + case SET: + return AtlasCardinality.SET; + } + // Should never reach this point + throw new IllegalArgumentException(String.format("Bad cardinality %s", cardinality)); + } + private void addIndexForType(AtlasGraphManagement management, IDataType dataType) { switch (dataType.getTypeCategory()) { case PRIMITIVE: @@ -456,9 +601,7 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang LOG.info("Reacting to active: initializing index"); try { initialize(); - } catch (RepositoryException e) { - throw new AtlasException("Error in reacting to active on initialization", e); - } catch (IndexException e) { + } catch (RepositoryException | IndexException e) { throw new AtlasException("Error in reacting to active on initialization", e); } } @@ -467,7 +610,59 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang public void instanceIsPassive() { LOG.info("Reacting to passive state: No action right now."); } - + + @Override + public void onChange(ChangedTypeDefs changedTypeDefs) throws AtlasBaseException { + LOG.info("Adding indexes for changed typedefs"); + AtlasGraphManagement management = null; + try { + management = provider.get().getManagementSystem(); + + // Update index for newly created types + if (CollectionUtils.isNotEmpty(changedTypeDefs.getCreateTypeDefs())) { + for (AtlasBaseTypeDef typeDef : changedTypeDefs.getCreateTypeDefs()) { + updateIndexForTypeDef(management, typeDef); + } + } + + // Update index for updated types + if (CollectionUtils.isNotEmpty(changedTypeDefs.getUpdatedTypeDefs())) { + for (AtlasBaseTypeDef typeDef : changedTypeDefs.getUpdatedTypeDefs()) { + updateIndexForTypeDef(management, typeDef); + } + } + + //Commit indexes + commit(management); + } catch (RepositoryException | IndexException e) { + LOG.error("Failed to update indexes for changed typedefs", e); + attemptRollback(changedTypeDefs, management); + } + + } + + private void attemptRollback(ChangedTypeDefs changedTypeDefs, AtlasGraphManagement management) + throws AtlasBaseException { + if (null != management) { + try { + rollback(management); + } catch (IndexException e) { + LOG.error("Index rollback has failed", e); + throw new AtlasBaseException(AtlasErrorCode.INDEX_ROLLBACK_FAILED, e, + changedTypeDefs.toString()); + } + } + } + + private void updateIndexForTypeDef(AtlasGraphManagement management, AtlasBaseTypeDef typeDef) { + Preconditions.checkNotNull(typeDef, "Cannot index on null typedefs"); + if (LOG.isDebugEnabled()) { + LOG.debug("Creating indexes for type name={}, definition={}", typeDef.getName(), typeDef.getClass()); + } + addIndexForType(management, typeDef); + LOG.info("Index creation for type {} complete", typeDef.getName()); + } + /* Commenting this out since we do not need an index for edge label here private void createEdgeMixedIndex(String propertyName) { EdgeLabel edgeLabel = management.getEdgeLabel(propertyName); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0b85d5a0/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStore.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStore.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStore.java index ebc7ab2..68d2781 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStore.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStore.java @@ -17,10 +17,18 @@ */ package org.apache.atlas.repository.store.graph; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; + import org.apache.atlas.AtlasErrorCode; +import org.apache.atlas.AtlasException; import org.apache.atlas.GraphTransaction; import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.listener.ActiveStateChangeHandler; +import org.apache.atlas.listener.ChangedTypeDefs; +import org.apache.atlas.listener.TypeDefChangeListener; import org.apache.atlas.model.SearchFilter; +import org.apache.atlas.model.typedef.AtlasBaseTypeDef; import org.apache.atlas.model.typedef.AtlasClassificationDef; import org.apache.atlas.model.typedef.AtlasClassificationDef.AtlasClassificationDefs; import org.apache.atlas.model.typedef.AtlasEntityDef; @@ -41,22 +49,28 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Set; /** * Abstract class for graph persistence store for TypeDef */ -public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore { +public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, ActiveStateChangeHandler { private static final Logger LOG = LoggerFactory.getLogger(AtlasTypeDefGraphStore.class); private final AtlasTypeRegistry typeRegistry; - protected AtlasTypeDefGraphStore(AtlasTypeRegistry typeRegistry) { + private final Set<TypeDefChangeListener> typeDefChangeListeners; + + protected AtlasTypeDefGraphStore(AtlasTypeRegistry typeRegistry, + Set<TypeDefChangeListener> typeDefChangeListeners) { this.typeRegistry = typeRegistry; + this.typeDefChangeListeners = typeDefChangeListeners; } protected abstract AtlasEnumDefStore getEnumDefStore(AtlasTypeRegistry typeRegistry); @@ -92,6 +106,8 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore { ttr.updateGuid(ret.getName(), ret.getGuid()); + notifyListeners(TypeDefChangeType.CREATE, Arrays.asList(ret)); + typeRegistry.commitTransientTypeRegistry(ttr); return ret; @@ -139,6 +155,8 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore { AtlasEnumDef ret = getEnumDefStore(ttr).updateByName(name, enumDef); + notifyListeners(TypeDefChangeType.UPDATE, Arrays.asList(ret)); + typeRegistry.commitTransientTypeRegistry(ttr); return ret; @@ -153,6 +171,8 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore { AtlasEnumDef ret = getEnumDefStore(ttr).updateByGuid(guid, enumDef); + notifyListeners(TypeDefChangeType.UPDATE, Arrays.asList(ret)); + typeRegistry.commitTransientTypeRegistry(ttr); return ret; @@ -163,10 +183,14 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore { public void deleteEnumDefByName(String name) throws AtlasBaseException { AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry(); + AtlasEnumDef byName = typeRegistry.getEnumDefByName(name); + ttr.removeTypeByName(name); getEnumDefStore(ttr).deleteByName(name); + notifyListeners(TypeDefChangeType.DELETE, Lists.newArrayList(byName)); + typeRegistry.commitTransientTypeRegistry(ttr); } @@ -175,10 +199,14 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore { public void deleteEnumDefByGuid(String guid) throws AtlasBaseException { AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry(); + AtlasEnumDef byGuid = typeRegistry.getEnumDefByGuid(guid); + ttr.removeTypeByGuid(guid); getEnumDefStore(ttr).deleteByGuid(guid); + notifyListeners(TypeDefChangeType.DELETE, Lists.newArrayList(byGuid)); + typeRegistry.commitTransientTypeRegistry(ttr); } @@ -203,6 +231,8 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore { ttr.updateGuid(ret.getName(), ret.getGuid()); + notifyListeners(TypeDefChangeType.CREATE, Arrays.asList(ret)); + typeRegistry.commitTransientTypeRegistry(ttr); return ret; @@ -250,6 +280,8 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore { AtlasStructDef ret = getStructDefStore(ttr).updateByName(name, structDef); + notifyListeners(TypeDefChangeType.UPDATE, Arrays.asList(ret)); + typeRegistry.commitTransientTypeRegistry(ttr); return ret; @@ -264,6 +296,8 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore { AtlasStructDef ret = getStructDefStore(ttr).updateByGuid(guid, structDef); + notifyListeners(TypeDefChangeType.UPDATE, Arrays.asList(ret)); + typeRegistry.commitTransientTypeRegistry(ttr); return ret; @@ -274,10 +308,14 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore { public void deleteStructDefByName(String name) throws AtlasBaseException { AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry(); + AtlasStructDef byName = typeRegistry.getStructDefByName(name); + ttr.removeTypeByName(name); getStructDefStore(ttr).deleteByName(name, null); + notifyListeners(TypeDefChangeType.DELETE, Lists.newArrayList(byName)); + typeRegistry.commitTransientTypeRegistry(ttr); } @@ -286,10 +324,14 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore { public void deleteStructDefByGuid(String guid) throws AtlasBaseException { AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry(); + AtlasStructDef byGuid = typeRegistry.getStructDefByGuid(guid); + ttr.removeTypeByGuid(guid); getStructDefStore(ttr).deleteByGuid(guid, null); + notifyListeners(TypeDefChangeType.DELETE, Lists.newArrayList(byGuid)); + typeRegistry.commitTransientTypeRegistry(ttr); } @@ -315,6 +357,8 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore { ttr.updateGuid(ret.getName(), ret.getGuid()); + notifyListeners(TypeDefChangeType.CREATE, Arrays.asList(ret)); + typeRegistry.commitTransientTypeRegistry(ttr); return ret; @@ -364,6 +408,8 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore { AtlasClassificationDef ret = getClassificationDefStore(ttr).updateByName(name, classificationDef); + notifyListeners(TypeDefChangeType.UPDATE, Arrays.asList(ret)); + typeRegistry.commitTransientTypeRegistry(ttr); return ret; @@ -379,6 +425,8 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore { AtlasClassificationDef ret = getClassificationDefStore(ttr).updateByGuid(guid, classificationDef); + notifyListeners(TypeDefChangeType.UPDATE, Arrays.asList(ret)); + typeRegistry.commitTransientTypeRegistry(ttr); return ret; @@ -389,10 +437,14 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore { public void deleteClassificationDefByName(String name) throws AtlasBaseException { AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry(); + AtlasClassificationDef byName = typeRegistry.getClassificationDefByName(name); + ttr.removeTypeByName(name); getClassificationDefStore(ttr).deleteByName(name, null); + notifyListeners(TypeDefChangeType.DELETE, Lists.newArrayList(byName)); + typeRegistry.commitTransientTypeRegistry(ttr); } @@ -401,10 +453,14 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore { public void deleteClassificationDefByGuid(String guid) throws AtlasBaseException { AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry(); + AtlasClassificationDef byGuid = typeRegistry.getClassificationDefByGuid(guid); + ttr.removeTypeByGuid(guid); getClassificationDefStore(ttr).deleteByGuid(guid, null); + notifyListeners(TypeDefChangeType.DELETE, Lists.newArrayList(byGuid)); + typeRegistry.commitTransientTypeRegistry(ttr); } @@ -429,6 +485,8 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore { ttr.updateGuid(ret.getName(), ret.getGuid()); + notifyListeners(TypeDefChangeType.CREATE, Arrays.asList(ret)); + typeRegistry.commitTransientTypeRegistry(ttr); return ret; @@ -476,6 +534,8 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore { AtlasEntityDef ret = getEntityDefStore(ttr).updateByName(name, entityDef); + notifyListeners(TypeDefChangeType.UPDATE, Arrays.asList(ret)); + typeRegistry.commitTransientTypeRegistry(ttr); return ret; @@ -490,6 +550,8 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore { AtlasEntityDef ret = getEntityDefStore(ttr).updateByGuid(guid, entityDef); + notifyListeners(TypeDefChangeType.UPDATE, Arrays.asList(ret)); + typeRegistry.commitTransientTypeRegistry(ttr); return ret; @@ -500,10 +562,14 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore { public void deleteEntityDefByName(String name) throws AtlasBaseException { AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry(); + AtlasEntityDef byName = typeRegistry.getEntityDefByName(name); + ttr.removeTypeByName(name); getEntityDefStore(ttr).deleteByName(name, null); + notifyListeners(TypeDefChangeType.DELETE, Lists.newArrayList(byName)); + typeRegistry.commitTransientTypeRegistry(ttr); } @@ -512,10 +578,14 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore { public void deleteEntityDefByGuid(String guid) throws AtlasBaseException { AtlasTransientTypeRegistry ttr = typeRegistry.createTransientTypeRegistry(); + AtlasEntityDef byGuid = typeRegistry.getEntityDefByGuid(guid); + ttr.removeTypeByGuid(guid); getEntityDefStore(ttr).deleteByGuid(guid, null); + notifyListeners(TypeDefChangeType.DELETE, Lists.newArrayList(byGuid)); + typeRegistry.commitTransientTypeRegistry(ttr); } @@ -619,6 +689,17 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore { } } + List<AtlasBaseTypeDef> createdTypeDefs = new ArrayList<>(); + createdTypeDefs.addAll(ret.getEnumDefs()); + createdTypeDefs.addAll(ret.getStructDefs()); + createdTypeDefs.addAll(ret.getClassificationDefs()); + createdTypeDefs.addAll(ret.getEntityDefs()); + + ChangedTypeDefs changedTypeDefs = new ChangedTypeDefs(); + changedTypeDefs.setCreateTypeDefs(createdTypeDefs); + + notifyListeners(changedTypeDefs); + typeRegistry.commitTransientTypeRegistry(ttr); if (LOG.isDebugEnabled()) { @@ -678,6 +759,17 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore { } } + List<AtlasBaseTypeDef> updatedTypeDefs = new ArrayList<>(); + updatedTypeDefs.addAll(ret.getEnumDefs()); + updatedTypeDefs.addAll(ret.getStructDefs()); + updatedTypeDefs.addAll(ret.getClassificationDefs()); + updatedTypeDefs.addAll(ret.getEntityDefs()); + + ChangedTypeDefs changedTypeDefs = new ChangedTypeDefs(); + changedTypeDefs.setUpdatedTypeDefs(updatedTypeDefs); + + notifyListeners(changedTypeDefs); + typeRegistry.commitTransientTypeRegistry(ttr); if (LOG.isDebugEnabled()) { @@ -792,6 +884,11 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore { } } + Iterable<AtlasBaseTypeDef> deleted = Iterables.concat(typesDef.getEnumDefs(), typesDef.getClassificationDefs(), + typesDef.getClassificationDefs(), typesDef.getEntityDefs()); + + notifyListeners(TypeDefChangeType.DELETE, Lists.newArrayList(deleted)); + typeRegistry.commitTransientTypeRegistry(ttr); if (LOG.isDebugEnabled()) { @@ -845,4 +942,53 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore { } return typesDef; } + + @Override + public void instanceIsActive() throws AtlasException { + try { + init(); + } catch (AtlasBaseException e) { + LOG.error("Failed to init after becoming active", e); + } + } + + @Override + public void instanceIsPassive() throws AtlasException { + LOG.info("Not reacting to a Passive state change"); + } + + private void notifyListeners(TypeDefChangeType type, List<? extends AtlasBaseTypeDef> typeDefs) + throws AtlasBaseException { + ChangedTypeDefs changedTypeDefs = new ChangedTypeDefs(); + switch (type) { + case CREATE: + changedTypeDefs.setCreateTypeDefs(typeDefs); + break; + case UPDATE: + changedTypeDefs.setUpdatedTypeDefs(typeDefs); + break; + case DELETE: + changedTypeDefs.setDeletedTypeDefs(typeDefs); + break; + } + + notifyListeners(changedTypeDefs); + } + + private void notifyListeners(ChangedTypeDefs changedTypeDefs) throws AtlasBaseException { + if (CollectionUtils.isNotEmpty(typeDefChangeListeners)) { + for (TypeDefChangeListener changeListener : typeDefChangeListeners) { + try { + changeListener.onChange(changedTypeDefs); + } catch (AtlasBaseException e) { + LOG.error("OnChange failed for listener {}", changeListener.getClass().getName()); + throw e; + } + } + } + } + + private enum TypeDefChangeType { + CREATE, UPDATE, DELETE + } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0b85d5a0/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasTypeDefGraphStoreV1.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasTypeDefGraphStoreV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasTypeDefGraphStoreV1.java index 1f40f87..878f355 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasTypeDefGraphStoreV1.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasTypeDefGraphStoreV1.java @@ -22,6 +22,7 @@ import com.google.inject.Inject; import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.listener.TypeDefChangeListener; import org.apache.atlas.model.typedef.AtlasBaseTypeDef; import org.apache.atlas.model.typedef.AtlasClassificationDef; import org.apache.atlas.model.typedef.AtlasEntityDef; @@ -65,8 +66,9 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore { protected final AtlasGraph atlasGraph = AtlasGraphProvider.getGraphInstance(); @Inject - public AtlasTypeDefGraphStoreV1(AtlasTypeRegistry typeRegistry) { - super(typeRegistry); + public AtlasTypeDefGraphStoreV1(AtlasTypeRegistry typeRegistry, + Set<TypeDefChangeListener> typeDefChangeListeners) { + super(typeRegistry, typeDefChangeListeners); LOG.info("==> AtlasTypeDefGraphStoreV1()"); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0b85d5a0/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java b/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java index 5b4eb0e..95c3dd9 100755 --- a/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java +++ b/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java @@ -18,29 +18,25 @@ package org.apache.atlas.services; -import static org.apache.atlas.AtlasClient.PROCESS_ATTRIBUTE_INPUTS; -import static org.apache.atlas.AtlasClient.PROCESS_ATTRIBUTE_OUTPUTS; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Map; - -import javax.inject.Inject; -import javax.inject.Singleton; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.inject.Provider; import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasClient; +import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.AtlasException; import org.apache.atlas.EntityAuditEvent; import org.apache.atlas.RequestContext; import org.apache.atlas.classification.InterfaceAudience; +import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.ha.HAConfiguration; import org.apache.atlas.listener.ActiveStateChangeHandler; +import org.apache.atlas.listener.ChangedTypeDefs; import org.apache.atlas.listener.EntityChangeListener; +import org.apache.atlas.listener.TypeDefChangeListener; import org.apache.atlas.listener.TypesChangeListener; -import org.apache.atlas.query.QueryParser; import org.apache.atlas.repository.MetadataRepository; import org.apache.atlas.repository.RepositoryException; import org.apache.atlas.repository.audit.EntityAuditRepository; @@ -72,17 +68,23 @@ import org.apache.atlas.typesystem.types.TypeSystem; import org.apache.atlas.typesystem.types.cache.TypeCache; import org.apache.atlas.typesystem.types.utils.TypesUtil; import org.apache.atlas.utils.ParamChecker; -import org.apache.commons.collections.CollectionUtils; import org.apache.commons.configuration.Configuration; import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; -import com.google.inject.Provider; +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; + +import javax.inject.Inject; +import javax.inject.Singleton; + +import static org.apache.atlas.AtlasClient.PROCESS_ATTRIBUTE_INPUTS; +import static org.apache.atlas.AtlasClient.PROCESS_ATTRIBUTE_OUTPUTS; @@ -91,7 +93,7 @@ import com.google.inject.Provider; * for listening to changes to the repository. */ @Singleton -public class DefaultMetadataService implements MetadataService, ActiveStateChangeHandler { +public class DefaultMetadataService implements MetadataService, ActiveStateChangeHandler, TypeDefChangeListener { private static final Logger LOG = LoggerFactory.getLogger(DefaultMetadataService.class); private final short maxAuditResults; @@ -774,4 +776,22 @@ public class DefaultMetadataService implements MetadataService, ActiveStateChang public void instanceIsPassive() { LOG.info("Reacting to passive state: no action right now"); } + + @Override + public void onChange(ChangedTypeDefs changedTypeDefs) throws AtlasBaseException { + // All we need here is a restore of the type-system + LOG.info("TypeSystem reset invoked by TypeRegistry changes"); + try { + TypesDef typesDef = typeStore.restore(); + typeSystem.reset(); + TypeSystem.TransientTypeSystem transientTypeSystem + = typeSystem.createTransientTypeSystem(typesDef, false); + Map<String, IDataType> typesAdded = transientTypeSystem.getTypesAdded(); + LOG.info("Number of types got from transient type system: " + typesAdded.size()); + typeSystem.commitTypes(typesAdded); + } catch (AtlasException e) { + LOG.error("Failed to restore type-system after TypeRegistry changes", e); + throw new AtlasBaseException(AtlasErrorCode.INTERNAL_ERROR, e); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0b85d5a0/repository/src/test/java/org/apache/atlas/BaseRepositoryTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/BaseRepositoryTest.java b/repository/src/test/java/org/apache/atlas/BaseRepositoryTest.java index 71a8756..03d155c 100644 --- a/repository/src/test/java/org/apache/atlas/BaseRepositoryTest.java +++ b/repository/src/test/java/org/apache/atlas/BaseRepositoryTest.java @@ -20,10 +20,12 @@ package org.apache.atlas; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; + import org.apache.atlas.repository.MetadataRepository; import org.apache.atlas.repository.graph.AtlasGraphProvider; import org.apache.atlas.repository.graph.GraphBackedSearchIndexer; import org.apache.atlas.services.MetadataService; +import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.typesystem.ITypedReferenceableInstance; import org.apache.atlas.typesystem.Referenceable; import org.apache.atlas.typesystem.TypesDef; @@ -42,11 +44,12 @@ import org.apache.atlas.typesystem.types.TypeSystem; import org.apache.atlas.typesystem.types.utils.TypesUtil; import org.testng.annotations.Guice; -import javax.inject.Inject; import java.util.ArrayList; import java.util.Date; import java.util.List; +import javax.inject.Inject; + /** * Base Class to set up hive types and instances for tests */ @@ -65,7 +68,7 @@ public class BaseRepositoryTest { //force graph initialization / built in type registration TestUtils.getGraph(); setUpTypes(); - new GraphBackedSearchIndexer(); + new GraphBackedSearchIndexer(new AtlasTypeRegistry()); TestUtils.resetRequestContext(); setupInstances(); TestUtils.dumpGraph(TestUtils.getGraph()); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0b85d5a0/repository/src/test/java/org/apache/atlas/TestUtils.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/TestUtils.java b/repository/src/test/java/org/apache/atlas/TestUtils.java index 30071ba..abb8e94 100755 --- a/repository/src/test/java/org/apache/atlas/TestUtils.java +++ b/repository/src/test/java/org/apache/atlas/TestUtils.java @@ -18,23 +18,9 @@ package org.apache.atlas; -import static org.apache.atlas.typesystem.types.utils.TypesUtil.createClassTypeDef; -import static org.apache.atlas.typesystem.types.utils.TypesUtil.createOptionalAttrDef; -import static org.apache.atlas.typesystem.types.utils.TypesUtil.createRequiredAttrDef; -import static org.apache.atlas.typesystem.types.utils.TypesUtil.createStructTypeDef; -import static org.apache.atlas.typesystem.types.utils.TypesUtil.createTraitTypeDef; -import static org.apache.atlas.typesystem.types.utils.TypesUtil.createUniqueRequiredAttrDef; - -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.math.BigDecimal; -import java.math.BigInteger; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Date; -import java.util.List; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.inject.Provider; import org.apache.atlas.listener.EntityChangeListener; import org.apache.atlas.listener.TypesChangeListener; @@ -48,6 +34,7 @@ import org.apache.atlas.repository.typestore.ITypeStore; import org.apache.atlas.services.DefaultMetadataService; import org.apache.atlas.services.MetadataService; import org.apache.atlas.services.ReservedTypesRegistrar; +import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.typesystem.ITypedReferenceableInstance; import org.apache.atlas.typesystem.Referenceable; import org.apache.atlas.typesystem.TypesDef; @@ -73,9 +60,23 @@ import org.apache.commons.lang.RandomStringUtils; import org.codehaus.jettison.json.JSONArray; import org.testng.Assert; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; -import com.google.inject.Provider; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.List; + +import static org.apache.atlas.typesystem.types.utils.TypesUtil.createClassTypeDef; +import static org.apache.atlas.typesystem.types.utils.TypesUtil.createOptionalAttrDef; +import static org.apache.atlas.typesystem.types.utils.TypesUtil.createRequiredAttrDef; +import static org.apache.atlas.typesystem.types.utils.TypesUtil.createStructTypeDef; +import static org.apache.atlas.typesystem.types.utils.TypesUtil.createTraitTypeDef; +import static org.apache.atlas.typesystem.types.utils.TypesUtil.createUniqueRequiredAttrDef; /** * Test utility class. @@ -513,7 +514,7 @@ public final class TestUtils { catch(Throwable t) { typeCache = new DefaultTypeCache(); } - final GraphBackedSearchIndexer indexer = new GraphBackedSearchIndexer(); + final GraphBackedSearchIndexer indexer = new GraphBackedSearchIndexer(new AtlasTypeRegistry()); Provider<TypesChangeListener> indexerProvider = new Provider<TypesChangeListener>() { @Override http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0b85d5a0/repository/src/test/java/org/apache/atlas/discovery/GraphBackedDiscoveryServiceTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/discovery/GraphBackedDiscoveryServiceTest.java b/repository/src/test/java/org/apache/atlas/discovery/GraphBackedDiscoveryServiceTest.java index 13b7d22..645fef1 100755 --- a/repository/src/test/java/org/apache/atlas/discovery/GraphBackedDiscoveryServiceTest.java +++ b/repository/src/test/java/org/apache/atlas/discovery/GraphBackedDiscoveryServiceTest.java @@ -18,22 +18,7 @@ package org.apache.atlas.discovery; -import static org.apache.atlas.typesystem.types.utils.TypesUtil.createClassTypeDef; -import static org.apache.atlas.typesystem.types.utils.TypesUtil.createOptionalAttrDef; -import static org.apache.atlas.typesystem.types.utils.TypesUtil.createRequiredAttrDef; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertNotNull; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import javax.inject.Inject; +import com.google.common.collect.ImmutableSet; import org.apache.atlas.AtlasException; import org.apache.atlas.BaseRepositoryTest; @@ -46,6 +31,7 @@ import org.apache.atlas.repository.Constants; import org.apache.atlas.repository.MetadataRepository; import org.apache.atlas.repository.graph.AtlasGraphProvider; import org.apache.atlas.repository.graph.GraphBackedSearchIndexer; +import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.typesystem.ITypedReferenceableInstance; import org.apache.atlas.typesystem.Referenceable; import org.apache.atlas.typesystem.persistence.Id; @@ -65,7 +51,22 @@ import org.testng.annotations.DataProvider; import org.testng.annotations.Guice; import org.testng.annotations.Test; -import com.google.common.collect.ImmutableSet; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import javax.inject.Inject; + +import static org.apache.atlas.typesystem.types.utils.TypesUtil.createClassTypeDef; +import static org.apache.atlas.typesystem.types.utils.TypesUtil.createOptionalAttrDef; +import static org.apache.atlas.typesystem.types.utils.TypesUtil.createRequiredAttrDef; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; @Guice(modules = RepositoryMetadataModule.class) public class GraphBackedDiscoveryServiceTest extends BaseRepositoryTest { @@ -119,7 +120,7 @@ public class GraphBackedDiscoveryServiceTest extends BaseRepositoryTest { //We need to commit the transaction before creating the indices to release the locks held by the transaction. //otherwise, the index commit will fail while waiting for the those locks to be released. AtlasGraphProvider.getGraphInstance().commit(); - GraphBackedSearchIndexer idx = new GraphBackedSearchIndexer(); + GraphBackedSearchIndexer idx = new GraphBackedSearchIndexer(new AtlasTypeRegistry()); idx.onAdd(newTypes); } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0b85d5a0/repository/src/test/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepositoryDeleteTestBase.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepositoryDeleteTestBase.java b/repository/src/test/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepositoryDeleteTestBase.java index 6de995b..9e850a9 100644 --- a/repository/src/test/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepositoryDeleteTestBase.java +++ b/repository/src/test/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepositoryDeleteTestBase.java @@ -18,30 +18,8 @@ package org.apache.atlas.repository.graph; -import static org.apache.atlas.TestUtils.COLUMNS_ATTR_NAME; -import static org.apache.atlas.TestUtils.COLUMN_TYPE; -import static org.apache.atlas.TestUtils.NAME; -import static org.apache.atlas.TestUtils.PII; -import static org.apache.atlas.TestUtils.PROCESS_TYPE; -import static org.apache.atlas.TestUtils.TABLE_TYPE; -import static org.apache.atlas.TestUtils.createColumnEntity; -import static org.apache.atlas.TestUtils.createDBEntity; -import static org.apache.atlas.TestUtils.createTableEntity; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertNotEquals; -import static org.testng.Assert.assertTrue; -import static org.testng.Assert.fail; - -import java.lang.reflect.InvocationHandler; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.lang.reflect.Proxy; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasClient.EntityResult; @@ -55,6 +33,7 @@ import org.apache.atlas.repository.MetadataRepository; import org.apache.atlas.repository.RepositoryException; import org.apache.atlas.repository.graphdb.AtlasGraph; import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.typesystem.IReferenceableInstance; import org.apache.atlas.typesystem.IStruct; import org.apache.atlas.typesystem.ITypedReferenceableInstance; @@ -84,8 +63,30 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Guice; import org.testng.annotations.Test; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.atlas.TestUtils.COLUMNS_ATTR_NAME; +import static org.apache.atlas.TestUtils.COLUMN_TYPE; +import static org.apache.atlas.TestUtils.NAME; +import static org.apache.atlas.TestUtils.PII; +import static org.apache.atlas.TestUtils.PROCESS_TYPE; +import static org.apache.atlas.TestUtils.TABLE_TYPE; +import static org.apache.atlas.TestUtils.createColumnEntity; +import static org.apache.atlas.TestUtils.createDBEntity; +import static org.apache.atlas.TestUtils.createTableEntity; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotEquals; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; /** * Test for GraphBackedMetadataRepository.deleteEntities @@ -110,7 +111,7 @@ public abstract class GraphBackedMetadataRepositoryDeleteTestBase { typeSystem = TypeSystem.getInstance(); typeSystem.reset(); - new GraphBackedSearchIndexer(); + new GraphBackedSearchIndexer(new AtlasTypeRegistry()); final GraphBackedMetadataRepository delegate = new GraphBackedMetadataRepository(getDeleteHandler(typeSystem)); repositoryService = (MetadataRepository)Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0b85d5a0/repository/src/test/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepositoryTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepositoryTest.java b/repository/src/test/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepositoryTest.java index 7a57518..725b9a6 100755 --- a/repository/src/test/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepositoryTest.java +++ b/repository/src/test/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepositoryTest.java @@ -18,25 +18,8 @@ package org.apache.atlas.repository.graph; -import static org.apache.atlas.typesystem.types.utils.TypesUtil.createClassTypeDef; -import static org.apache.atlas.typesystem.types.utils.TypesUtil.createUniqueRequiredAttrDef; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertNotEquals; -import static org.testng.Assert.assertNotNull; -import static org.testng.Assert.assertTrue; - -import java.util.ArrayList; -import java.util.Date; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.UUID; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; - -import javax.inject.Inject; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import org.apache.atlas.GraphTransaction; import org.apache.atlas.RepositoryMetadataModule; @@ -52,6 +35,7 @@ import org.apache.atlas.repository.graphdb.AtlasGraph; import org.apache.atlas.repository.graphdb.AtlasGraphQuery; import org.apache.atlas.repository.graphdb.AtlasGraphQuery.ComparisionOperator; import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.typesystem.IStruct; import org.apache.atlas.typesystem.ITypedReferenceableInstance; import org.apache.atlas.typesystem.ITypedStruct; @@ -78,11 +62,28 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Guice; import org.testng.annotations.Test; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import javax.inject.Inject; import scala.actors.threadpool.Arrays; +import static org.apache.atlas.typesystem.types.utils.TypesUtil.createClassTypeDef; +import static org.apache.atlas.typesystem.types.utils.TypesUtil.createUniqueRequiredAttrDef; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; + /** * GraphBackedMetadataRepository test * @@ -107,7 +108,7 @@ public class GraphBackedMetadataRepositoryTest { typeSystem = TypeSystem.getInstance(); typeSystem.reset(); - new GraphBackedSearchIndexer(); + new GraphBackedSearchIndexer(new AtlasTypeRegistry()); TestUtils.defineDeptEmployeeTypes(typeSystem); TestUtils.createHiveTypes(typeSystem); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0b85d5a0/repository/src/test/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexerMockTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexerMockTest.java b/repository/src/test/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexerMockTest.java index 398ea62..2a07f02 100644 --- a/repository/src/test/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexerMockTest.java +++ b/repository/src/test/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexerMockTest.java @@ -18,10 +18,6 @@ package org.apache.atlas.repository.graph; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyZeroInteractions; -import static org.mockito.Mockito.when; - import org.apache.atlas.AtlasException; import org.apache.atlas.ha.HAConfiguration; import org.apache.atlas.repository.Constants; @@ -29,12 +25,17 @@ import org.apache.atlas.repository.IndexException; import org.apache.atlas.repository.RepositoryException; import org.apache.atlas.repository.graphdb.AtlasGraph; import org.apache.atlas.repository.graphdb.AtlasGraphManagement; +import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.commons.configuration.Configuration; import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.Mockito.when; + public class GraphBackedSearchIndexerMockTest implements IAtlasGraphProvider { @Mock @@ -46,6 +47,9 @@ public class GraphBackedSearchIndexerMockTest implements IAtlasGraphProvider { @Mock private AtlasGraphManagement management; + @Mock + private AtlasTypeRegistry typeRegistry; + @BeforeMethod public void setup() { MockitoAnnotations.initMocks(this); @@ -57,7 +61,7 @@ public class GraphBackedSearchIndexerMockTest implements IAtlasGraphProvider { when(graph.getManagementSystem()).thenReturn(management); when(management.containsPropertyKey(Constants.VERTEX_TYPE_PROPERTY_KEY)).thenReturn(true); - GraphBackedSearchIndexer graphBackedSearchIndexer = new GraphBackedSearchIndexer(this, configuration); + GraphBackedSearchIndexer graphBackedSearchIndexer = new GraphBackedSearchIndexer(this, configuration, typeRegistry); verify(management).containsPropertyKey(Constants.VERTEX_TYPE_PROPERTY_KEY); } @@ -69,7 +73,7 @@ public class GraphBackedSearchIndexerMockTest implements IAtlasGraphProvider { when(graph.getManagementSystem()).thenReturn(management); when(management.containsPropertyKey(Constants.VERTEX_TYPE_PROPERTY_KEY)).thenReturn(true); - new GraphBackedSearchIndexer(this, configuration); + new GraphBackedSearchIndexer(this, configuration, typeRegistry); verifyZeroInteractions(management); } @@ -81,7 +85,7 @@ public class GraphBackedSearchIndexerMockTest implements IAtlasGraphProvider { when(graph.getManagementSystem()).thenReturn(management); when(management.containsPropertyKey(Constants.VERTEX_TYPE_PROPERTY_KEY)).thenReturn(true); - GraphBackedSearchIndexer graphBackedSearchIndexer = new GraphBackedSearchIndexer(this, configuration); + GraphBackedSearchIndexer graphBackedSearchIndexer = new GraphBackedSearchIndexer(this, configuration, typeRegistry); graphBackedSearchIndexer.instanceIsActive(); verify(management).containsPropertyKey(Constants.VERTEX_TYPE_PROPERTY_KEY); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0b85d5a0/repository/src/test/java/org/apache/atlas/repository/graph/GraphHelperTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/repository/graph/GraphHelperTest.java b/repository/src/test/java/org/apache/atlas/repository/graph/GraphHelperTest.java index fe15014..a7dc13d 100644 --- a/repository/src/test/java/org/apache/atlas/repository/graph/GraphHelperTest.java +++ b/repository/src/test/java/org/apache/atlas/repository/graph/GraphHelperTest.java @@ -18,26 +18,13 @@ package org.apache.atlas.repository.graph; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertFalse; -import static org.testng.Assert.assertNotNull; -import static org.testng.Assert.assertNull; -import static org.testng.Assert.assertTrue; - -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Set; - -import javax.inject.Inject; - import org.apache.atlas.RepositoryMetadataModule; import org.apache.atlas.TestUtils; import org.apache.atlas.repository.graph.GraphHelper.VertexInfo; import org.apache.atlas.repository.graphdb.AtlasEdge; import org.apache.atlas.repository.graphdb.AtlasGraph; import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.typesystem.ITypedReferenceableInstance; import org.apache.atlas.typesystem.types.TypeSystem; import org.testng.Assert; @@ -47,6 +34,20 @@ import org.testng.annotations.DataProvider; import org.testng.annotations.Guice; import org.testng.annotations.Test; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; + +import javax.inject.Inject; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; + @Guice(modules = RepositoryMetadataModule.class) public class GraphHelperTest { @@ -72,12 +73,15 @@ public class GraphHelperTest { private TypeSystem typeSystem; + @Inject + private AtlasTypeRegistry typeRegistry; + @BeforeClass public void setUp() throws Exception { typeSystem = TypeSystem.getInstance(); typeSystem.reset(); - new GraphBackedSearchIndexer(); + new GraphBackedSearchIndexer(typeRegistry); TestUtils.defineDeptEmployeeTypes(typeSystem); } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0b85d5a0/repository/src/test/java/org/apache/atlas/repository/graph/GraphRepoMapperScaleTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/repository/graph/GraphRepoMapperScaleTest.java b/repository/src/test/java/org/apache/atlas/repository/graph/GraphRepoMapperScaleTest.java index 49ef551..a03f965 100755 --- a/repository/src/test/java/org/apache/atlas/repository/graph/GraphRepoMapperScaleTest.java +++ b/repository/src/test/java/org/apache/atlas/repository/graph/GraphRepoMapperScaleTest.java @@ -18,13 +18,6 @@ package org.apache.atlas.repository.graph; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Date; -import java.util.Iterator; - -import javax.inject.Inject; - import org.apache.atlas.ApplicationProperties; import org.apache.atlas.GraphTransaction; import org.apache.atlas.RepositoryMetadataModule; @@ -35,6 +28,7 @@ import org.apache.atlas.repository.graphdb.AtlasGraphQuery; import org.apache.atlas.repository.graphdb.AtlasGraphQuery.ComparisionOperator; import org.apache.atlas.repository.graphdb.AtlasIndexQuery; import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.typesystem.ITypedReferenceableInstance; import org.apache.atlas.typesystem.Referenceable; import org.apache.atlas.typesystem.Struct; @@ -50,6 +44,13 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Guice; import org.testng.annotations.Test; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Date; +import java.util.Iterator; + +import javax.inject.Inject; + @Test @Guice(modules = RepositoryMetadataModule.class) public class GraphRepoMapperScaleTest { @@ -72,7 +73,7 @@ public class GraphRepoMapperScaleTest { public void setUp() throws Exception { //force up front graph initialization TestUtils.getGraph(); - searchIndexer = new GraphBackedSearchIndexer(new AtlasGraphProvider(), ApplicationProperties.get()); + searchIndexer = new GraphBackedSearchIndexer(new AtlasGraphProvider(), ApplicationProperties.get(), new AtlasTypeRegistry()); //Make sure we can cleanup the index directory Collection<IDataType> typesAdded = TestUtils.createHiveTypes(typeSystem); searchIndexer.onAdd(typesAdded); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0b85d5a0/repository/src/test/java/org/apache/atlas/service/DefaultMetadataServiceTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/service/DefaultMetadataServiceTest.java b/repository/src/test/java/org/apache/atlas/service/DefaultMetadataServiceTest.java index 96cefe2..82fe380 100644 --- a/repository/src/test/java/org/apache/atlas/service/DefaultMetadataServiceTest.java +++ b/repository/src/test/java/org/apache/atlas/service/DefaultMetadataServiceTest.java @@ -18,29 +18,9 @@ package org.apache.atlas.service; -import static org.apache.atlas.TestUtils.COLUMNS_ATTR_NAME; -import static org.apache.atlas.TestUtils.COLUMN_TYPE; -import static org.apache.atlas.TestUtils.PII; -import static org.apache.atlas.TestUtils.TABLE_TYPE; -import static org.apache.atlas.TestUtils.createColumnEntity; -import static org.apache.atlas.TestUtils.createDBEntity; -import static org.apache.atlas.TestUtils.createInstance; -import static org.apache.atlas.TestUtils.createTableEntity; -import static org.apache.atlas.TestUtils.randomString; -import static org.apache.atlas.typesystem.types.utils.TypesUtil.createClassTypeDef; -import static org.apache.atlas.typesystem.types.utils.TypesUtil.createOptionalAttrDef; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertNotNull; -import static org.testng.Assert.assertNull; -import static org.testng.Assert.assertTrue; -import static org.testng.Assert.fail; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.inject.Inject; import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasException; @@ -49,12 +29,15 @@ import org.apache.atlas.RepositoryMetadataModule; import org.apache.atlas.RequestContext; import org.apache.atlas.TestUtils; import org.apache.atlas.discovery.graph.GraphBackedDiscoveryService; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.listener.ChangedTypeDefs; import org.apache.atlas.listener.EntityChangeListener; import org.apache.atlas.query.QueryParams; import org.apache.atlas.repository.audit.EntityAuditRepository; import org.apache.atlas.repository.audit.HBaseBasedAuditRepository; import org.apache.atlas.repository.audit.HBaseTestUtils; import org.apache.atlas.repository.graph.AtlasGraphProvider; +import org.apache.atlas.services.DefaultMetadataService; import org.apache.atlas.services.MetadataService; import org.apache.atlas.typesystem.IReferenceableInstance; import org.apache.atlas.typesystem.IStruct; @@ -75,6 +58,7 @@ import org.apache.atlas.typesystem.types.HierarchicalTypeDefinition; import org.apache.atlas.typesystem.types.Multiplicity; import org.apache.atlas.typesystem.types.TypeSystem; import org.apache.atlas.typesystem.types.ValueConversionException; +import org.apache.atlas.typesystem.types.cache.TypeCache; import org.apache.atlas.typesystem.types.utils.TypesUtil; import org.apache.atlas.utils.ParamChecker; import org.apache.commons.lang.RandomStringUtils; @@ -87,9 +71,29 @@ import org.testng.annotations.BeforeTest; import org.testng.annotations.Guice; import org.testng.annotations.Test; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; -import com.google.inject.Inject; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.atlas.TestUtils.COLUMNS_ATTR_NAME; +import static org.apache.atlas.TestUtils.COLUMN_TYPE; +import static org.apache.atlas.TestUtils.PII; +import static org.apache.atlas.TestUtils.TABLE_TYPE; +import static org.apache.atlas.TestUtils.createColumnEntity; +import static org.apache.atlas.TestUtils.createDBEntity; +import static org.apache.atlas.TestUtils.createInstance; +import static org.apache.atlas.TestUtils.createTableEntity; +import static org.apache.atlas.TestUtils.randomString; +import static org.apache.atlas.typesystem.types.utils.TypesUtil.createClassTypeDef; +import static org.apache.atlas.typesystem.types.utils.TypesUtil.createOptionalAttrDef; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; @Guice(modules = RepositoryMetadataModule.class) public class DefaultMetadataServiceTest { @@ -1131,6 +1135,22 @@ public class DefaultMetadataServiceTest { } } + @Test + public void testOnChangeRefresh() { + try { + List<String> beforeChangeTypeNames = metadataService.getTypeNames(new HashMap<TypeCache.TYPE_FILTER, String>()); + + ((DefaultMetadataService)metadataService).onChange(new ChangedTypeDefs()); + + List<String> afterChangeTypeNames = metadataService.getTypeNames(new HashMap<TypeCache.TYPE_FILTER, String>()); + assertEquals(afterChangeTypeNames, beforeChangeTypeNames); + } catch (AtlasBaseException e) { + fail("Should've succeeded", e); + } catch (AtlasException e) { + fail("getTypeNames should've succeeded", e); + } + } + private static class EntitiesChangeListener implements EntityChangeListener { private List<String> deletedEntities = new ArrayList<>(); private List<String> updatedEntities = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0b85d5a0/repository/src/test/scala/org/apache/atlas/query/QueryTestsUtils.scala ---------------------------------------------------------------------- diff --git a/repository/src/test/scala/org/apache/atlas/query/QueryTestsUtils.scala b/repository/src/test/scala/org/apache/atlas/query/QueryTestsUtils.scala index 33275d3..c844558 100755 --- a/repository/src/test/scala/org/apache/atlas/query/QueryTestsUtils.scala +++ b/repository/src/test/scala/org/apache/atlas/query/QueryTestsUtils.scala @@ -74,7 +74,8 @@ trait GraphUtils { object QueryTestsUtils extends GraphUtils { def setupTypesAndIndices() : Unit = { - val indexer = new GraphBackedSearchIndexer(); + // FIXME: Do we need to init the AtlasTypeRegistry here ? + val indexer = new GraphBackedSearchIndexer(null); val typesDef : TypesDef = defineTypes; val newTypes = TypeSystem.getInstance.defineTypes(typesDef); indexer.onAdd(newTypes.values()); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0b85d5a0/typesystem/pom.xml ---------------------------------------------------------------------- diff --git a/typesystem/pom.xml b/typesystem/pom.xml index c564442..28c077d 100755 --- a/typesystem/pom.xml +++ b/typesystem/pom.xml @@ -116,6 +116,10 @@ <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> </dependency> + <dependency> + <groupId>org.apache.atlas</groupId> + <artifactId>atlas-intg</artifactId> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0b85d5a0/typesystem/src/main/java/org/apache/atlas/typesystem/types/TypeSystem.java ---------------------------------------------------------------------- diff --git a/typesystem/src/main/java/org/apache/atlas/typesystem/types/TypeSystem.java b/typesystem/src/main/java/org/apache/atlas/typesystem/types/TypeSystem.java index 52637e6..d73a7b3 100755 --- a/typesystem/src/main/java/org/apache/atlas/typesystem/types/TypeSystem.java +++ b/typesystem/src/main/java/org/apache/atlas/typesystem/types/TypeSystem.java @@ -20,6 +20,7 @@ package org.apache.atlas.typesystem.types; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; + import org.apache.atlas.AtlasException; import org.apache.atlas.classification.InterfaceAudience; import org.apache.atlas.typesystem.TypesDef; @@ -30,7 +31,6 @@ import org.apache.atlas.typesystem.types.cache.TypeCache; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.inject.Singleton; import java.lang.reflect.Constructor; import java.text.SimpleDateFormat; import java.util.ArrayList; @@ -41,6 +41,8 @@ import java.util.Set; import java.util.TimeZone; import java.util.concurrent.ConcurrentHashMap; +import javax.inject.Singleton; + @Singleton @InterfaceAudience.Private public class TypeSystem { http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0b85d5a0/webapp/src/main/java/org/apache/atlas/web/rest/TypesREST.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/rest/TypesREST.java b/webapp/src/main/java/org/apache/atlas/web/rest/TypesREST.java index a2cfc62..d0cb209 100644 --- a/webapp/src/main/java/org/apache/atlas/web/rest/TypesREST.java +++ b/webapp/src/main/java/org/apache/atlas/web/rest/TypesREST.java @@ -31,7 +31,6 @@ import org.apache.atlas.model.typedef.AtlasStructDef; import org.apache.atlas.model.typedef.AtlasStructDef.AtlasStructDefs; import org.apache.atlas.model.typedef.AtlasTypesDef; import org.apache.atlas.store.AtlasTypeDefStore; -import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.web.util.Servlets; import org.apache.http.annotation.Experimental; import org.slf4j.Logger; http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0b85d5a0/webapp/src/main/java/org/apache/atlas/web/service/ActiveInstanceElectorModule.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/service/ActiveInstanceElectorModule.java b/webapp/src/main/java/org/apache/atlas/web/service/ActiveInstanceElectorModule.java index ee6035d..1f67f9f 100644 --- a/webapp/src/main/java/org/apache/atlas/web/service/ActiveInstanceElectorModule.java +++ b/webapp/src/main/java/org/apache/atlas/web/service/ActiveInstanceElectorModule.java @@ -24,6 +24,7 @@ import org.apache.atlas.listener.ActiveStateChangeHandler; import org.apache.atlas.notification.NotificationHookConsumer; import org.apache.atlas.repository.audit.HBaseBasedAuditRepository; import org.apache.atlas.repository.graph.GraphBackedSearchIndexer; +import org.apache.atlas.repository.store.graph.v1.AtlasTypeDefGraphStoreV1; import org.apache.atlas.service.Service; import org.apache.atlas.services.DefaultMetadataService; @@ -41,6 +42,7 @@ public class ActiveInstanceElectorModule extends AbstractModule { activeStateChangeHandlerBinder.addBinding().to(DefaultMetadataService.class); activeStateChangeHandlerBinder.addBinding().to(NotificationHookConsumer.class); activeStateChangeHandlerBinder.addBinding().to(HBaseBasedAuditRepository.class); + activeStateChangeHandlerBinder.addBinding().to(AtlasTypeDefGraphStoreV1.class); Multibinder<Service> serviceBinder = Multibinder.newSetBinder(binder(), Service.class); serviceBinder.addBinding().to(ActiveInstanceElectorService.class);
