Repository: twill
Updated Branches:
  refs/heads/site 7bdf857c0 -> 3680e00f0


(TWILL-186) Guard against YARN returning mismatch container size case.

- Also make sure we don't remove container request without adding it first
- Code cleanup for ApplicationMasterService and related classes
  - Get rid of the inner loop in the doRun method
    - The inner loop can block the heartbeat thread for too long if there are a 
lot of runnable instances to stop
  - Remove unnecessary throwables.propagate
  - Remove unnecessary intermediate method
  - Better logging
  - Request multiple instances in the same request
  - Refactory/simiply placement policy related code
  - Expose container instanceId instead of parsing it from runId

This closes #34 on Github.

Signed-off-by: Terence Yim <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/twill/commit/f4df32da
Tree: http://git-wip-us.apache.org/repos/asf/twill/tree/f4df32da
Diff: http://git-wip-us.apache.org/repos/asf/twill/diff/f4df32da

Branch: refs/heads/site
Commit: f4df32da26cbc7e4fb93f88fb0b1306575a9cf0e
Parents: 140f7da
Author: Terence Yim <[email protected]>
Authored: Mon Feb 27 21:22:32 2017 -0800
Committer: Terence Yim <[email protected]>
Committed: Wed Mar 1 16:18:51 2017 -0800

----------------------------------------------------------------------
 .../internal/TwillContainerController.java      |  10 +-
 .../twill/internal/TwillContainerLauncher.java  |   5 +
 .../appmaster/ApplicationMasterService.java     | 198 ++++++++-----------
 .../internal/appmaster/ExpectedContainers.java  |  13 +-
 .../appmaster/PlacementPolicyManager.java       |  77 ++------
 .../appmaster/RunnableContainerRequest.java     |   6 +-
 .../internal/appmaster/RunningContainers.java   |  11 +-
 .../internal/yarn/AbstractYarnAMClient.java     |  50 ++---
 .../twill/internal/yarn/YarnAMClient.java       |  40 +++-
 9 files changed, 186 insertions(+), 224 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/twill/blob/f4df32da/twill-core/src/main/java/org/apache/twill/internal/TwillContainerController.java
----------------------------------------------------------------------
diff --git 
a/twill-core/src/main/java/org/apache/twill/internal/TwillContainerController.java
 
b/twill-core/src/main/java/org/apache/twill/internal/TwillContainerController.java
index 692e6b2..15689f5 100644
--- 
a/twill-core/src/main/java/org/apache/twill/internal/TwillContainerController.java
+++ 
b/twill-core/src/main/java/org/apache/twill/internal/TwillContainerController.java
@@ -22,6 +22,8 @@ import com.google.common.util.concurrent.Service;
 import org.apache.twill.api.ServiceController;
 import org.apache.twill.internal.state.Message;
 
+import javax.annotation.Nullable;
+
 /**
  * A {@link ServiceController} that allows sending a message directly. 
Internal use only.
  */
@@ -36,7 +38,13 @@ public interface TwillContainerController extends 
ServiceController, Service {
   void completed(int exitStatus);
 
   /**
-   * @returns the container's live node data.
+   * @return the container's live node data.
    */
+  @Nullable
   ContainerLiveNodeData getLiveNodeData();
+
+  /**
+   * @return the instance ID of the runnable that running in the container 
controlled by this controller.
+   */
+  int getInstanceId();
 }

http://git-wip-us.apache.org/repos/asf/twill/blob/f4df32da/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
----------------------------------------------------------------------
diff --git 
a/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
 
b/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
index 8dce91e..9b6384c 100644
--- 
a/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
+++ 
b/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
@@ -262,6 +262,11 @@ public final class TwillContainerLauncher {
       processController.cancel();
     }
 
+    @Override
+    public int getInstanceId() {
+      return instanceId;
+    }
+
     private void killAndWait(int maxWaitSecs) {
       Stopwatch watch = new Stopwatch();
       watch.start();

http://git-wip-us.apache.org/repos/asf/twill/blob/f4df32da/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
----------------------------------------------------------------------
diff --git 
a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
 
b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
index d5beb69..b4ac288 100644
--- 
a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
+++ 
b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
@@ -21,7 +21,6 @@ import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
 import com.google.common.base.Strings;
 import com.google.common.base.Supplier;
-import com.google.common.base.Throwables;
 import com.google.common.collect.DiscreteDomains;
 import com.google.common.collect.HashMultiset;
 import com.google.common.collect.ImmutableSet;
@@ -160,8 +159,8 @@ public final class ApplicationMasterService extends 
AbstractYarnTwillService imp
                                                         
Long.parseLong(System.getenv(EnvKeys.YARN_APP_ID_CLUSTER_TIME)),
                                                         
amClient.getContainerId().toString(), getLocalizeFiles());
 
-    this.expectedContainers = initExpectedContainers(twillSpec);
-    this.runningContainers = initRunningContainers(amClient.getContainerId(), 
amClient.getHost());
+    this.expectedContainers = new ExpectedContainers(twillSpec);
+    this.runningContainers = 
createRunningContainers(amClient.getContainerId(), amClient.getHost());
     this.eventHandler = createEventHandler(twillSpec);
   }
 
@@ -180,26 +179,22 @@ public final class ApplicationMasterService extends 
AbstractYarnTwillService imp
 
   @SuppressWarnings("unchecked")
   @Nullable
-  private EventHandler createEventHandler(TwillSpecification twillSpec) {
-    try {
-      // Should be able to load by this class ClassLoader, as they packaged in 
the same jar.
-      EventHandlerSpecification handlerSpec = twillSpec.getEventHandler();
-      if (handlerSpec == null) {
-        return null;
-      }
-
-      Class<?> handlerClass = 
getClass().getClassLoader().loadClass(handlerSpec.getClassName());
-      
Preconditions.checkArgument(EventHandler.class.isAssignableFrom(handlerClass),
-                                  "Class {} does not implements {}",
-                                  handlerClass, EventHandler.class.getName());
-      return Instances.newInstance((Class<? extends EventHandler>) 
handlerClass);
-    } catch (Exception e) {
-      throw Throwables.propagate(e);
+  private EventHandler createEventHandler(TwillSpecification twillSpec) throws 
ClassNotFoundException {
+    // Should be able to load by this class ClassLoader, as they packaged in 
the same jar.
+    EventHandlerSpecification handlerSpec = twillSpec.getEventHandler();
+    if (handlerSpec == null) {
+      return null;
     }
+
+    Class<?> handlerClass = 
getClass().getClassLoader().loadClass(handlerSpec.getClassName());
+    
Preconditions.checkArgument(EventHandler.class.isAssignableFrom(handlerClass),
+                                "Class {} does not implements {}",
+                                handlerClass, EventHandler.class.getName());
+    return Instances.newInstance((Class<? extends EventHandler>) handlerClass);
   }
 
-  private RunningContainers initRunningContainers(ContainerId 
appMasterContainerId,
-                                                  String appMasterHost) throws 
Exception {
+  private RunningContainers createRunningContainers(ContainerId 
appMasterContainerId,
+                                                    String appMasterHost) 
throws Exception {
     TwillRunResources appMasterResources = new DefaultTwillRunResources(
       0,
       appMasterContainerId.toString(),
@@ -211,14 +206,6 @@ public final class ApplicationMasterService extends 
AbstractYarnTwillService imp
       twillSpec.getRunnables(), twillRuntimeSpec.getMaxRetries());
   }
 
-  private ExpectedContainers initExpectedContainers(TwillSpecification 
twillSpec) {
-    Map<String, Integer> expectedCounts = Maps.newHashMap();
-    for (RuntimeSpecification runtimeSpec : twillSpec.getRunnables().values()) 
{
-      expectedCounts.put(runtimeSpec.getName(), 
runtimeSpec.getResourceSpecification().getInstances());
-    }
-    return new ExpectedContainers(expectedCounts);
-  }
-
   @Override
   public ResourceReport get() {
     return runningContainers.getResourceReport();
@@ -392,8 +379,14 @@ public final class ApplicationMasterService extends 
AbstractYarnTwillService imp
     boolean isRequestRelaxed = false;
     long nextTimeoutCheck = System.currentTimeMillis() + 
Constants.PROVISION_TIMEOUT;
     while (!stopped) {
-      // Call allocate. It has to be made at first in order to be able to get 
cluster resource availability.
-      amClient.allocate(0.0f, allocateHandler);
+      TimeUnit.SECONDS.sleep(1);
+
+      try {
+        // Call allocate. It has to be made at first in order to be able to 
get cluster resource availability.
+        amClient.allocate(0.0f, allocateHandler);
+      } catch (Exception e) {
+        LOG.warn("Exception raised when making heartbeat to RM. Will be 
retried in next heartbeat.", e);
+      }
 
       // Looks for containers requests.
       if (provisioning.isEmpty() && runnableContainerRequests.isEmpty() && 
runningContainers.isEmpty()) {
@@ -402,28 +395,22 @@ public final class ApplicationMasterService extends 
AbstractYarnTwillService imp
       }
 
       // If nothing is in provisioning, and no pending request, move to next 
one
-      int count = runnableContainerRequests.size();
-      LOG.debug("Runnable container requests: {}", count);
-      while (provisioning.isEmpty() && currentRequest == null && 
!runnableContainerRequests.isEmpty()) {
-        RunnableContainerRequest runnableContainerRequest = 
runnableContainerRequests.peek();
-        if (!runnableContainerRequest.isReadyToBeProvisioned()) {
-          // take it out from queue and put it back at the end for second 
chance.
-          runnableContainerRequest = runnableContainerRequests.poll();
-          runnableContainerRequests.add(runnableContainerRequest);
-          LOG.debug("Request not ready: {}", runnableContainerRequest);
-
-          // We checked all the requests that were pending when we started 
this loop
-          // Any remaining requests are not ready to be provisioned
-          if (--count <= 0) {
-            break;
-          }
+      if (provisioning.isEmpty() && currentRequest == null && 
!runnableContainerRequests.isEmpty()) {
+        RunnableContainerRequest containerRequest = 
runnableContainerRequests.peek();
+        // If the request at the head of the request queue is not yet ready, 
move it to the end of the queue
+        // so that it won't block requests that are already ready
+        if (!containerRequest.isReadyToBeProvisioned()) {
+          LOG.debug("Request not ready: {}", containerRequest);
+          runnableContainerRequests.add(runnableContainerRequests.poll());
           continue;
         }
-        currentRequest = runnableContainerRequest.takeRequest();
+
+        currentRequest = containerRequest.takeRequest();
         if (currentRequest == null) {
           // All different types of resource request from current order is 
done, move to next one
           // TODO: Need to handle order type as well
           runnableContainerRequests.poll();
+          continue;
         }
       }
 
@@ -448,10 +435,6 @@ public final class ApplicationMasterService extends 
AbstractYarnTwillService imp
       }
 
       nextTimeoutCheck = checkProvisionTimeout(nextTimeoutCheck);
-
-      if (isRunning()) {
-        TimeUnit.SECONDS.sleep(1);
-      }
     }
   }
 
@@ -462,28 +445,27 @@ public final class ApplicationMasterService extends 
AbstractYarnTwillService imp
     amClient.clearBlacklist();
 
     //Check the allocation strategy
-    AllocationSpecification currentAllocationSpecification = request.getKey();
-    if 
(currentAllocationSpecification.getType().equals(AllocationSpecification.Type.ALLOCATE_ONE_INSTANCE_AT_A_TIME))
 {
-
-      //Check the placement policy
-      TwillSpecification.PlacementPolicy placementPolicy =
-        
placementPolicyManager.getPlacementPolicy(currentAllocationSpecification.getRunnableName());
-      if (placementPolicy != null
-        && 
placementPolicy.getType().equals(TwillSpecification.PlacementPolicy.Type.DISTRIBUTED))
 {
-
-        //Update blacklist with hosts which are running DISTRIBUTED runnables
-        for (String runnable : 
placementPolicyManager.getFellowRunnables(request.getKey().getRunnableName())) {
-          Collection<ContainerInfo> containerStats =
-            runningContainers.getContainerInfo(runnable);
-          for (ContainerInfo containerInfo : containerStats) {
-            // Yarn Resource Manager may include port in the node name 
depending on the setting
-            // YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME. It is 
safe to add both
-            // the names (with and without port) to the blacklist.
-            LOG.debug("Adding {} to host blacklist", 
containerInfo.getHost().getHostName());
-            amClient.addToBlacklist(containerInfo.getHost().getHostName());
-            amClient.addToBlacklist(containerInfo.getHost().getHostName() + 
":" + containerInfo.getPort());
-          }
-        }
+    AllocationSpecification allocationSpec = request.getKey();
+    if 
(!allocationSpec.getType().equals(AllocationSpecification.Type.ALLOCATE_ONE_INSTANCE_AT_A_TIME))
 {
+      return;
+    }
+
+    //Check the placement policy
+    String runnableName = allocationSpec.getRunnableName();
+    TwillSpecification.PlacementPolicy placementPolicy = 
placementPolicyManager.getPlacementPolicy(runnableName);
+    if (placementPolicy == null || placementPolicy.getType() != 
TwillSpecification.PlacementPolicy.Type.DISTRIBUTED) {
+      return;
+    }
+
+    //Update blacklist with hosts which are running DISTRIBUTED runnables
+    for (String runnable : placementPolicy.getNames()) {
+      for (ContainerInfo containerInfo : 
runningContainers.getContainerInfo(runnable)) {
+        // Yarn Resource Manager may include port in the node name depending 
on the setting
+        // YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME. It is 
safe to add both
+        // the names (with and without port) to the blacklist.
+        LOG.debug("Adding {} to host blacklist", 
containerInfo.getHost().getHostName());
+        amClient.addToBlacklist(containerInfo.getHost().getHostName());
+        amClient.addToBlacklist(containerInfo.getHost().getHostName() + ":" + 
containerInfo.getPort());
       }
     }
   }
@@ -501,9 +483,7 @@ public final class ApplicationMasterService extends 
AbstractYarnTwillService imp
 
     for (Multiset.Entry<String> entry : restartRunnables.entrySet()) {
       LOG.info("Re-request container for {} with {} instances.", 
entry.getElement(), entry.getCount());
-      for (int i = 0; i < entry.getCount(); i++) {
-        
runnableContainerRequests.add(createRunnableContainerRequest(entry.getElement()));
-      }
+      
runnableContainerRequests.add(createRunnableContainerRequest(entry.getElement(),
  entry.getCount()));
     }
 
     // For all runnables that needs to re-request for containers, update the 
expected count timestamp
@@ -584,10 +564,10 @@ public final class ApplicationMasterService extends 
AbstractYarnTwillService imp
     Queue<RunnableContainerRequest> requests = new ConcurrentLinkedQueue<>();
     // For each order in the twillSpec, create container request for 
runnables, depending on Placement policy.
     for (TwillSpecification.Order order : twillSpec.getOrders()) {
-      Set<String> distributedRunnables = 
placementPolicyManager.getDistributedRunnables(order.getNames());
-      Set<String> defaultRunnables = Sets.newHashSet();
-      defaultRunnables.addAll(order.getNames());
-      defaultRunnables.removeAll(distributedRunnables);
+      Set<String> distributedRunnables = 
Sets.intersection(placementPolicyManager.getDistributedRunnables(),
+                                                           order.getNames());
+      Set<String> defaultRunnables = Sets.difference(order.getNames(), 
distributedRunnables);
+
       Map<AllocationSpecification, Collection<RuntimeSpecification>> 
requestsMap = Maps.newHashMap();
       for (String runnableName : distributedRunnables) {
         RuntimeSpecification runtimeSpec = 
twillSpec.getRunnables().get(runnableName);
@@ -637,19 +617,19 @@ public final class ApplicationMasterService extends 
AbstractYarnTwillService imp
           //Spawning 1 instance at a time
           newContainers = 1;
         }
+
+        // TODO: Allow user to set priority?
+        LOG.info("Request {} containers with capability {} for runnable {}", 
newContainers, capability, name);
+        YarnAMClient.ContainerRequestBuilder builder = 
amClient.addContainerRequest(capability, newContainers);
+        builder.setPriority(0);
+
         TwillSpecification.PlacementPolicy placementPolicy = 
placementPolicyManager.getPlacementPolicy(name);
-        Set<String> hosts = Sets.newHashSet();
-        Set<String> racks = Sets.newHashSet();
         if (placementPolicy != null) {
-          hosts = placementPolicy.getHosts();
-          racks = placementPolicy.getRacks();
+          builder.addHosts(placementPolicy.getHosts())
+                 .addRacks(placementPolicy.getRacks());
         }
-        // TODO: Allow user to set priority?
-        LOG.info("Request {} containers with capability {} for runnable {}", 
newContainers, capability, name);
-        String requestId = amClient.addContainerRequest(capability, 
newContainers)
-          .addHosts(hosts)
-          .addRacks(racks)
-          .setPriority(0).apply();
+
+        String requestId = builder.apply();
         provisioning.add(new ProvisionRequest(runtimeSpec, requestId, 
newContainers, allocationType));
       }
     }
@@ -661,14 +641,14 @@ public final class ApplicationMasterService extends 
AbstractYarnTwillService imp
   private void launchRunnable(List<? extends 
ProcessLauncher<YarnContainerInfo>> launchers,
                               Queue<ProvisionRequest> provisioning) {
     for (ProcessLauncher<YarnContainerInfo> processLauncher : launchers) {
-      LOG.info("Got container {}", processLauncher.getContainerInfo().getId());
+      LOG.info("Container allocated: {}", 
processLauncher.getContainerInfo().getContainer());
       ProvisionRequest provisionRequest = provisioning.peek();
       if (provisionRequest == null) {
         continue;
       }
 
       String runnableName = provisionRequest.getRuntimeSpec().getName();
-      LOG.info("Starting runnable {} with {}", runnableName, processLauncher);
+      LOG.info("Starting runnable {} in {}", runnableName, 
processLauncher.getContainerInfo().getContainer());
 
       LOG.debug("Log level for Twill runnable {} is {}", runnableName,
                 
twillRuntimeSpec.getLogLevels().get(runnableName).get(Logger.ROOT_LOGGER_NAME));
@@ -713,17 +693,15 @@ public final class ApplicationMasterService extends 
AbstractYarnTwillService imp
     }
   }
 
-  private List<LocalFile> getLocalizeFiles() {
+  private List<LocalFile> getLocalizeFiles() throws IOException {
     try (Reader reader = 
Files.newBufferedReader(Paths.get(Constants.Files.LOCALIZE_FILES), 
StandardCharsets.UTF_8)) {
       return new GsonBuilder().registerTypeAdapter(LocalFile.class, new 
LocalFileCodec())
         .create().fromJson(reader, new TypeToken<List<LocalFile>>() {
         }.getType());
-    } catch (IOException e) {
-      throw Throwables.propagate(e);
     }
   }
 
-  private Map<String, Map<String, String>> getEnvironments() {
+  private Map<String, Map<String, String>> getEnvironments() throws 
IOException {
     Path envFile = Paths.get(Constants.Files.RUNTIME_CONFIG_JAR, 
Constants.Files.ENVIRONMENTS);
     if (!Files.exists(envFile)) {
       return new HashMap<>();
@@ -732,8 +710,6 @@ public final class ApplicationMasterService extends 
AbstractYarnTwillService imp
     try (Reader reader = Files.newBufferedReader(envFile, 
StandardCharsets.UTF_8)) {
       return new Gson().fromJson(reader, new TypeToken<Map<String, Map<String, 
String>>>() {
       }.getType());
-    } catch (IOException e) {
-      throw Throwables.propagate(e);
     }
   }
 
@@ -823,10 +799,6 @@ public final class ApplicationMasterService extends 
AbstractYarnTwillService imp
     };
   }
 
-  private RunnableContainerRequest createRunnableContainerRequest(final String 
runnableName) {
-    return createRunnableContainerRequest(runnableName, 1);
-  }
-
   private RunnableContainerRequest createRunnableContainerRequest(final String 
runnableName,
                                                                   final int 
numberOfInstances) {
     return createRunnableContainerRequest(runnableName, numberOfInstances, 
true);
@@ -846,8 +818,8 @@ public final class ApplicationMasterService extends 
AbstractYarnTwillService imp
     RuntimeSpecification runtimeSpec = 
twillSpec.getRunnables().get(runnableName);
     Resource capability = 
createCapability(runtimeSpec.getResourceSpecification());
     Map<AllocationSpecification, Collection<RuntimeSpecification>> requestsMap 
= Maps.newHashMap();
-    if (placementPolicyManager.getPlacementPolicyType(runnableName).equals(
-      TwillSpecification.PlacementPolicy.Type.DISTRIBUTED)) {
+
+    if 
(placementPolicyManager.getDistributedRunnables().contains(runnableName)) {
       for (int instanceId = 0; instanceId < numberOfInstances; instanceId++) {
         AllocationSpecification allocationSpecification =
           new AllocationSpecification(capability, 
AllocationSpecification.Type.ALLOCATE_ONE_INSTANCE_AT_A_TIME,
@@ -938,18 +910,9 @@ public final class ApplicationMasterService extends 
AbstractYarnTwillService imp
   /**
    * Helper method to restart instances of runnables.
    */
-  private void restartRunnableInstances(String runnableName, @Nullable 
Set<Integer> instanceIds,
+  private void restartRunnableInstances(final String runnableName, @Nullable 
final Set<Integer> instanceIds,
                                         final Runnable completion) {
-    Runnable restartInstancesRunnable = 
createRestartInstancesRunnable(runnableName, instanceIds, completion);
-    instanceChangeExecutor.execute(restartInstancesRunnable);
-  }
-
-  /**
-   * Creates a Runnable for execution of restart instances request.
-   */
-  private Runnable createRestartInstancesRunnable(final String runnableName, 
@Nullable final Set<Integer> instanceIds,
-                                                  final Runnable completion) {
-    return new Runnable() {
+    instanceChangeExecutor.execute(new Runnable() {
       @Override
       public void run() {
         LOG.debug("Begin restart runnable {} instances.", runnableName);
@@ -974,8 +937,11 @@ public final class ApplicationMasterService extends 
AbstractYarnTwillService imp
           }
         }
 
+        LOG.info("All instances in {} for runnable {} are stopped. Ready to 
provision",
+                 instancesToRemove, runnableName);
+
         // set the container request to be ready
-        containerRequest.setReadyToBeProvisioned(true);
+        containerRequest.setReadyToBeProvisioned();
 
         // For all runnables that needs to re-request for containers, update 
the expected count timestamp
         // so that the EventHandler would be triggered with the right 
expiration timestamp.
@@ -983,7 +949,7 @@ public final class ApplicationMasterService extends 
AbstractYarnTwillService imp
 
         completion.run();
       }
-    };
+    });
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/twill/blob/f4df32da/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ExpectedContainers.java
----------------------------------------------------------------------
diff --git 
a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ExpectedContainers.java
 
b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ExpectedContainers.java
index f4ebbd0..c0ffdb0 100644
--- 
a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ExpectedContainers.java
+++ 
b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ExpectedContainers.java
@@ -19,6 +19,8 @@ package org.apache.twill.internal.appmaster;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
+import org.apache.twill.api.RuntimeSpecification;
+import org.apache.twill.api.TwillSpecification;
 
 import java.util.Map;
 
@@ -30,13 +32,14 @@ final class ExpectedContainers {
 
   private final Map<String, ExpectedCount> expectedCounts;
 
-  ExpectedContainers(Map<String, Integer> expected) {
-    expectedCounts = Maps.newHashMap();
+  ExpectedContainers(TwillSpecification twillSpec) {
+    Map<String, ExpectedCount> expectedCounts = Maps.newHashMap();
     long now = System.currentTimeMillis();
-
-    for (Map.Entry<String, Integer> entry : expected.entrySet()) {
-      expectedCounts.put(entry.getKey(), new ExpectedCount(entry.getValue(), 
now));
+    for (RuntimeSpecification runtimeSpec : twillSpec.getRunnables().values()) 
{
+      expectedCounts.put(runtimeSpec.getName(),
+                         new 
ExpectedCount(runtimeSpec.getResourceSpecification().getInstances(), now));
     }
+    this.expectedCounts = expectedCounts;
   }
 
   synchronized void setExpected(String runnable, int expected) {

http://git-wip-us.apache.org/repos/asf/twill/blob/f4df32da/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/PlacementPolicyManager.java
----------------------------------------------------------------------
diff --git 
a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/PlacementPolicyManager.java
 
b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/PlacementPolicyManager.java
index 91a58bb..a4e6af5 100644
--- 
a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/PlacementPolicyManager.java
+++ 
b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/PlacementPolicyManager.java
@@ -18,60 +18,42 @@
 
 package org.apache.twill.internal.appmaster;
 
-import com.google.common.collect.Sets;
 import org.apache.twill.api.TwillSpecification;
 
 import java.util.Collections;
+import java.util.EnumMap;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
+import javax.annotation.Nullable;
 
 /**
  * This class provides helper functions for operating on a set of Placement 
Policies.
  */
-public class PlacementPolicyManager {
-  List<TwillSpecification.PlacementPolicy> placementPolicies;
+final class PlacementPolicyManager {
 
-  public PlacementPolicyManager(List<TwillSpecification.PlacementPolicy> 
placementPolicies) {
-    this.placementPolicies = placementPolicies;
-  }
+  private final Map<TwillSpecification.PlacementPolicy.Type, Set<String>> 
policyTypeToRunnables;
+  private final Map<String, TwillSpecification.PlacementPolicy> 
runnablePolicies;
 
-  /**
-   * Given a set of runnables, get all runnables which belong to DISTRIBUTED 
placement policies.
-   * @param givenRunnables Set of runnables.
-   * @return Subset of runnables, which belong to DISTRIBUTED placement 
policies.
-   */
-  public Set<String> getDistributedRunnables(Set<String> givenRunnables) {
-    Set<String> distributedRunnables = getAllDistributedRunnables();
-    distributedRunnables.retainAll(givenRunnables);
-    return distributedRunnables;
-  }
+  PlacementPolicyManager(List<TwillSpecification.PlacementPolicy> policies) {
+    this.policyTypeToRunnables = new 
EnumMap<>(TwillSpecification.PlacementPolicy.Type.class);
+    this.runnablePolicies = new HashMap<>();
 
-  /**
-   * Given a runnable, get the type of placement policy. Returns DEFAULT if no 
placement policy is specified.
-   * @param runnableName Name of runnable.
-   * @return Placement policy type of the runnable.
-   */
-  public TwillSpecification.PlacementPolicy.Type getPlacementPolicyType(String 
runnableName) {
-    for (TwillSpecification.PlacementPolicy placementPolicy : 
placementPolicies) {
-      if (placementPolicy.getNames().contains(runnableName)) {
-        return placementPolicy.getType();
+    for (TwillSpecification.PlacementPolicy policy : policies) {
+      policyTypeToRunnables.put(policy.getType(), policy.getNames());
+      for (String runnable : policy.getNames()) {
+        runnablePolicies.put(runnable, policy);
       }
     }
-    return TwillSpecification.PlacementPolicy.Type.DEFAULT;
   }
 
   /**
-   * Get all runnables which belong to the same Placement policy as the given 
runnable.
-   * @param runnableName Name of runnable.
-   * @return Set of runnables, with same placement policy.
+   * Returns all runnables which belong to DISTRIBUTED placement policies.
    */
-  public Set<String> getFellowRunnables(String runnableName) {
-    for (TwillSpecification.PlacementPolicy placementPolicy : 
placementPolicies) {
-      if (placementPolicy.getNames().contains(runnableName)) {
-        return placementPolicy.getNames();
-      }
-    }
-    return Collections.emptySet();
+  Set<String> getDistributedRunnables() {
+    Set<String> runnables = 
policyTypeToRunnables.get(TwillSpecification.PlacementPolicy.Type.DISTRIBUTED);
+    return runnables == null ? Collections.<String>emptySet() : runnables;
   }
 
   /**
@@ -80,25 +62,8 @@ public class PlacementPolicyManager {
    * @param runnableName Name of runnable.
    * @return Placement policy of the runnable.
    */
-  public TwillSpecification.PlacementPolicy getPlacementPolicy(String 
runnableName) {
-    for (TwillSpecification.PlacementPolicy placementPolicy : 
placementPolicies) {
-      if (placementPolicy.getNames().contains(runnableName)) {
-        return placementPolicy;
-      }
-    }
-    return null;
-  }
-
-  /**
-   * Gets all runnables which belong to DISTRIBUTED placement policies.
-   */
-  private Set<String> getAllDistributedRunnables() {
-    Set<String> distributedRunnables = Sets.newHashSet();
-    for (TwillSpecification.PlacementPolicy placementPolicy : 
placementPolicies) {
-      if 
(placementPolicy.getType().equals(TwillSpecification.PlacementPolicy.Type.DISTRIBUTED))
 {
-        distributedRunnables.addAll(placementPolicy.getNames());
-      }
-    }
-    return  distributedRunnables;
+  @Nullable
+  TwillSpecification.PlacementPolicy getPlacementPolicy(String runnableName) {
+    return runnablePolicies.get(runnableName);
   }
 }

http://git-wip-us.apache.org/repos/asf/twill/blob/f4df32da/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableContainerRequest.java
----------------------------------------------------------------------
diff --git 
a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableContainerRequest.java
 
b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableContainerRequest.java
index e001121..1dcffed 100644
--- 
a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableContainerRequest.java
+++ 
b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableContainerRequest.java
@@ -53,12 +53,12 @@ final class RunnableContainerRequest {
     return orderType;
   }
 
-  public boolean isReadyToBeProvisioned() {
+  boolean isReadyToBeProvisioned() {
     return isReadyToBeProvisioned;
   }
 
-  public void setReadyToBeProvisioned(boolean isProvisioned) {
-    this.isReadyToBeProvisioned = isProvisioned;
+  void setReadyToBeProvisioned() {
+    this.isReadyToBeProvisioned = true;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/twill/blob/f4df32da/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java
----------------------------------------------------------------------
diff --git 
a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java
 
b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java
index 0763f26..a950c46 100644
--- 
a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java
+++ 
b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java
@@ -236,7 +236,7 @@ final class RunningContainers {
     try {
       // Find the controller with particular instance id.
       for (Map.Entry<String, TwillContainerController> entry : 
containers.row(runnableName).entrySet()) {
-        if (getInstanceId(entry.getValue().getRunId()) == instanceId) {
+        if (entry.getValue().getInstanceId() == instanceId) {
           containerId = entry.getKey();
           controller = entry.getValue();
           break;
@@ -449,7 +449,7 @@ final class RunningContainers {
 
       for (Map.Entry<String, TwillContainerController> completedEntry : 
lookup.entrySet()) {
         TwillContainerController controller = completedEntry.getValue();
-        instanceId = getInstanceId(controller.getRunId());
+        instanceId = controller.getInstanceId();
 
         // TODO: Can there be multiple controllers for a single container?
         // TODO: What is the best way to determine whether to restart 
container when there are multiple controllers?
@@ -466,7 +466,7 @@ final class RunningContainers {
         }
         // TODO: should we remove the completed instance from instanceId and 
resource report even on failures?
         // TODO: won't they get added back when the container is re-requested?
-        removeInstanceId(runnableName, getInstanceId(controller.getRunId()));
+        removeInstanceId(runnableName, controller.getInstanceId());
         resourceReport.removeRunnableResources(runnableName, containerId);
       }
       
@@ -651,11 +651,6 @@ final class RunningContainers {
     return RunIds.fromString(baseId.getId() + '-' + instanceId);
   }
 
-  private int getInstanceId(RunId runId) {
-    String id = runId.getId();
-    return Integer.parseInt(id.substring(id.lastIndexOf('-') + 1));
-  }
-
   /**
    * Given the containerId, removes the corresponding containerInfo.
    * @param containerId Id for the container to be removed.

http://git-wip-us.apache.org/repos/asf/twill/blob/f4df32da/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnAMClient.java
----------------------------------------------------------------------
diff --git 
a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnAMClient.java
 
b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnAMClient.java
index d28f810..e8f3b2b 100644
--- 
a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnAMClient.java
+++ 
b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnAMClient.java
@@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory;
 import java.net.InetSocketAddress;
 import java.net.URL;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import javax.annotation.Nullable;
@@ -50,12 +51,11 @@ public abstract class AbstractYarnAMClient<T> extends 
AbstractIdleService implem
   private static final Logger LOG = 
LoggerFactory.getLogger(AbstractYarnAMClient.class);
 
   // Map from a unique ID to inflight requests
-  private final Multimap<String, T> containerRequests;
-
-  // List of requests pending to send through allocate call
-  private final List<T> requests;
+  private final Multimap<String, T> inflightRequests;
+  // Map from a unique ID to pending requests that are not yet submitted to 
YARN
+  private final Multimap<String, T> pendingRequests;
   // List of requests pending to remove through allocate call
-  private final List<T> removes;
+  private final List<T> pendingRemoves;
   //List of pending blacklist additions for the next allocate call
   private final List<String> blacklistAdditions;
   //List of pending blacklist removals for the next allocate call
@@ -81,9 +81,9 @@ public abstract class AbstractYarnAMClient<T> extends 
AbstractIdleService implem
     Preconditions.checkArgument(masterContainerId != null,
                                 "Missing %s from environment", 
containerIdEnvName);
     this.containerId = ConverterUtils.toContainerId(masterContainerId);
-    this.containerRequests = ArrayListMultimap.create();
-    this.requests = Lists.newLinkedList();
-    this.removes = Lists.newLinkedList();
+    this.inflightRequests = ArrayListMultimap.create();
+    this.pendingRequests = ArrayListMultimap.create();
+    this.pendingRemoves = Lists.newLinkedList();
     this.blacklistAdditions = Lists.newArrayList();
     this.blacklistRemovals = Lists.newArrayList();
     this.blacklistedResources = Lists.newArrayList();
@@ -109,16 +109,17 @@ public abstract class AbstractYarnAMClient<T> extends 
AbstractIdleService implem
     // With bug YARN-314, if we mix the allocate call with new container 
request of the same priority,
     // in some cases the RM would not see the new request (based on sorting of 
resource capability),
     // but rather only see the one with size = 0.
-    if (removes.isEmpty()) {
-      for (T request : requests) {
-        addContainerRequest(request);
+    if (pendingRemoves.isEmpty()) {
+      for (Map.Entry<String, T> entry : pendingRequests.entries()) {
+        addContainerRequest(entry.getValue());
       }
-      requests.clear();
+      inflightRequests.putAll(pendingRequests);
+      pendingRequests.clear();
     } else {
-      for (T request : removes) {
+      for (T request : pendingRemoves) {
         removeContainerRequest(request);
       }
-      removes.clear();
+      pendingRemoves.clear();
     }
 
     if (!blacklistAdditions.isEmpty() || !blacklistRemovals.isEmpty()) {
@@ -131,7 +132,12 @@ public abstract class AbstractYarnAMClient<T> extends 
AbstractIdleService implem
     List<RunnableProcessLauncher> launchers = allocateResponse.getLaunchers();
 
     if (!launchers.isEmpty()) {
-      handler.acquired(launchers);
+      // Only call handler acquire if there is actually inflight requests.
+      // This is to workaround the YARN behavior that it can return more 
containers being asked,
+      // such that it causes us to launch process in the pending requests with 
the wrong container size
+      if (!inflightRequests.isEmpty()) {
+        handler.acquired(launchers);
+      }
 
       // If no process has been launched through the given launcher, return 
the container.
       for (ProcessLauncher<YarnContainerInfo> l : launchers) {
@@ -140,7 +146,7 @@ public abstract class AbstractYarnAMClient<T> extends 
AbstractIdleService implem
         if (!launcher.isLaunched()) {
           YarnContainerInfo containerInfo = launcher.getContainerInfo();
           // Casting is needed in Java 8, otherwise it complains about 
ambiguous method over the info(String, Throwable)
-          LOG.info("Nothing to run in container, releasing it: {}", (Object) 
containerInfo.getContainer());
+          LOG.info("Nothing to run in container, releasing it: {}", 
containerInfo.getContainer());
           releaseAssignedContainer(containerInfo);
         }
       }
@@ -153,11 +159,6 @@ public abstract class AbstractYarnAMClient<T> extends 
AbstractIdleService implem
   }
 
   @Override
-  public final ContainerRequestBuilder addContainerRequest(Resource 
capability) {
-    return addContainerRequest(capability, 1);
-  }
-
-  @Override
   public final ContainerRequestBuilder addContainerRequest(Resource 
capability, int count) {
     return new ContainerRequestBuilder(adjustCapability(capability), count) {
       @Override
@@ -170,8 +171,7 @@ public abstract class AbstractYarnAMClient<T> extends 
AbstractIdleService implem
 
           for (int i = 0; i < count; i++) {
             T request = createContainerRequest(priority, capability, hosts, 
racks, relaxLocality);
-            containerRequests.put(id, request);
-            requests.add(request);
+            pendingRequests.put(id, request);
           }
 
           return id;
@@ -208,8 +208,8 @@ public abstract class AbstractYarnAMClient<T> extends 
AbstractIdleService implem
 
   @Override
   public final synchronized void completeContainerRequest(String id) {
-    for (T request : containerRequests.removeAll(id)) {
-      removes.add(request);
+    for (T request : inflightRequests.removeAll(id)) {
+      pendingRemoves.add(request);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/twill/blob/f4df32da/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAMClient.java
----------------------------------------------------------------------
diff --git 
a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAMClient.java 
b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAMClient.java
index a10181e..65856ca 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAMClient.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAMClient.java
@@ -86,10 +86,19 @@ public interface YarnAMClient extends Service {
     }
   }
 
+  /**
+   * Returns the container ID of the application.
+   */
   ContainerId getContainerId();
 
+  /**
+   * Returns the hostname of the node manager that the AM is running on
+   */
   String getHost();
 
+  /**
+   * Returns the port of the node manager that the AM is running on
+   */
   int getNMPort();
 
   /**
@@ -98,19 +107,10 @@ public interface YarnAMClient extends Service {
   void setTracker(InetSocketAddress trackerAddr, URL trackerUrl);
 
   /**
-   * Callback for allocate call.
+   * The heartbeat call to RM.
    */
-  // TODO: Move AM heartbeat logic into this interface so AM only needs to 
handle callback.
-  interface AllocateHandler {
-    void acquired(List<? extends ProcessLauncher<YarnContainerInfo>> 
launchers);
-
-    void completed(List<YarnContainerStatus> completed);
-  }
-
   void allocate(float progress, AllocateHandler handler) throws Exception;
 
-  ContainerRequestBuilder addContainerRequest(Resource capability);
-
   ContainerRequestBuilder addContainerRequest(Resource capability, int count);
 
   void addToBlacklist(String resource);
@@ -129,4 +129,24 @@ public interface YarnAMClient extends Service {
    * @param id The ID returned by {@link 
YarnAMClient.ContainerRequestBuilder#apply()}.
    */
   void completeContainerRequest(String id);
+
+  /**
+   * Callback for allocate call.
+   */
+  interface AllocateHandler {
+
+    /**
+     * Invokes when a list of containers has been acquired from YARN
+     *
+     * @param launchers list of launchers for launching runnables
+     */
+    void acquired(List<? extends ProcessLauncher<YarnContainerInfo>> 
launchers);
+
+    /**
+     * Invokes when containers completed
+     *
+     * @param completed list of completed container status
+     */
+    void completed(List<YarnContainerStatus> completed);
+  }
 }

Reply via email to