Updated Branches:
  refs/heads/helix-provisioning 180aafe5b -> 27f627265


Provisioner returns ListenableFutures


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/27f62726
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/27f62726
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/27f62726

Branch: refs/heads/helix-provisioning
Commit: 27f627265713bbb93535e8d5ff0aee57c7ae46e9
Parents: 180aafe
Author: Kanak Biscuitwala <[email protected]>
Authored: Thu Jan 9 18:02:36 2014 -0800
Committer: Kanak Biscuitwala <[email protected]>
Committed: Thu Jan 9 18:02:36 2014 -0800

----------------------------------------------------------------------
 .../provisioner/ContainerProvider.java          |  10 +-
 .../controller/provisioner/ContainerState.java  |   4 +-
 .../stages/ContainerProvisioningStage.java      | 150 +++++++++++++------
 .../integration/TestLocalContainerProvider.java |  28 +++-
 .../yarn/GenericApplicationMaster.java          |  15 +-
 .../provisioning/yarn/YarnProvisioner.java      |  79 +++++-----
 6 files changed, 181 insertions(+), 105 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/27f62726/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerProvider.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerProvider.java
 
b/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerProvider.java
index 2dee697..c88733f 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerProvider.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerProvider.java
@@ -1,5 +1,7 @@
 package org.apache.helix.controller.provisioner;
 
+import com.google.common.util.concurrent.ListenableFuture;
+
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -21,13 +23,13 @@ package org.apache.helix.controller.provisioner;
 
 public interface ContainerProvider {
 
-  ContainerId allocateContainer(ContainerSpec spec);
+  ListenableFuture<ContainerId> allocateContainer(ContainerSpec spec);
 
-  boolean deallocateContainer(ContainerId containerId);
+  ListenableFuture<Boolean> deallocateContainer(ContainerId containerId);
 
-  boolean startContainer(ContainerId containerId);
+  ListenableFuture<Boolean> startContainer(ContainerId containerId);
 
-  boolean stopContainer(ContainerId containerId);
+  ListenableFuture<Boolean> stopContainer(ContainerId containerId);
 
   ContainerState getContainerState(ContainerId containerId);
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/27f62726/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerState.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerState.java
 
b/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerState.java
index c2e5649..cf4b736 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerState.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerState.java
@@ -25,8 +25,8 @@ public enum ContainerState {
   CONNECTING,
   ACTIVE,
   TEARDOWN,
-  FAILED,
   HALTED,
   FINALIZING,
-  FINALIZED
+  FINALIZED,
+  FAILED
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/27f62726/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
index c4cc5d8..97b80b9 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
@@ -44,6 +44,10 @@ import 
org.apache.helix.controller.provisioner.TargetProviderResponse;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.log4j.Logger;
 
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
 /**
  * This stage will manager the container allocation/deallocation needed for a
  * specific resource.<br/>
@@ -61,12 +65,12 @@ public class ContainerProvisioningStage extends 
AbstractBaseStage {
 
   @Override
   public void process(ClusterEvent event) throws Exception {
-    HelixManager helixManager = event.getAttribute("helixmanager");
-    Map<ResourceId, ResourceConfig> resourceMap =
+    final HelixManager helixManager = event.getAttribute("helixmanager");
+    final Map<ResourceId, ResourceConfig> resourceMap =
         event.getAttribute(AttributeName.RESOURCES.toString());
-    HelixAdmin helixAdmin = helixManager.getClusterManagmentTool();
-    HelixDataAccessor accessor = helixManager.getHelixDataAccessor();
-    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    final HelixAdmin helixAdmin = helixManager.getClusterManagmentTool();
+    final HelixDataAccessor accessor = helixManager.getHelixDataAccessor();
+    final PropertyKey.Builder keyBuilder = accessor.keyBuilder();
     for (ResourceId resourceId : resourceMap.keySet()) {
       ResourceConfig resourceConfig = resourceMap.get(resourceId);
       ProvisionerConfig provisionerConfig = 
resourceConfig.getProvisionerConfig();
@@ -89,8 +93,8 @@ public class ContainerProvisioningStage extends 
AbstractBaseStage {
           }
         }
 
-        Cluster cluster = event.getAttribute("ClusterDataCache");
-        Collection<Participant> participants = 
cluster.getParticipantMap().values();
+        final Cluster cluster = event.getAttribute("ClusterDataCache");
+        final Collection<Participant> participants = 
cluster.getParticipantMap().values();
 
         // Participants registered in helix
         // Give those participants to targetprovider
@@ -103,13 +107,13 @@ public class ContainerProvisioningStage extends 
AbstractBaseStage {
 
         // TargetProvider should be stateless, given the state of cluster and 
existing participants
         // it should return the same result
-        TargetProviderResponse response =
+        final TargetProviderResponse response =
             provisioner.evaluateExistingContainers(cluster, resourceId, 
participants);
 
         // allocate new containers
-        for (ContainerSpec spec : response.getContainersToAcquire()) {
+        for (final ContainerSpec spec : response.getContainersToAcquire()) {
           // random participant id
-          ParticipantId participantId = 
ParticipantId.from(UUID.randomUUID().toString());
+          final ParticipantId participantId = 
ParticipantId.from(UUID.randomUUID().toString());
           // create a new Participant, attach the container spec
           InstanceConfig instanceConfig = new InstanceConfig(participantId);
           instanceConfig.setContainerSpec(spec);
@@ -117,74 +121,128 @@ public class ContainerProvisioningStage extends 
AbstractBaseStage {
           instanceConfig.setContainerState(ContainerState.ACQUIRING);
           // create the helix participant and add it to cluster
           helixAdmin.addInstance(cluster.getId().toString(), instanceConfig);
-          ContainerId containerId = provisioner.allocateContainer(spec);
-          InstanceConfig existingInstance =
-              helixAdmin.getInstanceConfig(cluster.getId().toString(), 
participantId.toString());
-          existingInstance.setContainerId(containerId);
-          existingInstance.setContainerState(ContainerState.ACQUIRED);
-          
accessor.setProperty(keyBuilder.instanceConfig(participantId.toString()),
-              existingInstance);
+
+          ListenableFuture<ContainerId> future = 
provisioner.allocateContainer(spec);
+          Futures.addCallback(future, new FutureCallback<ContainerId>() {
+            @Override
+            public void onSuccess(ContainerId containerId) {
+              InstanceConfig existingInstance =
+                  helixAdmin.getInstanceConfig(cluster.getId().toString(), 
participantId.toString());
+              existingInstance.setContainerId(containerId);
+              existingInstance.setContainerState(ContainerState.ACQUIRED);
+              
accessor.setProperty(keyBuilder.instanceConfig(participantId.toString()),
+                  existingInstance);
+            }
+
+            @Override
+            public void onFailure(Throwable t) {
+              LOG.error("Could not allocate a container for participant " + 
participantId, t);
+              updateContainerState(helixAdmin, accessor, keyBuilder, cluster, 
participantId,
+                  ContainerState.FAILED);
+            }
+          });
         }
 
         // start new containers
-        for (Participant participant : response.getContainersToStart()) {
-          InstanceConfig existingInstance =
+        for (final Participant participant : response.getContainersToStart()) {
+          final InstanceConfig existingInstance =
               helixAdmin.getInstanceConfig(cluster.getId().toString(), 
participant.getId()
                   .toString());
-          ContainerId containerId = existingInstance.getContainerId();
+          final ContainerId containerId = existingInstance.getContainerId();
           existingInstance.setContainerId(containerId);
           existingInstance.setContainerState(ContainerState.CONNECTING);
           
accessor.setProperty(keyBuilder.instanceConfig(participant.getId().toString()),
               existingInstance);
           // create the helix participant and add it to cluster
-          provisioner.startContainer(containerId);
-          existingInstance =
-              helixAdmin.getInstanceConfig(cluster.getId().toString(), 
participant.getId()
-                  .toString());
-          existingInstance.setContainerState(ContainerState.ACTIVE);
-          
accessor.setProperty(keyBuilder.instanceConfig(participant.getId().toString()),
-              existingInstance);
+          ListenableFuture<Boolean> future = 
provisioner.startContainer(containerId);
+          Futures.addCallback(future, new FutureCallback<Boolean>() {
+            @Override
+            public void onSuccess(Boolean result) {
+              updateContainerState(helixAdmin, accessor, keyBuilder, cluster, 
participant.getId(),
+                  ContainerState.ACTIVE);
+            }
+
+            @Override
+            public void onFailure(Throwable t) {
+              LOG.error("Could not start container" + containerId + "for 
participant "
+                  + participant.getId(), t);
+              updateContainerState(helixAdmin, accessor, keyBuilder, cluster, 
participant.getId(),
+                  ContainerState.FAILED);
+            }
+          });
         }
 
         // release containers
-        for (Participant participant : response.getContainersToRelease()) {
-          // this will change the container state
-          InstanceConfig existingInstance =
+        for (final Participant participant : 
response.getContainersToRelease()) {
+          // mark it as finalizing
+          final InstanceConfig existingInstance =
               helixAdmin.getInstanceConfig(cluster.getId().toString(), 
participant.getId()
                   .toString());
-          ContainerId containerId = existingInstance.getContainerId();
+          final ContainerId containerId = existingInstance.getContainerId();
           existingInstance.setContainerState(ContainerState.FINALIZING);
           
accessor.setProperty(keyBuilder.instanceConfig(participant.getId().toString()),
               existingInstance);
-          provisioner.deallocateContainer(containerId);
           // remove the participant
-          existingInstance =
-              helixAdmin.getInstanceConfig(cluster.getId().toString(), 
participant.getId()
-                  .toString());
-          helixAdmin.dropInstance(cluster.getId().toString(), 
existingInstance);
+          ListenableFuture<Boolean> future = 
provisioner.deallocateContainer(containerId);
+          Futures.addCallback(future, new FutureCallback<Boolean>() {
+            @Override
+            public void onSuccess(Boolean result) {
+              InstanceConfig existingInstance =
+                  helixAdmin.getInstanceConfig(cluster.getId().toString(), 
participant.getId()
+                      .toString());
+              helixAdmin.dropInstance(cluster.getId().toString(), 
existingInstance);
+            }
+
+            @Override
+            public void onFailure(Throwable t) {
+              LOG.error("Could not deallocate container" + containerId + "for 
participant "
+                  + participant.getId(), t);
+              updateContainerState(helixAdmin, accessor, keyBuilder, cluster, 
participant.getId(),
+                  ContainerState.FAILED);
+            }
+          });
         }
 
         // stop but don't remove
-        for (Participant participant : response.getContainersToStop()) {
+        for (final Participant participant : response.getContainersToStop()) {
           // disable the node first
-          InstanceConfig existingInstance =
+          final InstanceConfig existingInstance =
               helixAdmin.getInstanceConfig(cluster.getId().toString(), 
participant.getId()
                   .toString());
-          ContainerId containerId = existingInstance.getContainerId();
+          final ContainerId containerId = existingInstance.getContainerId();
           existingInstance.setInstanceEnabled(false);
           existingInstance.setContainerState(ContainerState.TEARDOWN);
           
accessor.setProperty(keyBuilder.instanceConfig(participant.getId().toString()),
               existingInstance);
           // stop the container
-          provisioner.stopContainer(containerId);
-          existingInstance =
-              helixAdmin.getInstanceConfig(cluster.getId().toString(), 
participant.getId()
-                  .toString());
-          existingInstance.setContainerState(ContainerState.HALTED);
-          
accessor.setProperty(keyBuilder.instanceConfig(participant.getId().toString()),
-              existingInstance);
+          ListenableFuture<Boolean> future = 
provisioner.stopContainer(containerId);
+          Futures.addCallback(future, new FutureCallback<Boolean>() {
+            @Override
+            public void onSuccess(Boolean result) {
+              updateContainerState(helixAdmin, accessor, keyBuilder, cluster, 
participant.getId(),
+                  ContainerState.HALTED);
+            }
+
+            @Override
+            public void onFailure(Throwable t) {
+              LOG.error(
+                  "Could not stop container" + containerId + "for participant "
+                      + participant.getId(), t);
+              updateContainerState(helixAdmin, accessor, keyBuilder, cluster, 
participant.getId(),
+                  ContainerState.FAILED);
+            }
+          });
         }
       }
     }
   }
+
+  private void updateContainerState(HelixAdmin helixAdmin, HelixDataAccessor 
accessor,
+      PropertyKey.Builder keyBuilder, Cluster cluster, ParticipantId 
participantId,
+      ContainerState state) {
+    InstanceConfig existingInstance =
+        helixAdmin.getInstanceConfig(cluster.getId().toString(), 
participantId.toString());
+    existingInstance.setContainerState(state);
+    accessor.setProperty(keyBuilder.instanceConfig(participantId.toString()), 
existingInstance);
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/27f62726/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java
 
b/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java
index 6439fed..7b8a580 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java
@@ -62,6 +62,8 @@ import org.testng.annotations.Test;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.AbstractService;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
 
 public class TestLocalContainerProvider extends ZkUnitTestBase {
   private static final int MAX_PARTICIPANTS = 10;
@@ -219,24 +221,28 @@ public class TestLocalContainerProvider extends 
ZkUnitTestBase {
     }
 
     @Override
-    public ContainerId allocateContainer(ContainerSpec spec) {
+    public ListenableFuture<ContainerId> allocateContainer(ContainerSpec spec) 
{
       // allocation is a no-op
       ContainerId containerId = spec.getContainerId();
       _states.put(containerId, ContainerState.ACQUIRED);
       allocated++;
-      return containerId;
+      SettableFuture<ContainerId> future = SettableFuture.create();
+      future.set(containerId);
+      return future;
     }
 
     @Override
-    public boolean deallocateContainer(ContainerId containerId) {
+    public ListenableFuture<Boolean> deallocateContainer(ContainerId 
containerId) {
       // deallocation is a no-op
       _states.put(containerId, ContainerState.FINALIZED);
       deallocated++;
-      return true;
+      SettableFuture<Boolean> future = SettableFuture.create();
+      future.set(true);
+      return future;
     }
 
     @Override
-    public boolean startContainer(ContainerId containerId) {
+    public ListenableFuture<Boolean> startContainer(ContainerId containerId) {
       ParticipantService participant =
           new ParticipantService(_clusterId, 
_containerParticipants.get(containerId));
       participant.startAsync();
@@ -244,17 +250,21 @@ public class TestLocalContainerProvider extends 
ZkUnitTestBase {
       _participants.put(containerId, participant);
       _states.put(containerId, ContainerState.ACTIVE);
       started++;
-      return true;
+      SettableFuture<Boolean> future = SettableFuture.create();
+      future.set(true);
+      return future;
     }
 
     @Override
-    public boolean stopContainer(ContainerId containerId) {
+    public ListenableFuture<Boolean> stopContainer(ContainerId containerId) {
       ParticipantService participant = _participants.get(containerId);
       participant.stopAsync();
       participant.awaitTerminated();
       _states.put(containerId, ContainerState.HALTED);
       stopped++;
-      return true;
+      SettableFuture<Boolean> future = SettableFuture.create();
+      future.set(true);
+      return future;
     }
 
     @Override
@@ -298,6 +308,8 @@ public class TestLocalContainerProvider extends 
ZkUnitTestBase {
             // halted containers can be released
             containersToRelease.add(participant);
             break;
+          default:
+            break;
           }
           ContainerId containerId = containerConfig.getId();
           if (containerId != null) {

http://git-wip-us.apache.org/repos/asf/helix/blob/27f62726/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/GenericApplicationMaster.java
----------------------------------------------------------------------
diff --git 
a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/GenericApplicationMaster.java
 
b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/GenericApplicationMaster.java
index d3f410f..3b4e937 100644
--- 
a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/GenericApplicationMaster.java
+++ 
b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/GenericApplicationMaster.java
@@ -30,10 +30,6 @@ import java.util.Map;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.GnuParser;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -50,7 +46,6 @@ import 
org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
 import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.client.api.AMRMClient;
@@ -61,9 +56,9 @@ import 
org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
-import org.apache.hadoop.yarn.util.ConverterUtils;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 
 /**
@@ -237,26 +232,26 @@ public class GenericApplicationMaster {
   }
 
 
-  public Future<ContainerAskResponse> acquireContainer(ContainerRequest 
containerAsk) {
+  public ListenableFuture<ContainerAskResponse> 
acquireContainer(ContainerRequest containerAsk) {
     amRMClient.addContainerRequest(containerAsk);
     numRequestedContainers.incrementAndGet();
     SettableFuture<ContainerAskResponse> future = SettableFuture.create();
     return future;
   }
 
-  public Future<ContainerStopResponse> stopContainer(Container container) {
+  public ListenableFuture<ContainerStopResponse> stopContainer(Container 
container) {
     nmClientAsync.stopContainerAsync(container.getId(), container.getNodeId());
     SettableFuture<ContainerStopResponse> future = SettableFuture.create();
     return future;
   }
 
-  public Future<ContainerReleaseResponse> releaseContainer(Container 
container) {
+  public ListenableFuture<ContainerReleaseResponse> releaseContainer(Container 
container) {
     amRMClient.releaseAssignedContainer(container.getId());
     SettableFuture<ContainerReleaseResponse> future = SettableFuture.create();
     return future;
   }
 
-  public Future<ContainerLaunchResponse> launchContainer(Container container,
+  public ListenableFuture<ContainerLaunchResponse> launchContainer(Container 
container,
       ContainerLaunchContext containerLaunchContext) {
     nmClientAsync.startContainerAsync(container, containerLaunchContext);
     SettableFuture<ContainerLaunchResponse> future = SettableFuture.create();

http://git-wip-us.apache.org/repos/asf/helix/blob/27f62726/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java
----------------------------------------------------------------------
diff --git 
a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java
 
b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java
index bfaa209..e921c87 100644
--- 
a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java
+++ 
b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java
@@ -3,8 +3,7 @@ package org.apache.helix.provisioning.yarn;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
+import java.util.concurrent.Executors;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -24,60 +23,70 @@ import 
org.apache.helix.controller.provisioner.ContainerState;
 import org.apache.helix.controller.provisioner.Provisioner;
 import org.apache.helix.controller.provisioner.TargetProviderResponse;
 
+import com.google.common.base.Function;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+
 public class YarnProvisioner implements Provisioner {
 
   private static final Log LOG = LogFactory.getLog(YarnProvisioner.class);
   static GenericApplicationMaster applicationMaster;
+  static ListeningExecutorService service = 
MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
   Map<ContainerId, Container> allocatedContainersMap = new 
HashMap<ContainerId, Container>();
 
   @Override
-  public ContainerId allocateContainer(ContainerSpec spec) {
+  public ListenableFuture<ContainerId> allocateContainer(ContainerSpec spec) {
     ContainerRequest containerAsk = setupContainerAskForRM(spec);
-    Future<ContainerAskResponse> requestNewContainer =
+    ListenableFuture<ContainerAskResponse> requestNewContainer =
         applicationMaster.acquireContainer(containerAsk);
-    ContainerAskResponse containerAskResponse;
-    try {
-      containerAskResponse = requestNewContainer.get();
-      ContainerId helixContainerId =
-          
ContainerId.from(containerAskResponse.getContainer().getId().toString());
-      allocatedContainersMap.put(helixContainerId, 
containerAskResponse.getContainer());
-      return helixContainerId;
-    } catch (InterruptedException e) {
-      e.printStackTrace();
-    } catch (ExecutionException e) {
-      e.printStackTrace();
-    }
-    return null;
+    return Futures.transform(requestNewContainer, new 
Function<ContainerAskResponse, ContainerId>() {
+      @Override
+      public ContainerId apply(ContainerAskResponse containerAskResponse) {
+        ContainerId helixContainerId =
+            
ContainerId.from(containerAskResponse.getContainer().getId().toString());
+        allocatedContainersMap.put(helixContainerId, 
containerAskResponse.getContainer());
+        return helixContainerId;
+      }
+    });
   }
 
   @Override
-  public boolean deallocateContainer(ContainerId containerId) {
-    Future<ContainerReleaseResponse> releaseContainer =
+  public ListenableFuture<Boolean> deallocateContainer(ContainerId 
containerId) {
+    ListenableFuture<ContainerReleaseResponse> releaseContainer =
         
applicationMaster.releaseContainer(allocatedContainersMap.get(containerId));
-    try {
-      releaseContainer.get();
-      return true;
-    } catch (InterruptedException e) {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
-    } catch (ExecutionException e) {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
-    }
-    return false;
+    return Futures.transform(releaseContainer, new 
Function<ContainerReleaseResponse, Boolean>() {
+      @Override
+      public Boolean apply(ContainerReleaseResponse response) {
+        return response != null;
+      }
+    }, service);
   }
 
   @Override
-  public boolean startContainer(ContainerId containerId) {
+  public ListenableFuture<Boolean> startContainer(final ContainerId 
containerId) {
     Container container = allocatedContainersMap.get(containerId);
     ContainerLaunchContext containerLaunchContext = 
Records.newRecord(ContainerLaunchContext.class);
-    applicationMaster.launchContainer(container, containerLaunchContext);
-    return false;
+    ListenableFuture<ContainerLaunchResponse> future = 
applicationMaster.launchContainer(container, containerLaunchContext);
+    return Futures.transform(future, new Function<ContainerLaunchResponse, 
Boolean>() {
+      @Override
+      public Boolean apply(ContainerLaunchResponse response) {
+        return response != null;
+      }
+    }, service);
   }
 
   @Override
-  public boolean stopContainer(ContainerId containerId) {
-    return false;
+  public ListenableFuture<Boolean> stopContainer(final ContainerId 
containerId) {
+    Container container = allocatedContainersMap.get(containerId);
+    ListenableFuture<ContainerStopResponse> future = 
applicationMaster.stopContainer(container);
+    return Futures.transform(future, new Function<ContainerStopResponse, 
Boolean>() {
+      @Override
+      public Boolean apply(ContainerStopResponse response) {
+        return response != null;
+      }
+    }, service);
   }
 
   @Override

Reply via email to