Repository: incubator-atlas Updated Branches: refs/heads/master b6b6f9450 -> ea38942ba
ATLAS-1563: Entity change listener invocation in V2 Store Signed-off-by: Madhan Neethiraj <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/ea38942b Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/ea38942b Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/ea38942b Branch: refs/heads/master Commit: ea38942baa73774f35004d36fc1805a2368c2ace Parents: b6b6f94 Author: apoorvnaik <[email protected]> Authored: Fri Feb 17 17:21:37 2017 -0800 Committer: Madhan Neethiraj <[email protected]> Committed: Fri Feb 17 23:58:25 2017 -0800 ---------------------------------------------------------------------- .../converters/AtlasInstanceConverter.java | 15 ++- .../graph/v1/AtlasEntityChangeNotifier.java | 125 +++++++++++++++++++ .../store/graph/v1/AtlasEntityStoreV1.java | 43 +++++-- .../graph/v1/AtlasDeleteHandlerV1Test.java | 8 +- .../store/graph/v1/AtlasEntityStoreV1Test.java | 5 +- .../atlas/listener/EntityChangeListener.java | 1 + .../org/apache/atlas/web/rest/EntityREST.java | 12 +- 7 files changed, 185 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ea38942b/repository/src/main/java/org/apache/atlas/repository/converters/AtlasInstanceConverter.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/converters/AtlasInstanceConverter.java b/repository/src/main/java/org/apache/atlas/repository/converters/AtlasInstanceConverter.java index 9d475bf..e14fafb 100644 --- a/repository/src/main/java/org/apache/atlas/repository/converters/AtlasInstanceConverter.java +++ b/repository/src/main/java/org/apache/atlas/repository/converters/AtlasInstanceConverter.java @@ -76,17 +76,24 @@ public class AtlasInstanceConverter { Iterator<AtlasEntity> entityIterator = entities.iterator(); for (int i = 0; i < entities.size(); i++) { - ITypedReferenceableInstance typedInstance = getITypedReferenceable(entityIterator.next(), ctx); + ITypedReferenceableInstance typedInstance = getITypedReferenceable(entityIterator.next()); entitiesInOldFormat[i] = typedInstance; } return entitiesInOldFormat; } - public ITypedReferenceableInstance getITypedReferenceable(AtlasEntity entity, AtlasFormatConverter.ConverterContext ctx) throws AtlasBaseException { - Referenceable ref = getReferenceable(entity, ctx); + public ITypedReferenceableInstance getITypedReferenceable(AtlasEntity entity) throws AtlasBaseException { + try { + return metadataService.getEntityDefinition(entity.getGuid()); + } catch (AtlasException e) { + LOG.error("Exception while getting a typed reference for the entity ", e); + throw toAtlasBaseException(e); + } + } + public ITypedReferenceableInstance getITypedReferenceable(String guid) throws AtlasBaseException { try { - return metadataService.getTypedReferenceableInstance(ref); + return metadataService.getEntityDefinition(guid); } catch (AtlasException e) { LOG.error("Exception while getting a typed reference for the entity ", e); throw toAtlasBaseException(e); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ea38942b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java new file mode 100644 index 0000000..a532f31 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.store.graph.v1; + + +import com.google.inject.Inject; +import com.google.inject.Singleton; +import org.apache.atlas.AtlasErrorCode; +import org.apache.atlas.AtlasException; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.instance.AtlasEntityHeader; +import org.apache.atlas.model.instance.EntityMutationResponse; +import org.apache.atlas.model.instance.EntityMutations.EntityOperation; +import org.apache.atlas.listener.EntityChangeListener; +import org.apache.atlas.repository.converters.AtlasInstanceConverter; +import org.apache.atlas.typesystem.ITypedReferenceableInstance; +import org.apache.commons.collections.CollectionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.CREATE; +import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.DELETE; +import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.PARTIAL_UPDATE; +import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.UPDATE; + + +@Singleton +public class AtlasEntityChangeNotifier { + private static final Logger LOG = LoggerFactory.getLogger(AtlasEntityChangeNotifier.class); + + private final Set<EntityChangeListener> entityChangeListeners; + private final AtlasInstanceConverter instanceConverter; + + @Inject + public AtlasEntityChangeNotifier(Set<EntityChangeListener> entityChangeListeners, + AtlasInstanceConverter instanceConverter) { + this.entityChangeListeners = entityChangeListeners; + this.instanceConverter = instanceConverter; + } + + public void onEntitiesMutated(EntityMutationResponse entityMutationResponse) throws AtlasBaseException { + if (CollectionUtils.isEmpty(entityChangeListeners) || instanceConverter == null) { + return; + } + + List<AtlasEntityHeader> createdEntities = entityMutationResponse.getCreatedEntities(); + List<AtlasEntityHeader> updatedEntities = entityMutationResponse.getUpdatedEntities(); + List<AtlasEntityHeader> partiallyUpdatedEntities = entityMutationResponse.getPartialUpdatedEntities(); + List<AtlasEntityHeader> deletedEntities = entityMutationResponse.getDeletedEntities(); + + if (CollectionUtils.isNotEmpty(createdEntities)) { + List<ITypedReferenceableInstance> typedRefInst = toITypedReferenceable(createdEntities); + + notifyListeners(typedRefInst, EntityOperation.CREATE); + } + + if (CollectionUtils.isNotEmpty(updatedEntities)) { + List<ITypedReferenceableInstance> typedRefInst = toITypedReferenceable(updatedEntities); + + notifyListeners(typedRefInst, EntityOperation.UPDATE); + } + + if (CollectionUtils.isNotEmpty(partiallyUpdatedEntities)) { + List<ITypedReferenceableInstance> typedRefInst = toITypedReferenceable(partiallyUpdatedEntities); + + notifyListeners(typedRefInst, EntityOperation.PARTIAL_UPDATE); + } + + if (CollectionUtils.isNotEmpty(deletedEntities)) { + List<ITypedReferenceableInstance> typedRefInst = toITypedReferenceable(deletedEntities); + + notifyListeners(typedRefInst, EntityOperation.DELETE); + } + } + + private void notifyListeners(List<ITypedReferenceableInstance> typedRefInsts, EntityOperation operation) throws AtlasBaseException { + for (EntityChangeListener listener : entityChangeListeners) { + try { + switch (operation) { + case CREATE: + listener.onEntitiesAdded(typedRefInsts); + break; + case UPDATE: + case PARTIAL_UPDATE: + listener.onEntitiesUpdated(typedRefInsts); + break; + case DELETE: + listener.onEntitiesDeleted(typedRefInsts); + break; + } + } catch (AtlasException e) { + throw new AtlasBaseException(AtlasErrorCode.NOTIFICATION_FAILED, e, operation.toString()); + } + } + } + + private List<ITypedReferenceableInstance> toITypedReferenceable(List<AtlasEntityHeader> entityHeaders) throws AtlasBaseException { + List<ITypedReferenceableInstance> ret = new ArrayList<>(entityHeaders.size()); + + for (AtlasEntityHeader entityHeader : entityHeaders) { + ret.add(instanceConverter.getITypedReferenceable(entityHeader.getGuid())); + } + + return ret; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ea38942b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java index 31a5e8c..8a6501c 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java @@ -25,9 +25,13 @@ import org.apache.atlas.GraphTransaction; import org.apache.atlas.RequestContextV1; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.impexp.AtlasImportResult; -import org.apache.atlas.model.instance.*; +import org.apache.atlas.model.instance.AtlasClassification; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasEntityHeader; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; +import org.apache.atlas.model.instance.AtlasObjectId; +import org.apache.atlas.model.instance.EntityMutationResponse; import org.apache.atlas.repository.graphdb.AtlasVertex; import org.apache.atlas.repository.store.graph.AtlasEntityStore; import org.apache.atlas.repository.store.graph.EntityGraphDiscovery; @@ -49,13 +53,15 @@ import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.*; public class AtlasEntityStoreV1 implements AtlasEntityStore { private static final Logger LOG = LoggerFactory.getLogger(AtlasEntityStoreV1.class); - private final DeleteHandlerV1 deleteHandler; - private final AtlasTypeRegistry typeRegistry; + private final DeleteHandlerV1 deleteHandler; + private final AtlasTypeRegistry typeRegistry; + private final AtlasEntityChangeNotifier entityChangeNotifier; @Inject - public AtlasEntityStoreV1(DeleteHandlerV1 deleteHandler, AtlasTypeRegistry typeRegistry) { - this.deleteHandler = deleteHandler; - this.typeRegistry = typeRegistry; + public AtlasEntityStoreV1(DeleteHandlerV1 deleteHandler, AtlasTypeRegistry typeRegistry, AtlasEntityChangeNotifier entityChangeNotifier) { + this.deleteHandler = deleteHandler; + this.typeRegistry = typeRegistry; + this.entityChangeNotifier = entityChangeNotifier; } @Override @@ -208,6 +214,9 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore { LOG.debug("<== createOrUpdate()"); } + // Notify the change listeners + entityChangeNotifier.onEntitiesMutated(ret); + return ret; } @@ -252,7 +261,12 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore { Collection<AtlasVertex> deletionCandidates = new ArrayList<>(); deletionCandidates.add(vertex); - return deleteVertices(deletionCandidates); + EntityMutationResponse ret = deleteVertices(deletionCandidates); + + // Notify the change listeners + entityChangeNotifier.onEntitiesMutated(ret); + + return ret; } @Override @@ -281,7 +295,13 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore { if (deletionCandidates.isEmpty()) { LOG.info("No deletion candidate entities were found for guids %s", guids); } - return deleteVertices(deletionCandidates); + + EntityMutationResponse ret = deleteVertices(deletionCandidates); + + // Notify the change listeners + entityChangeNotifier.onEntitiesMutated(ret); + + return ret; } @Override @@ -297,7 +317,12 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore { Collection<AtlasVertex> deletionCandidates = new ArrayList<>(); deletionCandidates.add(vertex); - return deleteVertices(deletionCandidates); + EntityMutationResponse ret = deleteVertices(deletionCandidates); + + // Notify the change listeners + entityChangeNotifier.onEntitiesMutated(ret); + + return ret; } @Override http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ea38942b/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasDeleteHandlerV1Test.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasDeleteHandlerV1Test.java b/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasDeleteHandlerV1Test.java index 492abc4..c55e3f7 100644 --- a/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasDeleteHandlerV1Test.java +++ b/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasDeleteHandlerV1Test.java @@ -19,7 +19,6 @@ package org.apache.atlas.repository.store.graph.v1; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; -import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasException; import org.apache.atlas.RepositoryMetadataModule; import org.apache.atlas.RequestContextV1; @@ -59,7 +58,6 @@ import org.apache.atlas.typesystem.persistence.Id; import org.apache.atlas.typesystem.types.Multiplicity; import org.apache.atlas.typesystem.types.TraitType; import org.apache.atlas.typesystem.types.TypeSystem; -import org.apache.atlas.util.AtlasRepositoryConfiguration; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -69,7 +67,6 @@ import org.testng.annotations.Test; import javax.inject.Inject; -import java.lang.reflect.Constructor; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -82,6 +79,7 @@ import static org.apache.atlas.TestUtils.COLUMN_TYPE; import static org.apache.atlas.TestUtils.DEPARTMENT_TYPE; import static org.apache.atlas.TestUtils.NAME; import static org.apache.atlas.TestUtils.TABLE_TYPE; +import static org.mockito.Mockito.mock; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertTrue; @@ -107,6 +105,8 @@ public abstract class AtlasDeleteHandlerV1Test { private TypeSystem typeSystem = TypeSystem.getInstance(); + AtlasEntityChangeNotifier mockChangeNotifier = mock(AtlasEntityChangeNotifier.class); + @BeforeClass public void setUp() throws Exception { @@ -145,7 +145,7 @@ public abstract class AtlasDeleteHandlerV1Test { @BeforeTest public void init() throws Exception { DeleteHandlerV1 deleteHandler = getDeleteHandler(typeRegistry); - entityStore = new AtlasEntityStoreV1(deleteHandler, typeRegistry); + entityStore = new AtlasEntityStoreV1(deleteHandler, typeRegistry, mockChangeNotifier); RequestContextV1.clear(); } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ea38942b/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1Test.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1Test.java b/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1Test.java index dd82cb2..7f76236 100644 --- a/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1Test.java +++ b/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1Test.java @@ -73,6 +73,7 @@ import static org.apache.atlas.TestUtils.COLUMN_TYPE; import static org.apache.atlas.TestUtils.NAME; import static org.apache.atlas.TestUtils.randomString; import static org.apache.atlas.TestUtilsV2.TABLE_TYPE; +import static org.mockito.Mockito.mock; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; @@ -98,6 +99,8 @@ public class AtlasEntityStoreV1Test { private AtlasEntityWithExtInfo dbEntity; private AtlasEntityWithExtInfo tblEntity; + AtlasEntityChangeNotifier mockChangeNotifier = mock(AtlasEntityChangeNotifier.class); + @BeforeClass public void setUp() throws Exception { @@ -128,7 +131,7 @@ public class AtlasEntityStoreV1Test { @BeforeTest public void init() throws Exception { - entityStore = new AtlasEntityStoreV1(deleteHandler, typeRegistry); + entityStore = new AtlasEntityStoreV1(deleteHandler, typeRegistry, mockChangeNotifier); RequestContextV1.clear(); } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ea38942b/server-api/src/main/java/org/apache/atlas/listener/EntityChangeListener.java ---------------------------------------------------------------------- diff --git a/server-api/src/main/java/org/apache/atlas/listener/EntityChangeListener.java b/server-api/src/main/java/org/apache/atlas/listener/EntityChangeListener.java index 4bf1d05..e9a7d1a 100644 --- a/server-api/src/main/java/org/apache/atlas/listener/EntityChangeListener.java +++ b/server-api/src/main/java/org/apache/atlas/listener/EntityChangeListener.java @@ -19,6 +19,7 @@ package org.apache.atlas.listener; import org.apache.atlas.AtlasException; +import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.typesystem.IStruct; import org.apache.atlas.typesystem.ITypedReferenceableInstance; http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/ea38942b/webapp/src/main/java/org/apache/atlas/web/rest/EntityREST.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/rest/EntityREST.java b/webapp/src/main/java/org/apache/atlas/web/rest/EntityREST.java index 2f7ba20..92ea93e 100644 --- a/webapp/src/main/java/org/apache/atlas/web/rest/EntityREST.java +++ b/webapp/src/main/java/org/apache/atlas/web/rest/EntityREST.java @@ -74,7 +74,7 @@ public class EntityREST { public static final String PREFIX_ATTR = "attr:"; private final AtlasTypeRegistry typeRegistry; - private final AtlasInstanceConverter restAdapters; + private final AtlasInstanceConverter instanceConverter; private final MetadataService metadataService; private final AtlasEntityStore entitiesStore; @@ -82,7 +82,7 @@ public class EntityREST { public EntityREST(AtlasTypeRegistry typeRegistry, AtlasInstanceConverter instanceConverter, MetadataService metadataService, AtlasEntityStore entitiesStore) { this.typeRegistry = typeRegistry; - this.restAdapters = instanceConverter; + this.instanceConverter = instanceConverter; this.metadataService = metadataService; this.entitiesStore = entitiesStore; } @@ -204,7 +204,7 @@ public class EntityREST { try { IStruct trait = metadataService.getTraitDefinition(guid, typeName); - return restAdapters.getClassification(trait); + return instanceConverter.getClassification(trait); } catch (AtlasException e) { throw toAtlasBaseException(e); @@ -231,7 +231,7 @@ public class EntityREST { List<AtlasClassification> clsList = new ArrayList<>(); for ( String traitName : metadataService.getTraitNames(guid) ) { IStruct trait = metadataService.getTraitDefinition(guid, traitName); - AtlasClassification cls = restAdapters.getClassification(trait); + AtlasClassification cls = instanceConverter.getClassification(trait); clsList.add(cls); } @@ -258,7 +258,7 @@ public class EntityREST { } for (AtlasClassification classification: classifications) { - final ITypedStruct trait = restAdapters.getTrait(classification); + final ITypedStruct trait = instanceConverter.getTrait(classification); try { metadataService.addTrait(guid, trait); } catch (IllegalArgumentException e) { @@ -378,7 +378,7 @@ public class EntityREST { throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "empty entity list"); } - final ITypedStruct trait = restAdapters.getTrait(classification); + final ITypedStruct trait = instanceConverter.getTrait(classification); try { metadataService.addTrait(entityGuids, trait);
