ATLAS-394 Fix BaseResourceIT.waitForNotification (shwethags via sumasai)
Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/2cae42c0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/2cae42c0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/2cae42c0 Branch: refs/heads/branch-0.6-incubating Commit: 2cae42c08a4122db672f3188a46b8385992ddc02 Parents: fa502b2 Author: Suma Shivaprasad <[email protected]> Authored: Fri Dec 18 16:07:57 2015 +0530 Committer: Suma Shivaprasad <[email protected]> Committed: Fri Dec 18 16:07:57 2015 +0530 ---------------------------------------------------------------------- distro/src/conf/application.properties | 1 + .../org/apache/atlas/kafka/KafkaConsumer.java | 6 ++ .../AbstractNotificationConsumer.java | 5 +- .../notification/NotificationConsumer.java | 9 +- .../notification/NotificationHookConsumer.java | 69 +++++++++------ release-log.txt | 1 + .../atlas/repository/graph/GraphHelper.java | 2 +- .../graph/TypedInstanceToGraphMapper.java | 4 + .../src/main/resources/application.properties | 1 + .../notification/EntityNotificationIT.java | 89 ++++---------------- .../NotificationHookConsumerIT.java | 8 +- .../atlas/web/resources/BaseResourceIT.java | 86 ++++++++++--------- .../web/resources/EntityJerseyResourceIT.java | 37 +++----- 13 files changed, 144 insertions(+), 174 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/2cae42c0/distro/src/conf/application.properties ---------------------------------------------------------------------- diff --git a/distro/src/conf/application.properties b/distro/src/conf/application.properties index f9888bd..cb6ee31 100755 --- a/distro/src/conf/application.properties +++ b/distro/src/conf/application.properties @@ -57,6 +57,7 @@ atlas.kafka.bootstrap.servers=localhost:9027 atlas.kafka.zookeeper.session.timeout.ms=400 atlas.kafka.zookeeper.sync.time.ms=20 atlas.kafka.auto.commit.interval.ms=1000 +atlas.kafka.auto.offset.reset=smallest atlas.kafka.hook.group.id=atlas http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/2cae42c0/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java b/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java index d4e07c0..1f05df4 100644 --- a/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java +++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java @@ -69,4 +69,10 @@ public class KafkaConsumer<T> extends AbstractNotificationConsumer<T> { consumerId, message.topic(), message.partition(), message.offset(), message.message()); return (String) message.message(); } + + @Override + protected String peekMessage() { + MessageAndMetadata message = (MessageAndMetadata) iterator.peek(); + return (String) message.message(); + } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/2cae42c0/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java b/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java index 42a4e7f..b6a9d7b 100644 --- a/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java +++ b/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java @@ -94,10 +94,11 @@ public abstract class AbstractNotificationConsumer<T> implements NotificationCon } @Override - public void remove() { - throw new UnsupportedOperationException("The remove method is not supported."); + public T peek() { + return GSON.fromJson(peekMessage(), type); } + protected abstract String peekMessage(); // ----- inner class : ImmutableListDeserializer --------------------------- http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/2cae42c0/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java b/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java index 346ec3e..d2da975 100644 --- a/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java +++ b/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java @@ -17,8 +17,11 @@ package org.apache.atlas.notification; -import java.util.Iterator; - // TODO : docs! -public interface NotificationConsumer<T> extends Iterator<T>{ +public interface NotificationConsumer<T>{ + boolean hasNext(); + + T next(); + + T peek(); } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/2cae42c0/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java index e02aafa..3352cd0 100644 --- a/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java +++ b/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java @@ -19,10 +19,10 @@ package org.apache.atlas.notification; import com.google.inject.Inject; import com.google.inject.Singleton; +import kafka.consumer.ConsumerTimeoutException; import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasException; -import org.apache.atlas.AtlasServiceException; import org.apache.atlas.notification.hook.HookNotification; import org.apache.atlas.service.Service; import org.apache.commons.configuration.Configuration; @@ -98,6 +98,14 @@ public class NotificationHookConsumer implements Service { this.consumer = consumer; } + private boolean hasNext() { + try { + return consumer.hasNext(); + } catch(ConsumerTimeoutException e) { + return false; + } + } + @Override public void run() { @@ -105,34 +113,39 @@ public class NotificationHookConsumer implements Service { return; } - while(consumer.hasNext()) { - HookNotification.HookNotificationMessage message = consumer.next(); - + while(true) { try { - switch (message.getType()) { - case ENTITY_CREATE: - HookNotification.EntityCreateRequest createRequest = - (HookNotification.EntityCreateRequest) message; - atlasClient.createEntity(createRequest.getEntities()); - break; - - case ENTITY_PARTIAL_UPDATE: - HookNotification.EntityPartialUpdateRequest partialUpdateRequest = - (HookNotification.EntityPartialUpdateRequest) message; - atlasClient.updateEntity(partialUpdateRequest.getTypeName(), - partialUpdateRequest.getAttribute(), partialUpdateRequest.getAttributeValue(), - partialUpdateRequest.getEntity()); - break; - - case ENTITY_FULL_UPDATE: - HookNotification.EntityUpdateRequest updateRequest = - (HookNotification.EntityUpdateRequest) message; - atlasClient.updateEntities(updateRequest.getEntities()); - break; + if (hasNext()) { + HookNotification.HookNotificationMessage message = consumer.next(); + try { + switch (message.getType()) { + case ENTITY_CREATE: + HookNotification.EntityCreateRequest createRequest = + (HookNotification.EntityCreateRequest) message; + atlasClient.createEntity(createRequest.getEntities()); + break; + + case ENTITY_PARTIAL_UPDATE: + HookNotification.EntityPartialUpdateRequest partialUpdateRequest = + (HookNotification.EntityPartialUpdateRequest) message; + atlasClient.updateEntity(partialUpdateRequest.getTypeName(), + partialUpdateRequest.getAttribute(), partialUpdateRequest.getAttributeValue(), + partialUpdateRequest.getEntity()); + break; + + case ENTITY_FULL_UPDATE: + HookNotification.EntityUpdateRequest updateRequest = + (HookNotification.EntityUpdateRequest) message; + atlasClient.updateEntities(updateRequest.getEntities()); + break; + } + } catch (Exception e) { + //todo handle failures + LOG.warn("Error handling message {}", message, e); + } } - } catch (Exception e) { - //todo handle failures - LOG.debug("Error handling message {}", message, e); + } catch(Throwable t) { + LOG.warn("Failure in NotificationHookConsumer", t); } } } @@ -150,7 +163,7 @@ public class NotificationHookConsumer implements Service { return false; } } - } catch (AtlasServiceException e) { + } catch (Throwable e) { LOG.info( "Handled AtlasServiceException while waiting for Atlas Server to become ready, " + "exiting consumer thread.", e); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/2cae42c0/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 7d45a7b..0588ff9 100644 --- a/release-log.txt +++ b/release-log.txt @@ -14,6 +14,7 @@ ATLAS-54 Rename configs in hive hook (shwethags) ATLAS-3 Mixed Index creation fails with Date types (sumasai via shwethags) ALL CHANGES: +ATLAS-394 Fix BaseResourceIT.waitForNotification (shwethags via sumasai) ATLAS-385 Support for Lineage for entities with SuperType as DataSet (anilsg via sumasai) ATLAS-342 Atlas is sending an ENTITY_CREATE event to the ATLAS_ENTITIES topic even if the entity exists already (shwethags) ATLAS-386 Handle hive rename Table (shwethags) http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/2cae42c0/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java index 6b2d5d1..9955f07 100755 --- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java +++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java @@ -252,7 +252,7 @@ public final class GraphHelper { */ public Vertex getVertexForInstanceByUniqueAttribute(ClassType classType, IReferenceableInstance instance) throws AtlasException { - + LOG.debug("Checking if there is an instance with the same unique attributes for instance {}", instance); Vertex result = null; for (AttributeInfo attributeInfo : classType.fieldMapping().fields.values()) { if (attributeInfo.isUnique) { http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/2cae42c0/repository/src/main/java/org/apache/atlas/repository/graph/TypedInstanceToGraphMapper.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/TypedInstanceToGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/graph/TypedInstanceToGraphMapper.java index 996f31b..2f3eb30 100644 --- a/repository/src/main/java/org/apache/atlas/repository/graph/TypedInstanceToGraphMapper.java +++ b/repository/src/main/java/org/apache/atlas/repository/graph/TypedInstanceToGraphMapper.java @@ -231,12 +231,14 @@ public final class TypedInstanceToGraphMapper { List<ITypedReferenceableInstance> instancesToUpdate = new ArrayList<>(); for (IReferenceableInstance instance : instances) { + LOG.debug("Discovering instance to create/update for {}", instance); ITypedReferenceableInstance newInstance; Id id = instance.getId(); if (!idToVertexMap.containsKey(id)) { Vertex instanceVertex; if (id.isAssigned()) { // has a GUID + LOG.debug("Instance {} has an assigned id", instance.getId()._getId()); instanceVertex = graphHelper.getVertexForGUID(id.id); if (!(instance instanceof ReferenceableInstance)) { throw new IllegalStateException( @@ -252,6 +254,7 @@ public final class TypedInstanceToGraphMapper { //no entity with the given unique attribute, create new if (instanceVertex == null) { + LOG.debug("Creating new vertex for instance {}", instance); newInstance = classType.convert(instance, Multiplicity.REQUIRED); instanceVertex = graphHelper.createVertexWithIdentity(newInstance, classType.getAllSuperTypeNames()); instancesToCreate.add(newInstance); @@ -260,6 +263,7 @@ public final class TypedInstanceToGraphMapper { mapInstanceToVertex(newInstance, instanceVertex, classType.fieldMapping().fields, true, Operation.CREATE); } else { + LOG.debug("Re-using existing vertex {} for instance {}", instanceVertex.getId(), instance); if (!(instance instanceof ReferenceableInstance)) { throw new IllegalStateException( String.format("%s is not of type ITypedReferenceableInstance", instance)); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/2cae42c0/typesystem/src/main/resources/application.properties ---------------------------------------------------------------------- diff --git a/typesystem/src/main/resources/application.properties b/typesystem/src/main/resources/application.properties index d475d7e..702c6f2 100644 --- a/typesystem/src/main/resources/application.properties +++ b/typesystem/src/main/resources/application.properties @@ -66,6 +66,7 @@ atlas.kafka.bootstrap.servers=localhost:19027 atlas.kafka.data=${sys:atlas.data}/kafka atlas.kafka.zookeeper.session.timeout.ms=400 atlas.kafka.zookeeper.sync.time.ms=20 +atlas.kafka.consumer.timeout.ms=100 atlas.kafka.auto.commit.interval.ms=100 atlas.kafka.hook.group.id=atlas atlas.kafka.entities.group.id=atlas_entities http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/2cae42c0/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java b/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java index b2501ec..cd4e743 100644 --- a/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java +++ b/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java @@ -35,10 +35,8 @@ import org.apache.atlas.typesystem.types.TraitType; import org.apache.atlas.typesystem.types.utils.TypesUtil; import org.apache.atlas.web.resources.BaseResourceIT; import org.apache.atlas.web.util.Servlets; -import org.junit.AfterClass; import org.testng.Assert; import org.testng.annotations.BeforeClass; -import org.testng.annotations.BeforeMethod; import org.testng.annotations.Guice; import org.testng.annotations.Test; @@ -48,7 +46,9 @@ import java.util.Collections; import java.util.LinkedList; import java.util.List; -import static org.testng.Assert.*; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; /** * Entity Notification Integration Tests. @@ -62,9 +62,9 @@ public class EntityNotificationIT extends BaseResourceIT { private final String TABLE_NAME = "table" + randomString(); @Inject private NotificationInterface notificationInterface; - private EntityNotificationConsumer notificationConsumer; private Id tableId; private String traitName; + private NotificationConsumer<EntityNotification> notificationConsumer; @BeforeClass public void setUp() throws Exception { @@ -74,19 +74,7 @@ public class EntityNotificationIT extends BaseResourceIT { List<NotificationConsumer<EntityNotification>> consumers = notificationInterface.createConsumers(NotificationInterface.NotificationType.ENTITIES, 1); - NotificationConsumer<EntityNotification> consumer = consumers.iterator().next(); - notificationConsumer = new EntityNotificationConsumer(consumer); - notificationConsumer.start(); - } - - @AfterClass - public void tearDown() { - notificationConsumer.stop(); - } - - @BeforeMethod - public void setupTest() { - notificationConsumer.reset(); + notificationConsumer = consumers.iterator().next(); } @Test @@ -97,17 +85,8 @@ public class EntityNotificationIT extends BaseResourceIT { final String guid = tableId._getId(); - waitForNotification(notificationConsumer, MAX_WAIT_TIME); - - EntityNotification entityNotification = notificationConsumer.getLastEntityNotification(); - - assertNotNull(entityNotification); - assertEquals(EntityNotification.OperationType.ENTITY_CREATE, entityNotification.getOperationType()); - - IReferenceableInstance entity = entityNotification.getEntity(); - - assertEquals(HIVE_TABLE_TYPE, entity.getTypeName()); - assertEquals(guid, entity.getId()._getId()); + waitForNotification(notificationConsumer, MAX_WAIT_TIME, + newNotificationPredicate(EntityNotification.OperationType.ENTITY_CREATE, HIVE_TABLE_TYPE, guid)); } @Test(dependsOnMethods = "testCreateEntity") @@ -119,19 +98,8 @@ public class EntityNotificationIT extends BaseResourceIT { serviceClient.updateEntityAttribute(guid, property, newValue); - waitForNotification(notificationConsumer, MAX_WAIT_TIME); - - EntityNotification entityNotification = notificationConsumer.getLastEntityNotification(); - - assertNotNull(entityNotification); - assertEquals(EntityNotification.OperationType.ENTITY_UPDATE, entityNotification.getOperationType()); - - IReferenceableInstance entity = entityNotification.getEntity(); - - assertEquals(HIVE_TABLE_TYPE, entity.getTypeName()); - assertEquals(guid, entity.getId()._getId()); - - assertEquals(newValue, entity.getValuesMap().get(property)); + waitForNotification(notificationConsumer, MAX_WAIT_TIME, + newNotificationPredicate(EntityNotification.OperationType.ENTITY_UPDATE, HIVE_TABLE_TYPE, guid)); } @Test(dependsOnMethods = "testCreateEntity") @@ -154,18 +122,10 @@ public class EntityNotificationIT extends BaseResourceIT { ClientResponse clientResponse = addTrait(guid, traitInstanceJSON); assertEquals(clientResponse.getStatus(), Response.Status.CREATED.getStatusCode()); - waitForNotification(notificationConsumer, MAX_WAIT_TIME); - - EntityNotification entityNotification = notificationConsumer.getLastEntityNotification(); - - assertNotNull(entityNotification); - assertEquals(EntityNotification.OperationType.TRAIT_ADD, entityNotification.getOperationType()); + EntityNotification entityNotification = waitForNotification(notificationConsumer, MAX_WAIT_TIME, + newNotificationPredicate(EntityNotification.OperationType.TRAIT_ADD, HIVE_TABLE_TYPE, guid)); IReferenceableInstance entity = entityNotification.getEntity(); - - assertEquals(HIVE_TABLE_TYPE, entity.getTypeName()); - assertEquals(guid, entity.getId()._getId()); - assertTrue(entity.getTraits().contains(traitName)); List<IStruct> allTraits = entityNotification.getAllTraits(); @@ -178,9 +138,6 @@ public class EntityNotificationIT extends BaseResourceIT { assertTrue(allTraitNames.contains(superTraitName)); assertTrue(allTraitNames.contains(superSuperTraitName)); - // add another trait with the same super type to the entity - notificationConsumer.reset(); - String anotherTraitName = "Trait" + randomString(); createTrait(anotherTraitName, superTraitName); @@ -191,12 +148,8 @@ public class EntityNotificationIT extends BaseResourceIT { clientResponse = addTrait(guid, traitInstanceJSON); assertEquals(clientResponse.getStatus(), Response.Status.CREATED.getStatusCode()); - waitForNotification(notificationConsumer, MAX_WAIT_TIME); - - entityNotification = notificationConsumer.getLastEntityNotification(); - - assertNotNull(entityNotification); - assertEquals(EntityNotification.OperationType.TRAIT_ADD, entityNotification.getOperationType()); + entityNotification = waitForNotification(notificationConsumer, MAX_WAIT_TIME, + newNotificationPredicate(EntityNotification.OperationType.TRAIT_ADD, HIVE_TABLE_TYPE, guid)); allTraits = entityNotification.getAllTraits(); allTraitNames = new LinkedList<>(); @@ -217,20 +170,10 @@ public class EntityNotificationIT extends BaseResourceIT { ClientResponse clientResponse = deleteTrait(guid, traitName); Assert.assertEquals(clientResponse.getStatus(), Response.Status.OK.getStatusCode()); - waitForNotification(notificationConsumer, MAX_WAIT_TIME); - - EntityNotification entityNotification = notificationConsumer.getLastEntityNotification(); - - assertNotNull(entityNotification); - assertEquals(EntityNotification.OperationType.TRAIT_DELETE, - entityNotification.getOperationType()); - - IReferenceableInstance entity = entityNotification.getEntity(); - - assertEquals(HIVE_TABLE_TYPE, entity.getTypeName()); - assertEquals(guid, entity.getId()._getId()); + EntityNotification entityNotification = waitForNotification(notificationConsumer, MAX_WAIT_TIME, + newNotificationPredicate(EntityNotification.OperationType.TRAIT_DELETE, HIVE_TABLE_TYPE, guid)); - assertFalse(entity.getTraits().contains(traitName)); + assertFalse(entityNotification.getEntity().getTraits().contains(traitName)); } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/2cae42c0/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java index 3a4661c..e64e949 100644 --- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java +++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java @@ -59,7 +59,7 @@ public class NotificationHookConsumerIT extends BaseResourceIT { sendHookMessage(new HookNotification.EntityCreateRequest(entity)); - waitFor(1000, new Predicate() { + waitFor(MAX_WAIT_TIME, new Predicate() { @Override public boolean evaluate() throws Exception { JSONArray results = serviceClient.searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE, @@ -80,7 +80,7 @@ public class NotificationHookConsumerIT extends BaseResourceIT { final Referenceable newEntity = new Referenceable(DATABASE_TYPE); newEntity.set("owner", randomString()); sendHookMessage(new HookNotification.EntityPartialUpdateRequest(DATABASE_TYPE, "name", dbName, newEntity)); - waitFor(1000, new Predicate() { + waitFor(MAX_WAIT_TIME, new Predicate() { @Override public boolean evaluate() throws Exception { Referenceable localEntity = serviceClient.getEntity(DATABASE_TYPE, "name", dbName); @@ -106,7 +106,7 @@ public class NotificationHookConsumerIT extends BaseResourceIT { newEntity.set("name", newName); sendHookMessage(new HookNotification.EntityPartialUpdateRequest(DATABASE_TYPE, "name", dbName, newEntity)); - waitFor(1000, new Predicate() { + waitFor(MAX_WAIT_TIME, new Predicate() { @Override public boolean evaluate() throws Exception { JSONArray results = serviceClient.searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE, @@ -136,7 +136,7 @@ public class NotificationHookConsumerIT extends BaseResourceIT { //updating unique attribute sendHookMessage(new HookNotification.EntityUpdateRequest(newEntity)); - waitFor(1000, new Predicate() { + waitFor(MAX_WAIT_TIME, new Predicate() { @Override public boolean evaluate() throws Exception { JSONArray results = serviceClient.searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE, http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/2cae42c0/webapp/src/test/java/org/apache/atlas/web/resources/BaseResourceIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/atlas/web/resources/BaseResourceIT.java b/webapp/src/test/java/org/apache/atlas/web/resources/BaseResourceIT.java index aba191c..34abeab 100755 --- a/webapp/src/test/java/org/apache/atlas/web/resources/BaseResourceIT.java +++ b/webapp/src/test/java/org/apache/atlas/web/resources/BaseResourceIT.java @@ -24,7 +24,10 @@ import com.sun.jersey.api.client.Client; import com.sun.jersey.api.client.ClientResponse; import com.sun.jersey.api.client.WebResource; import com.sun.jersey.api.client.config.DefaultClientConfig; -import org.apache.atlas.*; +import kafka.consumer.ConsumerTimeoutException; +import org.apache.atlas.ApplicationProperties; +import org.apache.atlas.AtlasClient; +import org.apache.atlas.AtlasServiceException; import org.apache.atlas.notification.NotificationConsumer; import org.apache.atlas.notification.entity.EntityNotification; import org.apache.atlas.typesystem.Referenceable; @@ -43,6 +46,7 @@ import org.apache.atlas.typesystem.types.IDataType; import org.apache.atlas.typesystem.types.Multiplicity; import org.apache.atlas.typesystem.types.StructTypeDefinition; import org.apache.atlas.typesystem.types.TraitType; +import org.apache.atlas.typesystem.types.TypeUtils; import org.apache.atlas.typesystem.types.utils.TypesUtil; import org.apache.atlas.utils.ParamChecker; import org.apache.atlas.web.util.Servlets; @@ -272,6 +276,17 @@ public abstract class BaseResourceIT { boolean evaluate() throws Exception; } + public interface NotificationPredicate { + + /** + * Perform a predicate evaluation. + * + * @return the boolean result of the evaluation. + * @throws Exception thrown if the predicate evaluation could not evaluate. + */ + boolean evaluate(EntityNotification notification) throws Exception; + } + /** * Wait for a condition, expressed via a {@link Predicate} to become true. * @@ -292,49 +307,40 @@ public abstract class BaseResourceIT { } } - // ----- inner class : EntityNotificationConsumer -------------------------- - - protected static class EntityNotificationConsumer implements Runnable { - private final NotificationConsumer<EntityNotification> consumerIterator; - private EntityNotification entityNotification = null; - private boolean run; - - public EntityNotificationConsumer(NotificationConsumer<EntityNotification> consumerIterator) { - this.consumerIterator = consumerIterator; - } - - @Override - public void run() { - while (run && consumerIterator.hasNext()) { - entityNotification = consumerIterator.next(); - } - } - - public void reset() { - entityNotification = null; - } - - public void start() { - Thread thread = new Thread(this); - run = true; - thread.start(); - } - - public void stop() { - run = false; - } - - public EntityNotification getLastEntityNotification() { - return entityNotification; - } - } - - protected void waitForNotification(final EntityNotificationConsumer notificationConsumer, int maxWait) throws Exception { + protected EntityNotification waitForNotification(final NotificationConsumer<EntityNotification> consumer, int maxWait, + final NotificationPredicate predicate) throws Exception { + final TypeUtils.Pair<EntityNotification, String> pair = TypeUtils.Pair.of(null, null); + final long maxCurrentTime = System.currentTimeMillis() + maxWait; waitFor(maxWait, new Predicate() { @Override public boolean evaluate() throws Exception { - return notificationConsumer.getLastEntityNotification() != null; + try { + while (consumer.hasNext() && System.currentTimeMillis() < maxCurrentTime) { + EntityNotification notification = consumer.next(); + if (predicate.evaluate(notification)) { + pair.left = notification; + return true; + } + } + } catch(ConsumerTimeoutException e) { + //ignore + } + return false; } }); + return pair.left; + } + + protected NotificationPredicate newNotificationPredicate(final EntityNotification.OperationType operationType, + final String typeName, final String guid) { + return new NotificationPredicate() { + @Override + public boolean evaluate(EntityNotification notification) throws Exception { + return notification != null && + notification.getOperationType() == operationType && + notification.getEntity().getTypeName().equals(typeName) && + notification.getEntity().getId()._getId().equals(guid); + } + }; } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/2cae42c0/webapp/src/test/java/org/apache/atlas/web/resources/EntityJerseyResourceIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/atlas/web/resources/EntityJerseyResourceIT.java b/webapp/src/test/java/org/apache/atlas/web/resources/EntityJerseyResourceIT.java index a268196..73d26ce 100755 --- a/webapp/src/test/java/org/apache/atlas/web/resources/EntityJerseyResourceIT.java +++ b/webapp/src/test/java/org/apache/atlas/web/resources/EntityJerseyResourceIT.java @@ -48,12 +48,10 @@ import org.apache.atlas.web.util.Servlets; import org.apache.commons.lang.RandomStringUtils; import org.codehaus.jettison.json.JSONArray; import org.codehaus.jettison.json.JSONObject; -import org.junit.AfterClass; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; import org.testng.annotations.BeforeClass; -import org.testng.annotations.BeforeMethod; import org.testng.annotations.DataProvider; import org.testng.annotations.Guice; import org.testng.annotations.Test; @@ -67,7 +65,6 @@ import java.util.Map; import java.util.UUID; import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertNotNull; import static org.testng.Assert.fail; /** @@ -89,7 +86,7 @@ public class EntityJerseyResourceIT extends BaseResourceIT { @Inject private NotificationInterface notificationInterface; - private EntityNotificationConsumer notificationConsumer; + private NotificationConsumer<EntityNotification> notificationConsumer; @BeforeClass public void setUp() throws Exception { @@ -100,19 +97,7 @@ public class EntityJerseyResourceIT extends BaseResourceIT { List<NotificationConsumer<EntityNotification>> consumers = notificationInterface.createConsumers(NotificationInterface.NotificationType.ENTITIES, 1); - NotificationConsumer<EntityNotification> consumer = consumers.iterator().next(); - notificationConsumer = new EntityNotificationConsumer(consumer); - notificationConsumer.start(); - } - - @AfterClass - public void tearDown() { - notificationConsumer.stop(); - } - - @BeforeMethod - public void setupTest() { - notificationConsumer.reset(); + notificationConsumer = consumers.iterator().next(); } @Test @@ -158,20 +143,26 @@ public class EntityJerseyResourceIT extends BaseResourceIT { serviceClient.createEntity(db).getString(0); - waitForNotification(notificationConsumer, MAX_WAIT_TIME); - EntityNotification notification = notificationConsumer.getLastEntityNotification(); - assertNotNull(notification); - assertEquals(notification.getEntity().get("name"), dbName); + waitForNotification(notificationConsumer, MAX_WAIT_TIME, new NotificationPredicate() { + @Override + public boolean evaluate(EntityNotification notification) throws Exception { + return notification != null && notification.getEntity().get("name").equals(dbName); + } + }); JSONArray results = serviceClient.searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE, dbName)); assertEquals(results.length(), 1); //create entity again shouldn't create another instance with same unique attribute value - notificationConsumer.reset(); serviceClient.createEntity(db); try { - waitForNotification(notificationConsumer, MAX_WAIT_TIME); + waitForNotification(notificationConsumer, MAX_WAIT_TIME, new NotificationPredicate() { + @Override + public boolean evaluate(EntityNotification notification) throws Exception { + return notification != null && notification.getEntity().get("name").equals(dbName); + } + }); fail("Expected time out exception"); } catch (Exception e) { //expected timeout
