Repository: ambari
Updated Branches:
  refs/heads/trunk cd39cde29 -> d8e97b172


AMBARI-12537. Blueprints Cluster configuration task thread should not wait 
indefinitely. (Laszlo Puskas via rnettleton)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/d8e97b17
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/d8e97b17
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/d8e97b17

Branch: refs/heads/trunk
Commit: d8e97b17210a79c82ffe00abd67c0c829027951f
Parents: cd39cde
Author: Bob Nettleton <[email protected]>
Authored: Fri Nov 20 10:41:35 2015 -0500
Committer: Bob Nettleton <[email protected]>
Committed: Fri Nov 20 10:41:47 2015 -0500

----------------------------------------------------------------------
 .../server/topology/AsyncCallableService.java   | 124 ++++++++++++++
 .../ambari/server/topology/TopologyManager.java |  72 ++++----
 .../topology/AsyncCallableServiceTest.java      | 166 +++++++++++++++++++
 .../topology/ConfigureClusterTaskTest.java      | 129 ++++++++++++++
 .../server/topology/TopologyManagerTest.java    |  16 +-
 5 files changed, 474 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/d8e97b17/ambari-server/src/main/java/org/apache/ambari/server/topology/AsyncCallableService.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/topology/AsyncCallableService.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/topology/AsyncCallableService.java
new file mode 100644
index 0000000..fc7f190
--- /dev/null
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/topology/AsyncCallableService.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.topology;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Calendar;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Callable service implementation for executing tasks asynchronously.
+ * The service repeatedly tries to execute the provided task till it 
successfully completes, or the provided timeout
+ * interval is exceeded.
+ *
+ * @param <T> the type returned by the task to be executed
+ */
+public class AsyncCallableService<T> implements Callable<Boolean> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(AsyncCallableService.class);
+
+  // task execution is done on a separate thread provided by this executor
+  private final ScheduledExecutorService executorService;
+
+  // the task to be executed
+  private final Callable<T> task;
+
+  // the total time the allowed for the task to be executed (retries will be 
happen within this timeframe in
+  // milliseconds)
+  private final long timeout;
+
+  // the delay between two consecutive execution trials in milliseconds
+  private final long delay;
+
+
+  private Boolean serviceResult = Boolean.FALSE;
+
+  public AsyncCallableService(Callable<T> task, long timeout, long delay,
+                              ScheduledExecutorService executorService) {
+    this.task = task;
+    this.executorService = executorService;
+    this.timeout = timeout;
+    this.delay = delay;
+  }
+
+  @Override
+  public Boolean call() {
+
+    long startTimeInMillis = Calendar.getInstance().getTimeInMillis();
+    LOG.info("Task execution started at: {}", startTimeInMillis);
+
+    // task execution started on a new thread
+    Future future = executorService.submit(task);
+
+    while (!taskCompleted(future)) {
+      if (!timeoutExceeded(startTimeInMillis)) {
+        LOG.debug("Retrying task execution in [ {} ] milliseconds.", delay);
+        future = executorService.schedule(task, delay, TimeUnit.MILLISECONDS);
+      } else {
+        LOG.debug("Timout exceeded, cancelling task ... ");
+        // making sure the task gets cancelled!
+        if (!future.isDone()) {
+          boolean cancelled = future.cancel(true);
+          LOG.debug("Task cancelled: {}", cancelled);
+        } else {
+          LOG.debug("Task already done.");
+        }
+        LOG.info("Timeout exceeded, task execution won't be retried!");
+        // exit the "retry" loop!
+        break;
+      }
+    }
+
+    LOG.info("Exiting Async task execution with the result: [ {} ]", 
serviceResult);
+    return serviceResult;
+  }
+
+  private boolean taskCompleted(Future<T> future) {
+    boolean completed = false;
+    try {
+      LOG.debug("Retrieving task execution result ...");
+      // should receive task execution result within the configured timeout 
interval
+      // exceptions thrown from the task are propagated here
+      T taskResult = future.get(timeout, TimeUnit.MILLISECONDS);
+
+      // task failures are expected to be reportesd as exceptions
+      LOG.debug("Task successfully executed: {}", taskResult);
+      setServiceResult(Boolean.TRUE);
+      completed = true;
+    } catch (Exception e) {
+      // Future.isDone always true here!
+      LOG.info("Exception during task execution: ", e);
+    }
+    return completed;
+  }
+
+  private boolean timeoutExceeded(long startTimeInMillis) {
+    return timeout < Calendar.getInstance().getTimeInMillis() - 
startTimeInMillis;
+  }
+
+  private void setServiceResult(Boolean serviceResult) {
+    this.serviceResult = serviceResult;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/d8e97b17/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java
index 9e902a7..9b6c9ad 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java
@@ -71,6 +71,9 @@ public class TopologyManager {
   public static final String TOPOLOGY_RESOLVED_TAG = "TOPOLOGY_RESOLVED";
   public static final String KDC_ADMIN_CREDENTIAL = "kdc.admin.credential";
 
+  private static final String CLUSTER_ENV_CONFIG_TYPE_NAME = "cluster-env";
+  private static final String 
CLUSTER_CONFIG_TASK_MAX_TIME_IN_MILLIS_PROPERTY_NAME = 
"cluster_configure_task_timeout";
+
   private PersistedState persistedState;
   private ExecutorService executor = Executors.newSingleThreadExecutor();
   private Collection<String> hostsToIgnore = new HashSet<String>();
@@ -675,54 +678,65 @@ public class TopologyManager {
    * @param configurationRequest  configuration request to be executed
    */
   private void addClusterConfigRequest(ClusterTopology topology, 
ClusterConfigurationRequest configurationRequest) {
-    executor.execute(new ConfigureClusterTask(topology, configurationRequest));
+
+    String timeoutStr = 
topology.getConfiguration().getPropertyValue(CLUSTER_ENV_CONFIG_TYPE_NAME,
+        CLUSTER_CONFIG_TASK_MAX_TIME_IN_MILLIS_PROPERTY_NAME);
+
+    long timeout = 1000 * 60 * 30; // 30 minutes
+    long delay = 100; //ms
+
+    if (timeoutStr != null) {
+      timeout = Long.parseLong(timeoutStr);
+      LOG.debug("ConfigureClusterTask timeout set to: {}", timeout);
+    } else {
+      LOG.debug("No timeout constraints found in configuration. Wired defaults 
will be applied.");
+    }
+
+    ConfigureClusterTask configureClusterTask = new 
ConfigureClusterTask(topology, configurationRequest);
+    AsyncCallableService<Boolean> asyncCallableService = new 
AsyncCallableService(configureClusterTask, timeout, delay,
+        Executors.newScheduledThreadPool(1));
+
+    executor.submit(asyncCallableService);
   }
 
-  private class ConfigureClusterTask implements Runnable {
+  // package protected for testing purposes
+  static class ConfigureClusterTask implements Callable<Boolean> {
+
     private ClusterConfigurationRequest configRequest;
     private ClusterTopology topology;
 
-
     public ConfigureClusterTask(ClusterTopology topology, 
ClusterConfigurationRequest configRequest) {
       this.configRequest = configRequest;
       this.topology = topology;
     }
 
     @Override
-    public void run() {
+    public Boolean call() throws Exception {
       LOG.info("TopologyManager.ConfigureClusterTask: Entering");
 
-      boolean completed = false;
-      boolean interrupted = false;
-
       Collection<String> requiredHostGroups = getTopologyRequiredHostGroups();
-      while (!completed && !interrupted) {
-        try {
-          Thread.sleep(100);
-        } catch (InterruptedException e) {
-          interrupted = true;
-          LOG.info("TopologyManager.ConfigureClusterTask: waiting thread 
interrupted by exception", e);
-          // reset interrupted flag on thread
-          Thread.interrupted();
-        }
-        completed = areRequiredHostGroupsResolved(requiredHostGroups);
-      }
 
-      LOG.info("TopologyManager.ConfigureClusterTask: All Required host groups 
are completed, Cluster Configuration can now begin");
+      if (!areRequiredHostGroupsResolved(requiredHostGroups)) {
+        LOG.debug("TopologyManager.ConfigureClusterTask - prerequisites for 
config request processing not yet " +
+            "satisfied");
+        throw new 
IllegalArgumentException("TopologyManager.ConfigureClusterTask - prerequisites 
for config " +
+            "request processing not yet  satisfied");
+      }
 
-      if (!interrupted) {
-        try {
-          LOG.info("TopologyManager.ConfigureClusterTask: Setting 
Configuration on cluster");
-          // sets updated configuration on topology and cluster
+      try {
+          LOG.info("TopologyManager.ConfigureClusterTask: All Required host 
groups are completed, Cluster " +
+              "Configuration can now begin");
           configRequest.process();
         } catch (Exception e) {
-          // just logging and allowing config flag to be reset
           LOG.error("TopologyManager.ConfigureClusterTask: " +
-              "An exception occurred while attempting to process cluster 
configs and set on cluster: " + e);
-          e.printStackTrace();
-        }
+              "An exception occurred while attempting to process cluster 
configs and set on cluster: ", e);
+
+        // this will signal an unsuccessful run, retry will be triggered if 
required
+        throw new Exception(e);
       }
+
       LOG.info("TopologyManager.ConfigureClusterTask: Exiting");
+      return true;
     }
 
     /**
@@ -736,8 +750,8 @@ public class TopologyManager {
         requiredHostGroups = configRequest.getRequiredHostGroups();
       } catch (RuntimeException e) {
         // just log error and allow config topology update
-        LOG.error("TopologyManager.ConfigureClusterTask: An exception occurred 
while attempting to determine required host groups for config update " + e);
-        e.printStackTrace();
+        LOG.error("TopologyManager.ConfigureClusterTask: An exception occurred 
while attempting to determine required" +
+            " host groups for config update ", e);
         requiredHostGroups = Collections.emptyList();
       }
       return requiredHostGroups;

http://git-wip-us.apache.org/repos/asf/ambari/blob/d8e97b17/ambari-server/src/test/java/org/apache/ambari/server/topology/AsyncCallableServiceTest.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/test/java/org/apache/ambari/server/topology/AsyncCallableServiceTest.java
 
b/ambari-server/src/test/java/org/apache/ambari/server/topology/AsyncCallableServiceTest.java
new file mode 100644
index 0000000..00d6fe4
--- /dev/null
+++ 
b/ambari-server/src/test/java/org/apache/ambari/server/topology/AsyncCallableServiceTest.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.topology;
+
+import org.easymock.EasyMockRule;
+import org.easymock.Mock;
+import org.easymock.MockType;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+
+public class AsyncCallableServiceTest {
+  public static final Logger LOGGER = 
LoggerFactory.getLogger(AsyncCallableService.class);
+
+  @Rule
+  public EasyMockRule mocks = new EasyMockRule(this);
+
+  @Mock(type = MockType.STRICT)
+  private Callable<Boolean> taskMock;
+
+  private ExecutorService singleThreadedExecutor = 
Executors.newSingleThreadExecutor();
+
+  private long timeout;
+
+  private long delay;
+
+  private AsyncCallableService<Boolean> asyncCallableService;
+
+  @Before
+  public void setup() {
+    // default timeout, overwrite it if necessary
+    timeout = 1000;
+
+    // default delay between tries
+    delay = 500;
+  }
+
+
+  @Test
+  public void testCallableServiceShouldCancelTaskWhenTimeoutExceeded() throws 
Exception {
+    // GIVEN
+    // the task to be executed never completes successfully
+    expect(taskMock.call()).andThrow(new IllegalStateException("Prerequisites 
are not yet satisfied!")).anyTimes();
+    replay(taskMock);
+    asyncCallableService = new AsyncCallableService(taskMock, timeout, delay, 
Executors.newScheduledThreadPool(1));
+
+    // WHEN
+    Boolean serviceResult = asyncCallableService.call();
+
+    // THEN
+    verify();
+    Assert.assertNotNull("Service result must not be null", serviceResult);
+    Assert.assertFalse("The expected boolean result is 'false'!", 
serviceResult);
+  }
+
+  @Test
+  public void 
testCallableServiceShouldCancelTaskWhenTaskHangsAndTimeoutExceeded() throws 
Exception {
+    // GIVEN
+    //the task call hangs, it doesn't return within a reasonable period of time
+    Callable<Boolean> hangingTask = new Callable<Boolean>() {
+      @Override
+      public Boolean call() throws Exception {
+        Thread.sleep(10000000);
+        return false;
+      }
+    };
+
+    asyncCallableService = new AsyncCallableService(hangingTask, timeout, 
delay, Executors.newScheduledThreadPool(2));
+
+    // WHEN
+    Boolean serviceResult = asyncCallableService.call();
+
+    // THEN
+    Assert.assertNotNull("Service result must not be null", serviceResult);
+    Assert.assertFalse("The expected boolean result is 'false'!", 
serviceResult);
+  }
+
+  @Test
+  public void testCallableServiceShouldExitWhenTaskCompleted() throws 
Exception {
+    // GIVEN
+    // the task to be executed never completes successfully
+    expect(taskMock.call()).andReturn(Boolean.TRUE).times(1);
+    replay(taskMock);
+    asyncCallableService = new AsyncCallableService(taskMock, timeout, delay, 
Executors.newScheduledThreadPool(2));
+
+    // WHEN
+    Boolean serviceResult = asyncCallableService.call();
+
+    // THEN
+    verify();
+    Assert.assertNotNull("Service result must not be null", serviceResult);
+    Assert.assertTrue(serviceResult);
+  }
+
+  @Test
+  public void 
testCallableServiceShouldRetryTaskExecutionTillTimeoutExceededWhenTaskThrowsException()
 throws Exception {
+    // GIVEN
+
+    // the task to be throws exception
+    expect(taskMock.call()).andThrow(new 
IllegalStateException("****************** TESTING ****************")).times
+        (2,3);
+    replay(taskMock);
+    asyncCallableService = new AsyncCallableService(taskMock, timeout, delay, 
Executors.newScheduledThreadPool(2));
+
+    // WHEN
+    Boolean serviceResult = asyncCallableService.call();
+
+    // THEN
+    verify();
+    // THEN
+    Assert.assertNotNull("Service result must not be null", serviceResult);
+    Assert.assertFalse("The expected boolean result is 'false'!", 
serviceResult);
+
+  }
+
+
+  @Test
+  public void 
testShouldAsyncCallableServiceRetryExecutionWhenTaskThrowsException() throws 
Exception {
+    // GIVEN
+    //the task call hangs, it doesn't return within a reasonable period of time
+    Callable<Boolean> hangingTask = new Callable<Boolean>() {
+      @Override
+      public Boolean call() throws Exception {
+        throw new IllegalStateException("****************** TESTING 
****************");
+      }
+    };
+
+    asyncCallableService = new AsyncCallableService(hangingTask, timeout, 
delay, Executors.newScheduledThreadPool(2));
+
+    // WHEN
+    Boolean serviceResult = asyncCallableService.call();
+
+    // THEN
+    Assert.assertNotNull("Service result must not be null", serviceResult);
+    Assert.assertFalse("The expected boolean result is 'false'!", 
serviceResult);
+
+
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/d8e97b17/ambari-server/src/test/java/org/apache/ambari/server/topology/ConfigureClusterTaskTest.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/test/java/org/apache/ambari/server/topology/ConfigureClusterTaskTest.java
 
b/ambari-server/src/test/java/org/apache/ambari/server/topology/ConfigureClusterTaskTest.java
new file mode 100644
index 0000000..7bfa591
--- /dev/null
+++ 
b/ambari-server/src/test/java/org/apache/ambari/server/topology/ConfigureClusterTaskTest.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.topology;
+
+import junit.framework.Assert;
+import org.easymock.EasyMockRule;
+import org.easymock.Mock;
+import org.easymock.MockType;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Executors;
+
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.reset;
+import static org.easymock.EasyMock.verify;
+
+/**
+ * Unit test for the ConfigureClusterTask class.
+ * As business methods of this class don't return values, the assertions are 
made by verifying method calls on mocks.
+ * Thus having strict mocks is essential!
+ */
+public class ConfigureClusterTaskTest {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ConfigureClusterTaskTest.class);
+
+  @Rule
+  public EasyMockRule mocks = new EasyMockRule(this);
+
+  @Mock(type = MockType.STRICT)
+  private ClusterConfigurationRequest clusterConfigurationRequest;
+
+  @Mock(type = MockType.STRICT)
+  private ClusterTopology clusterTopology;
+
+  private TopologyManager.ConfigureClusterTask testSubject;
+
+
+  @Before
+  public void before() {
+    reset(clusterConfigurationRequest, clusterTopology);
+    testSubject = new TopologyManager.ConfigureClusterTask(clusterTopology, 
clusterConfigurationRequest);
+
+  }
+
+  @Test
+  public void 
testShouldConfigureClusterTaskLogicBeExecutedWhenRequiredHostgroupsAreResolved()
 throws
+      Exception {
+    // GIVEN
+    // is it OK to handle the non existence of hostgroups as a success?!
+    
expect(clusterConfigurationRequest.getRequiredHostGroups()).andReturn(Collections.EMPTY_LIST);
+    
expect(clusterTopology.getHostGroupInfo()).andReturn(Collections.EMPTY_MAP);
+
+    // this is only called if the "prerequisites" are satisfied
+    clusterConfigurationRequest.process();
+
+    replay(clusterConfigurationRequest, clusterTopology);
+
+    // WHEN
+    Boolean result = testSubject.call();
+
+    // THEN
+    verify();
+    Assert.assertTrue(result);
+  }
+
+  @Test
+  public void 
testsShouldConfigureClusterTaskExecuteWhenCalledFromAsyncCallableService() 
throws Exception {
+    // GIVEN
+    // is it OK to handle the non existence of hostgroups as a success?!
+    
expect(clusterConfigurationRequest.getRequiredHostGroups()).andReturn(Collections.EMPTY_LIST);
+    
expect(clusterTopology.getHostGroupInfo()).andReturn(Collections.EMPTY_MAP);
+
+    // this is only called if the "prerequisites" are satisfied
+    clusterConfigurationRequest.process();
+
+    replay(clusterConfigurationRequest, clusterTopology);
+
+    AsyncCallableService<Boolean> asyncService = new 
AsyncCallableService<>(testSubject, 5000, 500, Executors
+        .newScheduledThreadPool(3));
+
+    // WHEN
+    asyncService.call();
+    // THEN
+
+
+  }
+
+  private Collection<String> mockRequiredHostGroups() {
+    return Arrays.asList("test-hostgroup-1");
+  }
+
+  private Map<String, HostGroupInfo> mockHostGroupInfo() {
+    Map<String, HostGroupInfo> hostGroupInfoMap = new HashMap<>();
+    HostGroupInfo hostGroupInfo = new HostGroupInfo("test-hostgroup-1");
+    hostGroupInfo.addHost("test-host-1");
+    hostGroupInfo.setRequestedCount(2);
+
+    hostGroupInfoMap.put("test-hostgroup-1", hostGroupInfo);
+    return hostGroupInfoMap;
+  }
+
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/d8e97b17/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java
 
b/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java
index 3326bd4..47169f4 100644
--- 
a/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java
+++ 
b/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java
@@ -48,6 +48,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 
 import static org.easymock.EasyMock.anyObject;
 import static org.easymock.EasyMock.capture;
@@ -117,6 +118,10 @@ public class TopologyManagerTest {
   private ClusterController clusterController;
   @Mock(type = MockType.STRICT)
   private ResourceProvider resourceProvider;
+
+  @Mock(type = MockType.STRICT)
+  private Future mockFuture;
+
   private final Configuration stackConfig = new Configuration(new 
HashMap<String, Map<String, String>>(),
       new HashMap<String, Map<String, Map<String, String>>>());
   private final Configuration bpConfiguration = new Configuration(new 
HashMap<String, Map<String, String>>(),
@@ -295,7 +300,8 @@ public class TopologyManagerTest {
 
     
expect(clusterController.ensureResourceProvider(anyObject(Resource.Type.class))).andReturn(resourceProvider);
 
-    executor.execute(capture(updateConfigTaskCapture));
+    
expect(executor.submit(anyObject(AsyncCallableService.class))).andReturn(mockFuture);
+
     expectLastCall().times(1);
 
     
expect(persistedState.getAllRequests()).andReturn(Collections.<ClusterTopology,
@@ -305,7 +311,8 @@ public class TopologyManagerTest {
 
     replay(blueprint, stack, request, group1, group2, ambariContext, 
logicalRequestFactory, logicalRequest,
         configurationRequest, configurationRequest2, configurationRequest3, 
requestStatusResponse, executor,
-        persistedState, securityConfigurationFactory, credentialStoreService, 
clusterController, resourceProvider);
+        persistedState, securityConfigurationFactory, credentialStoreService, 
clusterController, resourceProvider,
+        mockFuture);
 
     Class clazz = TopologyManager.class;
 
@@ -321,10 +328,11 @@ public class TopologyManagerTest {
   public void tearDown() {
     verify(blueprint, stack, request, group1, group2, ambariContext, 
logicalRequestFactory,
         logicalRequest, configurationRequest, configurationRequest2, 
configurationRequest3,
-        requestStatusResponse, executor, persistedState);
+        requestStatusResponse, executor, persistedState, mockFuture);
+
     reset(blueprint, stack, request, group1, group2, ambariContext, 
logicalRequestFactory,
         logicalRequest, configurationRequest, configurationRequest2, 
configurationRequest3,
-        requestStatusResponse, executor, persistedState);
+        requestStatusResponse, executor, persistedState, mockFuture);
   }
 
   @Test

Reply via email to