Repository: incubator-atlas Updated Branches: refs/heads/master 6a24cad18 -> f3bbdc151
ATLAS-1266: fixed typedef APIs to update type-registry only on successful graph commit Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/f3bbdc15 Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/f3bbdc15 Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/f3bbdc15 Branch: refs/heads/master Commit: f3bbdc151d3aed6c8843f763c06d12c81033de7b Parents: 6a24cad Author: Madhan Neethiraj <[email protected]> Authored: Wed Nov 2 17:13:00 2016 -0700 Committer: Madhan Neethiraj <[email protected]> Committed: Thu Nov 3 13:40:17 2016 -0700 ---------------------------------------------------------------------- .../apache/atlas/type/AtlasTypeRegistry.java | 152 +++++++++++---- .../atlas/GraphTransactionInterceptor.java | 62 ++++-- .../apache/atlas/RepositoryMetadataModule.java | 2 + .../graph/GraphBackedSearchIndexer.java | 2 +- .../store/graph/AtlasTypeDefGraphStore.java | 191 +++++++------------ .../graph/v1/AtlasTypeDefGraphStoreV1.java | 38 ++-- 6 files changed, 249 insertions(+), 198 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/f3bbdc15/intg/src/main/java/org/apache/atlas/type/AtlasTypeRegistry.java ---------------------------------------------------------------------- diff --git a/intg/src/main/java/org/apache/atlas/type/AtlasTypeRegistry.java b/intg/src/main/java/org/apache/atlas/type/AtlasTypeRegistry.java index 95a5054..8924d44 100644 --- a/intg/src/main/java/org/apache/atlas/type/AtlasTypeRegistry.java +++ b/intg/src/main/java/org/apache/atlas/type/AtlasTypeRegistry.java @@ -32,8 +32,10 @@ import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -110,6 +112,9 @@ public class AtlasTypeRegistry { return ret; } + public AtlasBaseTypeDef getTypeDefByName(String name) { return registryData.getTypeDefByName(name); } + + public AtlasBaseTypeDef getTypeDefByGuid(String guid) { return registryData.getTypeDefByGuid(guid); } public Collection<AtlasEnumDef> getAllEnumDefs() { return registryData.enumDefs.getAll(); } @@ -168,6 +173,7 @@ public class AtlasTypeRegistry { final TypeDefCache<AtlasStructDef> structDefs; final TypeDefCache<AtlasClassificationDef> classificationDefs; final TypeDefCache<AtlasEntityDef> entityDefs; + final TypeDefCache<? extends AtlasBaseTypeDef>[] allDefCaches; RegistryData() { allTypes = new TypeCache(); @@ -175,6 +181,7 @@ public class AtlasTypeRegistry { structDefs = new TypeDefCache<>(allTypes); classificationDefs = new TypeDefCache<>(allTypes); entityDefs = new TypeDefCache<>(allTypes); + allDefCaches = new TypeDefCache[] { enumDefs, structDefs, classificationDefs, entityDefs }; allTypes.addType(new AtlasBuiltInTypes.AtlasBooleanType()); allTypes.addType(new AtlasBuiltInTypes.AtlasByteType()); @@ -196,6 +203,39 @@ public class AtlasTypeRegistry { structDefs = new TypeDefCache<>(other.structDefs, allTypes); classificationDefs = new TypeDefCache<>(other.classificationDefs, allTypes); entityDefs = new TypeDefCache<>(other.entityDefs, allTypes); + allDefCaches = new TypeDefCache[] { enumDefs, structDefs, classificationDefs, entityDefs }; + } + + AtlasBaseTypeDef getTypeDefByName(String name) { + AtlasBaseTypeDef ret = null; + + if (name != null) { + for (TypeDefCache typeDefCache : allDefCaches) { + ret = typeDefCache.getTypeDefByName(name); + + if (ret != null) { + break; + } + } + } + + return ret; + } + + AtlasBaseTypeDef getTypeDefByGuid(String guid) { + AtlasBaseTypeDef ret = null; + + if (guid != null) { + for (TypeDefCache typeDefCache : allDefCaches) { + ret = typeDefCache.getTypeDefByGuid(guid); + + if (ret != null) { + break; + } + } + } + + return ret; } void updateGuid(String typeName, String guid) { @@ -227,6 +267,10 @@ public class AtlasTypeRegistry { } public static class AtlasTransientTypeRegistry extends AtlasTypeRegistry { + private List<AtlasBaseTypeDef> addedTypes = new ArrayList<>(); + private List<AtlasBaseTypeDef> updatedTypes = new ArrayList<>(); + private List<AtlasBaseTypeDef> deletedTypes = new ArrayList<>(); + private AtlasTransientTypeRegistry(AtlasTypeRegistry parent) { super(parent); @@ -261,7 +305,6 @@ public class AtlasTypeRegistry { registryData.updateGuid(typeName, guid); - if (LOG.isDebugEnabled()) { LOG.debug("<== AtlasTypeRegistry.updateGuid({}, {})", typeName, guid); } @@ -391,9 +434,15 @@ public class AtlasTypeRegistry { } if (guid != null) { + AtlasBaseTypeDef typeDef = getTypeDefByGuid(guid); + registryData.removeByGuid(guid); resolveReferences(); + + if (typeDef != null) { + deletedTypes.add(typeDef); + } } if (LOG.isDebugEnabled()) { @@ -407,9 +456,15 @@ public class AtlasTypeRegistry { } if (name != null) { + AtlasBaseTypeDef typeDef = getTypeDefByName(name); + registryData.removeByName(name); resolveReferences(); + + if (typeDef != null) { + deletedTypes.add(typeDef); + } } if (LOG.isDebugEnabled()) { @@ -417,6 +472,12 @@ public class AtlasTypeRegistry { } } + public List<AtlasBaseTypeDef> getAddedTypes() { return addedTypes; } + + public List<AtlasBaseTypeDef> getUpdatedTypes() { return updatedTypes; } + + public List<AtlasBaseTypeDef> getDeleteedTypes() { return deletedTypes; } + private void addTypeWithNoRefResolve(AtlasBaseTypeDef typeDef) { if (LOG.isDebugEnabled()) { @@ -442,6 +503,8 @@ public class AtlasTypeRegistry { registryData.entityDefs.addType(entityDef, new AtlasEntityType(entityDef)); } + + addedTypes.add(typeDef); } if (LOG.isDebugEnabled()) { @@ -490,29 +553,32 @@ public class AtlasTypeRegistry { LOG.debug("==> AtlasTypeRegistry.updateTypeByGuidWithNoRefResolve({})", guid); } - if (guid == null || typeDef == null) { + if (guid != null && typeDef != null) { // ignore - } else if (typeDef.getClass().equals(AtlasEnumDef.class)) { - AtlasEnumDef enumDef = (AtlasEnumDef)typeDef; + if (typeDef.getClass().equals(AtlasEnumDef.class)) { + AtlasEnumDef enumDef = (AtlasEnumDef) typeDef; - registryData.enumDefs.removeTypeDefByGuid(guid); - registryData.enumDefs.addType(enumDef, new AtlasEnumType(enumDef)); - } else if (typeDef.getClass().equals(AtlasStructDef.class)) { - AtlasStructDef structDef = (AtlasStructDef)typeDef; + registryData.enumDefs.removeTypeDefByGuid(guid); + registryData.enumDefs.addType(enumDef, new AtlasEnumType(enumDef)); + } else if (typeDef.getClass().equals(AtlasStructDef.class)) { + AtlasStructDef structDef = (AtlasStructDef) typeDef; - registryData.structDefs.removeTypeDefByGuid(guid); - registryData.structDefs.addType(structDef, new AtlasStructType(structDef)); - } else if (typeDef.getClass().equals(AtlasClassificationDef.class)) { - AtlasClassificationDef classificationDef = (AtlasClassificationDef)typeDef; + registryData.structDefs.removeTypeDefByGuid(guid); + registryData.structDefs.addType(structDef, new AtlasStructType(structDef)); + } else if (typeDef.getClass().equals(AtlasClassificationDef.class)) { + AtlasClassificationDef classificationDef = (AtlasClassificationDef) typeDef; - registryData.classificationDefs.removeTypeDefByGuid(guid); - registryData.classificationDefs.addType(classificationDef, - new AtlasClassificationType(classificationDef)); - } else if (typeDef.getClass().equals(AtlasEntityDef.class)) { - AtlasEntityDef entityDef = (AtlasEntityDef)typeDef; + registryData.classificationDefs.removeTypeDefByGuid(guid); + registryData.classificationDefs.addType(classificationDef, + new AtlasClassificationType(classificationDef)); + } else if (typeDef.getClass().equals(AtlasEntityDef.class)) { + AtlasEntityDef entityDef = (AtlasEntityDef) typeDef; + + registryData.entityDefs.removeTypeDefByGuid(guid); + registryData.entityDefs.addType(entityDef, new AtlasEntityType(entityDef)); + } - registryData.entityDefs.removeTypeDefByGuid(guid); - registryData.entityDefs.addType(entityDef, new AtlasEntityType(entityDef)); + updatedTypes.add(typeDef); } if (LOG.isDebugEnabled()) { @@ -525,29 +591,31 @@ public class AtlasTypeRegistry { LOG.debug("==> AtlasTypeRegistry.updateTypeByNameWithNoRefResolve({})", name); } - if (name == null || typeDef == null) { - // ignore - } else if (typeDef.getClass().equals(AtlasEnumDef.class)) { - AtlasEnumDef enumDef = (AtlasEnumDef)typeDef; - - registryData.enumDefs.removeTypeDefByName(name); - registryData.enumDefs.addType(enumDef, new AtlasEnumType(enumDef)); - } else if (typeDef.getClass().equals(AtlasStructDef.class)) { - AtlasStructDef structDef = (AtlasStructDef)typeDef; - - registryData.structDefs.removeTypeDefByName(name); - registryData.structDefs.addType(structDef, new AtlasStructType(structDef)); - } else if (typeDef.getClass().equals(AtlasClassificationDef.class)) { - AtlasClassificationDef classificationDef = (AtlasClassificationDef)typeDef; - - registryData.classificationDefs.removeTypeDefByName(name); - registryData.classificationDefs.addType(classificationDef, - new AtlasClassificationType(classificationDef)); - } else if (typeDef.getClass().equals(AtlasEntityDef.class)) { - AtlasEntityDef entityDef = (AtlasEntityDef)typeDef; - - registryData.entityDefs.removeTypeDefByName(name); - registryData.entityDefs.addType(entityDef, new AtlasEntityType(entityDef)); + if (name != null && typeDef != null) { + if (typeDef.getClass().equals(AtlasEnumDef.class)) { + AtlasEnumDef enumDef = (AtlasEnumDef) typeDef; + + registryData.enumDefs.removeTypeDefByName(name); + registryData.enumDefs.addType(enumDef, new AtlasEnumType(enumDef)); + } else if (typeDef.getClass().equals(AtlasStructDef.class)) { + AtlasStructDef structDef = (AtlasStructDef) typeDef; + + registryData.structDefs.removeTypeDefByName(name); + registryData.structDefs.addType(structDef, new AtlasStructType(structDef)); + } else if (typeDef.getClass().equals(AtlasClassificationDef.class)) { + AtlasClassificationDef classificationDef = (AtlasClassificationDef) typeDef; + + registryData.classificationDefs.removeTypeDefByName(name); + registryData.classificationDefs.addType(classificationDef, + new AtlasClassificationType(classificationDef)); + } else if (typeDef.getClass().equals(AtlasEntityDef.class)) { + AtlasEntityDef entityDef = (AtlasEntityDef) typeDef; + + registryData.entityDefs.removeTypeDefByName(name); + registryData.entityDefs.addType(entityDef, new AtlasEntityType(entityDef)); + } + + updatedTypes.add(typeDef); } if (LOG.isDebugEnabled()) { http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/f3bbdc15/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java b/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java index 1f8affe..c773bac 100644 --- a/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java +++ b/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java @@ -26,8 +26,14 @@ import org.apache.atlas.typesystem.exception.SchemaNotFoundException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.List; + public class GraphTransactionInterceptor implements MethodInterceptor { private static final Logger LOG = LoggerFactory.getLogger(GraphTransactionInterceptor.class); + + private static final ThreadLocal<List<PostTransactionHook>> postTransactionHooks = new ThreadLocal<>(); + private AtlasGraph graph; @Override @@ -37,19 +43,38 @@ public class GraphTransactionInterceptor implements MethodInterceptor { graph = AtlasGraphProvider.getGraphInstance(); } + boolean isSuccess = false; + try { - Object response = invocation.proceed(); - graph.commit(); - LOG.info("graph commit"); - return response; - } catch (Throwable t) { - if (logException(t)) { - LOG.error("graph rollback due to exception ", t); - } else { - LOG.error("graph rollback due to exception " + t.getClass().getSimpleName() + ":" + t.getMessage()); + try { + Object response = invocation.proceed(); + graph.commit(); + isSuccess = true; + LOG.info("graph commit"); + return response; + } catch (Throwable t) { + if (logException(t)) { + LOG.error("graph rollback due to exception ", t); + } else { + LOG.error("graph rollback due to exception " + t.getClass().getSimpleName() + ":" + t.getMessage()); + } + graph.rollback(); + throw t; + } + } finally { + List<PostTransactionHook> trxHooks = postTransactionHooks.get(); + + if (trxHooks != null) { + postTransactionHooks.remove(); + + for (PostTransactionHook trxHook : trxHooks) { + try { + trxHook.onComplete(isSuccess); + } catch (Throwable t) { + LOG.error("postTransactionHook failed", t); + } + } } - graph.rollback(); - throw t; } } @@ -59,4 +84,19 @@ public class GraphTransactionInterceptor implements MethodInterceptor { } return true; } + + public static abstract class PostTransactionHook { + protected PostTransactionHook() { + List<PostTransactionHook> trxHooks = postTransactionHooks.get(); + + if (trxHooks == null) { + trxHooks = new ArrayList<>(); + postTransactionHooks.set(trxHooks); + } + + trxHooks.add(this); + } + + public abstract void onComplete(boolean isSuccess); + } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/f3bbdc15/repository/src/main/java/org/apache/atlas/RepositoryMetadataModule.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/RepositoryMetadataModule.java b/repository/src/main/java/org/apache/atlas/RepositoryMetadataModule.java index 0325c80..cd44318 100755 --- a/repository/src/main/java/org/apache/atlas/RepositoryMetadataModule.java +++ b/repository/src/main/java/org/apache/atlas/RepositoryMetadataModule.java @@ -48,6 +48,7 @@ import org.apache.atlas.services.IBootstrapTypesRegistrar; import org.apache.atlas.services.MetadataService; import org.apache.atlas.services.ReservedTypesRegistrar; import org.apache.atlas.store.AtlasTypeDefStore; +import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.typesystem.types.TypeSystem; import org.apache.atlas.typesystem.types.TypeSystemProvider; import org.apache.atlas.typesystem.types.cache.TypeCache; @@ -71,6 +72,7 @@ public class RepositoryMetadataModule extends com.google.inject.AbstractModule { // bind the ITypeStore interface to an implementation bind(ITypeStore.class).to(GraphBackedTypeStore.class).asEagerSingleton(); bind(AtlasTypeDefStore.class).to(AtlasTypeDefGraphStoreV1.class).asEagerSingleton(); + bind(AtlasTypeRegistry.class).asEagerSingleton(); //GraphBackedSearchIndexer must be an eager singleton to force the search index creation to happen before //we try to restore the type system (otherwise we'll end up running queries http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/f3bbdc15/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java index 3c7f63b..67b5362 100755 --- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java +++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java @@ -285,7 +285,7 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang } else if (isEnumType(atlasType)) { createIndexes(management, propertyName, String.class, isUnique, cardinality, false, isIndexable); } else if (isStructType(atlasType)) { - AtlasStructDef structDef = typeRegistry.getStructDefByName(attributeDef.getName()); + AtlasStructDef structDef = typeRegistry.getStructDefByName(attribTypeName); updateIndexForTypeDef(management, structDef); } } catch (AtlasBaseException e) { http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/f3bbdc15/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStore.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStore.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStore.java index 68d2781..2163e01 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStore.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasTypeDefGraphStore.java @@ -17,18 +17,15 @@ */ package org.apache.atlas.repository.store.graph; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; - import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.AtlasException; import org.apache.atlas.GraphTransaction; +import org.apache.atlas.GraphTransactionInterceptor; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.listener.ActiveStateChangeHandler; import org.apache.atlas.listener.ChangedTypeDefs; import org.apache.atlas.listener.TypeDefChangeListener; import org.apache.atlas.model.SearchFilter; -import org.apache.atlas.model.typedef.AtlasBaseTypeDef; import org.apache.atlas.model.typedef.AtlasClassificationDef; import org.apache.atlas.model.typedef.AtlasClassificationDef.AtlasClassificationDefs; import org.apache.atlas.model.typedef.AtlasEntityDef; @@ -49,7 +46,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -106,9 +102,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ ttr.updateGuid(ret.getName(), ret.getGuid()); - notifyListeners(TypeDefChangeType.CREATE, Arrays.asList(ret)); - - typeRegistry.commitTransientTypeRegistry(ttr); + updateTypeRegistryPostCommit(ttr); return ret; } @@ -155,9 +149,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ AtlasEnumDef ret = getEnumDefStore(ttr).updateByName(name, enumDef); - notifyListeners(TypeDefChangeType.UPDATE, Arrays.asList(ret)); - - typeRegistry.commitTransientTypeRegistry(ttr); + updateTypeRegistryPostCommit(ttr); return ret; } @@ -171,9 +163,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ AtlasEnumDef ret = getEnumDefStore(ttr).updateByGuid(guid, enumDef); - notifyListeners(TypeDefChangeType.UPDATE, Arrays.asList(ret)); - - typeRegistry.commitTransientTypeRegistry(ttr); + updateTypeRegistryPostCommit(ttr); return ret; } @@ -189,9 +179,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ getEnumDefStore(ttr).deleteByName(name); - notifyListeners(TypeDefChangeType.DELETE, Lists.newArrayList(byName)); - - typeRegistry.commitTransientTypeRegistry(ttr); + updateTypeRegistryPostCommit(ttr); } @Override @@ -205,9 +193,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ getEnumDefStore(ttr).deleteByGuid(guid); - notifyListeners(TypeDefChangeType.DELETE, Lists.newArrayList(byGuid)); - - typeRegistry.commitTransientTypeRegistry(ttr); + updateTypeRegistryPostCommit(ttr); } @Override @@ -231,9 +217,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ ttr.updateGuid(ret.getName(), ret.getGuid()); - notifyListeners(TypeDefChangeType.CREATE, Arrays.asList(ret)); - - typeRegistry.commitTransientTypeRegistry(ttr); + updateTypeRegistryPostCommit(ttr); return ret; } @@ -280,9 +264,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ AtlasStructDef ret = getStructDefStore(ttr).updateByName(name, structDef); - notifyListeners(TypeDefChangeType.UPDATE, Arrays.asList(ret)); - - typeRegistry.commitTransientTypeRegistry(ttr); + updateTypeRegistryPostCommit(ttr); return ret; } @@ -296,9 +278,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ AtlasStructDef ret = getStructDefStore(ttr).updateByGuid(guid, structDef); - notifyListeners(TypeDefChangeType.UPDATE, Arrays.asList(ret)); - - typeRegistry.commitTransientTypeRegistry(ttr); + updateTypeRegistryPostCommit(ttr); return ret; } @@ -314,9 +294,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ getStructDefStore(ttr).deleteByName(name, null); - notifyListeners(TypeDefChangeType.DELETE, Lists.newArrayList(byName)); - - typeRegistry.commitTransientTypeRegistry(ttr); + updateTypeRegistryPostCommit(ttr); } @Override @@ -330,9 +308,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ getStructDefStore(ttr).deleteByGuid(guid, null); - notifyListeners(TypeDefChangeType.DELETE, Lists.newArrayList(byGuid)); - - typeRegistry.commitTransientTypeRegistry(ttr); + updateTypeRegistryPostCommit(ttr); } @Override @@ -357,9 +333,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ ttr.updateGuid(ret.getName(), ret.getGuid()); - notifyListeners(TypeDefChangeType.CREATE, Arrays.asList(ret)); - - typeRegistry.commitTransientTypeRegistry(ttr); + updateTypeRegistryPostCommit(ttr); return ret; } @@ -408,9 +382,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ AtlasClassificationDef ret = getClassificationDefStore(ttr).updateByName(name, classificationDef); - notifyListeners(TypeDefChangeType.UPDATE, Arrays.asList(ret)); - - typeRegistry.commitTransientTypeRegistry(ttr); + updateTypeRegistryPostCommit(ttr); return ret; } @@ -425,9 +397,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ AtlasClassificationDef ret = getClassificationDefStore(ttr).updateByGuid(guid, classificationDef); - notifyListeners(TypeDefChangeType.UPDATE, Arrays.asList(ret)); - - typeRegistry.commitTransientTypeRegistry(ttr); + updateTypeRegistryPostCommit(ttr); return ret; } @@ -443,9 +413,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ getClassificationDefStore(ttr).deleteByName(name, null); - notifyListeners(TypeDefChangeType.DELETE, Lists.newArrayList(byName)); - - typeRegistry.commitTransientTypeRegistry(ttr); + updateTypeRegistryPostCommit(ttr); } @Override @@ -459,9 +427,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ getClassificationDefStore(ttr).deleteByGuid(guid, null); - notifyListeners(TypeDefChangeType.DELETE, Lists.newArrayList(byGuid)); - - typeRegistry.commitTransientTypeRegistry(ttr); + updateTypeRegistryPostCommit(ttr); } @Override @@ -485,9 +451,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ ttr.updateGuid(ret.getName(), ret.getGuid()); - notifyListeners(TypeDefChangeType.CREATE, Arrays.asList(ret)); - - typeRegistry.commitTransientTypeRegistry(ttr); + updateTypeRegistryPostCommit(ttr); return ret; } @@ -534,9 +498,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ AtlasEntityDef ret = getEntityDefStore(ttr).updateByName(name, entityDef); - notifyListeners(TypeDefChangeType.UPDATE, Arrays.asList(ret)); - - typeRegistry.commitTransientTypeRegistry(ttr); + updateTypeRegistryPostCommit(ttr); return ret; } @@ -550,9 +512,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ AtlasEntityDef ret = getEntityDefStore(ttr).updateByGuid(guid, entityDef); - notifyListeners(TypeDefChangeType.UPDATE, Arrays.asList(ret)); - - typeRegistry.commitTransientTypeRegistry(ttr); + updateTypeRegistryPostCommit(ttr); return ret; } @@ -568,9 +528,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ getEntityDefStore(ttr).deleteByName(name, null); - notifyListeners(TypeDefChangeType.DELETE, Lists.newArrayList(byName)); - - typeRegistry.commitTransientTypeRegistry(ttr); + updateTypeRegistryPostCommit(ttr); } @Override @@ -584,9 +542,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ getEntityDefStore(ttr).deleteByGuid(guid, null); - notifyListeners(TypeDefChangeType.DELETE, Lists.newArrayList(byGuid)); - - typeRegistry.commitTransientTypeRegistry(ttr); + updateTypeRegistryPostCommit(ttr); } @Override @@ -689,18 +645,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ } } - List<AtlasBaseTypeDef> createdTypeDefs = new ArrayList<>(); - createdTypeDefs.addAll(ret.getEnumDefs()); - createdTypeDefs.addAll(ret.getStructDefs()); - createdTypeDefs.addAll(ret.getClassificationDefs()); - createdTypeDefs.addAll(ret.getEntityDefs()); - - ChangedTypeDefs changedTypeDefs = new ChangedTypeDefs(); - changedTypeDefs.setCreateTypeDefs(createdTypeDefs); - - notifyListeners(changedTypeDefs); - - typeRegistry.commitTransientTypeRegistry(ttr); + updateTypeRegistryPostCommit(ttr); if (LOG.isDebugEnabled()) { LOG.debug("<== AtlasTypeDefGraphStore.createTypesDef(enums={}, structs={}, classfications={}, entities={})", @@ -759,18 +704,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ } } - List<AtlasBaseTypeDef> updatedTypeDefs = new ArrayList<>(); - updatedTypeDefs.addAll(ret.getEnumDefs()); - updatedTypeDefs.addAll(ret.getStructDefs()); - updatedTypeDefs.addAll(ret.getClassificationDefs()); - updatedTypeDefs.addAll(ret.getEntityDefs()); - - ChangedTypeDefs changedTypeDefs = new ChangedTypeDefs(); - changedTypeDefs.setUpdatedTypeDefs(updatedTypeDefs); - - notifyListeners(changedTypeDefs); - - typeRegistry.commitTransientTypeRegistry(ttr); + updateTypeRegistryPostCommit(ttr); if (LOG.isDebugEnabled()) { LOG.debug("<== AtlasTypeDefGraphStore.updateTypesDef(enums={}, structs={}, classfications={}, entities={})", @@ -884,12 +818,7 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ } } - Iterable<AtlasBaseTypeDef> deleted = Iterables.concat(typesDef.getEnumDefs(), typesDef.getClassificationDefs(), - typesDef.getClassificationDefs(), typesDef.getEntityDefs()); - - notifyListeners(TypeDefChangeType.DELETE, Lists.newArrayList(deleted)); - - typeRegistry.commitTransientTypeRegistry(ttr); + updateTypeRegistryPostCommit(ttr); if (LOG.isDebugEnabled()) { LOG.debug("<== AtlasTypeDefGraphStore.deleteTypesDef(enums={}, structs={}, classfications={}, entities={})", @@ -957,38 +886,50 @@ public abstract class AtlasTypeDefGraphStore implements AtlasTypeDefStore, Activ LOG.info("Not reacting to a Passive state change"); } - private void notifyListeners(TypeDefChangeType type, List<? extends AtlasBaseTypeDef> typeDefs) - throws AtlasBaseException { - ChangedTypeDefs changedTypeDefs = new ChangedTypeDefs(); - switch (type) { - case CREATE: - changedTypeDefs.setCreateTypeDefs(typeDefs); - break; - case UPDATE: - changedTypeDefs.setUpdatedTypeDefs(typeDefs); - break; - case DELETE: - changedTypeDefs.setDeletedTypeDefs(typeDefs); - break; - } - - notifyListeners(changedTypeDefs); - } - - private void notifyListeners(ChangedTypeDefs changedTypeDefs) throws AtlasBaseException { - if (CollectionUtils.isNotEmpty(typeDefChangeListeners)) { - for (TypeDefChangeListener changeListener : typeDefChangeListeners) { - try { - changeListener.onChange(changedTypeDefs); - } catch (AtlasBaseException e) { - LOG.error("OnChange failed for listener {}", changeListener.getClass().getName()); - throw e; - } + private void updateTypeRegistryPostCommit(AtlasTransientTypeRegistry ttr) { + new TypeRegistryUpdateHook(ttr); + } + + private class TypeRegistryUpdateHook extends GraphTransactionInterceptor.PostTransactionHook { + private final AtlasTransientTypeRegistry ttr; + + private TypeRegistryUpdateHook(AtlasTransientTypeRegistry ttr) { + super(); + + this.ttr = ttr; + } + + @Override + public void onComplete(boolean isSuccess) { + if (LOG.isDebugEnabled()) { + LOG.debug("==> TypeRegistryUpdateHook.onComplete({})", isSuccess); + } + + if (isSuccess) { + typeRegistry.commitTransientTypeRegistry(ttr); + + notifyListeners(ttr); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("<== TypeRegistryUpdateHook.onComplete({})", isSuccess); } } - } - private enum TypeDefChangeType { - CREATE, UPDATE, DELETE + private void notifyListeners(AtlasTransientTypeRegistry ttr) { + if (CollectionUtils.isNotEmpty(typeDefChangeListeners)) { + ChangedTypeDefs changedTypeDefs = new ChangedTypeDefs(ttr.getAddedTypes(), + ttr.getUpdatedTypes(), + ttr.getDeleteedTypes()); + + for (TypeDefChangeListener changeListener : typeDefChangeListeners) { + try { + changeListener.onChange(changedTypeDefs); + } catch (Throwable t) { + LOG.error("OnChange failed for listener {}", changeListener.getClass().getName(), t); + } + } + } + } } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/f3bbdc15/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasTypeDefGraphStoreV1.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasTypeDefGraphStoreV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasTypeDefGraphStoreV1.java index 878f355..73b64a3 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasTypeDefGraphStoreV1.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasTypeDefGraphStoreV1.java @@ -110,9 +110,9 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore { LOG.info("<== AtlasTypeDefGraphStoreV1.init()"); } - public AtlasGraph getAtlasGraph() { return atlasGraph; } + AtlasGraph getAtlasGraph() { return atlasGraph; } - public AtlasVertex findTypeVertexByName(String typeName) { + AtlasVertex findTypeVertexByName(String typeName) { Iterator results = atlasGraph.query().has(VERTEX_TYPE_PROPERTY_KEY, VERTEX_TYPE) .has(Constants.TYPENAME_PROPERTY_KEY, typeName) .vertices().iterator(); @@ -122,7 +122,7 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore { return ret; } - public AtlasVertex findTypeVertexByNameAndCategory(String typeName, TypeCategory category) { + AtlasVertex findTypeVertexByNameAndCategory(String typeName, TypeCategory category) { Iterator results = atlasGraph.query().has(VERTEX_TYPE_PROPERTY_KEY, VERTEX_TYPE) .has(Constants.TYPENAME_PROPERTY_KEY, typeName) .has(TYPE_CATEGORY_PROPERTY_KEY, category) @@ -133,7 +133,7 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore { return ret; } - public AtlasVertex findTypeVertexByGuid(String typeGuid) { + AtlasVertex findTypeVertexByGuid(String typeGuid) { Iterator<AtlasVertex> vertices = atlasGraph.query().has(VERTEX_TYPE_PROPERTY_KEY, VERTEX_TYPE) .has(Constants.GUID_PROPERTY_KEY, typeGuid) .vertices().iterator(); @@ -143,7 +143,7 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore { return ret; } - public AtlasVertex findTypeVertexByGuidAndCategory(String typeGuid, TypeCategory category) { + AtlasVertex findTypeVertexByGuidAndCategory(String typeGuid, TypeCategory category) { Iterator<AtlasVertex> vertices = atlasGraph.query().has(VERTEX_TYPE_PROPERTY_KEY, VERTEX_TYPE) .has(Constants.GUID_PROPERTY_KEY, typeGuid) .has(TYPE_CATEGORY_PROPERTY_KEY, category) @@ -154,7 +154,7 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore { return ret; } - public Iterator<AtlasVertex> findTypeVerticesByCategory(TypeCategory category) { + Iterator<AtlasVertex> findTypeVerticesByCategory(TypeCategory category) { Iterator<AtlasVertex> ret = atlasGraph.query().has(VERTEX_TYPE_PROPERTY_KEY, VERTEX_TYPE) .has(TYPE_CATEGORY_PROPERTY_KEY, category) .vertices().iterator(); @@ -162,7 +162,7 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore { return ret; } - public AtlasVertex createTypeVertex(AtlasBaseTypeDef typeDef) { + AtlasVertex createTypeVertex(AtlasBaseTypeDef typeDef) { // Validate all the required checks Preconditions.checkArgument(StringUtils.isNotBlank(typeDef.getName()), "Type name can't be null/empty"); @@ -203,7 +203,7 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore { return ret; } - public void updateTypeVertex(AtlasBaseTypeDef typeDef, AtlasVertex vertex) { + void updateTypeVertex(AtlasBaseTypeDef typeDef, AtlasVertex vertex) { if (!isTypeVertex(vertex)) { LOG.warn("updateTypeVertex(): not a type-vertex - {}", vertex); @@ -223,7 +223,7 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore { markVertexUpdated(vertex); } - public void deleteTypeVertexOutEdges(AtlasVertex vertex) throws AtlasBaseException { + void deleteTypeVertexOutEdges(AtlasVertex vertex) throws AtlasBaseException { Iterable<AtlasEdge> edges = vertex.getEdges(AtlasEdgeDirection.OUT); for (AtlasEdge edge : edges) { @@ -231,7 +231,7 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore { } } - public void deleteTypeVertex(AtlasVertex vertex) throws AtlasBaseException { + void deleteTypeVertex(AtlasVertex vertex) throws AtlasBaseException { Iterator<AtlasEdge> inEdges = vertex.getEdges(AtlasEdgeDirection.IN).iterator(); if (inEdges.hasNext()) { @@ -247,7 +247,7 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore { atlasGraph.removeVertex(vertex); } - public void vertexToTypeDef(AtlasVertex vertex, AtlasBaseTypeDef typeDef) { + void vertexToTypeDef(AtlasVertex vertex, AtlasBaseTypeDef typeDef) { String name = vertex.getProperty(Constants.TYPENAME_PROPERTY_KEY, String.class); String description = vertex.getProperty(Constants.TYPEDESCRIPTION_PROPERTY_KEY, String.class); String typeVersion = vertex.getProperty(Constants.TYPEVERSION_PROPERTY_KEY, String.class); @@ -274,7 +274,7 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore { } } - public boolean isTypeVertex(AtlasVertex vertex) { + boolean isTypeVertex(AtlasVertex vertex) { String vertexType = vertex.getProperty(Constants.VERTEX_TYPE_PROPERTY_KEY, String.class); boolean ret = VERTEX_TYPE.equals(vertexType); @@ -282,7 +282,7 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore { return ret; } - public boolean isTypeVertex(AtlasVertex vertex, TypeCategory category) { + boolean isTypeVertex(AtlasVertex vertex, TypeCategory category) { boolean ret = false; if (isTypeVertex(vertex)) { @@ -294,7 +294,7 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore { return ret; } - public boolean isTypeVertex(AtlasVertex vertex, TypeCategory[] categories) { + boolean isTypeVertex(AtlasVertex vertex, TypeCategory[] categories) { boolean ret = false; if (isTypeVertex(vertex)) { @@ -312,7 +312,7 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore { return ret; } - public AtlasEdge getOrCreateEdge(AtlasVertex outVertex, AtlasVertex inVertex, String edgeLabel) { + AtlasEdge getOrCreateEdge(AtlasVertex outVertex, AtlasVertex inVertex, String edgeLabel) { AtlasEdge ret = null; Iterable<AtlasEdge> edges = outVertex.getEdges(AtlasEdgeDirection.OUT, edgeLabel); @@ -330,13 +330,13 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore { return ret; } - public AtlasEdge addEdge(AtlasVertex outVertex, AtlasVertex inVertex, String edgeLabel) { + AtlasEdge addEdge(AtlasVertex outVertex, AtlasVertex inVertex, String edgeLabel) { AtlasEdge ret = atlasGraph.addEdge(outVertex, inVertex, edgeLabel); return ret; } - public void createSuperTypeEdges(AtlasVertex vertex, Set<String> superTypes, TypeCategory typeCategory) + void createSuperTypeEdges(AtlasVertex vertex, Set<String> superTypes, TypeCategory typeCategory) throws AtlasBaseException { Set<String> currentSuperTypes = getSuperTypeNames(vertex); @@ -355,7 +355,7 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore { } } - public Set<String> getSuperTypeNames(AtlasVertex vertex) { + Set<String> getSuperTypeNames(AtlasVertex vertex) { Set<String> ret = new HashSet<>(); Iterable<AtlasEdge> edges = vertex.getEdges(AtlasEdgeDirection.OUT, AtlasGraphUtilsV1.SUPERTYPE_EDGE_LABEL); @@ -366,7 +366,7 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore { return ret; } - private TypeCategory getTypeCategory(AtlasBaseTypeDef typeDef) { + TypeCategory getTypeCategory(AtlasBaseTypeDef typeDef) { TypeCategory ret = null; if (typeDef instanceof AtlasEntityDef) {
