Repository: incubator-atlas Updated Branches: refs/heads/master e4cc16ac7 -> 3a0865ad0
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/3a0865ad/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java index f06f791..b86c693 100644 --- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java +++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java @@ -17,13 +17,20 @@ */ package org.apache.atlas.notification; -import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasServiceException; -import org.apache.atlas.LocalAtlasClient; +import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.ha.HAConfiguration; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.EntityMutationResponse; import org.apache.atlas.notification.hook.HookNotification; +import org.apache.atlas.repository.converters.AtlasInstanceConverter; +import org.apache.atlas.repository.store.graph.AtlasEntityStore; +import org.apache.atlas.repository.store.graph.v1.EntityStream; +import org.apache.atlas.type.AtlasType; +import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.typesystem.Referenceable; +import org.apache.atlas.web.service.ServiceState; import org.apache.commons.configuration.Configuration; import org.mockito.Mock; import org.mockito.MockitoAnnotations; @@ -31,16 +38,11 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.concurrent.ExecutorService; -import static org.mockito.Mockito.any; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyZeroInteractions; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; import static org.testng.AssertJUnit.assertFalse; import static org.testng.AssertJUnit.assertTrue; @@ -50,26 +52,41 @@ public class NotificationHookConsumerTest { private NotificationInterface notificationInterface; @Mock - private LocalAtlasClient atlasClient; - - @Mock private Configuration configuration; @Mock private ExecutorService executorService; + @Mock + private AtlasEntityStore atlasEntityStore; + + @Mock + private ServiceState serviceState; + + @Mock + private AtlasInstanceConverter instanceConverter; + + @Mock + private AtlasTypeRegistry typeRegistry; + @BeforeMethod - public void setup() { + public void setup() throws AtlasBaseException { MockitoAnnotations.initMocks(this); + AtlasType mockType = mock(AtlasType.class); + when(typeRegistry.getType(anyString())).thenReturn(mockType); + AtlasEntity.AtlasEntitiesWithExtInfo mockEntity = mock(AtlasEntity.AtlasEntitiesWithExtInfo.class); + when(instanceConverter.getEntities(anyList())).thenReturn(mockEntity); + EntityMutationResponse mutationResponse = mock(EntityMutationResponse.class); + when(atlasEntityStore.createOrUpdate(any(EntityStream.class), anyBoolean())).thenReturn(mutationResponse); } @Test public void testConsumerCanProceedIfServerIsReady() throws Exception { - NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasClient); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)); NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class); - when(atlasClient.isServerReady()).thenReturn(true); + when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.ACTIVE); assertTrue(hookConsumer.serverAvailable(timer)); @@ -78,11 +95,16 @@ public class NotificationHookConsumerTest { @Test public void testConsumerWaitsNTimesIfServerIsNotReadyNTimes() throws Exception { - NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasClient); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)); NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class); - when(atlasClient.isServerReady()).thenReturn(false, false, false, true); + + when(serviceState.getState()) + .thenReturn(ServiceState.ServiceStateValue.PASSIVE) + .thenReturn(ServiceState.ServiceStateValue.PASSIVE) + .thenReturn(ServiceState.ServiceStateValue.PASSIVE) + .thenReturn(ServiceState.ServiceStateValue.ACTIVE); assertTrue(hookConsumer.serverAvailable(timer)); @@ -92,13 +114,15 @@ public class NotificationHookConsumerTest { @Test public void testCommitIsCalledWhenMessageIsProcessed() throws AtlasServiceException, AtlasException { NotificationHookConsumer notificationHookConsumer = - new NotificationHookConsumer(notificationInterface, atlasClient); + new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); NotificationConsumer consumer = mock(NotificationConsumer.class); NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer); HookNotification.EntityCreateRequest message = mock(HookNotification.EntityCreateRequest.class); when(message.getUser()).thenReturn("user"); when(message.getType()).thenReturn(HookNotification.HookNotificationType.ENTITY_CREATE); + Referenceable mock = mock(Referenceable.class); + when(message.getEntities()).thenReturn(Arrays.asList(mock)); hookConsumer.handleMessage(message); @@ -106,15 +130,17 @@ public class NotificationHookConsumerTest { } @Test - public void testCommitIsNotCalledEvenWhenMessageProcessingFails() throws AtlasServiceException, AtlasException { + public void testCommitIsNotCalledEvenWhenMessageProcessingFails() throws AtlasServiceException, AtlasException, AtlasBaseException { NotificationHookConsumer notificationHookConsumer = - new NotificationHookConsumer(notificationInterface, atlasClient); + new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); NotificationConsumer consumer = mock(NotificationConsumer.class); NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer); - HookNotification.EntityCreateRequest message = new HookNotification.EntityCreateRequest("user", new ArrayList<Referenceable>()); - when(atlasClient.createEntity(any(List.class))). - thenThrow(new RuntimeException("Simulating exception in processing message")); + HookNotification.EntityCreateRequest message = new HookNotification.EntityCreateRequest("user", + new ArrayList<Referenceable>() { + { add(mock(Referenceable.class)); } + }); + when(atlasEntityStore.createOrUpdate(any(EntityStream.class), anyBoolean())).thenThrow(new RuntimeException("Simulating exception in processing message")); hookConsumer.handleMessage(message); verifyZeroInteractions(consumer); @@ -122,24 +148,12 @@ public class NotificationHookConsumerTest { @Test public void testConsumerProceedsWithFalseIfInterrupted() throws Exception { - NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasClient); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)); NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class); doThrow(new InterruptedException()).when(timer).sleep(NotificationHookConsumer.SERVER_READY_WAIT_TIME_MS); - when(atlasClient.isServerReady()).thenReturn(false); - - assertFalse(hookConsumer.serverAvailable(timer)); - } - - @Test - public void testConsumerProceedsWithFalseOnAtlasServiceException() throws Exception { - NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasClient); - NotificationHookConsumer.HookConsumer hookConsumer = - notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)); - NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class); - when(atlasClient.isServerReady()).thenThrow(new AtlasServiceException(AtlasClient.API.VERSION, - new Exception())); + when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.PASSIVE); assertFalse(hookConsumer.serverAvailable(timer)); } @@ -152,7 +166,7 @@ public class NotificationHookConsumerTest { consumers.add(mock(NotificationConsumer.class)); when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)). thenReturn(consumers); - NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasClient); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); notificationHookConsumer.startInternal(configuration, executorService); verify(notificationInterface).createConsumers(NotificationInterface.NotificationType.HOOK, 1); verify(executorService).submit(any(NotificationHookConsumer.HookConsumer.class)); @@ -167,7 +181,7 @@ public class NotificationHookConsumerTest { consumers.add(mock(NotificationConsumer.class)); when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)). thenReturn(consumers); - NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasClient); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); notificationHookConsumer.startInternal(configuration, executorService); verifyZeroInteractions(notificationInterface); } @@ -181,7 +195,7 @@ public class NotificationHookConsumerTest { consumers.add(mock(NotificationConsumer.class)); when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)). thenReturn(consumers); - NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasClient); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); notificationHookConsumer.startInternal(configuration, executorService); notificationHookConsumer.instanceIsActive(); verify(notificationInterface).createConsumers(NotificationInterface.NotificationType.HOOK, 1); @@ -196,7 +210,7 @@ public class NotificationHookConsumerTest { consumers.add(mock(NotificationConsumer.class)); when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)). thenReturn(consumers); - NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasClient); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); notificationHookConsumer.startInternal(configuration, executorService); notificationHookConsumer.instanceIsPassive(); verify(notificationInterface).close();
