This is an automated email from the ASF dual-hosted git repository.
jagadish pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 8736797 Improved standby-aware container allocation for
active-containers on job redeploys
8736797 is described below
commit 87367975c693c4a06f5e56332374c7f95d39eb29
Author: Ray Matharu <[email protected]>
AuthorDate: Tue Mar 26 19:11:33 2019 -0700
Improved standby-aware container allocation for active-containers on job
redeploys
Author: Ray Matharu <[email protected]>
Reviewers: Jagadish<[email protected]>
Closes #952 from rmatharu/test-standbyimprovements
---
.../clustermanager/StandbyContainerManager.java | 236 ++++++++++++---------
.../samza/job/yarn/YarnClusterResourceManager.java | 5 -
2 files changed, 133 insertions(+), 108 deletions(-)
diff --git
a/samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
b/samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
index db27238..f240757 100644
---
a/samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
+++
b/samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
@@ -25,8 +25,9 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import org.apache.samza.SamzaException;
+import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
import org.apache.samza.job.model.JobModel;
-import org.apache.samza.storage.kv.Entry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -91,10 +92,10 @@ public class StandbyContainerManager {
case SamzaResourceStatus.DISK_FAIL:
case SamzaResourceStatus.ABORTED:
case SamzaResourceStatus.PREEMPTED:
- initiateActiveContainerFailover(containerID, resourceID,
containerAllocator);
+ initiateStandbyAwareAllocation(containerID, resourceID,
containerAllocator);
break;
- // in all other cases, request-resource for the failed container, but
record the resource-request, so that
- // if this request expires, we can do a failover -- select a standby to
stop & start the active on standby's host
+ // in all other cases, request-resource for the failed container, but
record the resource-request, so that
+ // if this request expires, we can do a failover -- select a standby
to stop & start the active on standby's host
default:
log.info("Requesting resource for active-container {} on host {}",
containerID, preferredHost);
SamzaResourceRequest resourceRequest =
containerAllocator.getResourceRequest(containerID, preferredHost);
@@ -119,7 +120,7 @@ public class StandbyContainerManager {
log.info("Handling launch fail for standby-container {}, requesting
resource on any host {}", containerID);
containerAllocator.requestResource(containerID,
ResourceRequestState.ANY_HOST);
} else {
- initiateActiveContainerFailover(containerID, resourceID,
containerAllocator);
+ initiateStandbyAwareAllocation(containerID, resourceID,
containerAllocator);
}
}
@@ -147,7 +148,8 @@ public class StandbyContainerManager {
activeContainerID, standbyContainerHostname, standbyContainerID);
// request standbycontainer's host for active-container
- SamzaResourceRequest resourceRequestForActive =
containerAllocator.getResourceRequest(activeContainerID,
standbyContainerHostname);
+ SamzaResourceRequest resourceRequestForActive =
+ containerAllocator.getResourceRequest(activeContainerID,
standbyContainerHostname);
// record the resource request, before issuing it to avoid race with
allocation-thread
failoverMetadata.get().recordResourceRequest(resourceRequestForActive);
containerAllocator.issueResourceRequest(resourceRequestForActive);
@@ -161,60 +163,90 @@ public class StandbyContainerManager {
}
}
- /** Method to handle failover for an active container.
- * We try to find a standby for the active container, and issue a stop on
it.
- * If we do not find a standby container, we simply issue an anyhost
request to place it.
+ /** Method to handle standby-aware allocation for an active container.
+ * We try to find a standby host for the active container, and issue a stop
on any standby-containers running on it,
+ * request resource to place the active on the standby's host, and one to
place the standby elsewhere.
*
- * @param containerID the samzaContainerID of the active-container
+ * @param activeContainerID the samzaContainerID of the active-container
* @param resourceID the samza-resource-ID of the container when it failed
(used to index failover-state)
*/
- private void initiateActiveContainerFailover(String containerID, String
resourceID,
+ private void initiateStandbyAwareAllocation(String activeContainerID, String
resourceID,
AbstractContainerAllocator containerAllocator) {
- Optional<Entry<String, SamzaResource>> standbyContainer =
this.selectStandby(containerID, resourceID);
+ String standbyHost = this.selectStandbyHost(activeContainerID, resourceID);
- // If we find a standbyContainer, we initiate a failover
- if (standbyContainer.isPresent()) {
-
- String standbyContainerId = standbyContainer.get().getKey();
- SamzaResource standbyResource = standbyContainer.get().getValue();
- String standbyResourceID = standbyResource.getResourceID();
- String standbyHost = standbyResource.getHost();
-
- // update the state
- FailoverMetadata failoverMetadata =
this.registerActiveContainerFailure(containerID, resourceID);
- failoverMetadata.updateStandbyContainer(standbyResourceID, standbyHost);
-
- log.info("Initiating failover and stopping standby container, found
standbyContainer {} = resource {}, "
- + "for active container {}", standbyContainerId, standbyResourceID,
containerID);
- samzaApplicationState.failoversToStandby.incrementAndGet();
- this.clusterResourceManager.stopStreamProcessor(standbyResource);
+ // if the standbyHost returned is anyhost, we proceed with the request
directly
+ if (standbyHost.equals(ResourceRequestState.ANY_HOST)) {
+ log.info("No standby container found for active container {}, making a
resource-request for placing {} on {}, active's resourceID: {}",
+ activeContainerID, activeContainerID, ResourceRequestState.ANY_HOST,
resourceID);
+ samzaApplicationState.failoversToAnyHost.incrementAndGet();
+ containerAllocator.requestResource(activeContainerID,
ResourceRequestState.ANY_HOST);
} else {
- // If we dont find a standbyContainer, we proceed with the ANYHOST
request
- log.info("No standby container found for active container {}, making a
request for {}", containerID,
- ResourceRequestState.ANY_HOST);
- samzaApplicationState.failoversToAnyHost.incrementAndGet();
- containerAllocator.requestResource(containerID,
ResourceRequestState.ANY_HOST);
+ // Check if there is a running standby-container on that host that needs
to be stopped
+ List<String> standbySamzaContainerIds =
this.standbyContainerConstraints.get(activeContainerID);
+
+ Map<String, SamzaResource> runningStandbyContainersOnHost = new
HashMap<>();
+ this.samzaApplicationState.runningContainers.forEach((samzaContainerId,
samzaResource) -> {
+ if (standbySamzaContainerIds.contains(samzaContainerId) &&
samzaResource.getHost().equals(standbyHost)) {
+ runningStandbyContainersOnHost.put(samzaContainerId,
samzaResource);
+ }
+ });
+
+ if (runningStandbyContainersOnHost.isEmpty()) {
+ // if there are no running standby-containers on the standbyHost, we
proceed to directly make a resource request
+
+ log.info("No running standby container to stop on host {}, making a
resource-request for placing {} on {}, active's resourceID: {}",
+ standbyHost, activeContainerID, standbyHost, resourceID);
+ FailoverMetadata failoverMetadata =
this.registerActiveContainerFailure(activeContainerID, resourceID);
+
+ // record the resource request, before issuing it to avoid race with
allocation-thread
+ SamzaResourceRequest resourceRequestForActive =
+ containerAllocator.getResourceRequest(activeContainerID,
standbyHost);
+ failoverMetadata.recordResourceRequest(resourceRequestForActive);
+ containerAllocator.issueResourceRequest(resourceRequestForActive);
+ samzaApplicationState.failoversToStandby.incrementAndGet();
+ } else {
+ // if there is a running standby-container on the standbyHost, we
issue a stop (the stopComplete callback completes the remainder of the flow)
+ FailoverMetadata failoverMetadata =
this.registerActiveContainerFailure(activeContainerID, resourceID);
+
+ runningStandbyContainersOnHost.forEach((standbyContainerID,
standbyResource) -> {
+ log.info("Initiating failover and stopping standby container,
found standbyContainer {} = resource {}, "
+ + "for active container {}",
runningStandbyContainersOnHost.keySet(),
+ runningStandbyContainersOnHost.values(), activeContainerID);
+
failoverMetadata.updateStandbyContainer(standbyResource.getResourceID(),
standbyResource.getHost());
+ samzaApplicationState.failoversToStandby.incrementAndGet();
+ this.clusterResourceManager.stopStreamProcessor(standbyResource);
+ });
+
+ // if multiple standbys are on the same host, we are in an invalid
state, so we fail the deploy and retry
+ if (runningStandbyContainersOnHost.size() > 1) {
+ throw new SamzaException(
+ "Invalid State. Multiple standby containers found running on one
host:" + runningStandbyContainersOnHost);
+ }
+ }
}
}
/**
- * Method to select a standby container for a given active container that
has stopped.
- * TODO: enrich this method to select standby's intelligently based on lag,
timestamp, load-balencing, etc.
+ * Method to select a standby host for a given active container.
+ * 1. We first try to select a host which has a running standby-container,
that we haven't already selected for failover.
+ * 2. If we dont any such host, we iterate over last-known standbyHosts, if
we haven't already selected it for failover.
+ * 3. If still dont find a host, we fall back to AnyHost.
+ *
+ * See https://issues.apache.org/jira/browse/SAMZA-2140
+ *
* @param activeContainerID Samza containerID of the active container
* @param activeContainerResourceID ResourceID of the active container at
the time of its last failure
- * @return
+ * @return standby host for the active container (if found), Any-host
otherwise.
*/
- private Optional<Entry<String, SamzaResource>> selectStandby(String
activeContainerID,
- String activeContainerResourceID) {
+ private String selectStandbyHost(String activeContainerID, String
activeContainerResourceID) {
- log.info("Standby containers {} for active container {}",
this.standbyContainerConstraints.get(activeContainerID), activeContainerID);
+ log.info("Standby containers {} for active container {} (resourceID {})",
this.standbyContainerConstraints.get(activeContainerID), activeContainerID,
activeContainerResourceID);
// obtain any existing failover metadata
- Optional<StandbyContainerManager.FailoverMetadata> failoverMetadata =
- activeContainerResourceID == null ? Optional.empty() :
this.getFailoverMetadata(activeContainerResourceID);
+ Optional<FailoverMetadata> failoverMetadata =
getFailoverMetadata(activeContainerResourceID);
// Iterate over the list of running standby containers, to find a standby
resource that we have not already
// used for a failover for this active resoruce
@@ -224,17 +256,39 @@ public class StandbyContainerManager {
SamzaResource standbyContainerResource =
samzaApplicationState.runningContainers.get(standbyContainerID);
// use this standby if there was no previous failover for which this
standbyResource was used
- if (!(failoverMetadata.isPresent() &&
failoverMetadata.get().isStandbyResourceUsed(standbyContainerResource.getResourceID())))
{
+ if (!(failoverMetadata.isPresent() && failoverMetadata.get()
+ .isStandbyResourceUsed(standbyContainerResource.getResourceID())))
{
- log.info("Returning standby container {} in running state for active
container {}", standbyContainerID,
- activeContainerID);
- return Optional.of(new Entry<>(standbyContainerID,
standbyContainerResource));
+ log.info("Returning standby container {} in running state on host {}
for active container {}",
+ standbyContainerID, standbyContainerResource.getHost(),
activeContainerID);
+ return standbyContainerResource.getHost();
}
}
}
-
log.info("Did not find any running standby container for active container
{}", activeContainerID);
- return Optional.empty();
+
+ // We iterate over the list of last-known standbyHosts to check if anyone
of them has not already been tried
+ for (String standbyContainerID :
this.standbyContainerConstraints.get(activeContainerID)) {
+
+ String standbyHost =
this.samzaApplicationState.jobModelManager.jobModel().
+ getContainerToHostValue(standbyContainerID,
SetContainerHostMapping.HOST_KEY);
+
+ if (standbyHost == null || standbyHost.isEmpty()) {
+ log.info("No last known standbyHost for container {}",
standbyContainerID);
+
+ } else if (failoverMetadata.isPresent() &&
failoverMetadata.get().isStandbyHostUsed(standbyHost)) {
+
+ log.info("Not using standby host {} for active container {} because it
had already been selected", standbyHost,
+ activeContainerID);
+ } else {
+ // standbyHost is valid and has not already been selected
+ log.info("Returning standby host {} for active container {}",
standbyHost, activeContainerID);
+ return standbyHost;
+ }
+ }
+
+ log.info("Did not find any standby host for active container {}, returning
any-host", activeContainerID);
+ return ResourceRequestState.ANY_HOST;
}
/**
@@ -321,11 +375,13 @@ public class StandbyContainerManager {
if (checkStandbyConstraints(request, samzaResource)) {
// This resource can be used to launch this container
- log.info("Running container {} on {} meets standby constraints,
preferredHost = {}", containerID, samzaResource.getHost(), preferredHost);
+ log.info("Running container {} on {} meets standby constraints,
preferredHost = {}", containerID,
+ samzaResource.getHost(), preferredHost);
containerAllocator.runStreamProcessor(request, preferredHost);
} else if (StandbyTaskUtil.isStandbyContainer(containerID)) {
// This resource cannot be used to launch this standby container, so we
make a new anyhost request
- log.info("Running standby container {} on host {} does not meet standby
constraints, cancelling resource request, releasing resource, and making a new
ANY_HOST request",
+ log.info(
+ "Running standby container {} on host {} does not meet standby
constraints, cancelling resource request, releasing resource, and making a new
ANY_HOST request",
containerID, samzaResource.getHost());
resourceRequestState.releaseUnstartableContainer(samzaResource,
preferredHost);
resourceRequestState.cancelResourceRequest(request);
@@ -333,24 +389,16 @@ public class StandbyContainerManager {
samzaApplicationState.failedStandbyAllocations.incrementAndGet();
} else {
// This resource cannot be used to launch this active container
container, so we initiate a failover
- log.warn("Running active container {} on host {} does not meet standby
constraints, cancelling resource request, releasing resource",
+ log.warn(
+ "Running active container {} on host {} does not meet standby
constraints, cancelling resource request, releasing resource",
containerID, samzaResource.getHost());
resourceRequestState.releaseUnstartableContainer(samzaResource,
preferredHost);
resourceRequestState.cancelResourceRequest(request);
Optional<FailoverMetadata> failoverMetadata =
getFailoverMetadata(request);
-
- // if this active-container has never failed, then simple request anyhost
- if (!failoverMetadata.isPresent()) {
- log.info("Requesting ANY_HOST for active container {}", containerID);
- containerAllocator.requestResource(containerID,
ResourceRequestState.ANY_HOST);
- } else {
- log.info("Initiating failover for active container {}", containerID);
- // we use the activeContainer's last resourceID to initiate the
failover
- String lastKnownResourceID =
failoverMetadata.get().activeContainerResourceID;
- initiateActiveContainerFailover(containerID, lastKnownResourceID,
containerAllocator);
- }
-
+ String lastKnownResourceID =
+ failoverMetadata.isPresent() ?
failoverMetadata.get().activeContainerResourceID : "unknown-" + containerID;
+ initiateStandbyAwareAllocation(containerID, lastKnownResourceID,
containerAllocator);
samzaApplicationState.failedStandbyAllocations.incrementAndGet();
}
}
@@ -368,9 +416,10 @@ public class StandbyContainerManager {
ResourceRequestState resourceRequestState) {
if (StandbyTaskUtil.isStandbyContainer(containerID)) {
- handleExpiredRequestForStandbyContainer(containerID, request,
alternativeResource, containerAllocator, resourceRequestState);
+ handleExpiredRequestForStandbyContainer(containerID, request,
alternativeResource, containerAllocator,
+ resourceRequestState);
} else {
- handleExpiredRequestForActiveContainer(containerID, request,
alternativeResource, containerAllocator, resourceRequestState);
+ handleExpiredRequestForActiveContainer(containerID, request,
containerAllocator, resourceRequestState);
}
}
@@ -382,10 +431,10 @@ public class StandbyContainerManager {
if (alternativeResource.isPresent()) {
// A standby container can be started on the
anyhost-alternative-resource rightaway provided it passes all the
// standby constraints
- log.info("Handling expired request, standby container {} can be started
on alternative resource {}", containerID, alternativeResource.get());
+ log.info("Handling expired request, standby container {} can be started
on alternative resource {}", containerID,
+ alternativeResource.get());
checkStandbyConstraintsAndRunStreamProcessor(request,
ResourceRequestState.ANY_HOST, alternativeResource.get(),
containerAllocator, resourceRequestState);
-
} else {
// If there is no alternative-resource for the standby container we make
a new anyhost request
log.info("Handling expired request, requesting anyHost resource for
standby container {}", containerID);
@@ -396,45 +445,20 @@ public class StandbyContainerManager {
// Handle an expired resource request that was made for placing an active
container
private void handleExpiredRequestForActiveContainer(String containerID,
SamzaResourceRequest request,
- Optional<SamzaResource> alternativeResource, AbstractContainerAllocator
containerAllocator,
- ResourceRequestState resourceRequestState) {
+ AbstractContainerAllocator containerAllocator, ResourceRequestState
resourceRequestState) {
+ log.info("Handling expired request for active container {}", containerID);
Optional<FailoverMetadata> failoverMetadata = getFailoverMetadata(request);
-
- // An active container can be started on the alternative-any-host resource
rightaway, if it has no prior failure,
- // that is, there is no failoverMetadata associated with this
resource-request
- if (alternativeResource.isPresent() && !failoverMetadata.isPresent()) {
-
- log.info("Handling expired request, trying to run active container {} on
alternative resource {}", containerID, alternativeResource.get());
-
- checkStandbyConstraintsAndRunStreamProcessor(request,
ResourceRequestState.ANY_HOST, alternativeResource.get(),
- containerAllocator, resourceRequestState);
-
- } else if (!failoverMetadata.isPresent() &&
!alternativeResource.isPresent()) {
- // An active container has no prior failure, and there is
no-alternative-anyhost resource, so we make a new anyhost request
- log.info("Handling expired request, requesting anyHost resource for
active container {} because this active container has never failed",
containerID);
-
- resourceRequestState.cancelResourceRequest(request);
- containerAllocator.requestResource(containerID,
ResourceRequestState.ANY_HOST);
-
- } else if (failoverMetadata.isPresent()) {
- // An active container that had failed, and whose subsequent resource
request has expired, needs to be failed over to
- // a new standby-candidate, so we initiate a failover
-
- log.info("Handling expired request, initiating failover for active
container {}", containerID);
-
- resourceRequestState.cancelResourceRequest(request);
-
- // we use the activeContainer's resourceID to initiate the failover
- String lastKnownResourceID =
failoverMetadata.get().activeContainerResourceID;
- initiateActiveContainerFailover(containerID, lastKnownResourceID,
containerAllocator);
-
- } else {
- log.error("Handling expired request, invalid state containerID {},
resource request {}", containerID, request);
- }
+ resourceRequestState.cancelResourceRequest(request);
+
+ // we use the activeContainer's resourceID to initiate the failover and
index failover-state
+ // if there is no prior failure for this active-Container, we use "unknown"
+ String lastKnownResourceID =
+ failoverMetadata.isPresent() ?
failoverMetadata.get().activeContainerResourceID : "unknown-" + containerID;
+ log.info("Handling expired request for active container {},
lastKnownResourceID is {}", containerID, lastKnownResourceID);
+ initiateStandbyAwareAllocation(containerID, lastKnownResourceID,
containerAllocator);
}
-
/**
* Check if a activeContainerResource has failover-metadata associated with
it
*/
@@ -455,7 +479,6 @@ public class StandbyContainerManager {
return Optional.empty();
}
-
@Override
public String toString() {
return this.failovers.toString();
@@ -501,11 +524,18 @@ public class StandbyContainerManager {
this.resourceRequests.add(samzaResourceRequest);
}
- // Check if this resource request is used for this failover
+ // Check if this resource request has been issued in this failover
public synchronized boolean containsResourceRequest(SamzaResourceRequest
samzaResourceRequest) {
return this.resourceRequests.contains(samzaResourceRequest);
}
+ // Check if this standby-host has been used in this failover, that is, if
this host matches the host of a selected-standby,
+ // or if we have made a resourceRequest for this host
+ public boolean isStandbyHostUsed(String standbyHost) {
+ return this.selectedStandbyContainers.values().contains(standbyHost)
+ || this.resourceRequests.stream().filter(request ->
request.getPreferredHost().equals(standbyHost)).count() > 0;
+ }
+
@Override
public String toString() {
return "[activeContainerID: " + this.activeContainerID + "
activeContainerResourceID: "
diff --git
a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
index 6d0fdb1..3a66573 100644
---
a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
+++
b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
@@ -563,11 +563,6 @@ public class YarnClusterResourceManager extends
ClusterResourceManager implement
if (samzaContainerId != null) {
YarnContainer container =
state.runningYarnContainers.get(samzaContainerId);
log.info("Failed Stop on Yarn Container: {} had Samza ContainerId: {} ",
containerId.toString(), samzaContainerId);
- SamzaResource resource = new
SamzaResource(container.resource().getVirtualCores(),
- container.resource().getMemory(), container.nodeId().getHost(),
containerId.toString());
-
- log.info("Re-invoking stop stream processor for container: {}",
containerId);
- this.stopStreamProcessor(resource);// For now, we retry the stopping of
the container
} else {
log.info("Got an invalid notification for container: {}",
containerId.toString());
}