Repository: ambari Updated Branches: refs/heads/trunk cd39cde29 -> d8e97b172
AMBARI-12537. Blueprints Cluster configuration task thread should not wait indefinitely. (Laszlo Puskas via rnettleton) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/d8e97b17 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/d8e97b17 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/d8e97b17 Branch: refs/heads/trunk Commit: d8e97b17210a79c82ffe00abd67c0c829027951f Parents: cd39cde Author: Bob Nettleton <[email protected]> Authored: Fri Nov 20 10:41:35 2015 -0500 Committer: Bob Nettleton <[email protected]> Committed: Fri Nov 20 10:41:47 2015 -0500 ---------------------------------------------------------------------- .../server/topology/AsyncCallableService.java | 124 ++++++++++++++ .../ambari/server/topology/TopologyManager.java | 72 ++++---- .../topology/AsyncCallableServiceTest.java | 166 +++++++++++++++++++ .../topology/ConfigureClusterTaskTest.java | 129 ++++++++++++++ .../server/topology/TopologyManagerTest.java | 16 +- 5 files changed, 474 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/d8e97b17/ambari-server/src/main/java/org/apache/ambari/server/topology/AsyncCallableService.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/AsyncCallableService.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/AsyncCallableService.java new file mode 100644 index 0000000..fc7f190 --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/AsyncCallableService.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.server.topology; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Calendar; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * Callable service implementation for executing tasks asynchronously. + * The service repeatedly tries to execute the provided task till it successfully completes, or the provided timeout + * interval is exceeded. + * + * @param <T> the type returned by the task to be executed + */ +public class AsyncCallableService<T> implements Callable<Boolean> { + + private static final Logger LOG = LoggerFactory.getLogger(AsyncCallableService.class); + + // task execution is done on a separate thread provided by this executor + private final ScheduledExecutorService executorService; + + // the task to be executed + private final Callable<T> task; + + // the total time the allowed for the task to be executed (retries will be happen within this timeframe in + // milliseconds) + private final long timeout; + + // the delay between two consecutive execution trials in milliseconds + private final long delay; + + + private Boolean serviceResult = Boolean.FALSE; + + public AsyncCallableService(Callable<T> task, long timeout, long delay, + ScheduledExecutorService executorService) { + this.task = task; + this.executorService = executorService; + this.timeout = timeout; + this.delay = delay; + } + + @Override + public Boolean call() { + + long startTimeInMillis = Calendar.getInstance().getTimeInMillis(); + LOG.info("Task execution started at: {}", startTimeInMillis); + + // task execution started on a new thread + Future future = executorService.submit(task); + + while (!taskCompleted(future)) { + if (!timeoutExceeded(startTimeInMillis)) { + LOG.debug("Retrying task execution in [ {} ] milliseconds.", delay); + future = executorService.schedule(task, delay, TimeUnit.MILLISECONDS); + } else { + LOG.debug("Timout exceeded, cancelling task ... "); + // making sure the task gets cancelled! + if (!future.isDone()) { + boolean cancelled = future.cancel(true); + LOG.debug("Task cancelled: {}", cancelled); + } else { + LOG.debug("Task already done."); + } + LOG.info("Timeout exceeded, task execution won't be retried!"); + // exit the "retry" loop! + break; + } + } + + LOG.info("Exiting Async task execution with the result: [ {} ]", serviceResult); + return serviceResult; + } + + private boolean taskCompleted(Future<T> future) { + boolean completed = false; + try { + LOG.debug("Retrieving task execution result ..."); + // should receive task execution result within the configured timeout interval + // exceptions thrown from the task are propagated here + T taskResult = future.get(timeout, TimeUnit.MILLISECONDS); + + // task failures are expected to be reportesd as exceptions + LOG.debug("Task successfully executed: {}", taskResult); + setServiceResult(Boolean.TRUE); + completed = true; + } catch (Exception e) { + // Future.isDone always true here! + LOG.info("Exception during task execution: ", e); + } + return completed; + } + + private boolean timeoutExceeded(long startTimeInMillis) { + return timeout < Calendar.getInstance().getTimeInMillis() - startTimeInMillis; + } + + private void setServiceResult(Boolean serviceResult) { + this.serviceResult = serviceResult; + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/d8e97b17/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java index 9e902a7..9b6c9ad 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java @@ -71,6 +71,9 @@ public class TopologyManager { public static final String TOPOLOGY_RESOLVED_TAG = "TOPOLOGY_RESOLVED"; public static final String KDC_ADMIN_CREDENTIAL = "kdc.admin.credential"; + private static final String CLUSTER_ENV_CONFIG_TYPE_NAME = "cluster-env"; + private static final String CLUSTER_CONFIG_TASK_MAX_TIME_IN_MILLIS_PROPERTY_NAME = "cluster_configure_task_timeout"; + private PersistedState persistedState; private ExecutorService executor = Executors.newSingleThreadExecutor(); private Collection<String> hostsToIgnore = new HashSet<String>(); @@ -675,54 +678,65 @@ public class TopologyManager { * @param configurationRequest configuration request to be executed */ private void addClusterConfigRequest(ClusterTopology topology, ClusterConfigurationRequest configurationRequest) { - executor.execute(new ConfigureClusterTask(topology, configurationRequest)); + + String timeoutStr = topology.getConfiguration().getPropertyValue(CLUSTER_ENV_CONFIG_TYPE_NAME, + CLUSTER_CONFIG_TASK_MAX_TIME_IN_MILLIS_PROPERTY_NAME); + + long timeout = 1000 * 60 * 30; // 30 minutes + long delay = 100; //ms + + if (timeoutStr != null) { + timeout = Long.parseLong(timeoutStr); + LOG.debug("ConfigureClusterTask timeout set to: {}", timeout); + } else { + LOG.debug("No timeout constraints found in configuration. Wired defaults will be applied."); + } + + ConfigureClusterTask configureClusterTask = new ConfigureClusterTask(topology, configurationRequest); + AsyncCallableService<Boolean> asyncCallableService = new AsyncCallableService(configureClusterTask, timeout, delay, + Executors.newScheduledThreadPool(1)); + + executor.submit(asyncCallableService); } - private class ConfigureClusterTask implements Runnable { + // package protected for testing purposes + static class ConfigureClusterTask implements Callable<Boolean> { + private ClusterConfigurationRequest configRequest; private ClusterTopology topology; - public ConfigureClusterTask(ClusterTopology topology, ClusterConfigurationRequest configRequest) { this.configRequest = configRequest; this.topology = topology; } @Override - public void run() { + public Boolean call() throws Exception { LOG.info("TopologyManager.ConfigureClusterTask: Entering"); - boolean completed = false; - boolean interrupted = false; - Collection<String> requiredHostGroups = getTopologyRequiredHostGroups(); - while (!completed && !interrupted) { - try { - Thread.sleep(100); - } catch (InterruptedException e) { - interrupted = true; - LOG.info("TopologyManager.ConfigureClusterTask: waiting thread interrupted by exception", e); - // reset interrupted flag on thread - Thread.interrupted(); - } - completed = areRequiredHostGroupsResolved(requiredHostGroups); - } - LOG.info("TopologyManager.ConfigureClusterTask: All Required host groups are completed, Cluster Configuration can now begin"); + if (!areRequiredHostGroupsResolved(requiredHostGroups)) { + LOG.debug("TopologyManager.ConfigureClusterTask - prerequisites for config request processing not yet " + + "satisfied"); + throw new IllegalArgumentException("TopologyManager.ConfigureClusterTask - prerequisites for config " + + "request processing not yet satisfied"); + } - if (!interrupted) { - try { - LOG.info("TopologyManager.ConfigureClusterTask: Setting Configuration on cluster"); - // sets updated configuration on topology and cluster + try { + LOG.info("TopologyManager.ConfigureClusterTask: All Required host groups are completed, Cluster " + + "Configuration can now begin"); configRequest.process(); } catch (Exception e) { - // just logging and allowing config flag to be reset LOG.error("TopologyManager.ConfigureClusterTask: " + - "An exception occurred while attempting to process cluster configs and set on cluster: " + e); - e.printStackTrace(); - } + "An exception occurred while attempting to process cluster configs and set on cluster: ", e); + + // this will signal an unsuccessful run, retry will be triggered if required + throw new Exception(e); } + LOG.info("TopologyManager.ConfigureClusterTask: Exiting"); + return true; } /** @@ -736,8 +750,8 @@ public class TopologyManager { requiredHostGroups = configRequest.getRequiredHostGroups(); } catch (RuntimeException e) { // just log error and allow config topology update - LOG.error("TopologyManager.ConfigureClusterTask: An exception occurred while attempting to determine required host groups for config update " + e); - e.printStackTrace(); + LOG.error("TopologyManager.ConfigureClusterTask: An exception occurred while attempting to determine required" + + " host groups for config update ", e); requiredHostGroups = Collections.emptyList(); } return requiredHostGroups; http://git-wip-us.apache.org/repos/asf/ambari/blob/d8e97b17/ambari-server/src/test/java/org/apache/ambari/server/topology/AsyncCallableServiceTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/topology/AsyncCallableServiceTest.java b/ambari-server/src/test/java/org/apache/ambari/server/topology/AsyncCallableServiceTest.java new file mode 100644 index 0000000..00d6fe4 --- /dev/null +++ b/ambari-server/src/test/java/org/apache/ambari/server/topology/AsyncCallableServiceTest.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.server.topology; + +import org.easymock.EasyMockRule; +import org.easymock.Mock; +import org.easymock.MockType; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.replay; +import static org.easymock.EasyMock.verify; + +public class AsyncCallableServiceTest { + public static final Logger LOGGER = LoggerFactory.getLogger(AsyncCallableService.class); + + @Rule + public EasyMockRule mocks = new EasyMockRule(this); + + @Mock(type = MockType.STRICT) + private Callable<Boolean> taskMock; + + private ExecutorService singleThreadedExecutor = Executors.newSingleThreadExecutor(); + + private long timeout; + + private long delay; + + private AsyncCallableService<Boolean> asyncCallableService; + + @Before + public void setup() { + // default timeout, overwrite it if necessary + timeout = 1000; + + // default delay between tries + delay = 500; + } + + + @Test + public void testCallableServiceShouldCancelTaskWhenTimeoutExceeded() throws Exception { + // GIVEN + // the task to be executed never completes successfully + expect(taskMock.call()).andThrow(new IllegalStateException("Prerequisites are not yet satisfied!")).anyTimes(); + replay(taskMock); + asyncCallableService = new AsyncCallableService(taskMock, timeout, delay, Executors.newScheduledThreadPool(1)); + + // WHEN + Boolean serviceResult = asyncCallableService.call(); + + // THEN + verify(); + Assert.assertNotNull("Service result must not be null", serviceResult); + Assert.assertFalse("The expected boolean result is 'false'!", serviceResult); + } + + @Test + public void testCallableServiceShouldCancelTaskWhenTaskHangsAndTimeoutExceeded() throws Exception { + // GIVEN + //the task call hangs, it doesn't return within a reasonable period of time + Callable<Boolean> hangingTask = new Callable<Boolean>() { + @Override + public Boolean call() throws Exception { + Thread.sleep(10000000); + return false; + } + }; + + asyncCallableService = new AsyncCallableService(hangingTask, timeout, delay, Executors.newScheduledThreadPool(2)); + + // WHEN + Boolean serviceResult = asyncCallableService.call(); + + // THEN + Assert.assertNotNull("Service result must not be null", serviceResult); + Assert.assertFalse("The expected boolean result is 'false'!", serviceResult); + } + + @Test + public void testCallableServiceShouldExitWhenTaskCompleted() throws Exception { + // GIVEN + // the task to be executed never completes successfully + expect(taskMock.call()).andReturn(Boolean.TRUE).times(1); + replay(taskMock); + asyncCallableService = new AsyncCallableService(taskMock, timeout, delay, Executors.newScheduledThreadPool(2)); + + // WHEN + Boolean serviceResult = asyncCallableService.call(); + + // THEN + verify(); + Assert.assertNotNull("Service result must not be null", serviceResult); + Assert.assertTrue(serviceResult); + } + + @Test + public void testCallableServiceShouldRetryTaskExecutionTillTimeoutExceededWhenTaskThrowsException() throws Exception { + // GIVEN + + // the task to be throws exception + expect(taskMock.call()).andThrow(new IllegalStateException("****************** TESTING ****************")).times + (2,3); + replay(taskMock); + asyncCallableService = new AsyncCallableService(taskMock, timeout, delay, Executors.newScheduledThreadPool(2)); + + // WHEN + Boolean serviceResult = asyncCallableService.call(); + + // THEN + verify(); + // THEN + Assert.assertNotNull("Service result must not be null", serviceResult); + Assert.assertFalse("The expected boolean result is 'false'!", serviceResult); + + } + + + @Test + public void testShouldAsyncCallableServiceRetryExecutionWhenTaskThrowsException() throws Exception { + // GIVEN + //the task call hangs, it doesn't return within a reasonable period of time + Callable<Boolean> hangingTask = new Callable<Boolean>() { + @Override + public Boolean call() throws Exception { + throw new IllegalStateException("****************** TESTING ****************"); + } + }; + + asyncCallableService = new AsyncCallableService(hangingTask, timeout, delay, Executors.newScheduledThreadPool(2)); + + // WHEN + Boolean serviceResult = asyncCallableService.call(); + + // THEN + Assert.assertNotNull("Service result must not be null", serviceResult); + Assert.assertFalse("The expected boolean result is 'false'!", serviceResult); + + + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/d8e97b17/ambari-server/src/test/java/org/apache/ambari/server/topology/ConfigureClusterTaskTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/topology/ConfigureClusterTaskTest.java b/ambari-server/src/test/java/org/apache/ambari/server/topology/ConfigureClusterTaskTest.java new file mode 100644 index 0000000..7bfa591 --- /dev/null +++ b/ambari-server/src/test/java/org/apache/ambari/server/topology/ConfigureClusterTaskTest.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.server.topology; + +import junit.framework.Assert; +import org.easymock.EasyMockRule; +import org.easymock.Mock; +import org.easymock.MockType; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Executors; + +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.replay; +import static org.easymock.EasyMock.reset; +import static org.easymock.EasyMock.verify; + +/** + * Unit test for the ConfigureClusterTask class. + * As business methods of this class don't return values, the assertions are made by verifying method calls on mocks. + * Thus having strict mocks is essential! + */ +public class ConfigureClusterTaskTest { + + private static final Logger LOGGER = LoggerFactory.getLogger(ConfigureClusterTaskTest.class); + + @Rule + public EasyMockRule mocks = new EasyMockRule(this); + + @Mock(type = MockType.STRICT) + private ClusterConfigurationRequest clusterConfigurationRequest; + + @Mock(type = MockType.STRICT) + private ClusterTopology clusterTopology; + + private TopologyManager.ConfigureClusterTask testSubject; + + + @Before + public void before() { + reset(clusterConfigurationRequest, clusterTopology); + testSubject = new TopologyManager.ConfigureClusterTask(clusterTopology, clusterConfigurationRequest); + + } + + @Test + public void testShouldConfigureClusterTaskLogicBeExecutedWhenRequiredHostgroupsAreResolved() throws + Exception { + // GIVEN + // is it OK to handle the non existence of hostgroups as a success?! + expect(clusterConfigurationRequest.getRequiredHostGroups()).andReturn(Collections.EMPTY_LIST); + expect(clusterTopology.getHostGroupInfo()).andReturn(Collections.EMPTY_MAP); + + // this is only called if the "prerequisites" are satisfied + clusterConfigurationRequest.process(); + + replay(clusterConfigurationRequest, clusterTopology); + + // WHEN + Boolean result = testSubject.call(); + + // THEN + verify(); + Assert.assertTrue(result); + } + + @Test + public void testsShouldConfigureClusterTaskExecuteWhenCalledFromAsyncCallableService() throws Exception { + // GIVEN + // is it OK to handle the non existence of hostgroups as a success?! + expect(clusterConfigurationRequest.getRequiredHostGroups()).andReturn(Collections.EMPTY_LIST); + expect(clusterTopology.getHostGroupInfo()).andReturn(Collections.EMPTY_MAP); + + // this is only called if the "prerequisites" are satisfied + clusterConfigurationRequest.process(); + + replay(clusterConfigurationRequest, clusterTopology); + + AsyncCallableService<Boolean> asyncService = new AsyncCallableService<>(testSubject, 5000, 500, Executors + .newScheduledThreadPool(3)); + + // WHEN + asyncService.call(); + // THEN + + + } + + private Collection<String> mockRequiredHostGroups() { + return Arrays.asList("test-hostgroup-1"); + } + + private Map<String, HostGroupInfo> mockHostGroupInfo() { + Map<String, HostGroupInfo> hostGroupInfoMap = new HashMap<>(); + HostGroupInfo hostGroupInfo = new HostGroupInfo("test-hostgroup-1"); + hostGroupInfo.addHost("test-host-1"); + hostGroupInfo.setRequestedCount(2); + + hostGroupInfoMap.put("test-hostgroup-1", hostGroupInfo); + return hostGroupInfoMap; + } + + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/d8e97b17/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java b/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java index 3326bd4..47169f4 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java @@ -48,6 +48,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import static org.easymock.EasyMock.anyObject; import static org.easymock.EasyMock.capture; @@ -117,6 +118,10 @@ public class TopologyManagerTest { private ClusterController clusterController; @Mock(type = MockType.STRICT) private ResourceProvider resourceProvider; + + @Mock(type = MockType.STRICT) + private Future mockFuture; + private final Configuration stackConfig = new Configuration(new HashMap<String, Map<String, String>>(), new HashMap<String, Map<String, Map<String, String>>>()); private final Configuration bpConfiguration = new Configuration(new HashMap<String, Map<String, String>>(), @@ -295,7 +300,8 @@ public class TopologyManagerTest { expect(clusterController.ensureResourceProvider(anyObject(Resource.Type.class))).andReturn(resourceProvider); - executor.execute(capture(updateConfigTaskCapture)); + expect(executor.submit(anyObject(AsyncCallableService.class))).andReturn(mockFuture); + expectLastCall().times(1); expect(persistedState.getAllRequests()).andReturn(Collections.<ClusterTopology, @@ -305,7 +311,8 @@ public class TopologyManagerTest { replay(blueprint, stack, request, group1, group2, ambariContext, logicalRequestFactory, logicalRequest, configurationRequest, configurationRequest2, configurationRequest3, requestStatusResponse, executor, - persistedState, securityConfigurationFactory, credentialStoreService, clusterController, resourceProvider); + persistedState, securityConfigurationFactory, credentialStoreService, clusterController, resourceProvider, + mockFuture); Class clazz = TopologyManager.class; @@ -321,10 +328,11 @@ public class TopologyManagerTest { public void tearDown() { verify(blueprint, stack, request, group1, group2, ambariContext, logicalRequestFactory, logicalRequest, configurationRequest, configurationRequest2, configurationRequest3, - requestStatusResponse, executor, persistedState); + requestStatusResponse, executor, persistedState, mockFuture); + reset(blueprint, stack, request, group1, group2, ambariContext, logicalRequestFactory, logicalRequest, configurationRequest, configurationRequest2, configurationRequest3, - requestStatusResponse, executor, persistedState); + requestStatusResponse, executor, persistedState, mockFuture); } @Test
