http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java new file mode 100644 index 0000000..0a7b444 --- /dev/null +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java @@ -0,0 +1,1432 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.test.active; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.asterix.active.ActivityState; +import org.apache.asterix.active.CountRetryPolicyFactory; +import org.apache.asterix.active.EntityId; +import org.apache.asterix.active.InfiniteRetryPolicyFactory; +import org.apache.asterix.active.NoRetryPolicyFactory; +import org.apache.asterix.app.active.ActiveNotificationHandler; +import org.apache.asterix.common.api.IClusterManagementWork.ClusterState; +import org.apache.asterix.common.api.IMetadataLockManager; +import org.apache.asterix.common.cluster.IClusterStateManager; +import org.apache.asterix.common.context.IStorageComponentProvider; +import org.apache.asterix.common.exceptions.ErrorCode; +import org.apache.asterix.external.feed.watch.WaitForStateSubscriber; +import org.apache.asterix.file.StorageComponentProvider; +import org.apache.asterix.metadata.declared.MetadataProvider; +import org.apache.asterix.metadata.entities.Dataset; +import org.apache.asterix.metadata.entities.Feed; +import org.apache.asterix.metadata.lock.MetadataLockManager; +import org.apache.asterix.runtime.utils.CcApplicationContext; +import org.apache.asterix.test.active.TestEventsListener.Behavior; +import org.apache.asterix.translator.IStatementExecutor; +import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; +import org.apache.hyracks.api.client.IHyracksClientConnection; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.job.JobIdFactory; +import org.apache.hyracks.api.job.JobStatus; +import org.apache.hyracks.control.cc.ClusterControllerService; +import org.apache.hyracks.control.cc.application.CCServiceContext; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +public class ActiveEventsListenerTest { + + static TestClusterControllerActor clusterController; + static TestNodeControllerActor[] nodeControllers; + static TestUserActor[] users; + static String[] nodes = { "node1", "node2" }; + static ActiveNotificationHandler handler; + static String dataverseName = "Default"; + static String entityName = "entityName"; + static EntityId entityId = new EntityId(Feed.EXTENSION_NAME, dataverseName, entityName); + static Dataset firstDataset; + static Dataset secondDataset; + static List<Dataset> allDatasets; + static TestEventsListener listener; + static IClusterStateManager clusterStateManager; + static CcApplicationContext appCtx; + static IStatementExecutor statementExecutor; + static IHyracksClientConnection hcc; + static MetadataProvider metadataProvider; + static IStorageComponentProvider componentProvider; + static JobIdFactory jobIdFactory; + static IMetadataLockManager lockManager = new MetadataLockManager(); + static AlgebricksAbsolutePartitionConstraint locations; + static ExecutorService executor; + + @Before + public void setUp() throws Exception { + jobIdFactory = new JobIdFactory(); + handler = new ActiveNotificationHandler(); + allDatasets = new ArrayList<>(); + firstDataset = new Dataset(dataverseName, "firstDataset", null, null, null, null, null, null, null, null, 0, 0); + secondDataset = + new Dataset(dataverseName, "secondDataset", null, null, null, null, null, null, null, null, 0, 0); + allDatasets.add(firstDataset); + allDatasets.add(secondDataset); + AtomicInteger threadCounter = new AtomicInteger(0); + executor = Executors.newCachedThreadPool( + r -> new Thread(r, "ClusterControllerServiceExecutor[" + threadCounter.getAndIncrement() + "]")); + clusterStateManager = Mockito.mock(IClusterStateManager.class); + Mockito.when(clusterStateManager.getState()).thenReturn(ClusterState.ACTIVE); + ClusterControllerService ccService = Mockito.mock(ClusterControllerService.class); + CCServiceContext ccServiceCtx = Mockito.mock(CCServiceContext.class); + appCtx = Mockito.mock(CcApplicationContext.class); + statementExecutor = Mockito.mock(IStatementExecutor.class); + hcc = Mockito.mock(IHyracksClientConnection.class); + Mockito.when(appCtx.getActiveNotificationHandler()).thenReturn(handler); + Mockito.when(appCtx.getMetadataLockManager()).thenReturn(lockManager); + Mockito.when(appCtx.getServiceContext()).thenReturn(ccServiceCtx); + Mockito.when(appCtx.getClusterStateManager()).thenReturn(clusterStateManager); + componentProvider = new StorageComponentProvider(); + Mockito.when(appCtx.getStorageComponentProvider()).thenReturn(componentProvider); + Mockito.when(ccServiceCtx.getControllerService()).thenReturn(ccService); + Mockito.when(ccService.getExecutor()).thenReturn(executor); + locations = new AlgebricksAbsolutePartitionConstraint(nodes); + metadataProvider = new MetadataProvider(appCtx, null); + metadataProvider.setConfig(new HashMap<>()); + clusterController = new TestClusterControllerActor("CC", handler, allDatasets); + nodeControllers = new TestNodeControllerActor[2]; + nodeControllers[0] = new TestNodeControllerActor(nodes[0], clusterController); + nodeControllers[1] = new TestNodeControllerActor(nodes[1], clusterController); + listener = new TestEventsListener(clusterController, nodeControllers, jobIdFactory, entityId, + new ArrayList<>(allDatasets), statementExecutor, appCtx, hcc, locations, + new InfiniteRetryPolicyFactory()); + users = new TestUserActor[3]; + users[0] = newUser("Till", appCtx); + users[1] = newUser("Mike", appCtx); + users[2] = newUser("Dmitry", appCtx); + } + + TestUserActor newUser(String name, CcApplicationContext appCtx) { + MetadataProvider actorMdProvider = new MetadataProvider(appCtx, null); + actorMdProvider.setConfig(new HashMap<>()); + return new TestUserActor("User: " + name, actorMdProvider, clusterController); + } + + @After + public void tearDown() throws Exception { + executor.shutdownNow(); + executor.awaitTermination(5, TimeUnit.SECONDS); + handler.stop(); + for (Actor user : users) { + user.stop(); + } + for (Actor nc : nodeControllers) { + nc.stop(); + } + clusterController.stop(); + } + + @Test + public void testStartWhenStartSucceed() throws Exception { + Assert.assertEquals(ActivityState.STOPPED, listener.getState()); + listener.onStart(Behavior.SUCCEED); + Action action = users[0].startActivity(listener); + action.sync(); + assertSuccess(action); + Assert.assertEquals(ActivityState.RUNNING, listener.getState()); + } + + @Test + public void testStopWhenStopSucceed() throws Exception { + testStartWhenStartSucceed(); + Assert.assertEquals(ActivityState.RUNNING, listener.getState()); + listener.onStop(Behavior.SUCCEED); + Action action = users[0].stopActivity(listener); + action.sync(); + assertSuccess(action); + Assert.assertEquals(ActivityState.STOPPED, listener.getState()); + } + + @Test + public void testDoubleStopWhenStopSucceed() throws Exception { + testStartWhenStartSucceed(); + Assert.assertEquals(ActivityState.RUNNING, listener.getState()); + listener.onStop(Behavior.SUCCEED); + Action firstStop = users[0].stopActivity(listener); + Action secondStop = users[1].stopActivity(listener); + firstStop.sync(); + secondStop.sync(); + if (firstStop.hasFailed()) { + assertFailure(firstStop, ErrorCode.ACTIVE_ENTITY_CANNOT_BE_STOPPED); + assertSuccess(secondStop); + } else { + assertSuccess(firstStop); + assertFailure(secondStop, ErrorCode.ACTIVE_ENTITY_CANNOT_BE_STOPPED); + } + Assert.assertEquals(ActivityState.STOPPED, listener.getState()); + } + + @Test + public void testDoubleStartWhenStartSucceed() throws Exception { + Assert.assertEquals(ActivityState.STOPPED, listener.getState()); + listener.onStart(Behavior.SUCCEED); + Action firstStart = users[0].startActivity(listener); + Action secondStart = users[1].startActivity(listener); + firstStart.sync(); + secondStart.sync(); + if (firstStart.hasFailed()) { + assertFailure(firstStart, ErrorCode.ACTIVE_ENTITY_ALREADY_STARTED); + assertSuccess(secondStart); + } else { + assertSuccess(firstStart); + assertFailure(secondStart, ErrorCode.ACTIVE_ENTITY_ALREADY_STARTED); + } + Assert.assertEquals(ActivityState.RUNNING, listener.getState()); + } + + @Test + public void testStopAfterDoubleStartWhenStartSucceedAndStopSucceed() throws Exception { + testDoubleStartWhenStartSucceed(); + Assert.assertEquals(ActivityState.RUNNING, listener.getState()); + listener.onStop(Behavior.SUCCEED); + Action action = users[2].stopActivity(listener); + action.sync(); + assertSuccess(action); + Assert.assertEquals(ActivityState.STOPPED, listener.getState()); + } + + @Test + public void testSuspendFromStopped() throws Exception { + Assert.assertEquals(ActivityState.STOPPED, listener.getState()); + listener.onStop(Behavior.SUCCEED); + Action action = users[0].suspendActivity(listener); + action.sync(); + assertSuccess(action); + Assert.assertEquals(ActivityState.STOPPED, listener.getState()); + action = users[0].resumeActivity(listener); + action.sync(); + assertSuccess(action); + } + + @Test + public void testStartWhileSuspend() throws Exception { + listener.onStart(Behavior.SUCCEED); + Assert.assertEquals(ActivityState.STOPPED, listener.getState()); + listener.onStop(Behavior.SUCCEED); + Action action = users[0].suspendActivity(listener); + action.sync(); + assertSuccess(action); + Assert.assertEquals(ActivityState.STOPPED, listener.getState()); + // user[0] has the locks + Action startAction = users[1].startActivity(listener); + for (int i = 0; i < 100; i++) { + Assert.assertFalse(startAction.isDone()); + } + Assert.assertEquals(ActivityState.STOPPED, listener.getState()); + Action resumeAction = users[0].resumeActivity(listener); + resumeAction.sync(); + startAction.sync(); + assertSuccess(resumeAction); + assertSuccess(startAction); + Assert.assertEquals(ActivityState.RUNNING, listener.getState()); + } + + @Test + public void testSuspendFromRunning() throws Exception { + testStartWhenStartSucceed(); + // suspend + Assert.assertEquals(ActivityState.RUNNING, listener.getState()); + listener.onStop(Behavior.SUCCEED); + Action suspension = users[1].suspendActivity(listener); + suspension.sync(); + assertSuccess(suspension); + // resume + Assert.assertEquals(ActivityState.SUSPENDED, listener.getState()); + Action resumption = users[1].resumeActivity(listener); + resumption.sync(); + assertSuccess(resumption); + Assert.assertEquals(ActivityState.RUNNING, listener.getState()); + } + + @Test + public void testSuspendFromRunningAndStopFail() throws Exception { + testStartWhenStartSucceed(); + // suspend + Assert.assertEquals(ActivityState.RUNNING, listener.getState()); + listener.onStop(Behavior.FAIL_COMPILE); + Action suspension = users[1].suspendActivity(listener); + suspension.sync(); + Assert.assertTrue(suspension.hasFailed()); + Assert.assertEquals(ActivityState.RUNNING, listener.getState()); + } + + @SuppressWarnings("deprecation") + @Test + public void testRecovery() throws Exception { + testStartWhenStartSucceed(); + // suspend + Assert.assertEquals(ActivityState.RUNNING, listener.getState()); + listener.onStart(Behavior.STEP_SUCCEED); + WaitForStateSubscriber subscriber = + new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED)); + clusterController.jobFinish(listener.getJobId(), JobStatus.FAILURE, + Collections.singletonList(new HyracksDataException("Runtime Failure"))); + subscriber.sync(); + Assert.assertNotNull(listener.getRecoveryTask()); + listener.allowStep(); + WaitForStateSubscriber running = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RUNNING)); + running.sync(); + Assert.assertEquals(ActivityState.RUNNING, listener.getState()); + listener.onStop(Behavior.SUCCEED); + Action stopAction = users[2].stopActivity(listener); + stopAction.sync(); + Assert.assertEquals(ActivityState.STOPPED, listener.getState()); + } + + @Test + public void testSuspendFromRunningButJobFailWhileSuspendingThenResumeSucceed() throws Exception { + testStartWhenStartSucceed(); + // suspend + Assert.assertEquals(ActivityState.RUNNING, listener.getState()); + listener.onStop(Behavior.RUNNING_JOB_FAIL); + Action suspension = users[1].suspendActivity(listener); + suspension.sync(); + assertSuccess(suspension); + Assert.assertEquals(ActivityState.TEMPORARILY_FAILED, listener.getState()); + Assert.assertNull(listener.getRecoveryTask()); + listener.onStart(Behavior.SUCCEED); + Action resumption = users[1].resumeActivity(listener); + resumption.sync(); + assertSuccess(resumption); + Assert.assertEquals(ActivityState.RUNNING, listener.getState()); + } + + @Test + public void testSuspendFromRunningButJobFailWhileSuspendingThenResumeFailsCompileAndRecoveryStarts() + throws Exception { + testStartWhenStartSucceed(); + // suspend + Assert.assertEquals(ActivityState.RUNNING, listener.getState()); + listener.onStop(Behavior.RUNNING_JOB_FAIL); + Action suspension = users[1].suspendActivity(listener); + suspension.sync(); + assertSuccess(suspension); + Assert.assertEquals(ActivityState.TEMPORARILY_FAILED, listener.getState()); + Assert.assertNull(listener.getRecoveryTask()); + listener.onStart(Behavior.FAIL_COMPILE); + Action resumption = users[1].resumeActivity(listener); + resumption.sync(); + assertSuccess(resumption); + ActivityState state = listener.getState(); + Assert.assertTrue(state == ActivityState.RECOVERING || state == ActivityState.TEMPORARILY_FAILED); + Assert.assertNotNull(listener.getRecoveryTask()); + } + + @Test + public void testSuspendFromRunningButJobFailWhileSuspendingThenResumeFailsRuntimeAndRecoveryStarts() + throws Exception { + testStartWhenStartSucceed(); + // suspend + Assert.assertEquals(ActivityState.RUNNING, listener.getState()); + listener.onStop(Behavior.RUNNING_JOB_FAIL); + Action suspension = users[1].suspendActivity(listener); + suspension.sync(); + assertSuccess(suspension); + Assert.assertEquals(ActivityState.TEMPORARILY_FAILED, listener.getState()); + Assert.assertNull(listener.getRecoveryTask()); + listener.onStart(Behavior.FAIL_RUNTIME); + Action resumption = users[1].resumeActivity(listener); + resumption.sync(); + assertSuccess(resumption); + ActivityState state = listener.getState(); + Assert.assertTrue(state == ActivityState.RECOVERING || state == ActivityState.TEMPORARILY_FAILED); + Assert.assertNotNull(listener.getRecoveryTask()); + } + + @Test + public void testStopWhileSuspended() throws Exception { + testStartWhenStartSucceed(); + // suspend + Assert.assertEquals(ActivityState.RUNNING, listener.getState()); + listener.onStop(Behavior.STEP_SUCCEED); + Action suspension = users[1].suspendActivity(listener); + WaitForStateSubscriber subscriber = + new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.SUSPENDING, ActivityState.SUSPENDED)); + subscriber.sync(); + Action stopping = users[0].stopActivity(listener); + listener.allowStep(); + listener.allowStep(); + suspension.sync(); + assertSuccess(suspension); + users[1].resumeActivity(listener); + stopping.sync(); + assertSuccess(stopping); + Assert.assertEquals(ActivityState.STOPPED, listener.getState()); + } + + @SuppressWarnings("deprecation") + @Test + public void testRecoveryFailureAfterOneAttemptCompilationFailure() throws Exception { + handler.unregisterListener(listener); + listener = new TestEventsListener(clusterController, nodeControllers, jobIdFactory, entityId, + new ArrayList<>(allDatasets), statementExecutor, appCtx, hcc, locations, + new CountRetryPolicyFactory(1)); + testStartWhenStartSucceed(); + WaitForStateSubscriber tempFailSubscriber = + new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED)); + WaitForStateSubscriber permFailSubscriber = + new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.PERMANENTLY_FAILED)); + listener.onStart(Behavior.FAIL_COMPILE); + clusterController.jobFinish(listener.getJobId(), JobStatus.FAILURE, + Collections.singletonList(new HyracksDataException("Compilation Failure"))); + tempFailSubscriber.sync(); + permFailSubscriber.sync(); + Assert.assertEquals(ActivityState.PERMANENTLY_FAILED, listener.getState()); + } + + @Test + public void testStartAfterPermenantFailure() throws Exception { + testRecoveryFailureAfterOneAttemptCompilationFailure(); + Assert.assertEquals(ActivityState.PERMANENTLY_FAILED, listener.getState()); + listener.onStart(Behavior.SUCCEED); + WaitForStateSubscriber subscriber = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RUNNING)); + users[1].startActivity(listener); + subscriber.sync(); + Assert.assertEquals(ActivityState.RUNNING, listener.getState()); + } + + @Test + public void testStopAfterStartAfterPermenantFailure() throws Exception { + testStartAfterPermenantFailure(); + Assert.assertEquals(ActivityState.RUNNING, listener.getState()); + listener.onStop(Behavior.SUCCEED); + WaitForStateSubscriber subscriber = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.STOPPED)); + users[1].stopActivity(listener); + subscriber.sync(); + Assert.assertEquals(ActivityState.STOPPED, listener.getState()); + } + + @SuppressWarnings("deprecation") + @Test + public void testRecoveryFailureAfterOneAttemptRuntimeFailure() throws Exception { + handler.unregisterListener(listener); + listener = new TestEventsListener(clusterController, nodeControllers, jobIdFactory, entityId, + new ArrayList<>(allDatasets), statementExecutor, appCtx, hcc, locations, + new CountRetryPolicyFactory(1)); + testStartWhenStartSucceed(); + WaitForStateSubscriber tempFailSubscriber = + new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED)); + WaitForStateSubscriber permFailSubscriber = + new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.PERMANENTLY_FAILED)); + listener.onStart(Behavior.FAIL_RUNTIME); + clusterController.jobFinish(listener.getJobId(), JobStatus.FAILURE, + Collections.singletonList(new HyracksDataException("Runtime Failure"))); + tempFailSubscriber.sync(); + permFailSubscriber.sync(); + Assert.assertEquals(ActivityState.PERMANENTLY_FAILED, listener.getState()); + } + + @SuppressWarnings("deprecation") + @Test + public void testRecoveryFailure() throws Exception { + handler.unregisterListener(listener); + listener = new TestEventsListener(clusterController, nodeControllers, jobIdFactory, entityId, + new ArrayList<>(allDatasets), statementExecutor, appCtx, hcc, locations, NoRetryPolicyFactory.INSTANCE); + testStartWhenStartSucceed(); + WaitForStateSubscriber tempFailSubscriber = + new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED)); + WaitForStateSubscriber permFailSubscriber = + new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.PERMANENTLY_FAILED)); + clusterController.jobFinish(listener.getJobId(), JobStatus.FAILURE, + Collections.singletonList(new HyracksDataException("Runtime Failure"))); + tempFailSubscriber.sync(); + permFailSubscriber.sync(); + Assert.assertEquals(ActivityState.PERMANENTLY_FAILED, listener.getState()); + } + + @SuppressWarnings("deprecation") + @Test + public void testStopDuringRecoveryAttemptThatSucceeds() throws Exception { + testStartWhenStartSucceed(); + listener.onStart(Behavior.FAIL_COMPILE); + WaitForStateSubscriber tempFailSubscriber = + new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED)); + clusterController.jobFinish(listener.getJobId(), JobStatus.FAILURE, + Collections.singletonList(new HyracksDataException("Runtime Failure"))); + // recovery is ongoing + listener.onStart(Behavior.STEP_SUCCEED); + tempFailSubscriber.sync(); + WaitForStateSubscriber runningSubscriber = + new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RUNNING)); + WaitForStateSubscriber stopSubscriber = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.STOPPED)); + WaitForStateSubscriber recoveringSubscriber = + new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RECOVERING)); + recoveringSubscriber.sync(); + listener.onStop(Behavior.SUCCEED); + users[0].stopActivity(listener); + listener.allowStep(); + runningSubscriber.sync(); + stopSubscriber.sync(); + Assert.assertEquals(ActivityState.STOPPED, listener.getState()); + } + + @SuppressWarnings("deprecation") + @Test + public void testStopDuringRecoveryAttemptThatFailsCompile() throws Exception { + testStartWhenStartSucceed(); + listener.onStart(Behavior.FAIL_COMPILE); + WaitForStateSubscriber tempFailSubscriber = + new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED)); + clusterController.jobFinish(listener.getJobId(), JobStatus.FAILURE, + Collections.singletonList(new HyracksDataException("Runtime Failure"))); + // recovery is ongoing + listener.onStart(Behavior.STEP_FAIL_COMPILE); + tempFailSubscriber.sync(); + WaitForStateSubscriber secondTempFailSubscriber = + new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED)); + WaitForStateSubscriber stopSubscriber = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.STOPPED)); + WaitForStateSubscriber recoveringSubscriber = + new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RECOVERING)); + recoveringSubscriber.sync(); + listener.onStop(Behavior.SUCCEED); + users[0].stopActivity(listener); + listener.allowStep(); + secondTempFailSubscriber.sync(); + stopSubscriber.sync(); + Assert.assertEquals(ActivityState.STOPPED, listener.getState()); + } + + @SuppressWarnings("deprecation") + @Test + public void testStopDuringRecoveryAttemptThatFailsRuntime() throws Exception { + testStartWhenStartSucceed(); + listener.onStart(Behavior.FAIL_COMPILE); + WaitForStateSubscriber tempFailSubscriber = + new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED)); + clusterController.jobFinish(listener.getJobId(), JobStatus.FAILURE, + Collections.singletonList(new HyracksDataException("Runtime Failure"))); + // recovery is ongoing + listener.onStart(Behavior.STEP_FAIL_RUNTIME); + tempFailSubscriber.sync(); + WaitForStateSubscriber secondTempFailSubscriber = + new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED)); + WaitForStateSubscriber stopSubscriber = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.STOPPED)); + WaitForStateSubscriber recoveringSubscriber = + new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RECOVERING)); + recoveringSubscriber.sync(); + listener.onStop(Behavior.SUCCEED); + users[0].stopActivity(listener); + listener.allowStep(); + secondTempFailSubscriber.sync(); + stopSubscriber.sync(); + Assert.assertEquals(ActivityState.STOPPED, listener.getState()); + } + + @SuppressWarnings("deprecation") + @Test + public void testStartDuringRecoveryAttemptThatSucceeds() throws Exception { + testStartWhenStartSucceed(); + listener.onStart(Behavior.FAIL_COMPILE); + WaitForStateSubscriber tempFailSubscriber = + new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED)); + clusterController.jobFinish(listener.getJobId(), JobStatus.FAILURE, + Collections.singletonList(new HyracksDataException("Runtime Failure"))); + // recovery is ongoing + listener.onStart(Behavior.STEP_SUCCEED); + tempFailSubscriber.sync(); + WaitForStateSubscriber runningSubscriber = + new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RUNNING)); + WaitForStateSubscriber recoveringSubscriber = + new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RECOVERING)); + recoveringSubscriber.sync(); + listener.onStop(Behavior.SUCCEED); + Action startAction = users[0].startActivity(listener); + listener.allowStep(); + runningSubscriber.sync(); + startAction.sync(); + assertFailure(startAction, ErrorCode.ACTIVE_ENTITY_ALREADY_STARTED); + Assert.assertEquals(ActivityState.RUNNING, listener.getState()); + } + + @SuppressWarnings("deprecation") + @Test + public void testStartDuringRecoveryAttemptThatFailsCompile() throws Exception { + testStartWhenStartSucceed(); + listener.onStart(Behavior.FAIL_COMPILE); + WaitForStateSubscriber tempFailSubscriber = + new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED)); + clusterController.jobFinish(listener.getJobId(), JobStatus.FAILURE, + Collections.singletonList(new HyracksDataException("Runtime Failure"))); + // recovery is ongoing + listener.onStart(Behavior.STEP_FAIL_COMPILE); + tempFailSubscriber.sync(); + WaitForStateSubscriber secondTempFailSubscriber = + new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED)); + WaitForStateSubscriber recoveringSubscriber = + new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RECOVERING)); + recoveringSubscriber.sync(); + listener.onStop(Behavior.SUCCEED); + Action action = users[0].startActivity(listener); + listener.allowStep(); + secondTempFailSubscriber.sync(); + action.sync(); + assertFailure(action, ErrorCode.ACTIVE_ENTITY_ALREADY_STARTED); + } + + @SuppressWarnings("deprecation") + @Test + public void testStartDuringRecoveryAttemptThatFailsRuntime() throws Exception { + testStartWhenStartSucceed(); + listener.onStart(Behavior.FAIL_COMPILE); + WaitForStateSubscriber tempFailSubscriber = + new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED)); + clusterController.jobFinish(listener.getJobId(), JobStatus.FAILURE, + Collections.singletonList(new HyracksDataException("Runtime Failure"))); + // recovery is ongoing + listener.onStart(Behavior.STEP_FAIL_RUNTIME); + tempFailSubscriber.sync(); + WaitForStateSubscriber secondTempFailSubscriber = + new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED)); + WaitForStateSubscriber recoveringSubscriber = + new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RECOVERING)); + recoveringSubscriber.sync(); + Action action = users[0].startActivity(listener); + listener.allowStep(); + secondTempFailSubscriber.sync(); + action.sync(); + assertFailure(action, ErrorCode.ACTIVE_ENTITY_ALREADY_STARTED); + } + + @SuppressWarnings("deprecation") + @Test + public void testSuspendDuringRecoveryAttemptThatSucceedsThenResumeSucceeds() throws Exception { + testStartWhenStartSucceed(); + listener.onStart(Behavior.FAIL_COMPILE); + WaitForStateSubscriber tempFailSubscriber = + new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED)); + clusterController.jobFinish(listener.getJobId(), JobStatus.FAILURE, + Collections.singletonList(new HyracksDataException("Runtime Failure"))); + // recovery is ongoing + listener.onStart(Behavior.STEP_SUCCEED); + tempFailSubscriber.sync(); + WaitForStateSubscriber runningSubscriber = + new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RUNNING)); + WaitForStateSubscriber recoveringSubscriber = + new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RECOVERING)); + recoveringSubscriber.sync(); + listener.onStop(Behavior.SUCCEED); + Action suspend = users[1].suspendActivity(listener); + listener.allowStep(); + runningSubscriber.sync(); + suspend.sync(); + assertSuccess(suspend); + Assert.assertEquals(ActivityState.SUSPENDED, listener.getState()); + listener.onStart(Behavior.SUCCEED); + Action resume = users[1].resumeActivity(listener); + resume.sync(); + assertSuccess(resume); + Assert.assertEquals(ActivityState.RUNNING, listener.getState()); + } + + @SuppressWarnings("deprecation") + @Test + public void testSuspendDuringRecoveryAttemptThatSucceedsThenResumeFailsCompile() throws Exception { + testStartWhenStartSucceed(); + listener.onStart(Behavior.FAIL_COMPILE); + WaitForStateSubscriber tempFailSubscriber = + new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED)); + clusterController.jobFinish(listener.getJobId(), JobStatus.FAILURE, + Collections.singletonList(new HyracksDataException("Runtime Failure"))); + // recovery is ongoing + listener.onStart(Behavior.STEP_SUCCEED); + tempFailSubscriber.sync(); + WaitForStateSubscriber runningSubscriber = + new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RUNNING)); + WaitForStateSubscriber recoveringSubscriber = + new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RECOVERING)); + recoveringSubscriber.sync(); + listener.onStop(Behavior.SUCCEED); + Action suspend = users[1].suspendActivity(listener); + listener.allowStep(); + runningSubscriber.sync(); + suspend.sync(); + assertSuccess(suspend); + Assert.assertEquals(ActivityState.SUSPENDED, listener.getState()); + // Fix here + listener.onStart(Behavior.FAIL_COMPILE); + tempFailSubscriber = + new WaitForStateSubscriber(listener, Collections.singleton(ActivityState.TEMPORARILY_FAILED)); + recoveringSubscriber = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RECOVERING)); + Action resume = users[1].resumeActivity(listener); + resume.sync(); + assertSuccess(resume); + tempFailSubscriber.sync(); + recoveringSubscriber.sync(); + ActivityState state = listener.getState(); + Assert.assertTrue(state == ActivityState.RECOVERING || state == ActivityState.TEMPORARILY_FAILED); + Assert.assertNotNull(listener.getRecoveryTask()); + runningSubscriber = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RUNNING)); + listener.onStart(Behavior.SUCCEED); + runningSubscriber.sync(); + Assert.assertEquals(ActivityState.RUNNING, listener.getState()); + } + + @SuppressWarnings("deprecation") + @Test + public void testSuspendDuringRecoveryAttemptThatSucceedsThenResumeFailsRuntime() throws Exception { + testStartWhenStartSucceed(); + listener.onStart(Behavior.FAIL_COMPILE); + WaitForStateSubscriber tempFailSubscriber = + new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED)); + clusterController.jobFinish(listener.getJobId(), JobStatus.FAILURE, + Collections.singletonList(new HyracksDataException("Runtime Failure"))); + // recovery is ongoing + listener.onStart(Behavior.STEP_SUCCEED); + tempFailSubscriber.sync(); + WaitForStateSubscriber runningSubscriber = + new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RUNNING)); + WaitForStateSubscriber recoveringSubscriber = + new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RECOVERING)); + recoveringSubscriber.sync(); + listener.onStop(Behavior.SUCCEED); + Action suspend = users[1].suspendActivity(listener); + listener.allowStep(); + runningSubscriber.sync(); + suspend.sync(); + assertSuccess(suspend); + Assert.assertEquals(ActivityState.SUSPENDED, listener.getState()); + // Fix here + listener.onStart(Behavior.FAIL_RUNTIME); + tempFailSubscriber = + new WaitForStateSubscriber(listener, Collections.singleton(ActivityState.TEMPORARILY_FAILED)); + recoveringSubscriber = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RECOVERING)); + Action resume = users[1].resumeActivity(listener); + resume.sync(); + assertSuccess(resume); + tempFailSubscriber.sync(); + recoveringSubscriber.sync(); + ActivityState state = listener.getState(); + Assert.assertTrue(state == ActivityState.RECOVERING || state == ActivityState.TEMPORARILY_FAILED); + Assert.assertNotNull(listener.getRecoveryTask()); + runningSubscriber = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RUNNING)); + listener.onStart(Behavior.SUCCEED); + runningSubscriber.sync(); + Assert.assertEquals(ActivityState.RUNNING, listener.getState()); + } + + @SuppressWarnings("deprecation") + @Test + public void testSuspendDuringRecoveryAttemptThatFailsCompile() throws Exception { + testStartWhenStartSucceed(); + listener.onStart(Behavior.FAIL_COMPILE); + WaitForStateSubscriber tempFailSubscriber = + new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED)); + clusterController.jobFinish(listener.getJobId(), JobStatus.FAILURE, + Collections.singletonList(new HyracksDataException("Runtime Failure"))); + // recovery is ongoing + listener.onStart(Behavior.STEP_FAIL_COMPILE); + tempFailSubscriber.sync(); + WaitForStateSubscriber recoveringSubscriber = + new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RECOVERING)); + recoveringSubscriber.sync(); + Assert.assertEquals(ActivityState.RECOVERING, listener.getState()); + tempFailSubscriber = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED)); + Action suspend = users[1].suspendActivity(listener); + listener.onStart(Behavior.FAIL_COMPILE); + listener.allowStep(); + tempFailSubscriber.sync(); + suspend.sync(); + assertSuccess(suspend); + Assert.assertEquals(ActivityState.SUSPENDED, listener.getState()); + tempFailSubscriber = + new WaitForStateSubscriber(listener, Collections.singleton(ActivityState.TEMPORARILY_FAILED)); + recoveringSubscriber = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RECOVERING)); + Action resume = users[1].resumeActivity(listener); + resume.sync(); + assertSuccess(resume); + tempFailSubscriber.sync(); + recoveringSubscriber.sync(); + ActivityState state = listener.getState(); + Assert.assertTrue(state == ActivityState.RECOVERING || state == ActivityState.TEMPORARILY_FAILED); + Assert.assertNotNull(listener.getRecoveryTask()); + WaitForStateSubscriber runningSubscriber = + new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RUNNING)); + listener.onStart(Behavior.SUCCEED); + runningSubscriber.sync(); + Assert.assertEquals(ActivityState.RUNNING, listener.getState()); + } + + @SuppressWarnings("deprecation") + @Test + public void testSuspendDuringRecoveryAttemptThatFailsRuntime() throws Exception { + testStartWhenStartSucceed(); + listener.onStart(Behavior.FAIL_RUNTIME); + WaitForStateSubscriber tempFailSubscriber = + new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED)); + clusterController.jobFinish(listener.getJobId(), JobStatus.FAILURE, + Collections.singletonList(new HyracksDataException("Runtime Failure"))); + // recovery is ongoing + listener.onStart(Behavior.STEP_FAIL_RUNTIME); + tempFailSubscriber.sync(); + WaitForStateSubscriber recoveringSubscriber = + new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RECOVERING)); + recoveringSubscriber.sync(); + Assert.assertEquals(ActivityState.RECOVERING, listener.getState()); + tempFailSubscriber = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED)); + Action suspend = users[1].suspendActivity(listener); + listener.onStart(Behavior.FAIL_RUNTIME); + listener.allowStep(); + tempFailSubscriber.sync(); + suspend.sync(); + assertSuccess(suspend); + Assert.assertEquals(ActivityState.SUSPENDED, listener.getState()); + tempFailSubscriber = + new WaitForStateSubscriber(listener, Collections.singleton(ActivityState.TEMPORARILY_FAILED)); + recoveringSubscriber = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RECOVERING)); + Action resume = users[1].resumeActivity(listener); + resume.sync(); + assertSuccess(resume); + tempFailSubscriber.sync(); + recoveringSubscriber.sync(); + ActivityState state = listener.getState(); + Assert.assertTrue(state == ActivityState.RECOVERING || state == ActivityState.TEMPORARILY_FAILED); + Assert.assertNotNull(listener.getRecoveryTask()); + WaitForStateSubscriber runningSubscriber = + new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RUNNING)); + listener.onStart(Behavior.SUCCEED); + runningSubscriber.sync(); + Assert.assertEquals(ActivityState.RUNNING, listener.getState()); + } + + @SuppressWarnings("deprecation") + @Test + public void testCreateNewShadowDuringRecoveryAttemptThatSucceeds() throws Exception { + testStartWhenStartSucceed(); + listener.onStart(Behavior.FAIL_COMPILE); + WaitForStateSubscriber tempFailSubscriber = + new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED)); + clusterController.jobFinish(listener.getJobId(), JobStatus.FAILURE, + Collections.singletonList(new HyracksDataException("Runtime Failure"))); + // recovery is ongoing + listener.onStart(Behavior.STEP_SUCCEED); + tempFailSubscriber.sync(); + WaitForStateSubscriber runningSubscriber = + new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RUNNING)); + WaitForStateSubscriber recoveringSubscriber = + new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RECOVERING)); + recoveringSubscriber.sync(); + Dataset newDataset = + new Dataset(dataverseName, "newDataset", null, null, null, null, null, null, null, null, 0, 0); + Action add = users[1].addDataset(newDataset, listener); + listener.allowStep(); + runningSubscriber.sync(); + add.sync(); + assertFailure(add, ErrorCode.CANNOT_ADD_DATASET_TO_ACTIVE_ENTITY); + Assert.assertEquals(2, listener.getDatasets().size()); + Assert.assertEquals(clusterController.getAllDatasets().size(), listener.getDatasets().size()); + } + + @SuppressWarnings("deprecation") + @Test + public void testCreateNewShadowDuringRecoveryAttemptThatFailsCompile() throws Exception { + testStartWhenStartSucceed(); + listener.onStart(Behavior.FAIL_COMPILE); + WaitForStateSubscriber tempFailSubscriber = + new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED)); + clusterController.jobFinish(listener.getJobId(), JobStatus.FAILURE, + Collections.singletonList(new HyracksDataException("Runtime Failure"))); + // recovery is ongoing + listener.onStart(Behavior.STEP_FAIL_COMPILE); + tempFailSubscriber.sync(); + WaitForStateSubscriber recoveringSubscriber = + new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RECOVERING)); + recoveringSubscriber.sync(); + tempFailSubscriber = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED)); + Dataset newDataset = + new Dataset(dataverseName, "newDataset", null, null, null, null, null, null, null, null, 0, 0); + Action add = users[1].addDataset(newDataset, listener); + listener.allowStep(); + tempFailSubscriber.sync(); + add.sync(); + assertFailure(add, ErrorCode.CANNOT_ADD_DATASET_TO_ACTIVE_ENTITY); + Assert.assertEquals(2, listener.getDatasets().size()); + Assert.assertEquals(clusterController.getAllDatasets().size(), listener.getDatasets().size()); + } + + @SuppressWarnings("deprecation") + @Test + public void testCreateNewShadowDuringRecoveryAttemptThatFailsRuntime() throws Exception { + testStartWhenStartSucceed(); + listener.onStart(Behavior.FAIL_COMPILE); + WaitForStateSubscriber tempFailSubscriber = + new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED)); + clusterController.jobFinish(listener.getJobId(), JobStatus.FAILURE, + Collections.singletonList(new HyracksDataException("Runtime Failure"))); + // recovery is ongoing + listener.onStart(Behavior.STEP_FAIL_RUNTIME); + tempFailSubscriber.sync(); + WaitForStateSubscriber recoveringSubscriber = + new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RECOVERING)); + recoveringSubscriber.sync(); + tempFailSubscriber = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED)); + Dataset newDataset = + new Dataset(dataverseName, "newDataset", null, null, null, null, null, null, null, null, 0, 0); + Action add = users[1].addDataset(newDataset, listener); + listener.allowStep(); + tempFailSubscriber.sync(); + add.sync(); + assertFailure(add, ErrorCode.CANNOT_ADD_DATASET_TO_ACTIVE_ENTITY); + Assert.assertEquals(2, listener.getDatasets().size()); + Assert.assertEquals(clusterController.getAllDatasets().size(), listener.getDatasets().size()); + } + + @Test + public void testCreateNewShadowWhileStarting() throws Exception { + Assert.assertEquals(ActivityState.STOPPED, listener.getState()); + listener.onStart(Behavior.STEP_SUCCEED); + Action startAction = users[0].startActivity(listener); + WaitForStateSubscriber subscriber = + new WaitForStateSubscriber(listener, Collections.singleton(ActivityState.STARTING)); + subscriber.sync(); + Dataset newDataset = + new Dataset(dataverseName, "newDataset", null, null, null, null, null, null, null, null, 0, 0); + Action createDatasetAction = users[1].addDataset(newDataset, listener); + listener.allowStep(); + startAction.sync(); + assertSuccess(startAction); + createDatasetAction.sync(); + assertFailure(createDatasetAction, ErrorCode.CANNOT_ADD_DATASET_TO_ACTIVE_ENTITY); + Assert.assertEquals(ActivityState.RUNNING, listener.getState()); + Assert.assertEquals(2, listener.getDatasets().size()); + Assert.assertEquals(clusterController.getAllDatasets().size(), listener.getDatasets().size()); + } + + @Test + public void testCreateNewShadowWhileRunning() throws Exception { + testStartWhenStartSucceed(); + Dataset newDataset = + new Dataset(dataverseName, "newDataset", null, null, null, null, null, null, null, null, 0, 0); + Action createDatasetAction = users[1].addDataset(newDataset, listener); + createDatasetAction.sync(); + assertFailure(createDatasetAction, ErrorCode.CANNOT_ADD_DATASET_TO_ACTIVE_ENTITY); + Assert.assertEquals(ActivityState.RUNNING, listener.getState()); + Assert.assertEquals(2, listener.getDatasets().size()); + Assert.assertEquals(clusterController.getAllDatasets().size(), listener.getDatasets().size()); + } + + @Test + public void testCreateNewShadowWhileSuspended() throws Exception { + testStartWhenStartSucceed(); + // suspend + Assert.assertEquals(ActivityState.RUNNING, listener.getState()); + listener.onStop(Behavior.STEP_SUCCEED); + Action suspension = users[1].suspendActivity(listener); + WaitForStateSubscriber subscriber = + new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.SUSPENDING, ActivityState.SUSPENDED)); + subscriber.sync(); + Dataset newDataset = + new Dataset(dataverseName, "newDataset", null, null, null, null, null, null, null, null, 0, 0); + Action createDatasetAction = users[0].addDataset(newDataset, listener); + listener.allowStep(); + listener.allowStep(); + suspension.sync(); + assertSuccess(suspension); + users[1].resumeActivity(listener); + createDatasetAction.sync(); + assertFailure(createDatasetAction, ErrorCode.CANNOT_ADD_DATASET_TO_ACTIVE_ENTITY); + Assert.assertEquals(ActivityState.RUNNING, listener.getState()); + Assert.assertEquals(2, listener.getDatasets().size()); + Assert.assertEquals(clusterController.getAllDatasets().size(), listener.getDatasets().size()); + } + + @Test + public void testCreateNewShadowWhilePermanentFailure() throws Exception { + testRecoveryFailureAfterOneAttemptCompilationFailure(); + Assert.assertEquals(ActivityState.PERMANENTLY_FAILED, listener.getState()); + Dataset newDataset = + new Dataset(dataverseName, "newDataset", null, null, null, null, null, null, null, null, 0, 0); + Action createDatasetAction = users[0].addDataset(newDataset, listener); + createDatasetAction.sync(); + assertSuccess(createDatasetAction); + Assert.assertEquals(ActivityState.PERMANENTLY_FAILED, listener.getState()); + Assert.assertEquals(3, listener.getDatasets().size()); + Assert.assertEquals(clusterController.getAllDatasets().size(), listener.getDatasets().size()); + } + + @SuppressWarnings("deprecation") + @Test + public void testDeleteShadowDuringRecoveryAttemptThatSucceeds() throws Exception { + testStartWhenStartSucceed(); + listener.onStart(Behavior.FAIL_COMPILE); + WaitForStateSubscriber tempFailSubscriber = + new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED)); + clusterController.jobFinish(listener.getJobId(), JobStatus.FAILURE, + Collections.singletonList(new HyracksDataException("Runtime Failure"))); + // recovery is ongoing + listener.onStart(Behavior.STEP_SUCCEED); + tempFailSubscriber.sync(); + WaitForStateSubscriber runningSubscriber = + new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RUNNING)); + WaitForStateSubscriber recoveringSubscriber = + new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RECOVERING)); + recoveringSubscriber.sync(); + Action drop = users[1].dropDataset(firstDataset, listener); + listener.allowStep(); + runningSubscriber.sync(); + drop.sync(); + assertFailure(drop, ErrorCode.CANNOT_REMOVE_DATASET_FROM_ACTIVE_ENTITY); + Assert.assertEquals(2, listener.getDatasets().size()); + Assert.assertEquals(clusterController.getAllDatasets().size(), listener.getDatasets().size()); + } + + @SuppressWarnings("deprecation") + @Test + public void testDeleteShadowDuringRecoveryAttemptThatFailsCompile() throws Exception { + testStartWhenStartSucceed(); + listener.onStart(Behavior.FAIL_COMPILE); + WaitForStateSubscriber tempFailSubscriber = + new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED)); + clusterController.jobFinish(listener.getJobId(), JobStatus.FAILURE, + Collections.singletonList(new HyracksDataException("Runtime Failure"))); + // recovery is ongoing + listener.onStart(Behavior.STEP_FAIL_COMPILE); + tempFailSubscriber.sync(); + WaitForStateSubscriber recoveringSubscriber = + new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RECOVERING)); + recoveringSubscriber.sync(); + tempFailSubscriber = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED)); + Action drop = users[1].dropDataset(firstDataset, listener); + listener.allowStep(); + tempFailSubscriber.sync(); + drop.sync(); + assertFailure(drop, ErrorCode.CANNOT_REMOVE_DATASET_FROM_ACTIVE_ENTITY); + Assert.assertEquals(2, listener.getDatasets().size()); + Assert.assertEquals(clusterController.getAllDatasets().size(), listener.getDatasets().size()); + } + + @SuppressWarnings("deprecation") + @Test + public void testDeleteShadowDuringRecoveryAttemptThatFailsRuntime() throws Exception { + testStartWhenStartSucceed(); + listener.onStart(Behavior.FAIL_COMPILE); + WaitForStateSubscriber tempFailSubscriber = + new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED)); + clusterController.jobFinish(listener.getJobId(), JobStatus.FAILURE, + Collections.singletonList(new HyracksDataException("Runtime Failure"))); + // recovery is ongoing + listener.onStart(Behavior.STEP_FAIL_RUNTIME); + tempFailSubscriber.sync(); + WaitForStateSubscriber recoveringSubscriber = + new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RECOVERING)); + recoveringSubscriber.sync(); + tempFailSubscriber = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED)); + Action drop = users[1].dropDataset(firstDataset, listener); + listener.allowStep(); + tempFailSubscriber.sync(); + drop.sync(); + assertFailure(drop, ErrorCode.CANNOT_REMOVE_DATASET_FROM_ACTIVE_ENTITY); + Assert.assertEquals(2, listener.getDatasets().size()); + Assert.assertEquals(clusterController.getAllDatasets().size(), listener.getDatasets().size()); + } + + @Test + public void testDeleteShadowWhileStarting() throws Exception { + Assert.assertEquals(ActivityState.STOPPED, listener.getState()); + listener.onStart(Behavior.STEP_SUCCEED); + Action startAction = users[0].startActivity(listener); + WaitForStateSubscriber subscriber = + new WaitForStateSubscriber(listener, Collections.singleton(ActivityState.STARTING)); + subscriber.sync(); + Action dropDatasetAction = users[1].dropDataset(firstDataset, listener); + listener.allowStep(); + startAction.sync(); + assertSuccess(startAction); + dropDatasetAction.sync(); + assertFailure(dropDatasetAction, ErrorCode.CANNOT_REMOVE_DATASET_FROM_ACTIVE_ENTITY); + Assert.assertEquals(ActivityState.RUNNING, listener.getState()); + Assert.assertEquals(2, listener.getDatasets().size()); + Assert.assertEquals(clusterController.getAllDatasets().size(), listener.getDatasets().size()); + } + + @Test + public void testDeleteShadowWhileRunning() throws Exception { + testStartWhenStartSucceed(); + Action dropDatasetAction = users[1].dropDataset(firstDataset, listener); + dropDatasetAction.sync(); + assertFailure(dropDatasetAction, ErrorCode.CANNOT_REMOVE_DATASET_FROM_ACTIVE_ENTITY); + Assert.assertEquals(ActivityState.RUNNING, listener.getState()); + Assert.assertEquals(2, listener.getDatasets().size()); + Assert.assertEquals(clusterController.getAllDatasets().size(), listener.getDatasets().size()); + } + + @Test + public void testDeleteShadowWhilePermanentFailure() throws Exception { + testRecoveryFailureAfterOneAttemptCompilationFailure(); + Assert.assertEquals(ActivityState.PERMANENTLY_FAILED, listener.getState()); + Action dropDatasetAction = users[0].dropDataset(secondDataset, listener); + dropDatasetAction.sync(); + assertSuccess(dropDatasetAction); + Assert.assertEquals(ActivityState.PERMANENTLY_FAILED, listener.getState()); + Assert.assertEquals(1, listener.getDatasets().size()); + Assert.assertEquals(clusterController.getAllDatasets().size(), listener.getDatasets().size()); + } + + @Test + public void testDeleteShadowWhileSuspended() throws Exception { + testStartWhenStartSucceed(); + // suspend + Assert.assertEquals(ActivityState.RUNNING, listener.getState()); + listener.onStop(Behavior.STEP_SUCCEED); + Action suspension = users[1].suspendActivity(listener); + WaitForStateSubscriber subscriber = + new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.SUSPENDING, ActivityState.SUSPENDED)); + subscriber.sync(); + Action dropDatasetAction = users[0].dropDataset(secondDataset, listener); + listener.allowStep(); + listener.allowStep(); + suspension.sync(); + assertSuccess(suspension); + users[1].resumeActivity(listener); + dropDatasetAction.sync(); + assertFailure(dropDatasetAction, ErrorCode.CANNOT_REMOVE_DATASET_FROM_ACTIVE_ENTITY); + Assert.assertEquals(ActivityState.RUNNING, listener.getState()); + Assert.assertEquals(2, listener.getDatasets().size()); + Assert.assertEquals(clusterController.getAllDatasets().size(), listener.getDatasets().size()); + } + + @SuppressWarnings("deprecation") + @Test + public void testCreateNewIndexDuringRecoveryAttemptThatSucceeds() throws Exception { + testStartWhenStartSucceed(); + listener.onStart(Behavior.FAIL_COMPILE); + WaitForStateSubscriber tempFailSubscriber = + new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED)); + clusterController.jobFinish(listener.getJobId(), JobStatus.FAILURE, + Collections.singletonList(new HyracksDataException("Runtime Failure"))); + // recovery is ongoing + listener.onStart(Behavior.STEP_SUCCEED); + tempFailSubscriber.sync(); + WaitForStateSubscriber runningSubscriber = + new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RUNNING)); + WaitForStateSubscriber recoveringSubscriber = + new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RECOVERING)); + recoveringSubscriber.sync(); + Action add = users[1].addIndex(firstDataset, listener); + listener.allowStep(); + runningSubscriber.sync(); + add.sync(); + assertFailure(add, ErrorCode.CANNOT_ADD_INDEX_TO_DATASET_CONNECTED_TO_ACTIVE_ENTITY); + } + + @SuppressWarnings("deprecation") + @Test + public void testCreateNewIndexDuringRecoveryAttemptThatFailsCompile() throws Exception { + testStartWhenStartSucceed(); + listener.onStart(Behavior.FAIL_COMPILE); + WaitForStateSubscriber tempFailSubscriber = + new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED)); + clusterController.jobFinish(listener.getJobId(), JobStatus.FAILURE, + Collections.singletonList(new HyracksDataException("Runtime Failure"))); + // recovery is ongoing + listener.onStart(Behavior.STEP_FAIL_COMPILE); + tempFailSubscriber.sync(); + WaitForStateSubscriber recoveringSubscriber = + new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RECOVERING)); + recoveringSubscriber.sync(); + tempFailSubscriber = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED)); + Action add = users[1].addIndex(firstDataset, listener); + listener.allowStep(); + tempFailSubscriber.sync(); + add.sync(); + assertFailure(add, ErrorCode.CANNOT_ADD_INDEX_TO_DATASET_CONNECTED_TO_ACTIVE_ENTITY); + } + + @SuppressWarnings("deprecation") + @Test + public void testCreateNewIndexDuringRecoveryAttemptThatFailsRuntime() throws Exception { + testStartWhenStartSucceed(); + listener.onStart(Behavior.FAIL_RUNTIME); + WaitForStateSubscriber tempFailSubscriber = + new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED)); + clusterController.jobFinish(listener.getJobId(), JobStatus.FAILURE, + Collections.singletonList(new HyracksDataException("Runtime Failure"))); + // recovery is ongoing + listener.onStart(Behavior.STEP_FAIL_COMPILE); + tempFailSubscriber.sync(); + WaitForStateSubscriber recoveringSubscriber = + new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RECOVERING)); + recoveringSubscriber.sync(); + tempFailSubscriber = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED)); + Action add = users[1].addIndex(firstDataset, listener); + listener.allowStep(); + tempFailSubscriber.sync(); + add.sync(); + assertFailure(add, ErrorCode.CANNOT_ADD_INDEX_TO_DATASET_CONNECTED_TO_ACTIVE_ENTITY); + } + + @Test + public void testCreateNewIndexWhileStarting() throws Exception { + Assert.assertEquals(ActivityState.STOPPED, listener.getState()); + listener.onStart(Behavior.STEP_SUCCEED); + Action startAction = users[0].startActivity(listener); + WaitForStateSubscriber subscriber = + new WaitForStateSubscriber(listener, Collections.singleton(ActivityState.STARTING)); + subscriber.sync(); + Action add = users[1].addIndex(firstDataset, listener); + listener.allowStep(); + startAction.sync(); + assertSuccess(startAction); + add.sync(); + assertFailure(add, ErrorCode.CANNOT_ADD_INDEX_TO_DATASET_CONNECTED_TO_ACTIVE_ENTITY); + Assert.assertEquals(ActivityState.RUNNING, listener.getState()); + } + + @Test + public void testCreateNewIndexWhileRunning() throws Exception { + testStartWhenStartSucceed(); + Action add = users[1].addIndex(firstDataset, listener); + add.sync(); + assertFailure(add, ErrorCode.CANNOT_ADD_INDEX_TO_DATASET_CONNECTED_TO_ACTIVE_ENTITY); + } + + @Test + public void testCreateNewIndexWhilePermanentFailure() throws Exception { + testRecoveryFailureAfterOneAttemptCompilationFailure(); + Assert.assertEquals(ActivityState.PERMANENTLY_FAILED, listener.getState()); + Action add = users[1].addIndex(firstDataset, listener); + add.sync(); + assertSuccess(add); + } + + @Test + public void testCreateNewIndexWhileSuspended() throws Exception { + testStartWhenStartSucceed(); + // suspend + Assert.assertEquals(ActivityState.RUNNING, listener.getState()); + listener.onStop(Behavior.STEP_SUCCEED); + Action suspension = users[1].suspendActivity(listener); + WaitForStateSubscriber subscriber = + new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.SUSPENDING, ActivityState.SUSPENDED)); + subscriber.sync(); + Action add = users[0].addIndex(firstDataset, listener); + listener.allowStep(); + listener.allowStep(); + suspension.sync(); + assertSuccess(suspension); + users[1].resumeActivity(listener); + add.sync(); + assertFailure(add, ErrorCode.CANNOT_ADD_INDEX_TO_DATASET_CONNECTED_TO_ACTIVE_ENTITY); + } + + @SuppressWarnings("deprecation") + @Test + public void testDeleteIndexDuringRecoveryAttemptThatSucceeds() throws Exception { + testStartWhenStartSucceed(); + listener.onStart(Behavior.FAIL_COMPILE); + WaitForStateSubscriber tempFailSubscriber = + new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED)); + clusterController.jobFinish(listener.getJobId(), JobStatus.FAILURE, + Collections.singletonList(new HyracksDataException("Runtime Failure"))); + // recovery is ongoing + listener.onStart(Behavior.STEP_SUCCEED); + tempFailSubscriber.sync(); + WaitForStateSubscriber runningSubscriber = + new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RUNNING)); + WaitForStateSubscriber recoveringSubscriber = + new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RECOVERING)); + recoveringSubscriber.sync(); + Action drop = users[1].dropIndex(firstDataset, listener); + listener.allowStep(); + runningSubscriber.sync(); + drop.sync(); + assertFailure(drop, ErrorCode.CANNOT_REMOVE_INDEX_FROM_DATASET_CONNECTED_TO_ACTIVE_ENTITY); + } + + @SuppressWarnings("deprecation") + @Test + public void testDeleteIndexDuringRecoveryAttemptThatFailsCompile() throws Exception { + testStartWhenStartSucceed(); + listener.onStart(Behavior.FAIL_COMPILE); + WaitForStateSubscriber tempFailSubscriber = + new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED)); + clusterController.jobFinish(listener.getJobId(), JobStatus.FAILURE, + Collections.singletonList(new HyracksDataException("Runtime Failure"))); + // recovery is ongoing + listener.onStart(Behavior.STEP_FAIL_COMPILE); + tempFailSubscriber.sync(); + WaitForStateSubscriber recoveringSubscriber = + new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RECOVERING)); + recoveringSubscriber.sync(); + tempFailSubscriber = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED)); + Action drop = users[1].dropIndex(firstDataset, listener); + listener.allowStep(); + tempFailSubscriber.sync(); + drop.sync(); + assertFailure(drop, ErrorCode.CANNOT_REMOVE_INDEX_FROM_DATASET_CONNECTED_TO_ACTIVE_ENTITY); + } + + @SuppressWarnings("deprecation") + @Test + public void testDeleteIndexDuringRecoveryAttemptThatFailsRuntime() throws Exception { + testStartWhenStartSucceed(); + listener.onStart(Behavior.FAIL_COMPILE); + WaitForStateSubscriber tempFailSubscriber = + new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED)); + clusterController.jobFinish(listener.getJobId(), JobStatus.FAILURE, + Collections.singletonList(new HyracksDataException("Runtime Failure"))); + // recovery is ongoing + listener.onStart(Behavior.STEP_FAIL_RUNTIME); + tempFailSubscriber.sync(); + WaitForStateSubscriber recoveringSubscriber = + new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.RECOVERING)); + recoveringSubscriber.sync(); + tempFailSubscriber = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED)); + Action drop = users[1].dropIndex(firstDataset, listener); + listener.allowStep(); + tempFailSubscriber.sync(); + drop.sync(); + assertFailure(drop, ErrorCode.CANNOT_REMOVE_INDEX_FROM_DATASET_CONNECTED_TO_ACTIVE_ENTITY); + } + + @Test + public void testDeleteIndexwWhileStarting() throws Exception { + Assert.assertEquals(ActivityState.STOPPED, listener.getState()); + listener.onStart(Behavior.STEP_SUCCEED); + Action startAction = users[0].startActivity(listener); + WaitForStateSubscriber subscriber = + new WaitForStateSubscriber(listener, Collections.singleton(ActivityState.STARTING)); + subscriber.sync(); + Action drop = users[1].dropIndex(firstDataset, listener); + listener.allowStep(); + startAction.sync(); + assertSuccess(startAction); + drop.sync(); + assertFailure(drop, ErrorCode.CANNOT_REMOVE_INDEX_FROM_DATASET_CONNECTED_TO_ACTIVE_ENTITY); + Assert.assertEquals(ActivityState.RUNNING, listener.getState()); + } + + @Test + public void testDeleteIndexWhileRunning() throws Exception { + testStartWhenStartSucceed(); + Action drop = users[1].dropIndex(firstDataset, listener); + drop.sync(); + assertFailure(drop, ErrorCode.CANNOT_REMOVE_INDEX_FROM_DATASET_CONNECTED_TO_ACTIVE_ENTITY); + Assert.assertEquals(ActivityState.RUNNING, listener.getState()); + } + + @Test + public void testDeleteIndexWhilePermanentFailure() throws Exception { + testRecoveryFailureAfterOneAttemptCompilationFailure(); + Assert.assertEquals(ActivityState.PERMANENTLY_FAILED, listener.getState()); + Action drop = users[1].dropIndex(firstDataset, listener); + drop.sync(); + assertSuccess(drop); + } + + @Test + public void testDeleteIndexWhileSuspended() throws Exception { + testStartWhenStartSucceed(); + // suspend + Assert.assertEquals(ActivityState.RUNNING, listener.getState()); + listener.onStop(Behavior.STEP_SUCCEED); + Action suspension = users[1].suspendActivity(listener); + WaitForStateSubscriber subscriber = + new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.SUSPENDING, ActivityState.SUSPENDED)); + subscriber.sync(); + Action drop = users[0].dropIndex(firstDataset, listener); + listener.allowStep(); + listener.allowStep(); + suspension.sync(); + assertSuccess(suspension); + users[1].resumeActivity(listener); + drop.sync(); + assertFailure(drop, ErrorCode.CANNOT_REMOVE_INDEX_FROM_DATASET_CONNECTED_TO_ACTIVE_ENTITY); + } + + @Test + public void testSuspendedAllActivities() throws Exception { + TestEventsListener[] additionalListeners = new TestEventsListener[3]; + for (int i = 0; i < additionalListeners.length; i++) { + String entityName = "entityName" + i; + EntityId entityId = new EntityId(Feed.EXTENSION_NAME, dataverseName, entityName); + ClusterControllerService ccService = Mockito.mock(ClusterControllerService.class); + CCServiceContext ccServiceCtx = Mockito.mock(CCServiceContext.class); + CcApplicationContext ccAppCtx = Mockito.mock(CcApplicationContext.class); + IStatementExecutor statementExecutor = Mockito.mock(IStatementExecutor.class); + IHyracksClientConnection hcc = Mockito.mock(IHyracksClientConnection.class); + Mockito.when(ccAppCtx.getActiveNotificationHandler()).thenReturn(handler); + Mockito.when(ccAppCtx.getMetadataLockManager()).thenReturn(lockManager); + Mockito.when(ccAppCtx.getServiceContext()).thenReturn(ccServiceCtx); + Mockito.when(ccAppCtx.getClusterStateManager()).thenReturn(clusterStateManager); + Mockito.when(ccServiceCtx.getControllerService()).thenReturn(ccService); + Mockito.when(ccService.getExecutor()).thenReturn(executor); + Mockito.when(ccAppCtx.getStorageComponentProvider()).thenReturn(componentProvider); + AlgebricksAbsolutePartitionConstraint locations = new AlgebricksAbsolutePartitionConstraint(nodes); + MetadataProvider metadataProvider = new MetadataProvider(ccAppCtx, null); + metadataProvider.setConfig(new HashMap<>()); + additionalListeners[i] = listener = new TestEventsListener(clusterController, nodeControllers, jobIdFactory, + entityId, new ArrayList<>(allDatasets), statementExecutor, ccAppCtx, hcc, locations, + new InfiniteRetryPolicyFactory()); + } + Action suspension = users[0].suspendAllActivities(handler); + suspension.sync(); + assertSuccess(suspension); + Action query = users[1].query(firstDataset, new Semaphore(1)); + query.sync(); + assertSuccess(query); + Dataset newDataset = + new Dataset(dataverseName, "newDataset", null, null, null, null, null, null, null, null, 0, 0); + Action addDataset = users[1].addDataset(newDataset, listener); + // blocked by suspension + Assert.assertFalse(addDataset.isDone()); + Action resumption = users[0].resumeAllActivities(handler); + resumption.sync(); + assertSuccess(resumption); + addDataset.sync(); + assertSuccess(addDataset); + } + + private void assertFailure(Action action, int errorCode) throws Exception { + HyracksDataException exception = action.getFailure(); + try { + Assert.assertTrue(action.hasFailed()); + Assert.assertNotNull(exception); + Assert.assertEquals(errorCode, exception.getErrorCode()); + } catch (Exception e) { + throw new Exception("Expected failure: " + errorCode + ". Found failure: " + exception); + } + } + + private void assertSuccess(Action action) throws Exception { + if (action.hasFailed()) { + System.err.println("Action failed while it was expected to succeed"); + action.getFailure().printStackTrace(); + throw action.getFailure(); + } + } +}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java index 956d111..f8baa0e 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java @@ -21,23 +21,29 @@ package org.apache.asterix.test.active; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; -import org.apache.asterix.active.ActiveJobNotificationHandler; -import org.apache.asterix.active.ActiveLifecycleListener; import org.apache.asterix.active.ActiveRuntimeId; import org.apache.asterix.active.ActivityState; import org.apache.asterix.active.EntityId; import org.apache.asterix.active.IActiveRuntime; +import org.apache.asterix.active.NoRetryPolicyFactory; import org.apache.asterix.active.message.ActivePartitionMessage; +import org.apache.asterix.algebra.base.ILangExtension.Language; +import org.apache.asterix.app.active.ActiveEntityEventsListener; +import org.apache.asterix.app.active.ActiveNotificationHandler; +import org.apache.asterix.app.cc.CCExtensionManager; import org.apache.asterix.app.nc.NCAppRuntimeContext; import org.apache.asterix.common.exceptions.ErrorCode; -import org.apache.asterix.common.metadata.IDataset; -import org.apache.asterix.external.feed.management.ActiveEntityEventsListener; import org.apache.asterix.external.feed.watch.WaitForStateSubscriber; import org.apache.asterix.external.operators.FeedIntakeOperatorNodePushable; +import org.apache.asterix.metadata.declared.MetadataProvider; +import org.apache.asterix.metadata.entities.Dataset; import org.apache.asterix.runtime.utils.CcApplicationContext; import org.apache.asterix.test.runtime.ExecutionTestUtil; +import org.apache.asterix.translator.IStatementExecutor; +import org.apache.asterix.translator.SessionOutput; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.JobId; @@ -65,14 +71,14 @@ public class ActiveStatsTest { EntityId entityId = new EntityId("MockExtension", "MockDataverse", "MockEntity"); ActiveRuntimeId activeRuntimeId = new ActiveRuntimeId(entityId, FeedIntakeOperatorNodePushable.class.getSimpleName(), 0); - List<IDataset> datasetList = new ArrayList<>(); + List<Dataset> datasetList = new ArrayList<>(); AlgebricksAbsolutePartitionConstraint partitionConstraint = new AlgebricksAbsolutePartitionConstraint(new String[] { "asterix_nc1" }); String requestedStats; CcApplicationContext appCtx = (CcApplicationContext) ExecutionTestUtil.integrationUtil.cc.getApplicationContext(); - ActiveLifecycleListener activeLifecycleListener = (ActiveLifecycleListener) appCtx.getActiveLifecycleListener(); - ActiveJobNotificationHandler activeJobNotificationHandler = activeLifecycleListener.getNotificationHandler(); + ActiveNotificationHandler activeJobNotificationHandler = + (ActiveNotificationHandler) appCtx.getActiveNotificationHandler(); JobId jobId = new JobId(1); // Mock ActiveRuntime @@ -82,14 +88,19 @@ public class ActiveStatsTest { // Mock JobSpecification JobSpecification jobSpec = Mockito.mock(JobSpecification.class); - Mockito.when(jobSpec.getProperty(ActiveJobNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME)) - .thenReturn(entityId); - + Mockito.when(jobSpec.getProperty(ActiveNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME)).thenReturn(entityId); + + // Mock MetadataProvider + CCExtensionManager extensionManager = (CCExtensionManager) appCtx.getExtensionManager(); + IStatementExecutor statementExecutor = extensionManager + .getStatementExecutorFactory(appCtx.getServiceContext().getControllerService().getExecutor()) + .create(appCtx, Collections.emptyList(), Mockito.mock(SessionOutput.class), + extensionManager.getCompilationProvider(Language.SQLPP), appCtx.getStorageComponentProvider()); + MetadataProvider mdProvider = new MetadataProvider(appCtx, null); // Add event listener - ActiveEntityEventsListener eventsListener = new ActiveEntityEventsListener(appCtx, entityId, datasetList, - partitionConstraint, FeedIntakeOperatorNodePushable.class.getSimpleName()); - activeJobNotificationHandler.registerListener(eventsListener); - + ActiveEntityEventsListener eventsListener = new DummyFeedEventsListener(statementExecutor, appCtx, null, + entityId, datasetList, partitionConstraint, FeedIntakeOperatorNodePushable.class.getSimpleName(), + NoRetryPolicyFactory.INSTANCE, null, Collections.emptyList()); // Register mock runtime NCAppRuntimeContext nc1AppCtx = (NCAppRuntimeContext) ExecutionTestUtil.integrationUtil.ncs[0].getApplicationContext(); @@ -103,26 +114,30 @@ public class ActiveStatsTest { eventsListener.refreshStats(1000); requestedStats = eventsListener.getStats(); Assert.assertTrue(requestedStats.contains("N/A")); - WaitForStateSubscriber startingSubscriber = new WaitForStateSubscriber(eventsListener, ActivityState.STARTING); - eventsListener.subscribe(startingSubscriber); + WaitForStateSubscriber startingSubscriber = + new WaitForStateSubscriber(eventsListener, Collections.singleton(ActivityState.STARTING)); // Update stats of created/started job without joined partition - activeJobNotificationHandler.notifyJobCreation(jobId, jobSpec); - activeLifecycleListener.notifyJobStart(jobId); + TestUserActor user = new TestUserActor("Xikui", mdProvider, null); + Action start = user.startActivity(eventsListener); startingSubscriber.sync(); + activeJobNotificationHandler.notifyJobCreation(jobId, jobSpec); + activeJobNotificationHandler.notifyJobStart(jobId); eventsListener.refreshStats(1000); requestedStats = eventsListener.getStats(); Assert.assertTrue(requestedStats.contains("N/A")); - // Fake partition message and notify eventListener - WaitForStateSubscriber startedSubscriber = new WaitForStateSubscriber(eventsListener, ActivityState.STARTED); - eventsListener.subscribe(startedSubscriber); ActivePartitionMessage partitionMessage = new ActivePartitionMessage(activeRuntimeId, jobId, ActivePartitionMessage.ACTIVE_RUNTIME_REGISTERED, null); partitionMessage.handle(appCtx); - startedSubscriber.sync(); + start.sync(); + if (start.hasFailed()) { + throw start.getFailure(); + } eventsListener.refreshStats(100000); requestedStats = eventsListener.getStats(); - Assert.assertTrue(requestedStats.contains(EXPECTED_STATS)); + if (!requestedStats.contains(EXPECTED_STATS)) { + throw new Exception("Expected stats to contain " + EXPECTED_STATS + " but found stats = " + requestedStats); + } ObjectMapper objectMapper = new ObjectMapper(); try { objectMapper.readTree(requestedStats); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Actor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Actor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Actor.java new file mode 100644 index 0000000..3f68651 --- /dev/null +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Actor.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.test.active; + +import org.apache.asterix.active.SingleThreadEventProcessor; +import org.apache.asterix.metadata.declared.MetadataProvider; + +class Actor extends SingleThreadEventProcessor<Action> { + + private final MetadataProvider actorMdProvider; + + public Actor(String name, MetadataProvider metadataProvider) { + super(Actor.class.getSimpleName() + ":" + name); + this.actorMdProvider = metadataProvider; + } + + @Override + protected void handle(Action action) throws Exception { + action.execute(actorMdProvider); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/DummyFeedEventsListener.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/DummyFeedEventsListener.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/DummyFeedEventsListener.java new file mode 100644 index 0000000..961b731 --- /dev/null +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/DummyFeedEventsListener.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.test.active; + +import java.util.Collections; +import java.util.EnumSet; +import java.util.List; + +import org.apache.asterix.active.ActivityState; +import org.apache.asterix.active.EntityId; +import org.apache.asterix.active.IActiveEntityEventSubscriber; +import org.apache.asterix.active.IRetryPolicyFactory; +import org.apache.asterix.app.active.FeedEventsListener; +import org.apache.asterix.common.dataflow.ICcApplicationContext; +import org.apache.asterix.external.feed.watch.WaitForStateSubscriber; +import org.apache.asterix.metadata.declared.MetadataProvider; +import org.apache.asterix.metadata.entities.Dataset; +import org.apache.asterix.metadata.entities.Feed; +import org.apache.asterix.metadata.entities.FeedConnection; +import org.apache.asterix.translator.IStatementExecutor; +import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.api.client.IHyracksClientConnection; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public class DummyFeedEventsListener extends FeedEventsListener { + + public DummyFeedEventsListener(IStatementExecutor statementExecutor, ICcApplicationContext appCtx, + IHyracksClientConnection hcc, EntityId entityId, List<Dataset> datasets, + AlgebricksAbsolutePartitionConstraint locations, String runtimeName, IRetryPolicyFactory retryPolicyFactory, + Feed feed, List<FeedConnection> feedConnections) throws HyracksDataException { + super(statementExecutor, appCtx, hcc, entityId, datasets, locations, runtimeName, retryPolicyFactory, feed, + feedConnections); + } + + @Override + protected void doStart(MetadataProvider metadataProvider) throws HyracksDataException, AlgebricksException { + IActiveEntityEventSubscriber eventSubscriber = + new WaitForStateSubscriber(this, Collections.singleton(ActivityState.RUNNING)); + try { + eventSubscriber.sync(); + } catch (Exception e) { + throw HyracksDataException.create(e); + } + } + + @Override + protected Void doStop(MetadataProvider metadataProvider) throws HyracksDataException, AlgebricksException { + IActiveEntityEventSubscriber eventSubscriber = + new WaitForStateSubscriber(this, EnumSet.of(ActivityState.RUNNING, ActivityState.PERMANENTLY_FAILED)); + try { + eventSubscriber.sync(); + } catch (Exception e) { + throw HyracksDataException.create(e); + } + return null; + } +}
