Repository: twill
Updated Branches:
refs/heads/site 7bdf857c0 -> 3680e00f0
(TWILL-186) Guard against YARN returning mismatch container size case.
- Also make sure we don't remove container request without adding it first
- Code cleanup for ApplicationMasterService and related classes
- Get rid of the inner loop in the doRun method
- The inner loop can block the heartbeat thread for too long if there are a
lot of runnable instances to stop
- Remove unnecessary throwables.propagate
- Remove unnecessary intermediate method
- Better logging
- Request multiple instances in the same request
- Refactory/simiply placement policy related code
- Expose container instanceId instead of parsing it from runId
This closes #34 on Github.
Signed-off-by: Terence Yim <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/twill/commit/f4df32da
Tree: http://git-wip-us.apache.org/repos/asf/twill/tree/f4df32da
Diff: http://git-wip-us.apache.org/repos/asf/twill/diff/f4df32da
Branch: refs/heads/site
Commit: f4df32da26cbc7e4fb93f88fb0b1306575a9cf0e
Parents: 140f7da
Author: Terence Yim <[email protected]>
Authored: Mon Feb 27 21:22:32 2017 -0800
Committer: Terence Yim <[email protected]>
Committed: Wed Mar 1 16:18:51 2017 -0800
----------------------------------------------------------------------
.../internal/TwillContainerController.java | 10 +-
.../twill/internal/TwillContainerLauncher.java | 5 +
.../appmaster/ApplicationMasterService.java | 198 ++++++++-----------
.../internal/appmaster/ExpectedContainers.java | 13 +-
.../appmaster/PlacementPolicyManager.java | 77 ++------
.../appmaster/RunnableContainerRequest.java | 6 +-
.../internal/appmaster/RunningContainers.java | 11 +-
.../internal/yarn/AbstractYarnAMClient.java | 50 ++---
.../twill/internal/yarn/YarnAMClient.java | 40 +++-
9 files changed, 186 insertions(+), 224 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/twill/blob/f4df32da/twill-core/src/main/java/org/apache/twill/internal/TwillContainerController.java
----------------------------------------------------------------------
diff --git
a/twill-core/src/main/java/org/apache/twill/internal/TwillContainerController.java
b/twill-core/src/main/java/org/apache/twill/internal/TwillContainerController.java
index 692e6b2..15689f5 100644
---
a/twill-core/src/main/java/org/apache/twill/internal/TwillContainerController.java
+++
b/twill-core/src/main/java/org/apache/twill/internal/TwillContainerController.java
@@ -22,6 +22,8 @@ import com.google.common.util.concurrent.Service;
import org.apache.twill.api.ServiceController;
import org.apache.twill.internal.state.Message;
+import javax.annotation.Nullable;
+
/**
* A {@link ServiceController} that allows sending a message directly.
Internal use only.
*/
@@ -36,7 +38,13 @@ public interface TwillContainerController extends
ServiceController, Service {
void completed(int exitStatus);
/**
- * @returns the container's live node data.
+ * @return the container's live node data.
*/
+ @Nullable
ContainerLiveNodeData getLiveNodeData();
+
+ /**
+ * @return the instance ID of the runnable that running in the container
controlled by this controller.
+ */
+ int getInstanceId();
}
http://git-wip-us.apache.org/repos/asf/twill/blob/f4df32da/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
----------------------------------------------------------------------
diff --git
a/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
b/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
index 8dce91e..9b6384c 100644
---
a/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
+++
b/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
@@ -262,6 +262,11 @@ public final class TwillContainerLauncher {
processController.cancel();
}
+ @Override
+ public int getInstanceId() {
+ return instanceId;
+ }
+
private void killAndWait(int maxWaitSecs) {
Stopwatch watch = new Stopwatch();
watch.start();
http://git-wip-us.apache.org/repos/asf/twill/blob/f4df32da/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
----------------------------------------------------------------------
diff --git
a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
index d5beb69..b4ac288 100644
---
a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
+++
b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
@@ -21,7 +21,6 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Strings;
import com.google.common.base.Supplier;
-import com.google.common.base.Throwables;
import com.google.common.collect.DiscreteDomains;
import com.google.common.collect.HashMultiset;
import com.google.common.collect.ImmutableSet;
@@ -160,8 +159,8 @@ public final class ApplicationMasterService extends
AbstractYarnTwillService imp
Long.parseLong(System.getenv(EnvKeys.YARN_APP_ID_CLUSTER_TIME)),
amClient.getContainerId().toString(), getLocalizeFiles());
- this.expectedContainers = initExpectedContainers(twillSpec);
- this.runningContainers = initRunningContainers(amClient.getContainerId(),
amClient.getHost());
+ this.expectedContainers = new ExpectedContainers(twillSpec);
+ this.runningContainers =
createRunningContainers(amClient.getContainerId(), amClient.getHost());
this.eventHandler = createEventHandler(twillSpec);
}
@@ -180,26 +179,22 @@ public final class ApplicationMasterService extends
AbstractYarnTwillService imp
@SuppressWarnings("unchecked")
@Nullable
- private EventHandler createEventHandler(TwillSpecification twillSpec) {
- try {
- // Should be able to load by this class ClassLoader, as they packaged in
the same jar.
- EventHandlerSpecification handlerSpec = twillSpec.getEventHandler();
- if (handlerSpec == null) {
- return null;
- }
-
- Class<?> handlerClass =
getClass().getClassLoader().loadClass(handlerSpec.getClassName());
-
Preconditions.checkArgument(EventHandler.class.isAssignableFrom(handlerClass),
- "Class {} does not implements {}",
- handlerClass, EventHandler.class.getName());
- return Instances.newInstance((Class<? extends EventHandler>)
handlerClass);
- } catch (Exception e) {
- throw Throwables.propagate(e);
+ private EventHandler createEventHandler(TwillSpecification twillSpec) throws
ClassNotFoundException {
+ // Should be able to load by this class ClassLoader, as they packaged in
the same jar.
+ EventHandlerSpecification handlerSpec = twillSpec.getEventHandler();
+ if (handlerSpec == null) {
+ return null;
}
+
+ Class<?> handlerClass =
getClass().getClassLoader().loadClass(handlerSpec.getClassName());
+
Preconditions.checkArgument(EventHandler.class.isAssignableFrom(handlerClass),
+ "Class {} does not implements {}",
+ handlerClass, EventHandler.class.getName());
+ return Instances.newInstance((Class<? extends EventHandler>) handlerClass);
}
- private RunningContainers initRunningContainers(ContainerId
appMasterContainerId,
- String appMasterHost) throws
Exception {
+ private RunningContainers createRunningContainers(ContainerId
appMasterContainerId,
+ String appMasterHost)
throws Exception {
TwillRunResources appMasterResources = new DefaultTwillRunResources(
0,
appMasterContainerId.toString(),
@@ -211,14 +206,6 @@ public final class ApplicationMasterService extends
AbstractYarnTwillService imp
twillSpec.getRunnables(), twillRuntimeSpec.getMaxRetries());
}
- private ExpectedContainers initExpectedContainers(TwillSpecification
twillSpec) {
- Map<String, Integer> expectedCounts = Maps.newHashMap();
- for (RuntimeSpecification runtimeSpec : twillSpec.getRunnables().values())
{
- expectedCounts.put(runtimeSpec.getName(),
runtimeSpec.getResourceSpecification().getInstances());
- }
- return new ExpectedContainers(expectedCounts);
- }
-
@Override
public ResourceReport get() {
return runningContainers.getResourceReport();
@@ -392,8 +379,14 @@ public final class ApplicationMasterService extends
AbstractYarnTwillService imp
boolean isRequestRelaxed = false;
long nextTimeoutCheck = System.currentTimeMillis() +
Constants.PROVISION_TIMEOUT;
while (!stopped) {
- // Call allocate. It has to be made at first in order to be able to get
cluster resource availability.
- amClient.allocate(0.0f, allocateHandler);
+ TimeUnit.SECONDS.sleep(1);
+
+ try {
+ // Call allocate. It has to be made at first in order to be able to
get cluster resource availability.
+ amClient.allocate(0.0f, allocateHandler);
+ } catch (Exception e) {
+ LOG.warn("Exception raised when making heartbeat to RM. Will be
retried in next heartbeat.", e);
+ }
// Looks for containers requests.
if (provisioning.isEmpty() && runnableContainerRequests.isEmpty() &&
runningContainers.isEmpty()) {
@@ -402,28 +395,22 @@ public final class ApplicationMasterService extends
AbstractYarnTwillService imp
}
// If nothing is in provisioning, and no pending request, move to next
one
- int count = runnableContainerRequests.size();
- LOG.debug("Runnable container requests: {}", count);
- while (provisioning.isEmpty() && currentRequest == null &&
!runnableContainerRequests.isEmpty()) {
- RunnableContainerRequest runnableContainerRequest =
runnableContainerRequests.peek();
- if (!runnableContainerRequest.isReadyToBeProvisioned()) {
- // take it out from queue and put it back at the end for second
chance.
- runnableContainerRequest = runnableContainerRequests.poll();
- runnableContainerRequests.add(runnableContainerRequest);
- LOG.debug("Request not ready: {}", runnableContainerRequest);
-
- // We checked all the requests that were pending when we started
this loop
- // Any remaining requests are not ready to be provisioned
- if (--count <= 0) {
- break;
- }
+ if (provisioning.isEmpty() && currentRequest == null &&
!runnableContainerRequests.isEmpty()) {
+ RunnableContainerRequest containerRequest =
runnableContainerRequests.peek();
+ // If the request at the head of the request queue is not yet ready,
move it to the end of the queue
+ // so that it won't block requests that are already ready
+ if (!containerRequest.isReadyToBeProvisioned()) {
+ LOG.debug("Request not ready: {}", containerRequest);
+ runnableContainerRequests.add(runnableContainerRequests.poll());
continue;
}
- currentRequest = runnableContainerRequest.takeRequest();
+
+ currentRequest = containerRequest.takeRequest();
if (currentRequest == null) {
// All different types of resource request from current order is
done, move to next one
// TODO: Need to handle order type as well
runnableContainerRequests.poll();
+ continue;
}
}
@@ -448,10 +435,6 @@ public final class ApplicationMasterService extends
AbstractYarnTwillService imp
}
nextTimeoutCheck = checkProvisionTimeout(nextTimeoutCheck);
-
- if (isRunning()) {
- TimeUnit.SECONDS.sleep(1);
- }
}
}
@@ -462,28 +445,27 @@ public final class ApplicationMasterService extends
AbstractYarnTwillService imp
amClient.clearBlacklist();
//Check the allocation strategy
- AllocationSpecification currentAllocationSpecification = request.getKey();
- if
(currentAllocationSpecification.getType().equals(AllocationSpecification.Type.ALLOCATE_ONE_INSTANCE_AT_A_TIME))
{
-
- //Check the placement policy
- TwillSpecification.PlacementPolicy placementPolicy =
-
placementPolicyManager.getPlacementPolicy(currentAllocationSpecification.getRunnableName());
- if (placementPolicy != null
- &&
placementPolicy.getType().equals(TwillSpecification.PlacementPolicy.Type.DISTRIBUTED))
{
-
- //Update blacklist with hosts which are running DISTRIBUTED runnables
- for (String runnable :
placementPolicyManager.getFellowRunnables(request.getKey().getRunnableName())) {
- Collection<ContainerInfo> containerStats =
- runningContainers.getContainerInfo(runnable);
- for (ContainerInfo containerInfo : containerStats) {
- // Yarn Resource Manager may include port in the node name
depending on the setting
- // YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME. It is
safe to add both
- // the names (with and without port) to the blacklist.
- LOG.debug("Adding {} to host blacklist",
containerInfo.getHost().getHostName());
- amClient.addToBlacklist(containerInfo.getHost().getHostName());
- amClient.addToBlacklist(containerInfo.getHost().getHostName() +
":" + containerInfo.getPort());
- }
- }
+ AllocationSpecification allocationSpec = request.getKey();
+ if
(!allocationSpec.getType().equals(AllocationSpecification.Type.ALLOCATE_ONE_INSTANCE_AT_A_TIME))
{
+ return;
+ }
+
+ //Check the placement policy
+ String runnableName = allocationSpec.getRunnableName();
+ TwillSpecification.PlacementPolicy placementPolicy =
placementPolicyManager.getPlacementPolicy(runnableName);
+ if (placementPolicy == null || placementPolicy.getType() !=
TwillSpecification.PlacementPolicy.Type.DISTRIBUTED) {
+ return;
+ }
+
+ //Update blacklist with hosts which are running DISTRIBUTED runnables
+ for (String runnable : placementPolicy.getNames()) {
+ for (ContainerInfo containerInfo :
runningContainers.getContainerInfo(runnable)) {
+ // Yarn Resource Manager may include port in the node name depending
on the setting
+ // YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME. It is
safe to add both
+ // the names (with and without port) to the blacklist.
+ LOG.debug("Adding {} to host blacklist",
containerInfo.getHost().getHostName());
+ amClient.addToBlacklist(containerInfo.getHost().getHostName());
+ amClient.addToBlacklist(containerInfo.getHost().getHostName() + ":" +
containerInfo.getPort());
}
}
}
@@ -501,9 +483,7 @@ public final class ApplicationMasterService extends
AbstractYarnTwillService imp
for (Multiset.Entry<String> entry : restartRunnables.entrySet()) {
LOG.info("Re-request container for {} with {} instances.",
entry.getElement(), entry.getCount());
- for (int i = 0; i < entry.getCount(); i++) {
-
runnableContainerRequests.add(createRunnableContainerRequest(entry.getElement()));
- }
+
runnableContainerRequests.add(createRunnableContainerRequest(entry.getElement(),
entry.getCount()));
}
// For all runnables that needs to re-request for containers, update the
expected count timestamp
@@ -584,10 +564,10 @@ public final class ApplicationMasterService extends
AbstractYarnTwillService imp
Queue<RunnableContainerRequest> requests = new ConcurrentLinkedQueue<>();
// For each order in the twillSpec, create container request for
runnables, depending on Placement policy.
for (TwillSpecification.Order order : twillSpec.getOrders()) {
- Set<String> distributedRunnables =
placementPolicyManager.getDistributedRunnables(order.getNames());
- Set<String> defaultRunnables = Sets.newHashSet();
- defaultRunnables.addAll(order.getNames());
- defaultRunnables.removeAll(distributedRunnables);
+ Set<String> distributedRunnables =
Sets.intersection(placementPolicyManager.getDistributedRunnables(),
+ order.getNames());
+ Set<String> defaultRunnables = Sets.difference(order.getNames(),
distributedRunnables);
+
Map<AllocationSpecification, Collection<RuntimeSpecification>>
requestsMap = Maps.newHashMap();
for (String runnableName : distributedRunnables) {
RuntimeSpecification runtimeSpec =
twillSpec.getRunnables().get(runnableName);
@@ -637,19 +617,19 @@ public final class ApplicationMasterService extends
AbstractYarnTwillService imp
//Spawning 1 instance at a time
newContainers = 1;
}
+
+ // TODO: Allow user to set priority?
+ LOG.info("Request {} containers with capability {} for runnable {}",
newContainers, capability, name);
+ YarnAMClient.ContainerRequestBuilder builder =
amClient.addContainerRequest(capability, newContainers);
+ builder.setPriority(0);
+
TwillSpecification.PlacementPolicy placementPolicy =
placementPolicyManager.getPlacementPolicy(name);
- Set<String> hosts = Sets.newHashSet();
- Set<String> racks = Sets.newHashSet();
if (placementPolicy != null) {
- hosts = placementPolicy.getHosts();
- racks = placementPolicy.getRacks();
+ builder.addHosts(placementPolicy.getHosts())
+ .addRacks(placementPolicy.getRacks());
}
- // TODO: Allow user to set priority?
- LOG.info("Request {} containers with capability {} for runnable {}",
newContainers, capability, name);
- String requestId = amClient.addContainerRequest(capability,
newContainers)
- .addHosts(hosts)
- .addRacks(racks)
- .setPriority(0).apply();
+
+ String requestId = builder.apply();
provisioning.add(new ProvisionRequest(runtimeSpec, requestId,
newContainers, allocationType));
}
}
@@ -661,14 +641,14 @@ public final class ApplicationMasterService extends
AbstractYarnTwillService imp
private void launchRunnable(List<? extends
ProcessLauncher<YarnContainerInfo>> launchers,
Queue<ProvisionRequest> provisioning) {
for (ProcessLauncher<YarnContainerInfo> processLauncher : launchers) {
- LOG.info("Got container {}", processLauncher.getContainerInfo().getId());
+ LOG.info("Container allocated: {}",
processLauncher.getContainerInfo().getContainer());
ProvisionRequest provisionRequest = provisioning.peek();
if (provisionRequest == null) {
continue;
}
String runnableName = provisionRequest.getRuntimeSpec().getName();
- LOG.info("Starting runnable {} with {}", runnableName, processLauncher);
+ LOG.info("Starting runnable {} in {}", runnableName,
processLauncher.getContainerInfo().getContainer());
LOG.debug("Log level for Twill runnable {} is {}", runnableName,
twillRuntimeSpec.getLogLevels().get(runnableName).get(Logger.ROOT_LOGGER_NAME));
@@ -713,17 +693,15 @@ public final class ApplicationMasterService extends
AbstractYarnTwillService imp
}
}
- private List<LocalFile> getLocalizeFiles() {
+ private List<LocalFile> getLocalizeFiles() throws IOException {
try (Reader reader =
Files.newBufferedReader(Paths.get(Constants.Files.LOCALIZE_FILES),
StandardCharsets.UTF_8)) {
return new GsonBuilder().registerTypeAdapter(LocalFile.class, new
LocalFileCodec())
.create().fromJson(reader, new TypeToken<List<LocalFile>>() {
}.getType());
- } catch (IOException e) {
- throw Throwables.propagate(e);
}
}
- private Map<String, Map<String, String>> getEnvironments() {
+ private Map<String, Map<String, String>> getEnvironments() throws
IOException {
Path envFile = Paths.get(Constants.Files.RUNTIME_CONFIG_JAR,
Constants.Files.ENVIRONMENTS);
if (!Files.exists(envFile)) {
return new HashMap<>();
@@ -732,8 +710,6 @@ public final class ApplicationMasterService extends
AbstractYarnTwillService imp
try (Reader reader = Files.newBufferedReader(envFile,
StandardCharsets.UTF_8)) {
return new Gson().fromJson(reader, new TypeToken<Map<String, Map<String,
String>>>() {
}.getType());
- } catch (IOException e) {
- throw Throwables.propagate(e);
}
}
@@ -823,10 +799,6 @@ public final class ApplicationMasterService extends
AbstractYarnTwillService imp
};
}
- private RunnableContainerRequest createRunnableContainerRequest(final String
runnableName) {
- return createRunnableContainerRequest(runnableName, 1);
- }
-
private RunnableContainerRequest createRunnableContainerRequest(final String
runnableName,
final int
numberOfInstances) {
return createRunnableContainerRequest(runnableName, numberOfInstances,
true);
@@ -846,8 +818,8 @@ public final class ApplicationMasterService extends
AbstractYarnTwillService imp
RuntimeSpecification runtimeSpec =
twillSpec.getRunnables().get(runnableName);
Resource capability =
createCapability(runtimeSpec.getResourceSpecification());
Map<AllocationSpecification, Collection<RuntimeSpecification>> requestsMap
= Maps.newHashMap();
- if (placementPolicyManager.getPlacementPolicyType(runnableName).equals(
- TwillSpecification.PlacementPolicy.Type.DISTRIBUTED)) {
+
+ if
(placementPolicyManager.getDistributedRunnables().contains(runnableName)) {
for (int instanceId = 0; instanceId < numberOfInstances; instanceId++) {
AllocationSpecification allocationSpecification =
new AllocationSpecification(capability,
AllocationSpecification.Type.ALLOCATE_ONE_INSTANCE_AT_A_TIME,
@@ -938,18 +910,9 @@ public final class ApplicationMasterService extends
AbstractYarnTwillService imp
/**
* Helper method to restart instances of runnables.
*/
- private void restartRunnableInstances(String runnableName, @Nullable
Set<Integer> instanceIds,
+ private void restartRunnableInstances(final String runnableName, @Nullable
final Set<Integer> instanceIds,
final Runnable completion) {
- Runnable restartInstancesRunnable =
createRestartInstancesRunnable(runnableName, instanceIds, completion);
- instanceChangeExecutor.execute(restartInstancesRunnable);
- }
-
- /**
- * Creates a Runnable for execution of restart instances request.
- */
- private Runnable createRestartInstancesRunnable(final String runnableName,
@Nullable final Set<Integer> instanceIds,
- final Runnable completion) {
- return new Runnable() {
+ instanceChangeExecutor.execute(new Runnable() {
@Override
public void run() {
LOG.debug("Begin restart runnable {} instances.", runnableName);
@@ -974,8 +937,11 @@ public final class ApplicationMasterService extends
AbstractYarnTwillService imp
}
}
+ LOG.info("All instances in {} for runnable {} are stopped. Ready to
provision",
+ instancesToRemove, runnableName);
+
// set the container request to be ready
- containerRequest.setReadyToBeProvisioned(true);
+ containerRequest.setReadyToBeProvisioned();
// For all runnables that needs to re-request for containers, update
the expected count timestamp
// so that the EventHandler would be triggered with the right
expiration timestamp.
@@ -983,7 +949,7 @@ public final class ApplicationMasterService extends
AbstractYarnTwillService imp
completion.run();
}
- };
+ });
}
/**
http://git-wip-us.apache.org/repos/asf/twill/blob/f4df32da/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ExpectedContainers.java
----------------------------------------------------------------------
diff --git
a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ExpectedContainers.java
b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ExpectedContainers.java
index f4ebbd0..c0ffdb0 100644
---
a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ExpectedContainers.java
+++
b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ExpectedContainers.java
@@ -19,6 +19,8 @@ package org.apache.twill.internal.appmaster;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
+import org.apache.twill.api.RuntimeSpecification;
+import org.apache.twill.api.TwillSpecification;
import java.util.Map;
@@ -30,13 +32,14 @@ final class ExpectedContainers {
private final Map<String, ExpectedCount> expectedCounts;
- ExpectedContainers(Map<String, Integer> expected) {
- expectedCounts = Maps.newHashMap();
+ ExpectedContainers(TwillSpecification twillSpec) {
+ Map<String, ExpectedCount> expectedCounts = Maps.newHashMap();
long now = System.currentTimeMillis();
-
- for (Map.Entry<String, Integer> entry : expected.entrySet()) {
- expectedCounts.put(entry.getKey(), new ExpectedCount(entry.getValue(),
now));
+ for (RuntimeSpecification runtimeSpec : twillSpec.getRunnables().values())
{
+ expectedCounts.put(runtimeSpec.getName(),
+ new
ExpectedCount(runtimeSpec.getResourceSpecification().getInstances(), now));
}
+ this.expectedCounts = expectedCounts;
}
synchronized void setExpected(String runnable, int expected) {
http://git-wip-us.apache.org/repos/asf/twill/blob/f4df32da/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/PlacementPolicyManager.java
----------------------------------------------------------------------
diff --git
a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/PlacementPolicyManager.java
b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/PlacementPolicyManager.java
index 91a58bb..a4e6af5 100644
---
a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/PlacementPolicyManager.java
+++
b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/PlacementPolicyManager.java
@@ -18,60 +18,42 @@
package org.apache.twill.internal.appmaster;
-import com.google.common.collect.Sets;
import org.apache.twill.api.TwillSpecification;
import java.util.Collections;
+import java.util.EnumMap;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Set;
+import javax.annotation.Nullable;
/**
* This class provides helper functions for operating on a set of Placement
Policies.
*/
-public class PlacementPolicyManager {
- List<TwillSpecification.PlacementPolicy> placementPolicies;
+final class PlacementPolicyManager {
- public PlacementPolicyManager(List<TwillSpecification.PlacementPolicy>
placementPolicies) {
- this.placementPolicies = placementPolicies;
- }
+ private final Map<TwillSpecification.PlacementPolicy.Type, Set<String>>
policyTypeToRunnables;
+ private final Map<String, TwillSpecification.PlacementPolicy>
runnablePolicies;
- /**
- * Given a set of runnables, get all runnables which belong to DISTRIBUTED
placement policies.
- * @param givenRunnables Set of runnables.
- * @return Subset of runnables, which belong to DISTRIBUTED placement
policies.
- */
- public Set<String> getDistributedRunnables(Set<String> givenRunnables) {
- Set<String> distributedRunnables = getAllDistributedRunnables();
- distributedRunnables.retainAll(givenRunnables);
- return distributedRunnables;
- }
+ PlacementPolicyManager(List<TwillSpecification.PlacementPolicy> policies) {
+ this.policyTypeToRunnables = new
EnumMap<>(TwillSpecification.PlacementPolicy.Type.class);
+ this.runnablePolicies = new HashMap<>();
- /**
- * Given a runnable, get the type of placement policy. Returns DEFAULT if no
placement policy is specified.
- * @param runnableName Name of runnable.
- * @return Placement policy type of the runnable.
- */
- public TwillSpecification.PlacementPolicy.Type getPlacementPolicyType(String
runnableName) {
- for (TwillSpecification.PlacementPolicy placementPolicy :
placementPolicies) {
- if (placementPolicy.getNames().contains(runnableName)) {
- return placementPolicy.getType();
+ for (TwillSpecification.PlacementPolicy policy : policies) {
+ policyTypeToRunnables.put(policy.getType(), policy.getNames());
+ for (String runnable : policy.getNames()) {
+ runnablePolicies.put(runnable, policy);
}
}
- return TwillSpecification.PlacementPolicy.Type.DEFAULT;
}
/**
- * Get all runnables which belong to the same Placement policy as the given
runnable.
- * @param runnableName Name of runnable.
- * @return Set of runnables, with same placement policy.
+ * Returns all runnables which belong to DISTRIBUTED placement policies.
*/
- public Set<String> getFellowRunnables(String runnableName) {
- for (TwillSpecification.PlacementPolicy placementPolicy :
placementPolicies) {
- if (placementPolicy.getNames().contains(runnableName)) {
- return placementPolicy.getNames();
- }
- }
- return Collections.emptySet();
+ Set<String> getDistributedRunnables() {
+ Set<String> runnables =
policyTypeToRunnables.get(TwillSpecification.PlacementPolicy.Type.DISTRIBUTED);
+ return runnables == null ? Collections.<String>emptySet() : runnables;
}
/**
@@ -80,25 +62,8 @@ public class PlacementPolicyManager {
* @param runnableName Name of runnable.
* @return Placement policy of the runnable.
*/
- public TwillSpecification.PlacementPolicy getPlacementPolicy(String
runnableName) {
- for (TwillSpecification.PlacementPolicy placementPolicy :
placementPolicies) {
- if (placementPolicy.getNames().contains(runnableName)) {
- return placementPolicy;
- }
- }
- return null;
- }
-
- /**
- * Gets all runnables which belong to DISTRIBUTED placement policies.
- */
- private Set<String> getAllDistributedRunnables() {
- Set<String> distributedRunnables = Sets.newHashSet();
- for (TwillSpecification.PlacementPolicy placementPolicy :
placementPolicies) {
- if
(placementPolicy.getType().equals(TwillSpecification.PlacementPolicy.Type.DISTRIBUTED))
{
- distributedRunnables.addAll(placementPolicy.getNames());
- }
- }
- return distributedRunnables;
+ @Nullable
+ TwillSpecification.PlacementPolicy getPlacementPolicy(String runnableName) {
+ return runnablePolicies.get(runnableName);
}
}
http://git-wip-us.apache.org/repos/asf/twill/blob/f4df32da/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableContainerRequest.java
----------------------------------------------------------------------
diff --git
a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableContainerRequest.java
b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableContainerRequest.java
index e001121..1dcffed 100644
---
a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableContainerRequest.java
+++
b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableContainerRequest.java
@@ -53,12 +53,12 @@ final class RunnableContainerRequest {
return orderType;
}
- public boolean isReadyToBeProvisioned() {
+ boolean isReadyToBeProvisioned() {
return isReadyToBeProvisioned;
}
- public void setReadyToBeProvisioned(boolean isProvisioned) {
- this.isReadyToBeProvisioned = isProvisioned;
+ void setReadyToBeProvisioned() {
+ this.isReadyToBeProvisioned = true;
}
/**
http://git-wip-us.apache.org/repos/asf/twill/blob/f4df32da/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java
----------------------------------------------------------------------
diff --git
a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java
b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java
index 0763f26..a950c46 100644
---
a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java
+++
b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java
@@ -236,7 +236,7 @@ final class RunningContainers {
try {
// Find the controller with particular instance id.
for (Map.Entry<String, TwillContainerController> entry :
containers.row(runnableName).entrySet()) {
- if (getInstanceId(entry.getValue().getRunId()) == instanceId) {
+ if (entry.getValue().getInstanceId() == instanceId) {
containerId = entry.getKey();
controller = entry.getValue();
break;
@@ -449,7 +449,7 @@ final class RunningContainers {
for (Map.Entry<String, TwillContainerController> completedEntry :
lookup.entrySet()) {
TwillContainerController controller = completedEntry.getValue();
- instanceId = getInstanceId(controller.getRunId());
+ instanceId = controller.getInstanceId();
// TODO: Can there be multiple controllers for a single container?
// TODO: What is the best way to determine whether to restart
container when there are multiple controllers?
@@ -466,7 +466,7 @@ final class RunningContainers {
}
// TODO: should we remove the completed instance from instanceId and
resource report even on failures?
// TODO: won't they get added back when the container is re-requested?
- removeInstanceId(runnableName, getInstanceId(controller.getRunId()));
+ removeInstanceId(runnableName, controller.getInstanceId());
resourceReport.removeRunnableResources(runnableName, containerId);
}
@@ -651,11 +651,6 @@ final class RunningContainers {
return RunIds.fromString(baseId.getId() + '-' + instanceId);
}
- private int getInstanceId(RunId runId) {
- String id = runId.getId();
- return Integer.parseInt(id.substring(id.lastIndexOf('-') + 1));
- }
-
/**
* Given the containerId, removes the corresponding containerInfo.
* @param containerId Id for the container to be removed.
http://git-wip-us.apache.org/repos/asf/twill/blob/f4df32da/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnAMClient.java
----------------------------------------------------------------------
diff --git
a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnAMClient.java
b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnAMClient.java
index d28f810..e8f3b2b 100644
---
a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnAMClient.java
+++
b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnAMClient.java
@@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
import java.net.URL;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.UUID;
import javax.annotation.Nullable;
@@ -50,12 +51,11 @@ public abstract class AbstractYarnAMClient<T> extends
AbstractIdleService implem
private static final Logger LOG =
LoggerFactory.getLogger(AbstractYarnAMClient.class);
// Map from a unique ID to inflight requests
- private final Multimap<String, T> containerRequests;
-
- // List of requests pending to send through allocate call
- private final List<T> requests;
+ private final Multimap<String, T> inflightRequests;
+ // Map from a unique ID to pending requests that are not yet submitted to
YARN
+ private final Multimap<String, T> pendingRequests;
// List of requests pending to remove through allocate call
- private final List<T> removes;
+ private final List<T> pendingRemoves;
//List of pending blacklist additions for the next allocate call
private final List<String> blacklistAdditions;
//List of pending blacklist removals for the next allocate call
@@ -81,9 +81,9 @@ public abstract class AbstractYarnAMClient<T> extends
AbstractIdleService implem
Preconditions.checkArgument(masterContainerId != null,
"Missing %s from environment",
containerIdEnvName);
this.containerId = ConverterUtils.toContainerId(masterContainerId);
- this.containerRequests = ArrayListMultimap.create();
- this.requests = Lists.newLinkedList();
- this.removes = Lists.newLinkedList();
+ this.inflightRequests = ArrayListMultimap.create();
+ this.pendingRequests = ArrayListMultimap.create();
+ this.pendingRemoves = Lists.newLinkedList();
this.blacklistAdditions = Lists.newArrayList();
this.blacklistRemovals = Lists.newArrayList();
this.blacklistedResources = Lists.newArrayList();
@@ -109,16 +109,17 @@ public abstract class AbstractYarnAMClient<T> extends
AbstractIdleService implem
// With bug YARN-314, if we mix the allocate call with new container
request of the same priority,
// in some cases the RM would not see the new request (based on sorting of
resource capability),
// but rather only see the one with size = 0.
- if (removes.isEmpty()) {
- for (T request : requests) {
- addContainerRequest(request);
+ if (pendingRemoves.isEmpty()) {
+ for (Map.Entry<String, T> entry : pendingRequests.entries()) {
+ addContainerRequest(entry.getValue());
}
- requests.clear();
+ inflightRequests.putAll(pendingRequests);
+ pendingRequests.clear();
} else {
- for (T request : removes) {
+ for (T request : pendingRemoves) {
removeContainerRequest(request);
}
- removes.clear();
+ pendingRemoves.clear();
}
if (!blacklistAdditions.isEmpty() || !blacklistRemovals.isEmpty()) {
@@ -131,7 +132,12 @@ public abstract class AbstractYarnAMClient<T> extends
AbstractIdleService implem
List<RunnableProcessLauncher> launchers = allocateResponse.getLaunchers();
if (!launchers.isEmpty()) {
- handler.acquired(launchers);
+ // Only call handler acquire if there is actually inflight requests.
+ // This is to workaround the YARN behavior that it can return more
containers being asked,
+ // such that it causes us to launch process in the pending requests with
the wrong container size
+ if (!inflightRequests.isEmpty()) {
+ handler.acquired(launchers);
+ }
// If no process has been launched through the given launcher, return
the container.
for (ProcessLauncher<YarnContainerInfo> l : launchers) {
@@ -140,7 +146,7 @@ public abstract class AbstractYarnAMClient<T> extends
AbstractIdleService implem
if (!launcher.isLaunched()) {
YarnContainerInfo containerInfo = launcher.getContainerInfo();
// Casting is needed in Java 8, otherwise it complains about
ambiguous method over the info(String, Throwable)
- LOG.info("Nothing to run in container, releasing it: {}", (Object)
containerInfo.getContainer());
+ LOG.info("Nothing to run in container, releasing it: {}",
containerInfo.getContainer());
releaseAssignedContainer(containerInfo);
}
}
@@ -153,11 +159,6 @@ public abstract class AbstractYarnAMClient<T> extends
AbstractIdleService implem
}
@Override
- public final ContainerRequestBuilder addContainerRequest(Resource
capability) {
- return addContainerRequest(capability, 1);
- }
-
- @Override
public final ContainerRequestBuilder addContainerRequest(Resource
capability, int count) {
return new ContainerRequestBuilder(adjustCapability(capability), count) {
@Override
@@ -170,8 +171,7 @@ public abstract class AbstractYarnAMClient<T> extends
AbstractIdleService implem
for (int i = 0; i < count; i++) {
T request = createContainerRequest(priority, capability, hosts,
racks, relaxLocality);
- containerRequests.put(id, request);
- requests.add(request);
+ pendingRequests.put(id, request);
}
return id;
@@ -208,8 +208,8 @@ public abstract class AbstractYarnAMClient<T> extends
AbstractIdleService implem
@Override
public final synchronized void completeContainerRequest(String id) {
- for (T request : containerRequests.removeAll(id)) {
- removes.add(request);
+ for (T request : inflightRequests.removeAll(id)) {
+ pendingRemoves.add(request);
}
}
http://git-wip-us.apache.org/repos/asf/twill/blob/f4df32da/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAMClient.java
----------------------------------------------------------------------
diff --git
a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAMClient.java
b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAMClient.java
index a10181e..65856ca 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAMClient.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAMClient.java
@@ -86,10 +86,19 @@ public interface YarnAMClient extends Service {
}
}
+ /**
+ * Returns the container ID of the application.
+ */
ContainerId getContainerId();
+ /**
+ * Returns the hostname of the node manager that the AM is running on
+ */
String getHost();
+ /**
+ * Returns the port of the node manager that the AM is running on
+ */
int getNMPort();
/**
@@ -98,19 +107,10 @@ public interface YarnAMClient extends Service {
void setTracker(InetSocketAddress trackerAddr, URL trackerUrl);
/**
- * Callback for allocate call.
+ * The heartbeat call to RM.
*/
- // TODO: Move AM heartbeat logic into this interface so AM only needs to
handle callback.
- interface AllocateHandler {
- void acquired(List<? extends ProcessLauncher<YarnContainerInfo>>
launchers);
-
- void completed(List<YarnContainerStatus> completed);
- }
-
void allocate(float progress, AllocateHandler handler) throws Exception;
- ContainerRequestBuilder addContainerRequest(Resource capability);
-
ContainerRequestBuilder addContainerRequest(Resource capability, int count);
void addToBlacklist(String resource);
@@ -129,4 +129,24 @@ public interface YarnAMClient extends Service {
* @param id The ID returned by {@link
YarnAMClient.ContainerRequestBuilder#apply()}.
*/
void completeContainerRequest(String id);
+
+ /**
+ * Callback for allocate call.
+ */
+ interface AllocateHandler {
+
+ /**
+ * Invokes when a list of containers has been acquired from YARN
+ *
+ * @param launchers list of launchers for launching runnables
+ */
+ void acquired(List<? extends ProcessLauncher<YarnContainerInfo>>
launchers);
+
+ /**
+ * Invokes when containers completed
+ *
+ * @param completed list of completed container status
+ */
+ void completed(List<YarnContainerStatus> completed);
+ }
}