This is an automated email from the ASF dual-hosted git repository.
rayman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 2e17e08 SAMZA-2266: Introduce a backoff when there are repeated
failures for host-affinity allocations (#1104)
2e17e08 is described below
commit 2e17e08ba2469409243bc1749c608ba31782ff5b
Author: Daniel Nishimura <[email protected]>
AuthorDate: Wed Aug 14 10:44:47 2019 -0700
SAMZA-2266: Introduce a backoff when there are repeated failures for
host-affinity allocations (#1104)
* SAMZA-2266: Introduce a backoff when there are repeated failures for
host-affinity allocations
---
.../versioned/jobs/samza-configurations.md | 1 +
.../clustermanager/AbstractContainerAllocator.java | 60 +++-
.../samza/clustermanager/ContainerAllocator.java | 5 +-
.../clustermanager/ContainerProcessManager.java | 298 ++++++++++-------
.../HostAwareContainerAllocator.java | 18 +-
.../samza/clustermanager/ProcessorFailure.java | 22 +-
.../samza/clustermanager/ResourceRequestState.java | 127 +++++--
.../samza/clustermanager/SamzaResourceRequest.java | 35 +-
.../clustermanager/StandbyContainerManager.java | 14 +-
.../apache/samza/config/ClusterManagerConfig.java | 21 +-
.../clustermanager/TestContainerAllocator.java | 22 +-
.../TestContainerProcessManager.java | 366 ++++++++++++---------
.../clustermanager/TestContainerRequestState.java | 94 +++++-
.../TestHostAwareContainerAllocator.java | 18 +-
14 files changed, 733 insertions(+), 368 deletions(-)
diff --git a/docs/learn/documentation/versioned/jobs/samza-configurations.md
b/docs/learn/documentation/versioned/jobs/samza-configurations.md
index cde7594..2309555 100644
--- a/docs/learn/documentation/versioned/jobs/samza-configurations.md
+++ b/docs/learn/documentation/versioned/jobs/samza-configurations.md
@@ -295,6 +295,7 @@ Samza supports both standalone and clustered
([YARN](yarn-jobs.html)) [deploymen
|--- |--- |--- |
|cluster-manager.container.retry.count|8|If a container fails, it is
automatically restarted by Samza. However, if a container keeps failing shortly
after startup, that indicates a deeper problem, so we should kill the job
rather than retrying indefinitely. This property determines the maximum number
of times we are willing to restart a failed container in quick succession (the
time period is configured with `cluster-manager.container.retry.window.ms`).
Each container in the job is count [...]
|cluster-manager.container.retry.window.ms|300000|This property determines how
frequently a container is allowed to fail before we give up and fail the job.
If the same container has failed more than
`cluster-manager.container.retry.count` times, and the time between failures
was less than this property `cluster-manager.container.retry.window.ms` (in
milliseconds), then we fail the job. There is no limit to the number of times
we will restart a container if the time between failures is g [...]
+|cluster-manager.container.preferred-host.last.retry.delay.ms|360000|The delay
added to the last retry for a failing container after all but one of
cluster-manager.container.retry.count retries have been exhausted. The delay is
only added when `job.host-affinity.enabled` is true and the retried request is
for a preferred host. This addresses the issue where there may be a delay when
a preferred host is marked invalid and the container continuously attempts to
restart and fail on the inva [...]
|cluster-manager.jobcoordinator.jmx.enabled|true|This is deprecated in favor
of `job.jmx.enabled`|
|cluster-manager.allocator.sleep.ms|3600|The container allocator thread is
responsible for matching requests to allocated containers. The sleep interval
for this thread is configured using this property.|
|cluster-manager.container.request.timeout.ms|5000|The allocator thread
periodically checks the state of the container requests and allocated
containers to determine the assignment of a container to an allocated resource.
This property determines the number of milliseconds before a container request
is considered to have expired / timed-out. When a request expires, it gets
allocated to any available container that was returned by the cluster manager.|
diff --git
a/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
b/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
index c5caa21..bba4b52 100644
---
a/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
+++
b/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
@@ -18,6 +18,10 @@
*/
package org.apache.samza.clustermanager;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Map;
+import java.util.Optional;
import org.apache.samza.SamzaException;
import org.apache.samza.config.ClusterManagerConfig;
import org.apache.samza.config.Config;
@@ -28,8 +32,6 @@ import org.apache.samza.util.ReflectionUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Map;
-
/**
* {@link AbstractContainerAllocator} makes requests for physical resources to
the resource manager and also runs
@@ -110,6 +112,10 @@ public abstract class AbstractContainerAllocator
implements Runnable {
while (isRunning) {
try {
assignResourceRequests();
+
+ // Move delayed requests that are ready to the active request queue
+ resourceRequestState.sendPendingDelayedResourceRequests();
+
// Release extra resources and update the entire system's state
resourceRequestState.releaseExtraResources();
@@ -151,7 +157,7 @@ public abstract class AbstractContainerAllocator implements
Runnable {
// Run processor on resource
log.info("Found Container ID: {} for Processor ID: {} on host: {} for
request creation time: {}.",
- resource.getContainerId(), processorId, preferredHost,
request.getRequestTimestampMs());
+ resource.getContainerId(), processorId, preferredHost,
request.getRequestTimestamp());
// Update processor state as "pending" and then issue a request to launch
it. It's important to perform the state-update
// prior to issuing the request. Otherwise, there's a race where the
response callback may arrive sooner and not see
@@ -175,35 +181,65 @@ public abstract class AbstractContainerAllocator
implements Runnable {
public abstract void requestResources(Map<String, String>
processorToHostMapping);
/**
- * Checks if this allocator has a pending resource request.
+ * Checks if this allocator has a pending resource request with a request
timestamp equal to or earlier than the current
+ * timestamp.
* @return {@code true} if there is a pending request, {@code false}
otherwise.
*/
- protected final boolean hasPendingRequest() {
- return peekPendingRequest() != null;
+ protected final boolean hasReadyPendingRequest() {
+ return peekReadyPendingRequest().isPresent();
}
/**
- * Retrieves, but does not remove, the next pending request in the queue.
+ * Retrieves, but does not remove, the next pending request in the queue
with the {@link SamzaResourceRequest#getRequestTimestamp()}
+ * that is greater than the current timestamp.
*
* @return the pending request or {@code null} if there is no pending
request.
*/
- protected final SamzaResourceRequest peekPendingRequest() {
- return resourceRequestState.peekPendingRequest();
+ protected final Optional<SamzaResourceRequest> peekReadyPendingRequest() {
+ SamzaResourceRequest pendingRequest =
resourceRequestState.peekPendingRequest();
+ return Optional.ofNullable(pendingRequest);
}
/**
* Requests a resource from the cluster manager
- *
* @param processorId Samza processor ID that will be run when a resource is
allocated for this request
* @param preferredHost name of the host that you prefer to run the
processor on
*/
public final void requestResource(String processorId, String preferredHost) {
- SamzaResourceRequest request = getResourceRequest(processorId,
preferredHost);
+ requestResourceWithDelay(processorId, preferredHost, Duration.ZERO);
+ }
+
+ /**
+ * Requests a resource from the cluster manager with a request timestamp of
the current time plus the specified delay.
+ * @param processorId Samza processor ID that will be run when a resource is
allocated for this request
+ * @param preferredHost name of the host that you prefer to run the
processor on
+ * @param delay the {@link Duration} to add to the request timestamp
+ */
+ public final void requestResourceWithDelay(String processorId, String
preferredHost, Duration delay) {
+ SamzaResourceRequest request = getResourceRequestWithDelay(processorId,
preferredHost, delay);
issueResourceRequest(request);
}
+ /**
+ * Creates a {@link SamzaResourceRequest} to send to the cluster manager
+ * @param processorId Samza processor ID that will be run when a resource is
allocated for this request
+ * @param preferredHost name of the host that you prefer to run the
processor on
+ * @return the created request
+ */
public final SamzaResourceRequest getResourceRequest(String processorId,
String preferredHost) {
- return new SamzaResourceRequest(this.containerNumCpuCores,
this.containerMemoryMb, preferredHost, processorId);
+ return getResourceRequestWithDelay(processorId, preferredHost,
Duration.ZERO);
+ }
+
+ /**
+ * Creates a {@link SamzaResourceRequest} to send to the cluster manager
with a request timestamp of the current time
+ * plus the specified delay.
+ * @param processorId Samza processor ID that will be run when a resource is
allocated for this request
+ * @param preferredHost name of the host that you prefer to run the
processor on
+ * @param delay the {@link Duration} to add to the request timestamp
+ * @return the created request
+ */
+ public final SamzaResourceRequest getResourceRequestWithDelay(String
processorId, String preferredHost, Duration delay) {
+ return new SamzaResourceRequest(this.containerNumCpuCores,
this.containerMemoryMb, preferredHost, processorId, Instant.now().plus(delay));
}
public final void issueResourceRequest(SamzaResourceRequest request) {
diff --git
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
index 9a1e31e..9078f25 100644
---
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
+++
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
@@ -51,9 +51,8 @@ public class ContainerAllocator extends
AbstractContainerAllocator {
* */
@Override
public void assignResourceRequests() {
- while (hasPendingRequest() &&
hasAllocatedResource(ResourceRequestState.ANY_HOST)) {
- SamzaResourceRequest request = peekPendingRequest();
- runStreamProcessor(request, ResourceRequestState.ANY_HOST);
+ while (hasReadyPendingRequest() &&
hasAllocatedResource(ResourceRequestState.ANY_HOST)) {
+ peekReadyPendingRequest().ifPresent(request ->
runStreamProcessor(request, ResourceRequestState.ANY_HOST));
}
}
diff --git
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
index cba8176..8e12f82 100644
---
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
+++
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
@@ -19,6 +19,7 @@
package org.apache.samza.clustermanager;
import com.google.common.annotations.VisibleForTesting;
+import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.List;
@@ -64,7 +65,7 @@ import static
com.google.common.base.Preconditions.checkNotNull;
*/
public class ContainerProcessManager implements
ClusterResourceManager.Callback {
- private static final Logger log =
LoggerFactory.getLogger(ContainerProcessManager.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(ContainerProcessManager.class);
/**
* Metrics for the {@link ContainerProcessManager}
@@ -72,7 +73,6 @@ public class ContainerProcessManager implements
ClusterResourceManager.Callback
private final static String METRICS_SOURCE_NAME = "ApplicationMaster";
private final static String EXEC_ENV_CONTAINER_ID_SYS_PROPERTY =
"CONTAINER_ID";
-
/**
* Does this Samza Job need hostAffinity when containers are allocated.
*/
@@ -171,19 +171,19 @@ public class ContainerProcessManager implements
ClusterResourceManager.Callback
buildContainerAllocator(this.hostAffinityEnabled,
this.clusterResourceManager, this.clusterManagerConfig,
config, this.standbyContainerManager, state, classLoader);
this.allocatorThread = new Thread(this.containerAllocator, "Container
Allocator Thread");
- log.info("Finished container process manager initialization.");
+ LOG.info("Finished container process manager initialization.");
}
@VisibleForTesting
- ContainerProcessManager(Config config,
+ ContainerProcessManager(ClusterManagerConfig clusterManagerConfig,
SamzaApplicationState state,
MetricsRegistryMap registry,
ClusterResourceManager resourceManager,
Optional<AbstractContainerAllocator> allocator,
ClassLoader classLoader) {
this.state = state;
- this.clusterManagerConfig = new ClusterManagerConfig(config);
- this.jobConfig = new JobConfig(config);
+ this.clusterManagerConfig = clusterManagerConfig;
+ this.jobConfig = new JobConfig(clusterManagerConfig);
this.hostAffinityEnabled = clusterManagerConfig.getHostAffinityEnabled();
@@ -192,10 +192,10 @@ public class ContainerProcessManager implements
ClusterResourceManager.Callback
this.diagnosticsManager = Option.empty();
this.containerAllocator = allocator.orElseGet(
() -> buildContainerAllocator(this.hostAffinityEnabled,
this.clusterResourceManager, this.clusterManagerConfig,
- config, this.standbyContainerManager, state, classLoader));
+ clusterManagerConfig, this.standbyContainerManager, state,
classLoader));
this.allocatorThread = new Thread(this.containerAllocator, "Container
Allocator Thread");
- log.info("Finished container process manager initialization");
+ LOG.info("Finished container process manager initialization");
}
private static AbstractContainerAllocator buildContainerAllocator(boolean
hostAffinityEnabled,
@@ -210,32 +210,39 @@ public class ContainerProcessManager implements
ClusterResourceManager.Callback
}
public boolean shouldShutdown() {
- log.debug("ContainerProcessManager state: Completed containers: {},
Configured containers: {}, Are there too many failed containers: {}, Is
allocator thread alive: {}",
+ LOG.debug("ContainerProcessManager state: Completed containers: {},
Configured containers: {}, Are there too many failed containers: {}, Is
allocator thread alive: {}",
state.completedProcessors.get(), state.processorCount,
jobFailureCriteriaMet ? "yes" : "no", allocatorThread.isAlive() ? "yes" : "no");
if (exceptionOccurred != null) {
- log.error("Exception in container process manager", exceptionOccurred);
+ LOG.error("Exception in container process manager", exceptionOccurred);
throw new SamzaException(exceptionOccurred);
}
return jobFailureCriteriaMet || state.completedProcessors.get() ==
state.processorCount.get() || !allocatorThread.isAlive();
}
public void start() {
- log.info("Starting the container process manager");
+ LOG.info("Starting the container process manager");
+
+ int containerRetryCount = clusterManagerConfig.getContainerRetryCount();
+ if (containerRetryCount > -1) {
+ LOG.info("Max retries on restarting failed containers: {}",
containerRetryCount);
+ } else {
+ LOG.info("Infinite retries on restarting failed containers");
+ }
if (jvmMetrics != null) {
jvmMetrics.start();
}
- if (this.metricsReporters != null) {
- this.metricsReporters.values().forEach(reporter -> reporter.start());
+ if (metricsReporters != null) {
+ metricsReporters.values().forEach(reporter -> reporter.start());
}
- if (this.diagnosticsManager.isDefined()) {
- this.diagnosticsManager.get().start();
+ if (diagnosticsManager.isDefined()) {
+ diagnosticsManager.get().start();
}
- log.info("Starting the cluster resource manager");
+ LOG.info("Starting the cluster resource manager");
clusterResourceManager.start();
state.processorCount.set(state.jobModelManager.jobModel().getContainers().size());
@@ -246,59 +253,59 @@ public class ContainerProcessManager implements
ClusterResourceManager.Callback
containerAllocator.requestResources(processorToHostMapping);
// Start container allocator thread
- log.info("Starting the container allocator thread");
+ LOG.info("Starting the container allocator thread");
allocatorThread.start();
- log.info("Starting the container process manager");
+ LOG.info("Starting the container process manager");
}
public void stop() {
- log.info("Stopping the container process manager");
+ LOG.info("Stopping the container process manager");
// Shutdown allocator thread
containerAllocator.stop();
try {
allocatorThread.join();
- log.info("Stopped container allocator");
+ LOG.info("Stopped container allocator");
} catch (InterruptedException ie) {
- log.error("Allocator thread join threw an interrupted exception", ie);
+ LOG.error("Allocator thread join threw an interrupted exception", ie);
Thread.currentThread().interrupt();
}
- if (this.diagnosticsManager.isDefined()) {
+ if (diagnosticsManager.isDefined()) {
try {
- this.diagnosticsManager.get().stop();
+ diagnosticsManager.get().stop();
} catch (InterruptedException e) {
- log.error("InterruptedException while stopping diagnosticsManager", e);
+ LOG.error("InterruptedException while stopping diagnosticsManager", e);
}
}
try {
- if (this.metricsReporters != null) {
- this.metricsReporters.values().forEach(reporter -> reporter.stop());
+ if (metricsReporters != null) {
+ metricsReporters.values().forEach(reporter -> reporter.stop());
}
- if (this.jvmMetrics != null) {
+ if (jvmMetrics != null) {
jvmMetrics.stop();
}
- log.info("Stopped containerProcessManagerMetrics reporters");
+ LOG.info("Stopped containerProcessManagerMetrics reporters");
} catch (Throwable e) {
- log.error("Exception while stopping containerProcessManagerMetrics", e);
+ LOG.error("Exception while stopping containerProcessManagerMetrics", e);
}
try {
clusterResourceManager.stop(state.status);
- log.info("Stopped the cluster resource manager");
+ LOG.info("Stopped the cluster resource manager");
} catch (Throwable e) {
- log.error("Exception while stopping cluster resource manager", e);
+ LOG.error("Exception while stopping cluster resource manager", e);
}
- log.info("Stopped the container process manager");
+ LOG.info("Stopped the container process manager");
}
public void onResourceAllocated(SamzaResource resource) {
- log.info("Container ID: {} allocated from RM on host: {}",
resource.getContainerId(), resource.getHost());
+ LOG.info("Container ID: {} allocated from RM on host: {}",
resource.getContainerId(), resource.getHost());
containerAllocator.addResource(resource);
}
@@ -313,7 +320,7 @@ public class ContainerProcessManager implements
ClusterResourceManager.Callback
String hostName = null;
for (Map.Entry<String, SamzaResource> entry:
state.runningProcessors.entrySet()) {
if
(entry.getValue().getContainerId().equals(resourceStatus.getContainerId())) {
- log.info("Container ID: {} matched running Processor ID: {} on host:
{}", containerId, entry.getKey(), entry.getValue().getHost());
+ LOG.info("Container ID: {} matched running Processor ID: {} on host:
{}", containerId, entry.getKey(), entry.getValue().getHost());
processorId = entry.getKey();
hostName = entry.getValue().getHost();
@@ -321,7 +328,7 @@ public class ContainerProcessManager implements
ClusterResourceManager.Callback
}
}
if (processorId == null) {
- log.info("No running Processor ID found for Container ID: {} with
Status: {}. Ignoring redundant notification.", containerId,
resourceStatus.toString());
+ LOG.info("No running Processor ID found for Container ID: {} with
Status: {}. Ignoring redundant notification.", containerId,
resourceStatus.toString());
state.redundantNotifications.incrementAndGet();
if (resourceStatus.getExitCode() != SamzaResourceStatus.SUCCESS) {
@@ -336,7 +343,7 @@ public class ContainerProcessManager implements
ClusterResourceManager.Callback
int exitStatus = resourceStatus.getExitCode();
switch (exitStatus) {
case SamzaResourceStatus.SUCCESS:
- log.info("Container ID: {} for Processor ID: {} completed
successfully.", containerId, processorId);
+ LOG.info("Container ID: {} for Processor ID: {} completed
successfully.", containerId, processorId);
state.completedProcessors.incrementAndGet();
@@ -344,7 +351,7 @@ public class ContainerProcessManager implements
ClusterResourceManager.Callback
processorFailures.remove(processorId);
if (state.completedProcessors.get() == state.processorCount.get()) {
- log.info("Setting job status to SUCCEEDED since all containers have
been marked as completed.");
+ LOG.info("Setting job status to SUCCEEDED since all containers have
been marked as completed.");
state.status = SamzaApplicationState.SamzaAppStatus.SUCCEEDED;
}
break;
@@ -352,7 +359,7 @@ public class ContainerProcessManager implements
ClusterResourceManager.Callback
case SamzaResourceStatus.DISK_FAIL:
case SamzaResourceStatus.ABORTED:
case SamzaResourceStatus.PREEMPTED:
- log.info("Container ID: {} for Processor ID: {} was released with an
exit code: {}. This means that " +
+ LOG.info("Container ID: {} for Processor ID: {} was released with an
exit code: {}. This means that " +
"the container was killed by YARN, either due to being
released by the application master " +
"or being 'lost' due to node failures etc. or due to
preemption by the RM." +
"Requesting a new container for the processor.",
@@ -368,86 +375,15 @@ public class ContainerProcessManager implements
ClusterResourceManager.Callback
state.jobHealthy.set(false);
// handle container stop due to node fail
- this.handleContainerStop(processorId, resourceStatus.getContainerId(),
ResourceRequestState.ANY_HOST, exitStatus);
+ handleContainerStop(processorId, resourceStatus.getContainerId(),
ResourceRequestState.ANY_HOST, exitStatus, Duration.ZERO);
break;
default:
- log.info("Container ID: {} for Processor ID: {} failed with exit code:
{}.", containerId, processorId, exitStatus);
-
- state.failedContainers.incrementAndGet();
- state.failedContainersStatus.put(containerId, resourceStatus);
- state.jobHealthy.set(false);
-
- state.neededProcessors.incrementAndGet();
- // Find out previously running container location
- String lastSeenOn =
state.jobModelManager.jobModel().getContainerToHostValue(processorId,
SetContainerHostMapping.HOST_KEY);
- if (!hostAffinityEnabled || lastSeenOn == null) {
- lastSeenOn = ResourceRequestState.ANY_HOST;
- }
- log.info("Container ID: {} for Processor ID: {} was last seen on host
{}.", containerId, processorId, lastSeenOn);
- // A container failed for an unknown reason. Let's check to see if
- // we need to shutdown the whole app master if too many container
- // failures have happened. The rules for failing are that the
- // failure count for a task group id must be > the configured retry
- // count, and the last failure (the one prior to this one) must have
- // happened less than retry window ms ago. If retry count is set to
- // 0, the app master will fail on any container failure. If the
- // retry count is set to a number < 0, a container failure will
- // never trigger an app master failure.
- int retryCount = clusterManagerConfig.getContainerRetryCount();
- int retryWindowMs = clusterManagerConfig.getContainerRetryWindowMs();
-
- if (retryCount == 0) {
- log.error("Processor ID: {} (current Container ID: {}) failed, and
retry count is set to 0, " +
- "so shutting down the application master and marking the job as
failed.", processorId, containerId);
-
- jobFailureCriteriaMet = true;
- } else if (retryCount > 0) {
- int currentFailCount;
- long lastFailureMsDiff;
- if (processorFailures.containsKey(processorId)) {
- ProcessorFailure failure = processorFailures.get(processorId);
- currentFailCount = failure.getCount() + 1;
- lastFailureMsDiff = Instant.now().toEpochMilli() -
failure.getLastFailure();
- } else {
- currentFailCount = 1;
- lastFailureMsDiff = 0;
- }
-
- if (lastFailureMsDiff >= retryWindowMs) {
- log.info("Resetting failure count for Processor ID: {} back to 1,
since last failure " +
- "(for Container ID: {}) was outside the bounds of the retry
window.", processorId, containerId);
-
- // Reset counter back to 1, since the last failure for this
- // container happened outside the window boundary.
- currentFailCount = 1;
- }
-
- // if fail count is (1 initial failure + max retries) then fail job.
- if (currentFailCount > retryCount) {
- log.error("Processor ID: {} (current Container ID: {}) has failed
{} times, with last failure {} ms ago. " +
- "This is greater than retry count of {} and window of {}
ms, " +
- "so shutting down the application master and marking the
job as failed.",
- processorId, containerId, currentFailCount, lastFailureMsDiff,
retryCount, retryWindowMs);
-
- // We have too many failures, and we're within the window
- // boundary, so reset shut down the app master.
- jobFailureCriteriaMet = true;
- state.status = SamzaApplicationState.SamzaAppStatus.FAILED;
- } else {
- log.info("Current failure count for Processor ID: {} is {}.",
processorId, currentFailCount);
- processorFailures.put(processorId, new
ProcessorFailure(currentFailCount, Instant.now().toEpochMilli()));
- }
- }
-
- if (!jobFailureCriteriaMet) {
- handleContainerStop(processorId, resourceStatus.getContainerId(),
lastSeenOn, exitStatus);
- }
-
+ onResourceCompletedWithUnknownStatus(resourceStatus, containerId,
processorId, exitStatus);
}
- if (this.diagnosticsManager.isDefined()) {
- this.diagnosticsManager.get().addProcessorStopEvent(processorId,
resourceStatus.getContainerId(), hostName, exitStatus);
+ if (diagnosticsManager.isDefined()) {
+ diagnosticsManager.get().addProcessorStopEvent(processorId,
resourceStatus.getContainerId(), hostName, exitStatus);
}
}
@@ -472,13 +408,13 @@ public class ContainerProcessManager implements
ClusterResourceManager.Callback
// 1. Obtain the processor ID for the pending container on this resource.
String processorId = getPendingProcessorId(containerId);
- log.info("Successfully started Processor ID: {} on Container ID: {} on
host: {}",
+ LOG.info("Successfully started Processor ID: {} on Container ID: {} on
host: {}",
processorId, containerId, containerHost);
// 2. Remove the container from the pending buffer and add it to the
running buffer. Additionally, update the
// job-health metric.
if (processorId != null) {
- log.info("Moving Processor ID: {} on Container ID: {} on host: {} from
pending to running state.",
+ LOG.info("Moving Processor ID: {} on Container ID: {} on host: {} from
pending to running state.",
processorId, containerId, containerHost);
state.pendingProcessors.remove(processorId);
state.runningProcessors.put(processorId, resource);
@@ -487,7 +423,7 @@ public class ContainerProcessManager implements
ClusterResourceManager.Callback
state.jobHealthy.set(true);
}
} else {
- log.warn("Did not find a pending Processor ID for Container ID: {} on
host: {}. " +
+ LOG.warn("Did not find a pending Processor ID for Container ID: {} on
host: {}. " +
"Ignoring invalid/redundant notification.", containerId,
containerHost);
}
}
@@ -499,23 +435,23 @@ public class ContainerProcessManager implements
ClusterResourceManager.Callback
// 1. Obtain the pending Samza processor ID for this container ID.
String processorId = getPendingProcessorId(containerId);
- log.error("Launch failed for pending Processor ID: {} on Container ID: {}
on host: {} with exception: {}",
+ LOG.error("Launch failed for pending Processor ID: {} on Container ID: {}
on host: {} with exception: {}",
processorId, containerId, containerHost, t);
// 2. Release resources for containers that failed back to YARN
- log.info("Releasing un-startable Container ID: {} for pending Processor
ID: {}", containerId, processorId);
+ LOG.info("Releasing un-startable Container ID: {} for pending Processor
ID: {}", containerId, processorId);
clusterResourceManager.releaseResources(resource);
// 3. Re-request resources on ANY_HOST in case of launch failures on the
preferred host, if standby are not enabled
// otherwise calling standbyContainerManager
if (processorId != null && standbyContainerManager.isPresent()) {
-
this.standbyContainerManager.get().handleContainerLaunchFail(processorId,
containerId, containerAllocator);
+ standbyContainerManager.get().handleContainerLaunchFail(processorId,
containerId, containerAllocator);
} else if (processorId != null) {
- log.info("Falling back to ANY_HOST for Processor ID: {} since launch
failed for Container ID: {} on host: {}",
+ LOG.info("Falling back to ANY_HOST for Processor ID: {} since launch
failed for Container ID: {} on host: {}",
processorId, containerId, containerHost);
containerAllocator.requestResource(processorId,
ResourceRequestState.ANY_HOST);
} else {
- log.warn("Did not find a pending Processor ID for Container ID: {} on
host: {}. " +
+ LOG.warn("Did not find a pending Processor ID for Container ID: {} on
host: {}. " +
"Ignoring invalid/redundant notification.", containerId,
containerHost);
}
}
@@ -526,7 +462,7 @@ public class ContainerProcessManager implements
ClusterResourceManager.Callback
*/
@Override
public void onError(Throwable e) {
- log.error("Exception occurred in callbacks in the Cluster Resource
Manager", e);
+ LOG.error("Exception occurred in callbacks in the Cluster Resource
Manager", e);
exceptionOccurred = e;
}
@@ -541,6 +477,114 @@ public class ContainerProcessManager implements
ClusterResourceManager.Callback
}
/**
+ * Called within {@link #onResourceCompleted(SamzaResourceStatus)} for
unknown exit statuses. These exit statuses
+ * correspond to container completion other than container
run-to-completion, abort or preemption, or disk failure
+ * (e.g., detected by YARN's NM healthchecks).
+ * @param resourceStatus reported resource status.
+ * @param containerId container ID
+ * @param processorId processor ID (aka. logical container ID)
+ * @param exitStatus exit status from the {@link
#onResourceCompleted(SamzaResourceStatus)} callback.
+ */
+ @VisibleForTesting
+ void onResourceCompletedWithUnknownStatus(SamzaResourceStatus
resourceStatus, String containerId, String processorId,
+ int exitStatus) {
+ LOG.info("Container ID: {} for Processor ID: {} failed with exit code:
{}.", containerId, processorId, exitStatus);
+ Instant now = Instant.now();
+ state.failedContainers.incrementAndGet();
+ state.failedContainersStatus.put(containerId, resourceStatus);
+ state.jobHealthy.set(false);
+
+ state.neededProcessors.incrementAndGet();
+ // Find out previously running container location
+ String lastSeenOn =
state.jobModelManager.jobModel().getContainerToHostValue(processorId,
SetContainerHostMapping.HOST_KEY);
+ if (!hostAffinityEnabled || lastSeenOn == null) {
+ lastSeenOn = ResourceRequestState.ANY_HOST;
+ }
+ LOG.info("Container ID: {} for Processor ID: {} was last seen on host
{}.", containerId, processorId, lastSeenOn);
+ // A container failed for an unknown reason. Let's check to see if
+ // we need to shutdown the whole app master if too many container
+ // failures have happened. The rules for failing are that the
+ // failure count for a task group id must be > the configured retry
+ // count, and the last failure (the one prior to this one) must have
+ // happened less than retry window ms ago. If retry count is set to
+ // 0, the app master will fail on any container failure. If the
+ // retry count is set to a number < 0, a container failure will
+ // never trigger an app master failure.
+ int retryCount = clusterManagerConfig.getContainerRetryCount();
+ int retryWindowMs = clusterManagerConfig.getContainerRetryWindowMs();
+ int currentFailCount;
+
+ if (retryCount == 0) {
+ LOG.error("Processor ID: {} (current Container ID: {}) failed, and retry
count is set to 0, " +
+ "so shutting down the application master and marking the job as
failed.", processorId, containerId);
+
+ jobFailureCriteriaMet = true;
+ } else if (retryCount > 0) {
+ long durationSinceLastRetryMs;
+ if (processorFailures.containsKey(processorId)) {
+ ProcessorFailure failure = processorFailures.get(processorId);
+ currentFailCount = failure.getCount() + 1;
+ Duration lastRetryDelay = getRetryDelay(processorId);
+ Instant retryAttemptedAt =
failure.getLastFailure().plus(lastRetryDelay);
+ durationSinceLastRetryMs = now.toEpochMilli() -
retryAttemptedAt.toEpochMilli();
+ if (durationSinceLastRetryMs < 0) {
+ // This should never happen without changes to the system clock or
time travel. Log a warning just in case.
+ LOG.warn("Last failure at: {} with a retry attempted at: {} which is
supposed to be before current time of: {}",
+ failure.getLastFailure(), retryAttemptedAt, now);
+ }
+ } else {
+ currentFailCount = 1;
+ durationSinceLastRetryMs = 0;
+ }
+
+ if (durationSinceLastRetryMs >= retryWindowMs) {
+ LOG.info("Resetting failure count for Processor ID: {} back to 1,
since last failure " +
+ "(for Container ID: {}) was outside the bounds of the retry
window.", processorId, containerId);
+
+ // Reset counter back to 1, since the last failure for this
+ // container happened outside the window boundary.
+ currentFailCount = 1;
+ }
+
+ // if fail count is (1 initial failure + max retries) then fail job.
+ if (currentFailCount > retryCount) {
+ LOG.error("Processor ID: {} (current Container ID: {}) has failed {}
times, with last failure {} ms ago. " +
+ "This is greater than retry count of {} and window of {} ms, "
+
+ "so shutting down the application master and marking the job
as failed.",
+ processorId, containerId, currentFailCount,
durationSinceLastRetryMs, retryCount, retryWindowMs);
+
+ // We have too many failures, and we're within the window
+ // boundary, so reset shut down the app master.
+ jobFailureCriteriaMet = true;
+ state.status = SamzaApplicationState.SamzaAppStatus.FAILED;
+ } else {
+ LOG.info("Current failure count for Processor ID: {} is {}.",
processorId, currentFailCount);
+ Duration retryDelay = Duration.ZERO;
+ if (!ResourceRequestState.ANY_HOST.equals(lastSeenOn) &&
currentFailCount == retryCount) {
+ // Add the preferred host last retry delay on the last retry
+ retryDelay =
Duration.ofMillis(clusterManagerConfig.getContainerPreferredHostLastRetryDelayMs());
+ }
+ processorFailures.put(processorId, new
ProcessorFailure(currentFailCount, now, retryDelay));
+ }
+ }
+
+ if (!jobFailureCriteriaMet) {
+ Duration retryDelay = getRetryDelay(processorId);
+ if (!retryDelay.isZero()) {
+ LOG.info("Adding a delay of: {} seconds on the last container retry
request for preferred host: {}",
+ retryDelay.getSeconds(), lastSeenOn);
+ }
+ handleContainerStop(processorId, resourceStatus.getContainerId(),
lastSeenOn, exitStatus, retryDelay);
+ }
+ }
+
+ private Duration getRetryDelay(String processorId) {
+ return processorFailures.containsKey(processorId)
+ ? processorFailures.get(processorId).getLastRetryDelay()
+ : Duration.ZERO;
+ }
+
+ /**
* Returns an instantiated {@link ResourceManagerFactory} from a {@link
ClusterManagerConfig}. The
* {@link ResourceManagerFactory} is used to return an implementation of a
{@link ClusterResourceManager}
*
@@ -555,7 +599,7 @@ public class ContainerProcessManager implements
ClusterResourceManager.Callback
try {
factory = ReflectionUtil.getObj(classLoader,
containerManagerFactoryClass, ResourceManagerFactory.class);
} catch (Exception e) {
- log.error("Error creating the cluster resource manager.", e);
+ LOG.error("Error creating the cluster resource manager.", e);
throw new SamzaException(e);
}
return factory;
@@ -570,19 +614,19 @@ public class ContainerProcessManager implements
ClusterResourceManager.Callback
private String getPendingProcessorId(String resourceId) {
for (Map.Entry<String, SamzaResource> entry:
state.pendingProcessors.entrySet()) {
if (entry.getValue().getContainerId().equals(resourceId)) {
- log.info("Container ID: {} matched pending Processor ID: {} on host:
{}", resourceId, entry.getKey(), entry.getValue().getHost());
+ LOG.info("Container ID: {} matched pending Processor ID: {} on host:
{}", resourceId, entry.getKey(), entry.getValue().getHost());
return entry.getKey();
}
}
return null;
}
- private void handleContainerStop(String processorId, String resourceID,
String preferredHost, int exitStatus) {
+ private void handleContainerStop(String processorId, String resourceID,
String preferredHost, int exitStatus, Duration preferredHostRetryDelay) {
if (standbyContainerManager.isPresent()) {
- standbyContainerManager.get().handleContainerStop(processorId,
resourceID, preferredHost, exitStatus, containerAllocator);
+ standbyContainerManager.get().handleContainerStop(processorId,
resourceID, preferredHost, exitStatus, containerAllocator,
preferredHostRetryDelay);
} else {
// If StandbyTasks are not enabled, we simply make a request for the
preferredHost
- containerAllocator.requestResource(processorId, preferredHost);
+ containerAllocator.requestResourceWithDelay(processorId, preferredHost,
preferredHostRetryDelay);
}
}
}
diff --git
a/samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java
b/samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java
index 044f29c..177c473 100644
---
a/samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java
+++
b/samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java
@@ -18,9 +18,9 @@
*/
package org.apache.samza.clustermanager;
-
-import java.util.Optional;
+import java.time.Instant;
import java.util.Map;
+import java.util.Optional;
import org.apache.samza.config.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -69,11 +69,13 @@ public class HostAwareContainerAllocator extends
AbstractContainerAllocator {
*/
@Override
public void assignResourceRequests() {
- while (hasPendingRequest()) {
- SamzaResourceRequest request = peekPendingRequest();
+ for (Optional<SamzaResourceRequest> requestOptional =
peekReadyPendingRequest();
+ requestOptional.isPresent();
+ requestOptional = peekReadyPendingRequest()) {
+ SamzaResourceRequest request = requestOptional.get();
String processorId = request.getProcessorId();
String preferredHost = request.getPreferredHost();
- long requestCreationTime = request.getRequestTimestampMs();
+ Instant requestCreationTime = request.getRequestTimestamp();
log.info("Handling assignment request for Processor ID: {} on preferred
host: {}.", processorId, preferredHost);
if (hasAllocatedResource(preferredHost)) {
@@ -144,11 +146,11 @@ public class HostAwareContainerAllocator extends
AbstractContainerAllocator {
* @return true if request has expired
*/
private boolean isRequestExpired(SamzaResourceRequest request) {
- long currTime = System.currentTimeMillis();
- boolean requestExpired = currTime - request.getRequestTimestampMs() >
requestTimeout;
+ long currTime = Instant.now().toEpochMilli();
+ boolean requestExpired = currTime -
request.getRequestTimestamp().toEpochMilli() > requestTimeout;
if (requestExpired) {
log.info("Request for Processor ID: {} on host: {} with creation time:
{} has expired at current time: {} after timeout: {} ms.",
- request.getProcessorId(), request.getPreferredHost(),
request.getRequestTimestampMs(), currTime, requestTimeout);
+ request.getProcessorId(), request.getPreferredHost(),
request.getRequestTimestamp(), currTime, requestTimeout);
}
return requestExpired;
}
diff --git
a/samza-core/src/main/java/org/apache/samza/clustermanager/ProcessorFailure.java
b/samza-core/src/main/java/org/apache/samza/clustermanager/ProcessorFailure.java
index e1b0c93..8ab00f9 100644
---
a/samza-core/src/main/java/org/apache/samza/clustermanager/ProcessorFailure.java
+++
b/samza-core/src/main/java/org/apache/samza/clustermanager/ProcessorFailure.java
@@ -19,11 +19,15 @@
package org.apache.samza.clustermanager;
+import java.time.Duration;
+import java.time.Instant;
+
+
/**
* A ProcessorFailure instance encapsulates information related to a Samza
processor failure.
* It keeps track of the time of the last failure, the number of failures.
*/
-public class ProcessorFailure {
+class ProcessorFailure {
/**
* Number of times a processor has failed
*/
@@ -31,18 +35,28 @@ public class ProcessorFailure {
/**
* Latest failure time of the processor
*/
- private final Long lastFailure;
+ private final Instant lastFailure;
- public ProcessorFailure(int count, Long lastFailure) {
+ /**
+ * The delay added to the retry attempt of the last failure
+ */
+ private final Duration lastRetryDelay;
+
+ public ProcessorFailure(int count, Instant lastFailure, Duration
lastRetryDelay) {
this.count = count;
this.lastFailure = lastFailure;
+ this.lastRetryDelay = lastRetryDelay;
}
public int getCount() {
return count;
}
- public Long getLastFailure() {
+ public Instant getLastFailure() {
return lastFailure;
}
+
+ public Duration getLastRetryDelay() {
+ return lastRetryDelay;
+ }
}
diff --git
a/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java
b/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java
index 8da3337..2e4bcdb 100644
---
a/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java
+++
b/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java
@@ -18,17 +18,19 @@
*/
package org.apache.samza.clustermanager;
-import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
+import com.google.common.annotations.VisibleForTesting;
+import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* {@link ResourceRequestState} maintains the state variables for all the
resource requests and the allocated resources returned
@@ -46,16 +48,27 @@ public class ResourceRequestState {
* Maintain a map of hostname to a list of resources allocated on this host
*/
private final Map<String, List<SamzaResource>> allocatedResources = new
HashMap<>();
+
/**
* Represents the queue of resource requests made by the {@link
ContainerProcessManager}
*/
- private final PriorityQueue<SamzaResourceRequest> requestsQueue = new
PriorityQueue<SamzaResourceRequest>();
+ private final PriorityQueue<SamzaResourceRequest> requestsQueue = new
PriorityQueue<>();
+
+ /**
+ * Represents the queue of delayed resource requests made by the {@link
ContainerProcessManager}
+ * The difference between requestsQueue and delayedRequestsQueue is that,
any request present the requestsQueue has
+ * been sent out to the cluster resource manager, while requests in the
delayedRequestsQueue will be sent to the
+ * cluster resource manager only when their delay reaches 0.
+ */
+ private final DelayedRequestQueue delayedRequestsQueue = new
DelayedRequestQueue();
+
/**
* Maintain a map of hostname to the number of requests made for resources
on this host
* This state variable is used to look-up whether an allocated resource on a
host was ever requested in the past.
* This map is not updated when host-affinity is not enabled
*/
private final Map<String, AtomicInteger> hostRequestCounts = new HashMap<>();
+
/**
* Indicates whether host-affinity is enabled or not
*/
@@ -71,34 +84,20 @@ public class ResourceRequestState {
}
/**
- * Enqueues a {@link SamzaResourceRequest} to be sent to a {@link
ClusterResourceManager}.
+ * Sends a {@link SamzaResourceRequest} to the {@link
ClusterResourceManager} and queues the {@link SamzaResourceRequest}
+ * to be matched with the {@link SamzaResource} when it is provisioned.
*
- * @param request {@link SamzaResourceRequest} to be queued
+ * @param request {@link SamzaResourceRequest} to be sent and queued.
*/
public void addResourceRequest(SamzaResourceRequest request) {
synchronized (lock) {
- requestsQueue.add(request);
- String preferredHost = request.getPreferredHost();
-
- // if host affinity is enabled, update state.
- if (hostAffinityEnabled) {
- //increment # of requests on the host.
- if (hostRequestCounts.containsKey(preferredHost)) {
- hostRequestCounts.get(preferredHost).incrementAndGet();
- } else {
- hostRequestCounts.put(preferredHost, new AtomicInteger(1));
- }
- /**
- * The following is important to correlate allocated resource data
with the requestsQueue made before. If
- * the preferredHost is requested for the first time, the state should
reflect that the allocatedResources
- * list is empty and NOT null.
- */
-
- if (!allocatedResources.containsKey(preferredHost)) {
- allocatedResources.put(preferredHost, new ArrayList<>());
- }
+ if (request.getRequestTimestamp().isAfter(Instant.now())) {
+ delayedRequestsQueue.add(request);
+ return;
}
- manager.requestResources(request);
+
+ // Send request immediately if the request timestamp is not in the
future.
+ sendResourceRequest(request);
}
}
@@ -110,6 +109,7 @@ public class ResourceRequestState {
public void cancelResourceRequest(SamzaResourceRequest request) {
log.info("Canceling resource request for Processor ID: {} on host: {}",
request.getProcessorId(), request.getPreferredHost());
synchronized (lock) {
+ delayedRequestsQueue.remove(request);
requestsQueue.remove(request);
if (hostAffinityEnabled) {
// assignedHost may not always be the preferred host.
@@ -206,6 +206,22 @@ public class ResourceRequestState {
}
/**
+ * Sends the {@link SamzaResourceRequest}s in the delayed requests queue
that have expired.
+ * @return number of delayed requests sent.
+ */
+ public int sendPendingDelayedResourceRequests() {
+ synchronized (lock) {
+ int numMoved = 0;
+ Instant now = Instant.now();
+ while (!delayedRequestsQueue.isEmpty() &&
delayedRequestsQueue.peek().getRequestTimestamp().isBefore(now)) {
+ sendResourceRequest(delayedRequestsQueue.poll());
+ numMoved++;
+ }
+ return numMoved;
+ }
+ }
+
+ /**
* If requestQueue is empty, all extra resources in the buffer should be
released and update the entire system's state
* Needs to be synchronized because it is modifying shared state buffers
* @return the number of resources released.
@@ -275,6 +291,36 @@ public class ResourceRequestState {
}
}
+ /**
+ * Sends the request to the {@link ClusterResourceManager} while queuing the
request to be matched with the returned
+ * {@link SamzaResource}. Caller must call this in a synchronized block.
+ * @param request to be sent.
+ */
+ @VisibleForTesting
+ void sendResourceRequest(SamzaResourceRequest request) {
+ requestsQueue.add(request);
+ String preferredHost = request.getPreferredHost();
+
+ // if host affinity is enabled, update state.
+ if (hostAffinityEnabled) {
+ //increment # of requests on the host.
+ if (hostRequestCounts.containsKey(preferredHost)) {
+ hostRequestCounts.get(preferredHost).incrementAndGet();
+ } else {
+ hostRequestCounts.put(preferredHost, new AtomicInteger(1));
+ }
+ /**
+ * The following is important to correlate allocated resource data with
the requestsQueue made before. If
+ * the preferredHost is requested for the first time, the state should
reflect that the allocatedResources
+ * list is empty and NOT null.
+ */
+
+ if (!allocatedResources.containsKey(preferredHost)) {
+ allocatedResources.put(preferredHost, new ArrayList<>());
+ }
+ }
+ manager.requestResources(request);
+ }
/**
* Releases all allocated resources for the specified host.
@@ -359,7 +405,7 @@ public class ResourceRequestState {
*/
public SamzaResourceRequest peekPendingRequest() {
synchronized (lock) {
- return this.requestsQueue.peek();
+ return requestsQueue.peek();
}
}
@@ -369,10 +415,19 @@ public class ResourceRequestState {
*/
public int numPendingRequests() {
synchronized (lock) {
- return this.requestsQueue.size();
+ return requestsQueue.size();
}
}
+ /**
+ * Returns the number of delayed SamzaResource requests in the queue.
+ * @return the number of delayed requests
+ */
+ public int numDelayedRequests() {
+ synchronized (lock) {
+ return delayedRequestsQueue.size();
+ }
+ }
/**
* Returns the list of resources allocated on a given host. If no resources
were ever allocated on
@@ -393,7 +448,19 @@ public class ResourceRequestState {
}
// Package private, used only in tests.
+ @VisibleForTesting
Map<String, AtomicInteger> getHostRequestCounts() {
return Collections.unmodifiableMap(hostRequestCounts);
}
+
+ @VisibleForTesting
+ DelayedRequestQueue getDelayedRequestsQueue() {
+ return delayedRequestsQueue;
+ }
+
+ static class DelayedRequestQueue extends PriorityQueue<SamzaResourceRequest>
{
+ DelayedRequestQueue() {
+ super(Comparator.comparingLong(request ->
request.getRequestTimestamp().toEpochMilli()));
+ }
+ }
}
\ No newline at end of file
diff --git
a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceRequest.java
b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceRequest.java
index 78a9dcc..5ca8b24 100644
---
a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceRequest.java
+++
b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceRequest.java
@@ -19,11 +19,11 @@
package org.apache.samza.clustermanager;
+import java.time.Instant;
+import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.UUID;
-
/**
* Specification of a Request for resources from a ClusterResourceManager. A
* resource request currently includes cpu cores and memory in MB. A preferred
host
@@ -60,24 +60,28 @@ public class SamzaResourceRequest implements
Comparable<SamzaResourceRequest> {
/**
* The timestamp in millis when the request was created.
*/
- private final long requestTimestampMs;
+ private final Instant requestTimestamp;
public SamzaResourceRequest(int numCores, int memoryMB, String
preferredHost, String processorId) {
+ this(numCores, memoryMB, preferredHost, processorId, Instant.now());
+ }
+
+ public SamzaResourceRequest(int numCores, int memoryMB, String
preferredHost, String processorId, Instant requestTimestamp) {
this.numCores = numCores;
this.memoryMB = memoryMB;
this.preferredHost = preferredHost;
this.requestId = UUID.randomUUID().toString();
this.processorId = processorId;
- this.requestTimestampMs = System.currentTimeMillis();
- log.info("SamzaResourceRequest created for Processor ID: {} on host: {} at
time: {} with Request ID: {}", this.processorId, this.preferredHost,
this.requestTimestampMs, this.requestId);
+ this.requestTimestamp = requestTimestamp;
+ log.info("SamzaResourceRequest created for Processor ID: {} on host: {} at
time: {} with Request ID: {}", this.processorId, this.preferredHost,
this.requestTimestamp, this.requestId);
}
public String getProcessorId() {
return processorId;
}
- public long getRequestTimestampMs() {
- return requestTimestampMs;
+ public Instant getRequestTimestamp() {
+ return requestTimestamp;
}
public String getRequestId() {
@@ -104,29 +108,34 @@ public class SamzaResourceRequest implements
Comparable<SamzaResourceRequest> {
", preferredHost='" + preferredHost + '\'' +
", requestId='" + requestId + '\'' +
", processorId=" + processorId +
- ", requestTimestampMs=" + requestTimestampMs +
+ ", requestTimestampMs=" + requestTimestamp +
'}';
}
/**
* Requests are ordered by the processor type and the time at which they
were created.
- * Active processors take precedence over standby processors, regardless of
timestamp.
+ * Requests with timestamps in the future for retries take less precedence
than timestamps in the past or current.
+ * Otherwise, active processors take precedence over standby processors,
regardless of timestamp.
* @param o the other
*/
@Override
public int compareTo(SamzaResourceRequest o) {
- if (!StandbyTaskUtil.isStandbyContainer(this.processorId) &&
StandbyTaskUtil.isStandbyContainer(o.processorId)) {
+ if (!StandbyTaskUtil.isStandbyContainer(processorId) &&
StandbyTaskUtil.isStandbyContainer(o.processorId)) {
return -1;
}
- if (StandbyTaskUtil.isStandbyContainer(this.processorId) &&
!StandbyTaskUtil.isStandbyContainer(o.processorId)) {
+ if (StandbyTaskUtil.isStandbyContainer(processorId) &&
!StandbyTaskUtil.isStandbyContainer(o.processorId)) {
return 1;
}
- if (this.requestTimestampMs < o.requestTimestampMs)
+ if (requestTimestamp.isBefore(o.requestTimestamp)) {
return -1;
- if (this.requestTimestampMs > o.requestTimestampMs)
+ }
+
+ if (requestTimestamp.isAfter(o.requestTimestamp)) {
return 1;
+ }
+
return 0;
}
}
diff --git
a/samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
b/samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
index 8f6d675..0a114ab 100644
---
a/samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
+++
b/samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
@@ -18,6 +18,7 @@
*/
package org.apache.samza.clustermanager;
+import java.time.Duration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -82,10 +83,10 @@ public class StandbyContainerManager {
* @param containerAllocator the container allocator
*/
public void handleContainerStop(String containerID, String resourceID,
String preferredHost, int exitStatus,
- AbstractContainerAllocator containerAllocator) {
+ AbstractContainerAllocator containerAllocator, Duration
preferredHostRetryDelay) {
if (StandbyTaskUtil.isStandbyContainer(containerID)) {
- handleStandbyContainerStop(containerID, resourceID, preferredHost,
containerAllocator);
+ handleStandbyContainerStop(containerID, resourceID, preferredHost,
containerAllocator, preferredHostRetryDelay);
} else {
// initiate failover for the active container based on the exitStatus
switch (exitStatus) {
@@ -98,7 +99,8 @@ public class StandbyContainerManager {
// if this request expires, we can do a failover -- select a standby
to stop & start the active on standby's host
default:
log.info("Requesting resource for active-container {} on host {}",
containerID, preferredHost);
- SamzaResourceRequest resourceRequest =
containerAllocator.getResourceRequest(containerID, preferredHost);
+ SamzaResourceRequest resourceRequest =
containerAllocator.getResourceRequestWithDelay(containerID, preferredHost,
preferredHostRetryDelay);
+
FailoverMetadata failoverMetadata =
registerActiveContainerFailure(containerID, resourceID);
failoverMetadata.recordResourceRequest(resourceRequest);
containerAllocator.issueResourceRequest(resourceRequest);
@@ -135,7 +137,7 @@ public class StandbyContainerManager {
* @param preferredHost Preferred host of the standby container
*/
private void handleStandbyContainerStop(String standbyContainerID, String
resourceID, String preferredHost,
- AbstractContainerAllocator containerAllocator) {
+ AbstractContainerAllocator containerAllocator, Duration
preferredHostRetryDelay) {
// if this standbyContainerResource was stopped for a failover, we will
find a metadata entry
Optional<StandbyContainerManager.FailoverMetadata> failoverMetadata =
this.checkIfUsedForFailover(resourceID);
@@ -149,7 +151,7 @@ public class StandbyContainerManager {
// request standbycontainer's host for active-container
SamzaResourceRequest resourceRequestForActive =
- containerAllocator.getResourceRequest(activeContainerID,
standbyContainerHostname);
+ containerAllocator.getResourceRequestWithDelay(activeContainerID,
standbyContainerHostname, preferredHostRetryDelay);
// record the resource request, before issuing it to avoid race with
allocation-thread
failoverMetadata.get().recordResourceRequest(resourceRequestForActive);
containerAllocator.issueResourceRequest(resourceRequestForActive);
@@ -159,7 +161,7 @@ public class StandbyContainerManager {
} else {
log.info("Issuing request for standby container {} on host {}, since
this is not for a failover",
standbyContainerID, preferredHost);
- containerAllocator.requestResource(standbyContainerID, preferredHost);
+ containerAllocator.requestResourceWithDelay(standbyContainerID,
preferredHost, preferredHostRetryDelay);
}
}
diff --git
a/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java
b/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java
index eda1be8..18d3954 100644
--- a/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java
@@ -20,6 +20,7 @@
package org.apache.samza.config;
+import java.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -87,7 +88,17 @@ public class ClusterManagerConfig extends MapConfig {
*/
public static final String CONTAINER_RETRY_COUNT =
"yarn.container.retry.count";
public static final String CLUSTER_MANAGER_CONTAINER_RETRY_COUNT =
"cluster-manager.container.retry.count";
- private static final int DEFAULT_CONTAINER_RETRY_COUNT = 8;
+ public static final int DEFAULT_CONTAINER_RETRY_COUNT = 8;
+
+ /**
+ * Maximum delay in milliseconds for the last container retry
+ */
+ public static final String
CLUSTER_MANAGER_CONTAINER_PREFERRED_HOST_LAST_RETRY_DELAY_MS =
+ "cluster-manager.container.preferred-host.last.retry.delay.ms";
+ private static final long
CLUSTER_MANAGER_CONTAINER_PREFERRED_HOST_RETRY_DELAY_CLOCK_SKEW_DELTA =
+ Duration.ofSeconds(1).toMillis();
+ private static final long
DEFAULT_CLUSTER_MANAGER_CONTAINER_PREFERRED_HOST_LAST_RETRY_DELAY_MS =
+ Duration.ofMinutes(6).toMillis() +
CLUSTER_MANAGER_CONTAINER_PREFERRED_HOST_RETRY_DELAY_CLOCK_SKEW_DELTA;
/**
* The cluster managed job coordinator sleeps for a configurable time before
checking again for termination.
@@ -181,6 +192,14 @@ public class ClusterManagerConfig extends MapConfig {
}
}
+ public long getContainerPreferredHostLastRetryDelayMs() {
+ if
(containsKey(CLUSTER_MANAGER_CONTAINER_PREFERRED_HOST_LAST_RETRY_DELAY_MS)) {
+ return
getLong(CLUSTER_MANAGER_CONTAINER_PREFERRED_HOST_LAST_RETRY_DELAY_MS) +
CLUSTER_MANAGER_CONTAINER_PREFERRED_HOST_RETRY_DELAY_CLOCK_SKEW_DELTA;
+ } else {
+ return
DEFAULT_CLUSTER_MANAGER_CONTAINER_PREFERRED_HOST_LAST_RETRY_DELAY_MS;
+ }
+ }
+
public int getContainerRetryWindowMs() {
if (containsKey(CLUSTER_MANAGER_RETRY_WINDOW_MS)) {
return getInt(CLUSTER_MANAGER_RETRY_WINDOW_MS);
diff --git
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java
index 39a767d..0f90f92 100644
---
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java
+++
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java
@@ -19,6 +19,9 @@
package org.apache.samza.clustermanager;
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
import org.apache.samza.coordinator.JobModelManager;
@@ -30,10 +33,6 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import java.lang.reflect.Field;
-import java.util.HashMap;
-import java.util.Map;
-
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
@@ -157,15 +156,16 @@ public class TestContainerAllocator {
assertNotNull(requestState);
- assertEquals(requestState.numPendingRequests(), 4);
+ assertEquals(4, requestState.numPendingRequests());
+ assertEquals(0, requestState.numDelayedRequests());
// If host-affinty is not enabled, it doesn't update the requestMap
assertNotNull(requestState.getHostRequestCounts());
- assertEquals(requestState.getHostRequestCounts().keySet().size(), 0);
+ assertEquals(0, requestState.getHostRequestCounts().keySet().size());
assertNotNull(state);
- assertEquals(state.anyHostRequests.get(), 4);
- assertEquals(state.preferredHostRequests.get(), 0);
+ assertEquals(4, state.anyHostRequests.get());
+ assertEquals(0, state.preferredHostRequests.get());
}
/**
@@ -184,11 +184,12 @@ public class TestContainerAllocator {
assertNotNull(requestState);
- assertTrue(requestState.numPendingRequests() == 4);
+ assertEquals(4, requestState.numPendingRequests());
+ assertEquals(0, requestState.numDelayedRequests());
// If host-affinty is not enabled, it doesn't update the requestMap
assertNotNull(requestState.getHostRequestCounts());
- assertTrue(requestState.getHostRequestCounts().keySet().size() == 0);
+ assertEquals(0, requestState.getHostRequestCounts().keySet().size());
}
/**
@@ -213,6 +214,7 @@ public class TestContainerAllocator {
// Test that state is cleaned up
assertEquals(0, requestState.numPendingRequests());
+ assertEquals(0, requestState.numDelayedRequests());
assertEquals(0, requestState.getHostRequestCounts().size());
assertNull(requestState.getResourcesOnAHost("abc"));
assertNull(requestState.getResourcesOnAHost("def"));
diff --git
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
index 99b6bdd..997adb1 100644
---
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
+++
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
@@ -21,6 +21,7 @@ package org.apache.samza.clustermanager;
import com.google.common.collect.ImmutableMap;
import java.lang.reflect.Field;
+import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
@@ -49,7 +50,15 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class TestContainerProcessManager {
@@ -80,15 +89,13 @@ public class TestContainerProcessManager {
}
private Config getConfigWithHostAffinity() {
- Map<String, String> map = new HashMap<>();
- map.putAll(config);
- map.put("job.host-affinity.enabled", "true");
- return new MapConfig(map);
+ return getConfigWithHostAffinityAndRetries(true, 1);
}
- private Config getConfigWithRetries(int maxRetries) {
+ private Config getConfigWithHostAffinityAndRetries(boolean withHostAffinity,
int maxRetries) {
Map<String, String> map = new HashMap<>();
map.putAll(config);
+ map.put("job.host-affinity.enabled", String.valueOf(withHostAffinity));
map.put(ClusterManagerConfig.CLUSTER_MANAGER_CONTAINER_RETRY_COUNT,
String.valueOf(maxRetries));
return new MapConfig(map);
}
@@ -117,7 +124,7 @@ public class TestContainerProcessManager {
server = new MockHttpServer("/", 7777, null, new
ServletHolder(DefaultServlet.class));
}
- private Field getPrivateFieldFromTaskManager(String fieldName,
ContainerProcessManager object) throws Exception {
+ private Field getPrivateFieldFromCpm(String fieldName,
ContainerProcessManager object) throws Exception {
Field field = object.getClass().getDeclaredField(fieldName);
field.setAccessible(true);
return field;
@@ -135,11 +142,11 @@ public class TestContainerProcessManager {
MockClusterResourceManagerCallback callback = new
MockClusterResourceManagerCallback();
MockClusterResourceManager clusterResourceManager = new
MockClusterResourceManager(callback, state);
- ContainerProcessManager taskManager =
- buildContainerProcessManager(new MapConfig(conf), state,
clusterResourceManager, Optional.empty());
+ ContainerProcessManager cpm =
+ buildContainerProcessManager(new ClusterManagerConfig(new
MapConfig(conf)), state, clusterResourceManager, Optional.empty());
AbstractContainerAllocator allocator =
- (AbstractContainerAllocator)
getPrivateFieldFromTaskManager("containerAllocator",
taskManager).get(taskManager);
+ (AbstractContainerAllocator)
getPrivateFieldFromCpm("containerAllocator", cpm).get(cpm);
assertEquals(ContainerAllocator.class, allocator.getClass());
// Asserts that samza exposed container configs is honored by allocator
thread
assertEquals(500, allocator.containerMemoryMb);
@@ -153,8 +160,8 @@ public class TestContainerProcessManager {
state = new
SamzaApplicationState(getJobModelManagerWithHostAffinity(ImmutableMap.of("0",
"host1")));
callback = new MockClusterResourceManagerCallback();
clusterResourceManager = new MockClusterResourceManager(callback, state);
- taskManager = new ContainerProcessManager(
- new MapConfig(conf),
+ cpm = new ContainerProcessManager(
+ new ClusterManagerConfig(new MapConfig(conf)),
state,
new MetricsRegistryMap(),
clusterResourceManager,
@@ -163,7 +170,7 @@ public class TestContainerProcessManager {
);
allocator =
- (AbstractContainerAllocator)
getPrivateFieldFromTaskManager("containerAllocator",
taskManager).get(taskManager);
+ (AbstractContainerAllocator)
getPrivateFieldFromCpm("containerAllocator", cpm).get(cpm);
assertEquals(HostAwareContainerAllocator.class, allocator.getClass());
// Asserts that samza exposed container configs is honored by allocator
thread
assertEquals(500, allocator.containerMemoryMb);
@@ -176,24 +183,26 @@ public class TestContainerProcessManager {
SamzaApplicationState state = new
SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
MockClusterResourceManagerCallback callback = new
MockClusterResourceManagerCallback();
ClusterResourceManager clusterResourceManager = new
MockClusterResourceManager(callback, state);
- ContainerProcessManager taskManager =
- buildContainerProcessManager(new MapConfig(conf), state,
clusterResourceManager, Optional.empty());
+ ClusterManagerConfig clusterManagerConfig = spy(new
ClusterManagerConfig(conf));
+
+ ContainerProcessManager cpm =
+ buildContainerProcessManager(clusterManagerConfig, state,
clusterResourceManager, Optional.empty());
MockContainerAllocator allocator = new MockContainerAllocator(
clusterResourceManager,
conf,
state);
- getPrivateFieldFromTaskManager("containerAllocator",
taskManager).set(taskManager, allocator);
+ getPrivateFieldFromCpm("containerAllocator", cpm).set(cpm, allocator);
CountDownLatch latch = new CountDownLatch(1);
- getPrivateFieldFromTaskManager("allocatorThread",
taskManager).set(taskManager, new Thread() {
+ getPrivateFieldFromCpm("allocatorThread", cpm).set(cpm, new Thread() {
public void run() {
isRunning = true;
latch.countDown();
}
});
- taskManager.start();
+ cpm.start();
if (!latch.await(2, TimeUnit.SECONDS)) {
Assert.fail("timed out waiting for the latch to expire");
@@ -206,7 +215,7 @@ public class TestContainerProcessManager {
assertEquals(1, state.neededProcessors.get());
assertEquals(1, allocator.requestedContainers);
- taskManager.stop();
+ cpm.stop();
}
@Test
@@ -215,66 +224,65 @@ public class TestContainerProcessManager {
SamzaApplicationState state = new
SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
MockClusterResourceManagerCallback callback = new
MockClusterResourceManagerCallback();
MockClusterResourceManager clusterResourceManager = new
MockClusterResourceManager(callback, state);
+ ClusterManagerConfig clusterManagerConfig = spy(new
ClusterManagerConfig(conf));
- ContainerProcessManager taskManager =
- buildContainerProcessManager(new MapConfig(conf), state,
clusterResourceManager, Optional.empty());
- taskManager.start();
+ ContainerProcessManager cpm =
+ buildContainerProcessManager(clusterManagerConfig, state,
clusterResourceManager, Optional.empty());
+ cpm.start();
- Thread allocatorThread = (Thread)
getPrivateFieldFromTaskManager("allocatorThread", taskManager).get(taskManager);
+ Thread allocatorThread = (Thread)
getPrivateFieldFromCpm("allocatorThread", cpm).get(cpm);
assertTrue(allocatorThread.isAlive());
- taskManager.stop();
+ cpm.stop();
assertFalse(allocatorThread.isAlive());
}
/**
- * Test Task Manager should stop when all containers finish
+ * Test Container Process Manager should stop when all containers finish
*/
@Test
- public void testTaskManagerShouldStopWhenContainersFinish() throws Exception
{
+ public void testCpmShouldStopWhenContainersFinish() throws Exception {
Config conf = getConfig();
SamzaApplicationState state = new
SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
MockClusterResourceManagerCallback callback = new
MockClusterResourceManagerCallback();
MockClusterResourceManager clusterResourceManager = new
MockClusterResourceManager(callback, state);
-
- ContainerProcessManager taskManager =
- buildContainerProcessManager(new MapConfig(conf), state,
clusterResourceManager, Optional.empty());
+ ClusterManagerConfig clusterManagerConfig = spy(new
ClusterManagerConfig(conf));
MockContainerAllocator allocator = new MockContainerAllocator(
clusterResourceManager,
conf,
state);
- getPrivateFieldFromTaskManager("containerAllocator",
taskManager).set(taskManager, allocator);
-
- Thread thread = new Thread(allocator);
- getPrivateFieldFromTaskManager("allocatorThread",
taskManager).set(taskManager, thread);
+ ContainerProcessManager cpm =
+ spy(buildContainerProcessManager(clusterManagerConfig, state,
clusterResourceManager, Optional.of(allocator)));
// start triggers a request
- taskManager.start();
+ cpm.start();
- assertFalse(taskManager.shouldShutdown());
+ assertFalse(cpm.shouldShutdown());
assertEquals(1, allocator.getContainerRequestState().numPendingRequests());
+ assertEquals(0, allocator.getContainerRequestState().numDelayedRequests());
SamzaResource container = new SamzaResource(1, 1024, "host1", "id0");
- taskManager.onResourceAllocated(container);
+ cpm.onResourceAllocated(container);
// Allow container to run and update state
if (!allocator.awaitContainersStart(1, 2, TimeUnit.SECONDS)) {
fail("timed out waiting for the containers to start");
}
- taskManager.onStreamProcessorLaunchSuccess(container);
- assertFalse(taskManager.shouldShutdown());
+ cpm.onStreamProcessorLaunchSuccess(container);
+ assertFalse(cpm.shouldShutdown());
- taskManager.onResourceCompleted(new SamzaResourceStatus("id0",
"diagnostics", SamzaResourceStatus.SUCCESS));
- assertTrue(taskManager.shouldShutdown());
+ cpm.onResourceCompleted(new SamzaResourceStatus("id0", "diagnostics",
SamzaResourceStatus.SUCCESS));
+ verify(cpm,
never()).onResourceCompletedWithUnknownStatus(any(SamzaResourceStatus.class),
anyString(), anyString(), anyInt());
+ assertTrue(cpm.shouldShutdown());
}
/**
- * Test Task Manager should request a new container when a task fails with
unknown exit code
+ * Test Container Process Manager should request a new container when a task
fails with unknown exit code
* When host-affinity is not enabled, it will always request for ANY_HOST
*/
@Test
@@ -283,81 +291,97 @@ public class TestContainerProcessManager {
SamzaApplicationState state = new
SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
MockClusterResourceManagerCallback callback = new
MockClusterResourceManagerCallback();
MockClusterResourceManager clusterResourceManager = new
MockClusterResourceManager(callback, state);
-
- ContainerProcessManager taskManager =
- buildContainerProcessManager(new MapConfig(conf), state,
clusterResourceManager, Optional.empty());
+ ClusterManagerConfig clusterManagerConfig = spy(new
ClusterManagerConfig(conf));
MockContainerAllocator allocator = new MockContainerAllocator(
clusterResourceManager,
conf,
state);
- getPrivateFieldFromTaskManager("containerAllocator",
taskManager).set(taskManager, allocator);
-
- Thread thread = new Thread(allocator);
- getPrivateFieldFromTaskManager("allocatorThread",
taskManager).set(taskManager, thread);
+ ContainerProcessManager cpm = spy(
+ buildContainerProcessManager(clusterManagerConfig, state,
clusterResourceManager, Optional.of(allocator)));
// start triggers a request
- taskManager.start();
+ cpm.start();
- assertFalse(taskManager.shouldShutdown());
+ verify(clusterManagerConfig,
never()).getContainerPreferredHostLastRetryDelayMs();
+ verify(cpm, never()).onResourceCompletedWithUnknownStatus(any(),
anyString(), anyString(), anyInt());
+ assertFalse(cpm.shouldShutdown());
assertEquals(1, allocator.getContainerRequestState().numPendingRequests());
SamzaResource container = new SamzaResource(1, 1024, "host1", "id0");
- taskManager.onResourceAllocated(container);
+ cpm.onResourceAllocated(container);
// Allow container to run and update state
if (!allocator.awaitContainersStart(1, 2, TimeUnit.SECONDS)) {
fail("timed out waiting for the containers to start");
}
- taskManager.onStreamProcessorLaunchSuccess(container);
+ cpm.onStreamProcessorLaunchSuccess(container);
// Create first container failure
- taskManager.onResourceCompleted(new
SamzaResourceStatus(container.getContainerId(), "diagnostics", 1));
+ SamzaResourceStatus samzaResourceStatus = new
SamzaResourceStatus(container.getContainerId(), "diagnostics", 1);
+ cpm.onResourceCompleted(samzaResourceStatus);
+
// The above failure should trigger a container request
+ verify(cpm).onResourceCompletedWithUnknownStatus(eq(samzaResourceStatus),
eq(container.getContainerId()), eq("0"), eq(1));
+ verify(clusterManagerConfig,
never()).getContainerPreferredHostLastRetryDelayMs();
assertEquals(1, allocator.getContainerRequestState().numPendingRequests());
assertEquals(ResourceRequestState.ANY_HOST,
allocator.getContainerRequestState().peekPendingRequest().getPreferredHost());
- assertFalse(taskManager.shouldShutdown());
+ assertFalse(cpm.shouldShutdown());
assertFalse(state.jobHealthy.get());
assertEquals(2, clusterResourceManager.resourceRequests.size());
assertEquals(0, clusterResourceManager.releasedResources.size());
- taskManager.onResourceAllocated(container);
+ cpm.onResourceAllocated(container);
// Allow container to run and update state
if (!allocator.awaitContainersStart(1, 2, TimeUnit.SECONDS)) {
fail("timed out waiting for the containers to start");
}
- taskManager.onStreamProcessorLaunchSuccess(container);
+ cpm.onStreamProcessorLaunchSuccess(container);
assertTrue(state.jobHealthy.get());
// Create a second failure
- taskManager.onResourceCompleted(new
SamzaResourceStatus(container.getContainerId(), "diagnostics", 1));
-
+ cpm.onResourceCompleted(samzaResourceStatus);
// The above failure should trigger a job shutdown because our retry count
is set to 1
+ verify(cpm,
times(2)).onResourceCompletedWithUnknownStatus(eq(samzaResourceStatus),
eq(container.getContainerId()), eq("0"), eq(1));
+ verify(clusterManagerConfig,
never()).getContainerPreferredHostLastRetryDelayMs();
assertEquals(0, allocator.getContainerRequestState().numPendingRequests());
assertEquals(2, clusterResourceManager.resourceRequests.size());
assertEquals(0, clusterResourceManager.releasedResources.size());
assertFalse(state.jobHealthy.get());
- assertTrue(taskManager.shouldShutdown());
+ assertTrue(cpm.shouldShutdown());
assertEquals(SamzaApplicationState.SamzaAppStatus.FAILED, state.status);
- taskManager.stop();
+ cpm.stop();
+ }
+
+ /**
+ * Test scenario where a container fails multiple times but failures are
more than retryWindow apart without host affinity
+ * @throws Exception
+ */
+ @Test
+ public void
testContainerRequestedRetriesExceedingWindowOnFailureWithUnknownCodeWithNoHostAffinity()
throws Exception {
+
testContainerRequestedRetriesExceedingWindowOnFailureWithUnknownCode(false);
}
/**
- * Test scenario where a container fails multiple times but failures are
more than retryWindow apart.
+ * Test scenario where a container fails multiple times but failures are
more than retryWindow apart with host affinity
* @throws Exception
*/
@Test
- public void
testContainerRequestedRetriesExceedingWindowOnFailureWithUnknownCode() throws
Exception {
+ public void
testContainerRequestedRetriesExceedingWindowOnFailureWithUnknownCodeWithHostAffinity()
throws Exception {
+ testContainerRequestedRetriesExceedingWindowOnFailureWithUnknownCode(true);
+ }
+
+ private void
testContainerRequestedRetriesExceedingWindowOnFailureWithUnknownCode(boolean
withHostAffinity) throws Exception {
int maxRetries = 3;
String processorId = "0";
- ClusterManagerConfig clusterManagerConfig = new
ClusterManagerConfig(getConfigWithRetries(maxRetries));
+ ClusterManagerConfig clusterManagerConfig = new
ClusterManagerConfig(getConfigWithHostAffinityAndRetries(withHostAffinity,
maxRetries));
SamzaApplicationState state = new
SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
MockClusterResourceManagerCallback callback = new
MockClusterResourceManagerCallback();
MockClusterResourceManager clusterResourceManager = new
MockClusterResourceManager(callback, state);
@@ -368,7 +392,7 @@ public class TestContainerProcessManager {
state);
ContainerProcessManager cpm =
- buildContainerProcessManager(new MapConfig(clusterManagerConfig),
state, clusterResourceManager, Optional.of(allocator));
+ buildContainerProcessManager(clusterManagerConfig, state,
clusterResourceManager, Optional.of(allocator));
// start triggers a request
cpm.start();
@@ -387,7 +411,8 @@ public class TestContainerProcessManager {
// Mock 2nd failure exceeding retry window.
int longWindow = clusterManagerConfig.getContainerRetryWindowMs() + 10;
- cpm.getProcessorFailures().put(processorId, new ProcessorFailure(1,
Instant.now().minusMillis(longWindow).toEpochMilli()));
+ cpm.getProcessorFailures().put(processorId, new ProcessorFailure(1,
Instant.now().minusMillis(longWindow), Duration.ZERO));
+ assertEquals(1, cpm.getProcessorFailures().get(processorId).getCount());
cpm.onResourceCompleted(new
SamzaResourceStatus(container.getContainerId(), "diagnostics", 1));
assertEquals(false, cpm.getJobFailureCriteriaMet());
assertEquals(1, cpm.getProcessorFailures().get(processorId).getCount());
@@ -401,7 +426,7 @@ public class TestContainerProcessManager {
cpm.onStreamProcessorLaunchSuccess(container);
// Mock 3rd failure exceeding retry window.
- cpm.getProcessorFailures().put(processorId, new ProcessorFailure(2,
Instant.now().minusMillis(longWindow).toEpochMilli()));
+ cpm.getProcessorFailures().put(processorId, new ProcessorFailure(2,
Instant.now().minusMillis(longWindow), Duration.ZERO));
cpm.onResourceCompleted(new
SamzaResourceStatus(container.getContainerId(), "diagnostics", 1));
assertEquals(false, cpm.getJobFailureCriteriaMet());
assertEquals(1, cpm.getProcessorFailures().get(processorId).getCount());
@@ -415,7 +440,7 @@ public class TestContainerProcessManager {
cpm.onStreamProcessorLaunchSuccess(container);
// Mock 4th failure exceeding retry window.
- cpm.getProcessorFailures().put(processorId, new ProcessorFailure(3,
Instant.now().minusMillis(longWindow).toEpochMilli()));
+ cpm.getProcessorFailures().put(processorId, new ProcessorFailure(3,
Instant.now().minusMillis(longWindow), Duration.ZERO));
cpm.onResourceCompleted(new
SamzaResourceStatus(container.getContainerId(), "diagnostics", 1));
assertEquals(false, cpm.getJobFailureCriteriaMet());
assertEquals(1, cpm.getProcessorFailures().get(processorId).getCount());
@@ -424,10 +449,19 @@ public class TestContainerProcessManager {
}
@Test
- public void
testContainerRequestedRetriesNotExceedingWindowOnFailureWithUnknownCode()
throws Exception {
+ public void
testContainerRequestedRetriesNotExceedingWindowOnFailureWithUnknownCodeWithNoHostAffinity()
throws Exception {
+
testContainerRequestedRetriesNotExceedingWindowOnFailureWithUnknownCode(false);
+ }
+
+ @Test
+ public void
testContainerRequestedRetriesNotExceedingWindowOnFailureWithUnknownCodeWithHostAffinity()
throws Exception {
+
testContainerRequestedRetriesNotExceedingWindowOnFailureWithUnknownCode(true);
+ }
+
+ private void
testContainerRequestedRetriesNotExceedingWindowOnFailureWithUnknownCode(boolean
withHostAffinity) throws Exception {
int maxRetries = 3;
String processorId = "0";
- ClusterManagerConfig clusterManagerConfig = new
ClusterManagerConfig(getConfigWithRetries(maxRetries));
+ ClusterManagerConfig clusterManagerConfig = new
ClusterManagerConfig(getConfigWithHostAffinityAndRetries(withHostAffinity,
maxRetries));
SamzaApplicationState state = new
SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
MockClusterResourceManagerCallback callback = new
MockClusterResourceManagerCallback();
MockClusterResourceManager clusterResourceManager = new
MockClusterResourceManager(callback, state);
@@ -438,13 +472,14 @@ public class TestContainerProcessManager {
state);
ContainerProcessManager cpm =
- buildContainerProcessManager(new MapConfig(clusterManagerConfig),
state, clusterResourceManager, Optional.of(allocator));
+ buildContainerProcessManager(clusterManagerConfig, state,
clusterResourceManager, Optional.of(allocator));
// start triggers a request
cpm.start();
assertFalse(cpm.shouldShutdown());
assertEquals(1, allocator.getContainerRequestState().numPendingRequests());
+ assertEquals(0, allocator.getContainerRequestState().numDelayedRequests());
SamzaResource container = new SamzaResource(1, 1024, "host1", "id0");
cpm.onResourceAllocated(container);
@@ -456,10 +491,13 @@ public class TestContainerProcessManager {
cpm.onStreamProcessorLaunchSuccess(container);
// Mock 2nd failure not exceeding retry window.
- cpm.getProcessorFailures().put(processorId, new ProcessorFailure(1,
Instant.now().toEpochMilli()));
+ cpm.getProcessorFailures().put(processorId, new ProcessorFailure(1,
Instant.now(), Duration.ZERO));
cpm.onResourceCompleted(new
SamzaResourceStatus(container.getContainerId(), "diagnostics", 1));
assertEquals(false, cpm.getJobFailureCriteriaMet());
assertEquals(2, cpm.getProcessorFailures().get(processorId).getCount());
+ assertFalse(cpm.shouldShutdown());
+ assertEquals(1, allocator.getContainerRequestState().numPendingRequests());
+ assertEquals(0, allocator.getContainerRequestState().numDelayedRequests());
cpm.onResourceAllocated(container);
@@ -470,24 +508,53 @@ public class TestContainerProcessManager {
cpm.onStreamProcessorLaunchSuccess(container);
// Mock 3rd failure not exceeding retry window.
- cpm.getProcessorFailures().put(processorId, new ProcessorFailure(2,
Instant.now().toEpochMilli()));
+ cpm.getProcessorFailures().put(processorId, new ProcessorFailure(2,
Instant.now(), Duration.ZERO));
cpm.onResourceCompleted(new
SamzaResourceStatus(container.getContainerId(), "diagnostics", 1));
assertEquals(false, cpm.getJobFailureCriteriaMet());
assertEquals(3, cpm.getProcessorFailures().get(processorId).getCount());
+ assertFalse(cpm.shouldShutdown());
+
+ if (withHostAffinity) {
+ assertEquals(0,
allocator.getContainerRequestState().numPendingRequests());
+ assertEquals(1,
allocator.getContainerRequestState().numDelayedRequests());
+ } else {
+ assertEquals(1,
allocator.getContainerRequestState().numPendingRequests());
+ assertEquals(0,
allocator.getContainerRequestState().numDelayedRequests());
+ }
cpm.onResourceAllocated(container);
- // Allow container to run and update state
+ if (withHostAffinity) {
+ if (allocator.awaitContainersStart(1, 2, TimeUnit.SECONDS)) {
+ // No delayed retry requests for there host affinity is disabled. Call
back should return immediately.
+ fail("Expecting a delayed request so allocator callback should have
timed out waiting for a response.");
+ }
+
+ // For the sake of testing the mocked 4th failure below, send delayed
requests now.
+ SamzaResourceRequest request =
allocator.getContainerRequestState().getDelayedRequestsQueue().poll();
+ SamzaResourceRequest fastForwardRequest =
+ new SamzaResourceRequest(request.getNumCores(),
request.getMemoryMB(), request.getPreferredHost(), request.getProcessorId(),
Instant.now().minusSeconds(1));
+
allocator.getContainerRequestState().getDelayedRequestsQueue().add(fastForwardRequest);
+ int numSent =
allocator.getContainerRequestState().sendPendingDelayedResourceRequests();
+ assertEquals(1, numSent);
+ cpm.onResourceAllocated(container);
+ }
+
if (!allocator.awaitContainersStart(1, 2, TimeUnit.SECONDS)) {
- fail("timed out waiting for the containers to start");
+ // No delayed retry requests for there host affinity is disabled. Call
back should return immediately.
+ fail("Timed out waiting for the containers to start");
}
+
cpm.onStreamProcessorLaunchSuccess(container);
// Mock 4th failure not exceeding retry window.
- cpm.getProcessorFailures().put(processorId, new ProcessorFailure(3,
Instant.now().toEpochMilli()));
+ cpm.getProcessorFailures().put(processorId, new ProcessorFailure(3,
Instant.now(), Duration.ZERO));
cpm.onResourceCompleted(new
SamzaResourceStatus(container.getContainerId(), "diagnostics", 1));
assertEquals(true, cpm.getJobFailureCriteriaMet()); // expecting failed
container
assertEquals(3, cpm.getProcessorFailures().get(processorId).getCount());
// count won't update on failure
+ assertTrue(cpm.shouldShutdown());
+ assertEquals(0, allocator.getContainerRequestState().numPendingRequests());
+ assertEquals(0, allocator.getContainerRequestState().numDelayedRequests());
cpm.stop();
}
@@ -496,29 +563,24 @@ public class TestContainerProcessManager {
public void testInvalidNotificationsAreIgnored() throws Exception {
Config conf = getConfig();
- Map<String, String> config = new HashMap<>();
- config.putAll(getConfig());
SamzaApplicationState state = new
SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
MockClusterResourceManagerCallback callback = new
MockClusterResourceManagerCallback();
MockClusterResourceManager clusterResourceManager = new
MockClusterResourceManager(callback, state);
-
- ContainerProcessManager taskManager =
- buildContainerProcessManager(new MapConfig(conf), state,
clusterResourceManager, Optional.empty());
+ ClusterManagerConfig clusterManagerConfig = spy(new
ClusterManagerConfig(conf));
MockContainerAllocator allocator = new MockContainerAllocator(
clusterResourceManager,
conf,
state);
- getPrivateFieldFromTaskManager("containerAllocator",
taskManager).set(taskManager, allocator);
- Thread thread = new Thread(allocator);
- getPrivateFieldFromTaskManager("allocatorThread",
taskManager).set(taskManager, thread);
+ ContainerProcessManager cpm =
+ spy(buildContainerProcessManager(clusterManagerConfig, state,
clusterResourceManager, Optional.of(allocator)));
// Start the task clusterResourceManager
- taskManager.start();
+ cpm.start();
SamzaResource container = new SamzaResource(1, 1000, "host1", "id1");
- taskManager.onResourceAllocated(container);
+ cpm.onResourceAllocated(container);
// Allow container to run and update state
if (!allocator.awaitContainersStart(1, 2, TimeUnit.SECONDS)) {
@@ -526,11 +588,12 @@ public class TestContainerProcessManager {
}
// Create container failure - with ContainerExitStatus.DISKS_FAILED
- taskManager.onResourceCompleted(new
SamzaResourceStatus("invalidContainerID", "Disk failure",
SamzaResourceStatus.DISK_FAIL));
+ cpm.onResourceCompleted(new SamzaResourceStatus("invalidContainerID",
"Disk failure", SamzaResourceStatus.DISK_FAIL));
+ verify(cpm,
never()).onResourceCompletedWithUnknownStatus(any(SamzaResourceStatus.class),
anyString(), anyString(), anyInt());
// The above failure should not trigger any container requests, since it
is for an invalid container ID
assertEquals(0, allocator.getContainerRequestState().numPendingRequests());
- assertFalse(taskManager.shouldShutdown());
+ assertFalse(cpm.shouldShutdown());
assertTrue(state.jobHealthy.get());
assertEquals(state.redundantNotifications.get(), 1);
}
@@ -549,7 +612,7 @@ public class TestContainerProcessManager {
state);
ContainerProcessManager manager =
- new ContainerProcessManager(config, state, new MetricsRegistryMap(),
clusterResourceManager,
+ new ContainerProcessManager(new ClusterManagerConfig(config), state,
new MetricsRegistryMap(), clusterResourceManager,
Optional.of(allocator), getClass().getClassLoader());
manager.start();
@@ -566,6 +629,7 @@ public class TestContainerProcessManager {
Map<String, String> config = new HashMap<>();
config.putAll(getConfigWithHostAffinity());
config.put("job.container.count", "2");
+ config.put("cluster-manager.container.retry.count", "2");
Config cfg = new MapConfig(config);
// 1. Request two containers on hosts - host1 and host2
SamzaApplicationState state = new
SamzaApplicationState(getJobModelManagerWithHostAffinity(ImmutableMap.of("0",
"host1",
@@ -573,41 +637,38 @@ public class TestContainerProcessManager {
MockClusterResourceManagerCallback callback = new
MockClusterResourceManagerCallback();
MockClusterResourceManager clusterResourceManager = new
MockClusterResourceManager(callback, state);
- ContainerProcessManager taskManager =
- buildContainerProcessManager(cfg, state, clusterResourceManager,
Optional.empty());
-
MockHostAwareContainerAllocator allocator = new
MockHostAwareContainerAllocator(
clusterResourceManager,
cfg,
state);
- getPrivateFieldFromTaskManager("containerAllocator",
taskManager).set(taskManager, allocator);
- Thread thread = new Thread(allocator);
- getPrivateFieldFromTaskManager("allocatorThread",
taskManager).set(taskManager, thread);
+ ContainerProcessManager cpm =
+ spy(buildContainerProcessManager(new ClusterManagerConfig(cfg), state,
clusterResourceManager, Optional.of(allocator)));
- taskManager.start();
- assertFalse(taskManager.shouldShutdown());
+ cpm.start();
+ assertFalse(cpm.shouldShutdown());
// 2. When the task manager starts, there should have been a pending
request on host1 and host2
assertEquals(2, allocator.getContainerRequestState().numPendingRequests());
// 3. Allocate an extra resource on host1 and no resource on host2 yet.
SamzaResource resource1 = new SamzaResource(1, 1000, "host1", "id1");
SamzaResource resource2 = new SamzaResource(1, 1000, "host1", "id2");
- taskManager.onResourceAllocated(resource1);
- taskManager.onResourceAllocated(resource2);
+ cpm.onResourceAllocated(resource1);
+ cpm.onResourceAllocated(resource2);
// 4. Wait for the container to start on host1 and immediately fail
if (!allocator.awaitContainersStart(1, 2, TimeUnit.SECONDS)) {
fail("timed out waiting for the containers to start");
}
- taskManager.onStreamProcessorLaunchSuccess(resource1);
+ cpm.onStreamProcessorLaunchSuccess(resource1);
assertEquals("host2",
allocator.getContainerRequestState().peekPendingRequest().getPreferredHost());
assertEquals(1, allocator.getContainerRequestState().numPendingRequests());
- taskManager.onResourceCompleted(new
SamzaResourceStatus(resource1.getContainerId(), "App Error", 1));
+ cpm.onResourceCompleted(new
SamzaResourceStatus(resource1.getContainerId(), "App Error", 1));
+
verify(cpm).onResourceCompletedWithUnknownStatus(any(SamzaResourceStatus.class),
anyString(), anyString(), anyInt());
assertEquals(2, allocator.getContainerRequestState().numPendingRequests());
- assertFalse(taskManager.shouldShutdown());
+ assertFalse(cpm.shouldShutdown());
assertFalse(state.jobHealthy.get());
assertEquals(3, clusterResourceManager.resourceRequests.size());
assertEquals(0, clusterResourceManager.releasedResources.size());
@@ -615,13 +676,13 @@ public class TestContainerProcessManager {
// 5. Do not allocate any further resource on host1, and verify that the
re-run of the container on host1 uses the
// previously allocated extra resource
SamzaResource resource3 = new SamzaResource(1, 1000, "host2", "id3");
- taskManager.onResourceAllocated(resource3);
+ cpm.onResourceAllocated(resource3);
if (!allocator.awaitContainersStart(2, 2, TimeUnit.SECONDS)) {
fail("timed out waiting for the containers to start");
}
- taskManager.onStreamProcessorLaunchSuccess(resource2);
- taskManager.onStreamProcessorLaunchSuccess(resource3);
+ cpm.onStreamProcessorLaunchSuccess(resource2);
+ cpm.onStreamProcessorLaunchSuccess(resource3);
assertTrue(state.jobHealthy.get());
}
@@ -635,58 +696,57 @@ public class TestContainerProcessManager {
SamzaApplicationState state = new
SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
MockClusterResourceManagerCallback callback = new
MockClusterResourceManagerCallback();
MockClusterResourceManager clusterResourceManager = new
MockClusterResourceManager(callback, state);
-
- ContainerProcessManager taskManager =
- buildContainerProcessManager(new MapConfig(conf), state,
clusterResourceManager, Optional.empty());
+ ClusterManagerConfig clusterManagerConfig = spy(new
ClusterManagerConfig(new MapConfig(conf)));
MockContainerAllocator allocator = new MockContainerAllocator(
clusterResourceManager,
conf,
state);
- getPrivateFieldFromTaskManager("containerAllocator",
taskManager).set(taskManager, allocator);
- Thread thread = new Thread(allocator);
- getPrivateFieldFromTaskManager("allocatorThread",
taskManager).set(taskManager, thread);
+ ContainerProcessManager cpm =
+ spy(buildContainerProcessManager(clusterManagerConfig, state,
clusterResourceManager, Optional.of(allocator)));
// Start the task manager
- taskManager.start();
- assertFalse(taskManager.shouldShutdown());
+ cpm.start();
+ assertFalse(cpm.shouldShutdown());
assertEquals(1, allocator.getContainerRequestState().numPendingRequests());
SamzaResource container1 = new SamzaResource(1, 1000, "host1", "id1");
- taskManager.onResourceAllocated(container1);
+ cpm.onResourceAllocated(container1);
// Allow container to run and update state
if (!allocator.awaitContainersStart(1, 2, TimeUnit.SECONDS)) {
fail("timed out waiting for the containers to start");
}
- taskManager.onStreamProcessorLaunchSuccess(container1);
+ cpm.onStreamProcessorLaunchSuccess(container1);
assertEquals(0, allocator.getContainerRequestState().numPendingRequests());
// Create container failure - with ContainerExitStatus.DISKS_FAILED
- taskManager.onResourceCompleted(new
SamzaResourceStatus(container1.getContainerId(), "Disk failure",
SamzaResourceStatus.DISK_FAIL));
+ cpm.onResourceCompleted(new
SamzaResourceStatus(container1.getContainerId(), "Disk failure",
SamzaResourceStatus.DISK_FAIL));
+ verify(cpm,
never()).onResourceCompletedWithUnknownStatus(any(SamzaResourceStatus.class),
anyString(), anyString(), anyInt());
// The above failure should trigger a container request
assertEquals(1, allocator.getContainerRequestState().numPendingRequests());
- assertFalse(taskManager.shouldShutdown());
+ assertFalse(cpm.shouldShutdown());
assertFalse(state.jobHealthy.get());
assertEquals(2, clusterResourceManager.resourceRequests.size());
assertEquals(0, clusterResourceManager.releasedResources.size());
assertEquals(ResourceRequestState.ANY_HOST,
allocator.getContainerRequestState().peekPendingRequest().getPreferredHost());
SamzaResource container2 = new SamzaResource(1, 1000, "host1", "id2");
- taskManager.onResourceAllocated(container2);
+ cpm.onResourceAllocated(container2);
// Allow container to run and update state
if (!allocator.awaitContainersStart(1, 2, TimeUnit.SECONDS)) {
fail("timed out waiting for the containers to start");
}
- taskManager.onStreamProcessorLaunchSuccess(container2);
+ cpm.onStreamProcessorLaunchSuccess(container2);
assertTrue(state.jobHealthy.get());
// Simulate a duplicate notification for container 1 with a different exit
code
- taskManager.onResourceCompleted(new
SamzaResourceStatus(container1.getContainerId(), "Disk failure",
SamzaResourceStatus.PREEMPTED));
+ cpm.onResourceCompleted(new
SamzaResourceStatus(container1.getContainerId(), "Disk failure",
SamzaResourceStatus.PREEMPTED));
+ verify(cpm,
never()).onResourceCompletedWithUnknownStatus(any(SamzaResourceStatus.class),
anyString(), anyString(), anyInt());
// assert that a duplicate notification does not change metrics (including
job health)
assertEquals(state.redundantNotifications.get(), 1);
assertEquals(2, clusterResourceManager.resourceRequests.size());
@@ -707,83 +767,88 @@ public class TestContainerProcessManager {
SamzaApplicationState state = new
SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
MockClusterResourceManagerCallback callback = new
MockClusterResourceManagerCallback();
MockClusterResourceManager clusterResourceManager = new
MockClusterResourceManager(callback, state);
-
- ContainerProcessManager taskManager =
- buildContainerProcessManager(new MapConfig(conf), state,
clusterResourceManager, Optional.empty());
+ ClusterManagerConfig clusterManagerConfig = spy(new
ClusterManagerConfig(new MapConfig(config)));
MockContainerAllocator allocator = new MockContainerAllocator(
clusterResourceManager,
conf,
state);
- getPrivateFieldFromTaskManager("containerAllocator",
taskManager).set(taskManager, allocator);
- Thread thread = new Thread(allocator);
- getPrivateFieldFromTaskManager("allocatorThread",
taskManager).set(taskManager, thread);
+ ContainerProcessManager cpm =
+ spy(buildContainerProcessManager(clusterManagerConfig, state,
clusterResourceManager, Optional.of(allocator)));
// Start the task clusterResourceManager
- taskManager.start();
- assertFalse(taskManager.shouldShutdown());
+ cpm.start();
+ assertFalse(cpm.shouldShutdown());
assertEquals(1, allocator.getContainerRequestState().numPendingRequests());
SamzaResource container1 = new SamzaResource(1, 1000, "host1", "id1");
- taskManager.onResourceAllocated(container1);
+ cpm.onResourceAllocated(container1);
// Allow container to run and update state
if (!allocator.awaitContainersStart(1, 2, TimeUnit.SECONDS)) {
fail("timed out waiting for the containers to start");
}
assertEquals(0, allocator.getContainerRequestState().numPendingRequests());
- taskManager.onStreamProcessorLaunchSuccess(container1);
+ cpm.onStreamProcessorLaunchSuccess(container1);
// Create container failure - with ContainerExitStatus.DISKS_FAILED
- taskManager.onResourceCompleted(new
SamzaResourceStatus(container1.getContainerId(), "App error", 1));
+ SamzaResourceStatus resourceStatusOnAppError = new
SamzaResourceStatus(container1.getContainerId(), "App error", 1);
+ cpm.onResourceCompleted(resourceStatusOnAppError);
+
verify(cpm).onResourceCompletedWithUnknownStatus(eq(resourceStatusOnAppError),
anyString(), anyString(), anyInt());
// The above failure should trigger a container request
assertEquals(1, allocator.getContainerRequestState().numPendingRequests());
- assertFalse(taskManager.shouldShutdown());
+ assertFalse(cpm.shouldShutdown());
assertFalse(state.jobHealthy.get());
assertEquals(2, clusterResourceManager.resourceRequests.size());
assertEquals(0, clusterResourceManager.releasedResources.size());
assertEquals(ResourceRequestState.ANY_HOST,
allocator.getContainerRequestState().peekPendingRequest().getPreferredHost());
SamzaResource container2 = new SamzaResource(1, 1000, "host1", "id2");
- taskManager.onResourceAllocated(container2);
+ cpm.onResourceAllocated(container2);
// Allow container to run and update state
if (!allocator.awaitContainersStart(1, 2, TimeUnit.SECONDS)) {
fail("timed out waiting for the containers to start");
}
- taskManager.onStreamProcessorLaunchSuccess(container2);
+ cpm.onStreamProcessorLaunchSuccess(container2);
// Create container failure - with ContainerExitStatus.PREEMPTED
- taskManager.onResourceCompleted(new
SamzaResourceStatus(container2.getContainerId(), "Preemption",
SamzaResourceStatus.PREEMPTED));
+ SamzaResourceStatus resourceStatusOnPreemption =
+ new SamzaResourceStatus(container2.getContainerId(), "Preemption",
SamzaResourceStatus.PREEMPTED);
+ cpm.onResourceCompleted(resourceStatusOnPreemption);
+ verify(cpm,
never()).onResourceCompletedWithUnknownStatus(eq(resourceStatusOnPreemption),
anyString(), anyString(), anyInt());
assertEquals(3, clusterResourceManager.resourceRequests.size());
// The above failure should trigger a container request
assertEquals(1, allocator.getContainerRequestState().numPendingRequests());
- assertFalse(taskManager.shouldShutdown());
+ assertFalse(cpm.shouldShutdown());
assertFalse(state.jobHealthy.get());
assertEquals(ResourceRequestState.ANY_HOST,
allocator.getContainerRequestState().peekPendingRequest().getPreferredHost());
SamzaResource container3 = new SamzaResource(1, 1000, "host1", "id3");
- taskManager.onResourceAllocated(container3);
+ cpm.onResourceAllocated(container3);
// Allow container to run and update state
if (!allocator.awaitContainersStart(1, 2, TimeUnit.SECONDS)) {
fail("timed out waiting for the containers to start");
}
- taskManager.onStreamProcessorLaunchSuccess(container3);
+ cpm.onStreamProcessorLaunchSuccess(container3);
// Create container failure - with ContainerExitStatus.ABORTED
- taskManager.onResourceCompleted(new
SamzaResourceStatus(container3.getContainerId(), "Aborted",
SamzaResourceStatus.ABORTED));
+ SamzaResourceStatus resourceStatusOnAborted =
+ new SamzaResourceStatus(container3.getContainerId(), "Aborted",
SamzaResourceStatus.ABORTED);
+ cpm.onResourceCompleted(resourceStatusOnAborted);
+ verify(cpm,
never()).onResourceCompletedWithUnknownStatus(eq(resourceStatusOnAborted),
anyString(), anyString(), anyInt());
// The above failure should trigger a container request
assertEquals(1, allocator.getContainerRequestState().numPendingRequests());
assertEquals(4, clusterResourceManager.resourceRequests.size());
assertEquals(0, clusterResourceManager.releasedResources.size());
- assertFalse(taskManager.shouldShutdown());
+ assertFalse(cpm.shouldShutdown());
assertFalse(state.jobHealthy.get());
assertEquals(ResourceRequestState.ANY_HOST,
allocator.getContainerRequestState().peekPendingRequest().getPreferredHost());
- taskManager.stop();
+ cpm.stop();
}
@Test
@@ -793,20 +858,21 @@ public class TestContainerProcessManager {
MockClusterResourceManagerCallback callback = new
MockClusterResourceManagerCallback();
MockClusterResourceManager clusterResourceManager = new
MockClusterResourceManager(callback, state);
- ContainerProcessManager taskManager =
- buildContainerProcessManager(new MapConfig(conf), state,
clusterResourceManager, Optional.empty());
- taskManager.start();
+
+ ContainerProcessManager cpm =
+ buildContainerProcessManager(new ClusterManagerConfig(conf), state,
clusterResourceManager, Optional.empty());
+ cpm.start();
SamzaResource container2 = new SamzaResource(1, 1024, "", "id0");
- assertFalse(taskManager.shouldShutdown());
- taskManager.onResourceAllocated(container2);
+ assertFalse(cpm.shouldShutdown());
+ cpm.onResourceAllocated(container2);
configVals.put(JobConfig.SAMZA_FWK_PATH, "/export/content/whatever");
Config config1 = new MapConfig(configVals);
- ContainerProcessManager taskManager1 =
- buildContainerProcessManager(new MapConfig(config), state,
clusterResourceManager, Optional.empty());
- taskManager1.start();
- taskManager1.onResourceAllocated(container2);
+ ContainerProcessManager cpm1 =
+ buildContainerProcessManager(new ClusterManagerConfig(config), state,
clusterResourceManager, Optional.empty());
+ cpm1.start();
+ cpm1.onResourceAllocated(container2);
}
@After
@@ -814,9 +880,9 @@ public class TestContainerProcessManager {
server.stop();
}
- private ContainerProcessManager buildContainerProcessManager(Config config,
SamzaApplicationState state,
+ private ContainerProcessManager
buildContainerProcessManager(ClusterManagerConfig clusterManagerConfig,
SamzaApplicationState state,
ClusterResourceManager clusterResourceManager,
Optional<AbstractContainerAllocator> allocator) {
- return new ContainerProcessManager(config, state, new
MetricsRegistryMap(), clusterResourceManager, allocator,
+ return new ContainerProcessManager(clusterManagerConfig, state, new
MetricsRegistryMap(), clusterResourceManager, allocator,
getClass().getClassLoader());
}
}
diff --git
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerRequestState.java
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerRequestState.java
index caae4a7..1877547 100644
---
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerRequestState.java
+++
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerRequestState.java
@@ -19,12 +19,26 @@
package org.apache.samza.clustermanager;
+import com.google.common.collect.ImmutableList;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.Random;
+import org.apache.commons.lang3.RandomStringUtils;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+
public class TestContainerRequestState {
@@ -89,7 +103,7 @@ public class TestContainerRequestState {
assertEquals(resource, state.getResourcesOnAHost(ANY_HOST).get(0));
// Container Allocated when there is no request in queue
- ResourceRequestState state1 = new ResourceRequestState(true, manager);
+ ResourceRequestState state1 = spy(new ResourceRequestState(true, manager));
SamzaResource container1 = new SamzaResource(1, 1024, "zzz", "id2");
state1.addResource(container1);
@@ -103,7 +117,17 @@ public class TestContainerRequestState {
// Container Allocated on a Requested Host
state1.addResourceRequest(new SamzaResourceRequest(1, 1024, "abc", "0"));
+ // Delayed request
+ state1.addResourceRequest(new SamzaResourceRequest(1, 1024, "def", "1",
+ Instant.now().plus(Duration.ofHours(1))));
+ state1.addResourceRequest(new SamzaResourceRequest(1, 1024, "ghi", "2",
+ Instant.now().plus(Duration.ofHours(2))));
+
assertEquals(1, state1.numPendingRequests());
+ assertEquals(2, state1.numDelayedRequests());
+
+ // Verify request sent only once for the non-delayed request
+ verify(state1).sendResourceRequest(any(SamzaResourceRequest.class));
assertNotNull(state1.getHostRequestCounts());
assertNotNull(state1.getHostRequestCounts().get("abc"));
@@ -215,6 +239,74 @@ public class TestContainerRequestState {
state.releaseResource("id1");
assertEquals(0, state.getResourcesOnAHost("abc").size());
assertEquals(0, state.getResourcesOnAHost(ANY_HOST).size());
+ }
+
+ @Test
+ public void testPriorityQueueOrdering() {
+ PriorityQueue<SamzaResourceRequest> pq = new PriorityQueue<>();
+ Instant now = Instant.now();
+
+ ImmutableList<SamzaResourceRequest> expectedOrder = ImmutableList.of(
+ createRequestForActive(now.minusSeconds(120)),
+ createRequestForActive(now),
+ createRequestForActive(now.plusSeconds(120)),
+ createRequestForActive(now.plusSeconds(240)),
+ createRequestForStandby(now.minusSeconds(120)),
+ createRequestForStandby(now),
+ createRequestForStandby(now.plusSeconds(120)),
+ createRequestForStandby(now.plusSeconds(240)));
+
+ SamzaResourceRequest[] copyExpectedOrder = new
SamzaResourceRequest[expectedOrder.size()];
+ copyExpectedOrder = expectedOrder.toArray(copyExpectedOrder);
+ List<SamzaResourceRequest> shuffled = Arrays.asList(copyExpectedOrder);
+
+ Collections.shuffle(shuffled, new Random(Instant.now().toEpochMilli()));
+ pq.addAll(shuffled);
+
+ ArrayList priorityQueueOrder = new ArrayList();
+ for (int i = 0; i < expectedOrder.size(); ++i) {
+ priorityQueueOrder.add(pq.poll());
+ }
+ assertEquals(expectedOrder, priorityQueueOrder);
+ }
+
+ @Test
+ public void testDelayedQueueOrdering() {
+ ResourceRequestState.DelayedRequestQueue delayedRequestQueue = new
ResourceRequestState.DelayedRequestQueue();
+ Instant now = Instant.now();
+
+ // Expected priority by request timestamp only, regardless of active or
standby
+ ImmutableList<SamzaResourceRequest> expectedOrder = ImmutableList.of(
+ createRequestForActive(now),
+ createRequestForStandby(now.plusSeconds(60)),
+ createRequestForActive(now.plusSeconds(120)),
+ createRequestForStandby(now.plusSeconds(121)),
+ createRequestForActive(now.plusSeconds(240)),
+ createRequestForStandby(now.plusSeconds(241)));
+
+ SamzaResourceRequest[] copyExpectedOrder = new
SamzaResourceRequest[expectedOrder.size()];
+ copyExpectedOrder = expectedOrder.toArray(copyExpectedOrder);
+ List<SamzaResourceRequest> shuffled = Arrays.asList(copyExpectedOrder);
+
+ Collections.shuffle(shuffled, new Random(Instant.now().toEpochMilli()));
+ delayedRequestQueue.addAll(shuffled);
+
+ ArrayList priorityQueueOrder = new ArrayList();
+ for (int i = 0; i < expectedOrder.size(); ++i) {
+ priorityQueueOrder.add(delayedRequestQueue.poll());
+ }
+ assertEquals(expectedOrder, priorityQueueOrder);
+ }
+
+ SamzaResourceRequest createRequestForActive(Instant requestTime) {
+ String randomHost = RandomStringUtils.randomAlphanumeric(4);
+ String randomId = RandomStringUtils.randomAlphanumeric(8);
+ return new SamzaResourceRequest(1, 1, randomHost, randomId, requestTime);
+ }
+ SamzaResourceRequest createRequestForStandby(Instant requestTime) {
+ String randomHost = RandomStringUtils.randomAlphanumeric(4);
+ String randomId = RandomStringUtils.randomAlphanumeric(8) + "-standby"; //
hyphen in ID denotes a standby processor
+ return new SamzaResourceRequest(1, 1, randomHost, randomId, requestTime);
}
}
diff --git
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java
index d9dfe45..40d292c 100644
---
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java
+++
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java
@@ -18,14 +18,14 @@
*/
package org.apache.samza.clustermanager;
+import com.google.common.collect.ImmutableMap;
import java.lang.reflect.Field;
+import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-
-import com.google.common.collect.ImmutableMap;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
import org.apache.samza.container.LocalityManager;
@@ -249,10 +249,22 @@ public class TestHostAwareContainerAllocator {
assertEquals(1, requestsMap.get(ResourceRequestState.ANY_HOST).get());
}
+ @Test
+ public void testDelayedRequestedContainers() {
+ containerAllocator.requestResource("0", "abc");
+ containerAllocator.requestResourceWithDelay("0", "efg",
Duration.ofHours(2));
+ containerAllocator.requestResourceWithDelay("0", "hij",
Duration.ofHours(3));
+ containerAllocator.requestResourceWithDelay("0", "klm",
Duration.ofHours(4));
+
+ assertNotNull(clusterResourceManager.resourceRequests);
+ assertEquals(clusterResourceManager.resourceRequests.size(), 1);
+ assertEquals(requestState.numPendingRequests(), 1);
+ assertEquals(requestState.numDelayedRequests(), 3);
+ }
+
/**
* Handles expired requests correctly and assigns ANY_HOST
*/
-
@Test
public void testExpiredRequestAreAssignedToAnyHost() throws Exception {
final SamzaResource resource0 = new SamzaResource(1, 1000, "xyz", "id1");