Repository: asterixdb Updated Branches: refs/heads/master 9c2e9f0ef -> 73983153c
[ASTERIXDB-1970][ING] Fix Active Stats Test - user model changes: no - storage format changes: no - interface changes: no details: - Active Stats Test fails because it issues many async calls and doesn't wait until the calls complete. In addition, it bypass the active event inbox incorrectly. Change-Id: I518a6b1f7d8e86703ee5537869d207e609a7c293 Reviewed-on: https://asterix-gerrit.ics.uci.edu/1865 Sonar-Qube: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> BAD: Jenkins <[email protected]> Reviewed-by: Xikui Wang <[email protected]> Integration-Tests: Jenkins <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/73983153 Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/73983153 Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/73983153 Branch: refs/heads/master Commit: 73983153c8bef1ce651047f5fc9def55aea7ec5d Parents: 9c2e9f0 Author: Abdullah Alamoudi <[email protected]> Authored: Sun Jul 2 12:31:16 2017 -0700 Committer: abdullah alamoudi <[email protected]> Committed: Sun Jul 2 13:51:59 2017 -0700 ---------------------------------------------------------------------- .../active/ActiveJobNotificationHandler.java | 1 + .../asterix/test/active/ActiveMessageTest.java | 141 ------------------ .../asterix/test/active/ActiveStatsTest.java | 144 +++++++++++++++++++ .../asterix/common/exceptions/ErrorCode.java | 4 +- .../main/resources/asx_errormsg/en.properties | 3 +- .../management/ActiveEntityEventsListener.java | 4 + .../feed/watch/WaitForStateSubscriber.java | 3 - 7 files changed, 151 insertions(+), 149 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/73983153/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java index b4ed8e5..d2b8a89 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java @@ -107,6 +107,7 @@ public class ActiveJobNotificationHandler implements Runnable { LOGGER.log(Level.FINER, "Job was found to be: " + (found ? "Active" : "Inactive")); IActiveEntityEventsListener listener = entityEventListeners.get(entityId); if (listener != null) { + // It is okay to bypass the event inbox in this case because we know this is the first event for this entity listener.notify(new ActiveEvent(jobId, Kind.JOB_CREATED, entityId, jobSpecification)); } LOGGER.log(Level.FINER, "Listener was notified" + jobId); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/73983153/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveMessageTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveMessageTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveMessageTest.java deleted file mode 100644 index 2dc1782..0000000 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveMessageTest.java +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.asterix.test.active; - -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.asterix.active.ActiveEvent; -import org.apache.asterix.active.ActiveJobNotificationHandler; -import org.apache.asterix.active.ActiveLifecycleListener; -import org.apache.asterix.active.ActiveRuntimeId; -import org.apache.asterix.active.EntityId; -import org.apache.asterix.active.IActiveRuntime; -import org.apache.asterix.active.message.ActivePartitionMessage; -import org.apache.asterix.app.nc.NCAppRuntimeContext; -import org.apache.asterix.common.exceptions.ErrorCode; -import org.apache.asterix.common.metadata.IDataset; -import org.apache.asterix.external.feed.management.ActiveEntityEventsListener; -import org.apache.asterix.external.operators.FeedIntakeOperatorNodePushable; -import org.apache.asterix.runtime.utils.CcApplicationContext; -import org.apache.asterix.test.runtime.ExecutionTestUtil; -import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.api.exceptions.HyracksException; -import org.apache.hyracks.api.job.JobId; -import org.apache.hyracks.api.job.JobSpecification; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.mockito.Mockito; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -public class ActiveMessageTest { - - protected boolean cleanUp = true; - private static String EXPECTED_STATS = "Mock stats"; - - @Before - public void setUp() throws Exception { - ExecutionTestUtil.setUp(cleanUp); - } - - @Test - public void refreshStatsTest() throws HyracksException { - // Entities to be used - EntityId entityId = new EntityId("MockExtension", "MockDataverse", "MockEntity"); - ActiveRuntimeId activeRuntimeId = - new ActiveRuntimeId(entityId, FeedIntakeOperatorNodePushable.class.getSimpleName(), 0); - List<IDataset> datasetList = new ArrayList<>(); - AlgebricksAbsolutePartitionConstraint partitionConstraint = - new AlgebricksAbsolutePartitionConstraint(new String[] { "asterix_nc1" }); - String requestedStats; - CcApplicationContext appCtx = - (CcApplicationContext) ExecutionTestUtil.integrationUtil.cc.getApplicationContext(); - ActiveLifecycleListener activeLifecycleListener = (ActiveLifecycleListener) appCtx.getActiveLifecycleListener(); - ActiveJobNotificationHandler activeJobNotificationHandler = activeLifecycleListener.getNotificationHandler(); - JobId jobId = new JobId(1); - - // Mock ActiveRuntime - IActiveRuntime mockRuntime = Mockito.mock(IActiveRuntime.class); - Mockito.when(mockRuntime.getRuntimeId()).thenReturn(activeRuntimeId); - Mockito.when(mockRuntime.getStats()).thenReturn(EXPECTED_STATS); - - // Mock JobSpecification - JobSpecification jobSpec = Mockito.mock(JobSpecification.class); - Mockito.when(jobSpec.getProperty(ActiveJobNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME)) - .thenReturn(entityId); - - // Add event listener - ActiveEntityEventsListener eventsListener = - new ActiveEntityEventsListener(appCtx, entityId, datasetList, partitionConstraint, - FeedIntakeOperatorNodePushable.class.getSimpleName()); - activeJobNotificationHandler.registerListener(eventsListener); - - // Register mock runtime - NCAppRuntimeContext nc1AppCtx = - (NCAppRuntimeContext) ExecutionTestUtil.integrationUtil.ncs[0].getApplicationContext(); - nc1AppCtx.getActiveManager().registerRuntime(mockRuntime); - - // Check init stats - requestedStats = eventsListener.getStats(); - Assert.assertTrue(requestedStats.equals("N/A")); - - // Update stats of not-started job - eventsListener.refreshStats(1000); - requestedStats = eventsListener.getStats(); - Assert.assertTrue(requestedStats.equals("N/A")); - - // Update stats of created/started job without joined partition - activeJobNotificationHandler.notifyJobCreation(jobId, jobSpec); - activeLifecycleListener.notifyJobStart(jobId); - eventsListener.refreshStats(1000); - requestedStats = eventsListener.getStats(); - Assert.assertTrue(requestedStats.equals("N/A")); - - // Fake partition message and notify eventListener - ActivePartitionMessage partitionMessage = - new ActivePartitionMessage(activeRuntimeId, jobId, ActivePartitionMessage.ACTIVE_RUNTIME_REGISTERED, - null); - eventsListener.notify(new ActiveEvent(jobId, ActiveEvent.Kind.PARTITION_EVENT, entityId, partitionMessage)); - eventsListener.refreshStats(100000); - requestedStats = eventsListener.getStats(); - Assert.assertTrue(requestedStats.contains(EXPECTED_STATS)); - ObjectMapper objectMapper = new ObjectMapper(); - try { - objectMapper.readTree(requestedStats); - } catch (IOException e) { - e.printStackTrace(); - Assert.fail(); - } - // Ask for runtime that is not registered - HyracksDataException expectedException = null; - nc1AppCtx.getActiveManager().deregisterRuntime(activeRuntimeId); - try { - eventsListener.refreshStats(100000); - } catch (HyracksDataException e) { - expectedException = e; - } - Assert.assertTrue(expectedException != null - && expectedException.getErrorCode() == ErrorCode.ACTIVE_MANAGER_INVALID_RUNTIME); - } - -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/73983153/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java new file mode 100644 index 0000000..e932006 --- /dev/null +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.test.active; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.asterix.active.ActiveJobNotificationHandler; +import org.apache.asterix.active.ActiveLifecycleListener; +import org.apache.asterix.active.ActiveRuntimeId; +import org.apache.asterix.active.ActivityState; +import org.apache.asterix.active.EntityId; +import org.apache.asterix.active.IActiveRuntime; +import org.apache.asterix.active.message.ActivePartitionMessage; +import org.apache.asterix.app.nc.NCAppRuntimeContext; +import org.apache.asterix.common.exceptions.ErrorCode; +import org.apache.asterix.common.metadata.IDataset; +import org.apache.asterix.external.feed.management.ActiveEntityEventsListener; +import org.apache.asterix.external.feed.watch.WaitForStateSubscriber; +import org.apache.asterix.external.operators.FeedIntakeOperatorNodePushable; +import org.apache.asterix.runtime.utils.CcApplicationContext; +import org.apache.asterix.test.runtime.ExecutionTestUtil; +import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.api.job.JobSpecification; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import com.fasterxml.jackson.databind.ObjectMapper; + +public class ActiveStatsTest { + + protected boolean cleanUp = true; + private static String EXPECTED_STATS = "Mock stats"; + + @Before + public void setUp() throws Exception { + ExecutionTestUtil.setUp(cleanUp); + } + + @Test + public void refreshStatsTest() throws Exception { + // Entities to be used + EntityId entityId = new EntityId("MockExtension", "MockDataverse", "MockEntity"); + ActiveRuntimeId activeRuntimeId = + new ActiveRuntimeId(entityId, FeedIntakeOperatorNodePushable.class.getSimpleName(), 0); + List<IDataset> datasetList = new ArrayList<>(); + AlgebricksAbsolutePartitionConstraint partitionConstraint = + new AlgebricksAbsolutePartitionConstraint(new String[] { "asterix_nc1" }); + String requestedStats; + CcApplicationContext appCtx = + (CcApplicationContext) ExecutionTestUtil.integrationUtil.cc.getApplicationContext(); + ActiveLifecycleListener activeLifecycleListener = (ActiveLifecycleListener) appCtx.getActiveLifecycleListener(); + ActiveJobNotificationHandler activeJobNotificationHandler = activeLifecycleListener.getNotificationHandler(); + JobId jobId = new JobId(1); + + // Mock ActiveRuntime + IActiveRuntime mockRuntime = Mockito.mock(IActiveRuntime.class); + Mockito.when(mockRuntime.getRuntimeId()).thenReturn(activeRuntimeId); + Mockito.when(mockRuntime.getStats()).thenReturn(EXPECTED_STATS); + + // Mock JobSpecification + JobSpecification jobSpec = Mockito.mock(JobSpecification.class); + Mockito.when(jobSpec.getProperty(ActiveJobNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME)) + .thenReturn(entityId); + + // Add event listener + ActiveEntityEventsListener eventsListener = new ActiveEntityEventsListener(appCtx, entityId, datasetList, + partitionConstraint, FeedIntakeOperatorNodePushable.class.getSimpleName()); + activeJobNotificationHandler.registerListener(eventsListener); + + // Register mock runtime + NCAppRuntimeContext nc1AppCtx = + (NCAppRuntimeContext) ExecutionTestUtil.integrationUtil.ncs[0].getApplicationContext(); + nc1AppCtx.getActiveManager().registerRuntime(mockRuntime); + + // Check init stats + requestedStats = eventsListener.getStats(); + Assert.assertTrue(requestedStats.equals("N/A")); + + // Update stats of not-started job + eventsListener.refreshStats(1000); + requestedStats = eventsListener.getStats(); + Assert.assertTrue(requestedStats.equals("N/A")); + WaitForStateSubscriber startingSubscriber = new WaitForStateSubscriber(eventsListener, ActivityState.STARTING); + eventsListener.subscribe(startingSubscriber); + // Update stats of created/started job without joined partition + activeJobNotificationHandler.notifyJobCreation(jobId, jobSpec); + activeLifecycleListener.notifyJobStart(jobId); + startingSubscriber.sync(); + eventsListener.refreshStats(1000); + requestedStats = eventsListener.getStats(); + Assert.assertTrue(requestedStats.equals("N/A")); + // Fake partition message and notify eventListener + WaitForStateSubscriber startedSubscriber = new WaitForStateSubscriber(eventsListener, ActivityState.STARTED); + eventsListener.subscribe(startedSubscriber); + ActivePartitionMessage partitionMessage = new ActivePartitionMessage(activeRuntimeId, jobId, + ActivePartitionMessage.ACTIVE_RUNTIME_REGISTERED, null); + partitionMessage.handle(appCtx); + startedSubscriber.sync(); + eventsListener.refreshStats(100000); + requestedStats = eventsListener.getStats(); + Assert.assertTrue(requestedStats.contains(EXPECTED_STATS)); + ObjectMapper objectMapper = new ObjectMapper(); + try { + objectMapper.readTree(requestedStats); + } catch (IOException e) { + e.printStackTrace(); + Assert.fail(); + } + // Ask for runtime that is not registered + HyracksDataException expectedException = null; + nc1AppCtx.getActiveManager().deregisterRuntime(activeRuntimeId); + try { + eventsListener.refreshStats(100000); + } catch (HyracksDataException e) { + expectedException = e; + } + Assert.assertNotNull(expectedException); + Assert.assertEquals(ErrorCode.ACTIVE_MANAGER_INVALID_RUNTIME, expectedException.getErrorCode()); + } + +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/73983153/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java index d264008..7c82ca3 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java @@ -68,7 +68,6 @@ public class ErrorCode { public static final int POLYGON_3_POINTS = 25; public static final int POLYGON_INVALID = 26; - public static final int INSTANTIATION_ERROR = 100; // Compilation errors @@ -195,12 +194,11 @@ public class ErrorCode { public static final int FEED_METADATA_SOCKET_ADAPTOR_SOCKET_NOT_PROPERLY_CONFIGURED = 3081; public static final int FEED_METADATA_SOCKET_ADAPTOR_SOCKET_INVALID_HOST_NC = 3082; public static final int PROVIDER_DATASOURCE_FACTORY_DUPLICATE_FORMAT_MAPPING = 3083; - public static final int CANNOT_WAIT_FOR_STATE = 3084; + public static final int CANNOT_SUBSCRIBE_TO_FAILED_ACTIVE_ENTITY = 3084; public static final int FEED_UNKNOWN_ADAPTER_NAME = 3085; public static final int PROVIDER_STREAM_RECORD_READER_WRONG_CONFIGURATION = 3086; public static final int FEED_CONNECT_FEED_APPLIED_INVALID_FUNCTION = 3087; public static final int ACTIVE_MANAGER_INVALID_RUNTIME = 3088; - public static final int CANNOT_SUBSCRIBE_TO_FAILED_ACTIVE_ENTITY = 3089; // Lifecycle management errors public static final int DUPLICATE_PARTITION_ID = 4000; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/73983153/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties index 11334f1..facf1a9 100644 --- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties +++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties @@ -183,12 +183,11 @@ 3081 = socket is not properly configured 3082 = Invalid %1$s %2$s as it is not part of the AsterixDB cluster. Valid choices are %3$s 3083 = Duplicate feed adaptor name: %1$s -3084 = Cannot wait for state %1$s. The only states that can be waited for are STARTED or STOPPED +3084 = Cannot subscribe to events of a failed active entity 3085 = Unknown Adapter Name. 3086 = Cannot find record reader %1$s with specified configuration 3087 = Cannot find function %1$s 3088 = %1$s is not a valid runtime Id -3089 = Cannot subscribe to events of a failed active entity # Lifecycle management errors 4000 = Partition id %1$d for node %2$s already in use by node %3$s http://git-wip-us.apache.org/repos/asf/asterixdb/blob/73983153/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ActiveEntityEventsListener.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ActiveEntityEventsListener.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ActiveEntityEventsListener.java index cee6fa9..409c297 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ActiveEntityEventsListener.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ActiveEntityEventsListener.java @@ -88,6 +88,7 @@ public class ActiveEntityEventsListener implements IActiveEntityEventsListener { this.numRegistered = 0; } + @Override public synchronized void notify(ActiveEvent event) { try { LOGGER.finer("EventListener is notified."); @@ -191,8 +192,11 @@ public class ActiveEntityEventsListener implements IActiveEntityEventsListener { @SuppressWarnings("unchecked") @Override public void refreshStats(long timeout) throws HyracksDataException { + LOGGER.log(Level.INFO, "refreshStats called"); synchronized (this) { if (state != ActivityState.STARTED || statsRequestState == RequestState.STARTED) { + LOGGER.log(Level.INFO, "returning immediately since state = " + state + " and statsRequestState = " + + statsRequestState); return; } else { statsRequestState = RequestState.STARTED; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/73983153/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/WaitForStateSubscriber.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/WaitForStateSubscriber.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/WaitForStateSubscriber.java index ea7e3ae..7bab421 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/WaitForStateSubscriber.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/WaitForStateSubscriber.java @@ -33,9 +33,6 @@ public class WaitForStateSubscriber extends AbstractSubscriber { throws HyracksDataException { super(listener); this.targetState = targetState; - if (targetState != ActivityState.STARTED && targetState != ActivityState.STOPPED) { - throw new RuntimeDataException(ErrorCode.CANNOT_WAIT_FOR_STATE, targetState); - } listener.subscribe(this); }
