This is an automated email from the ASF dual-hosted git repository.

jagadish pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 8736797  Improved standby-aware container allocation for 
active-containers on job redeploys
8736797 is described below

commit 87367975c693c4a06f5e56332374c7f95d39eb29
Author: Ray Matharu <rmath...@linkedin.com>
AuthorDate: Tue Mar 26 19:11:33 2019 -0700

    Improved standby-aware container allocation for active-containers on job 
redeploys
    
    Author: Ray Matharu <rmath...@linkedin.com>
    
    Reviewers: Jagadish<jagad...@apache.org>
    
    Closes #952 from rmatharu/test-standbyimprovements
---
 .../clustermanager/StandbyContainerManager.java    | 236 ++++++++++++---------
 .../samza/job/yarn/YarnClusterResourceManager.java |   5 -
 2 files changed, 133 insertions(+), 108 deletions(-)

diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
index db27238..f240757 100644
--- 
a/samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
@@ -25,8 +25,9 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import org.apache.samza.SamzaException;
+import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
 import org.apache.samza.job.model.JobModel;
-import org.apache.samza.storage.kv.Entry;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -91,10 +92,10 @@ public class StandbyContainerManager {
         case SamzaResourceStatus.DISK_FAIL:
         case SamzaResourceStatus.ABORTED:
         case SamzaResourceStatus.PREEMPTED:
-          initiateActiveContainerFailover(containerID, resourceID, 
containerAllocator);
+          initiateStandbyAwareAllocation(containerID, resourceID, 
containerAllocator);
           break;
-      // in all other cases, request-resource for the failed container, but 
record the resource-request, so that
-      // if this request expires, we can do a failover -- select a standby to 
stop & start the active on standby's host
+        // in all other cases, request-resource for the failed container, but 
record the resource-request, so that
+        // if this request expires, we can do a failover -- select a standby 
to stop & start the active on standby's host
         default:
           log.info("Requesting resource for active-container {} on host {}", 
containerID, preferredHost);
           SamzaResourceRequest resourceRequest = 
containerAllocator.getResourceRequest(containerID, preferredHost);
@@ -119,7 +120,7 @@ public class StandbyContainerManager {
       log.info("Handling launch fail for standby-container {}, requesting 
resource on any host {}", containerID);
       containerAllocator.requestResource(containerID, 
ResourceRequestState.ANY_HOST);
     } else {
-      initiateActiveContainerFailover(containerID, resourceID, 
containerAllocator);
+      initiateStandbyAwareAllocation(containerID, resourceID, 
containerAllocator);
     }
   }
 
@@ -147,7 +148,8 @@ public class StandbyContainerManager {
           activeContainerID, standbyContainerHostname, standbyContainerID);
 
       // request standbycontainer's host for active-container
-      SamzaResourceRequest resourceRequestForActive = 
containerAllocator.getResourceRequest(activeContainerID, 
standbyContainerHostname);
+      SamzaResourceRequest resourceRequestForActive =
+          containerAllocator.getResourceRequest(activeContainerID, 
standbyContainerHostname);
       // record the resource request, before issuing it to avoid race with 
allocation-thread
       failoverMetadata.get().recordResourceRequest(resourceRequestForActive);
       containerAllocator.issueResourceRequest(resourceRequestForActive);
@@ -161,60 +163,90 @@ public class StandbyContainerManager {
     }
   }
 
-  /** Method to handle failover for an active container.
-   *  We try to find a standby for the active container, and issue a stop on 
it.
-   *  If we do not find a standby container, we simply issue an anyhost 
request to place it.
+  /** Method to handle standby-aware allocation for an active container.
+   *  We try to find a standby host for the active container, and issue a stop 
on any standby-containers running on it,
+   *  request resource to place the active on the standby's host, and one to 
place the standby elsewhere.
    *
-   * @param containerID the samzaContainerID of the active-container
+   * @param activeContainerID the samzaContainerID of the active-container
    * @param resourceID  the samza-resource-ID of the container when it failed 
(used to index failover-state)
    */
-  private void initiateActiveContainerFailover(String containerID, String 
resourceID,
+  private void initiateStandbyAwareAllocation(String activeContainerID, String 
resourceID,
       AbstractContainerAllocator containerAllocator) {
 
-    Optional<Entry<String, SamzaResource>> standbyContainer = 
this.selectStandby(containerID, resourceID);
+    String standbyHost = this.selectStandbyHost(activeContainerID, resourceID);
 
-    // If we find a standbyContainer, we initiate a failover
-    if (standbyContainer.isPresent()) {
-
-      String standbyContainerId = standbyContainer.get().getKey();
-      SamzaResource standbyResource = standbyContainer.get().getValue();
-      String standbyResourceID = standbyResource.getResourceID();
-      String standbyHost = standbyResource.getHost();
-
-      // update the state
-      FailoverMetadata failoverMetadata = 
this.registerActiveContainerFailure(containerID, resourceID);
-      failoverMetadata.updateStandbyContainer(standbyResourceID, standbyHost);
-
-      log.info("Initiating failover and stopping standby container, found 
standbyContainer {} = resource {}, "
-          + "for active container {}", standbyContainerId, standbyResourceID, 
containerID);
-      samzaApplicationState.failoversToStandby.incrementAndGet();
-      this.clusterResourceManager.stopStreamProcessor(standbyResource);
+    // if the standbyHost returned is anyhost, we proceed with the request 
directly
+    if (standbyHost.equals(ResourceRequestState.ANY_HOST)) {
+      log.info("No standby container found for active container {}, making a 
resource-request for placing {} on {}, active's resourceID: {}",
+          activeContainerID, activeContainerID, ResourceRequestState.ANY_HOST, 
resourceID);
+      samzaApplicationState.failoversToAnyHost.incrementAndGet();
+      containerAllocator.requestResource(activeContainerID, 
ResourceRequestState.ANY_HOST);
 
     } else {
 
-      // If we dont find a standbyContainer, we proceed with the ANYHOST 
request
-      log.info("No standby container found for active container {}, making a 
request for {}", containerID,
-          ResourceRequestState.ANY_HOST);
-      samzaApplicationState.failoversToAnyHost.incrementAndGet();
-      containerAllocator.requestResource(containerID, 
ResourceRequestState.ANY_HOST);
+      // Check if there is a running standby-container on that host that needs 
to be stopped
+      List<String> standbySamzaContainerIds = 
this.standbyContainerConstraints.get(activeContainerID);
+
+      Map<String, SamzaResource> runningStandbyContainersOnHost = new 
HashMap<>();
+      this.samzaApplicationState.runningContainers.forEach((samzaContainerId, 
samzaResource) -> {
+          if (standbySamzaContainerIds.contains(samzaContainerId) && 
samzaResource.getHost().equals(standbyHost)) {
+            runningStandbyContainersOnHost.put(samzaContainerId, 
samzaResource);
+          }
+        });
+
+      if (runningStandbyContainersOnHost.isEmpty()) {
+        // if there are no running standby-containers on the standbyHost, we 
proceed to directly make a resource request
+
+        log.info("No running standby container to stop on host {}, making a 
resource-request for placing {} on {}, active's resourceID: {}",
+            standbyHost, activeContainerID, standbyHost, resourceID);
+        FailoverMetadata failoverMetadata = 
this.registerActiveContainerFailure(activeContainerID, resourceID);
+
+        // record the resource request, before issuing it to avoid race with 
allocation-thread
+        SamzaResourceRequest resourceRequestForActive =
+            containerAllocator.getResourceRequest(activeContainerID, 
standbyHost);
+        failoverMetadata.recordResourceRequest(resourceRequestForActive);
+        containerAllocator.issueResourceRequest(resourceRequestForActive);
+        samzaApplicationState.failoversToStandby.incrementAndGet();
+      } else {
+        // if there is a running standby-container on the standbyHost, we 
issue a stop (the stopComplete callback completes the remainder of the flow)
+        FailoverMetadata failoverMetadata = 
this.registerActiveContainerFailure(activeContainerID, resourceID);
+
+        runningStandbyContainersOnHost.forEach((standbyContainerID, 
standbyResource) -> {
+            log.info("Initiating failover and stopping standby container, 
found standbyContainer {} = resource {}, "
+                    + "for active container {}", 
runningStandbyContainersOnHost.keySet(),
+                runningStandbyContainersOnHost.values(), activeContainerID);
+            
failoverMetadata.updateStandbyContainer(standbyResource.getResourceID(), 
standbyResource.getHost());
+            samzaApplicationState.failoversToStandby.incrementAndGet();
+            this.clusterResourceManager.stopStreamProcessor(standbyResource);
+          });
+
+        // if multiple standbys are on the same host, we are in an invalid 
state, so we fail the deploy and retry
+        if (runningStandbyContainersOnHost.size() > 1) {
+          throw new SamzaException(
+              "Invalid State. Multiple standby containers found running on one 
host:" + runningStandbyContainersOnHost);
+        }
+      }
     }
   }
 
   /**
-   * Method to select a standby container for a given active container that 
has stopped.
-   * TODO: enrich this method to select standby's intelligently based on lag, 
timestamp, load-balencing, etc.
+   * Method to select a standby host for a given active container.
+   * 1. We first try to select a host which has a running standby-container, 
that we haven't already selected for failover.
+   * 2. If we dont any such host, we iterate over last-known standbyHosts, if 
we haven't already selected it for failover.
+   * 3. If still dont find a host, we fall back to AnyHost.
+   *
+   * See https://issues.apache.org/jira/browse/SAMZA-2140
+   *
    * @param activeContainerID Samza containerID of the active container
    * @param activeContainerResourceID ResourceID of the active container at 
the time of its last failure
-   * @return
+   * @return standby host for the active container (if found), Any-host 
otherwise.
    */
-  private Optional<Entry<String, SamzaResource>> selectStandby(String 
activeContainerID,
-      String activeContainerResourceID) {
+  private String selectStandbyHost(String activeContainerID, String 
activeContainerResourceID) {
 
-    log.info("Standby containers {} for active container {}", 
this.standbyContainerConstraints.get(activeContainerID), activeContainerID);
+    log.info("Standby containers {} for active container {} (resourceID {})", 
this.standbyContainerConstraints.get(activeContainerID), activeContainerID, 
activeContainerResourceID);
 
     // obtain any existing failover metadata
-    Optional<StandbyContainerManager.FailoverMetadata> failoverMetadata =
-        activeContainerResourceID == null ? Optional.empty() : 
this.getFailoverMetadata(activeContainerResourceID);
+    Optional<FailoverMetadata> failoverMetadata = 
getFailoverMetadata(activeContainerResourceID);
 
     // Iterate over the list of running standby containers, to find a standby 
resource that we have not already
     // used for a failover for this active resoruce
@@ -224,17 +256,39 @@ public class StandbyContainerManager {
         SamzaResource standbyContainerResource = 
samzaApplicationState.runningContainers.get(standbyContainerID);
 
         // use this standby if there was no previous failover for which this 
standbyResource was used
-        if (!(failoverMetadata.isPresent() && 
failoverMetadata.get().isStandbyResourceUsed(standbyContainerResource.getResourceID())))
 {
+        if (!(failoverMetadata.isPresent() && failoverMetadata.get()
+            .isStandbyResourceUsed(standbyContainerResource.getResourceID()))) 
{
 
-          log.info("Returning standby container {} in running state for active 
container {}", standbyContainerID,
-              activeContainerID);
-          return Optional.of(new Entry<>(standbyContainerID, 
standbyContainerResource));
+          log.info("Returning standby container {} in running state on host {} 
for active container {}",
+              standbyContainerID, standbyContainerResource.getHost(), 
activeContainerID);
+          return standbyContainerResource.getHost();
         }
       }
     }
-
     log.info("Did not find any running standby container for active container 
{}", activeContainerID);
-    return Optional.empty();
+
+    // We iterate over the list of last-known standbyHosts to check if anyone 
of them has not already been tried
+    for (String standbyContainerID : 
this.standbyContainerConstraints.get(activeContainerID)) {
+
+      String standbyHost = 
this.samzaApplicationState.jobModelManager.jobModel().
+          getContainerToHostValue(standbyContainerID, 
SetContainerHostMapping.HOST_KEY);
+
+      if (standbyHost == null || standbyHost.isEmpty()) {
+        log.info("No last known standbyHost for container {}", 
standbyContainerID);
+
+      } else if (failoverMetadata.isPresent() && 
failoverMetadata.get().isStandbyHostUsed(standbyHost)) {
+
+        log.info("Not using standby host {} for active container {} because it 
had already been selected", standbyHost,
+            activeContainerID);
+      } else {
+        // standbyHost is valid and has not already been selected
+        log.info("Returning standby host {} for active container {}", 
standbyHost, activeContainerID);
+        return standbyHost;
+      }
+    }
+
+    log.info("Did not find any standby host for active container {}, returning 
any-host", activeContainerID);
+    return ResourceRequestState.ANY_HOST;
   }
 
   /**
@@ -321,11 +375,13 @@ public class StandbyContainerManager {
 
     if (checkStandbyConstraints(request, samzaResource)) {
       // This resource can be used to launch this container
-      log.info("Running container {} on {} meets standby constraints, 
preferredHost = {}", containerID, samzaResource.getHost(), preferredHost);
+      log.info("Running container {} on {} meets standby constraints, 
preferredHost = {}", containerID,
+          samzaResource.getHost(), preferredHost);
       containerAllocator.runStreamProcessor(request, preferredHost);
     } else if (StandbyTaskUtil.isStandbyContainer(containerID)) {
       // This resource cannot be used to launch this standby container, so we 
make a new anyhost request
-      log.info("Running standby container {} on host {} does not meet standby 
constraints, cancelling resource request, releasing resource, and making a new 
ANY_HOST request",
+      log.info(
+          "Running standby container {} on host {} does not meet standby 
constraints, cancelling resource request, releasing resource, and making a new 
ANY_HOST request",
           containerID, samzaResource.getHost());
       resourceRequestState.releaseUnstartableContainer(samzaResource, 
preferredHost);
       resourceRequestState.cancelResourceRequest(request);
@@ -333,24 +389,16 @@ public class StandbyContainerManager {
       samzaApplicationState.failedStandbyAllocations.incrementAndGet();
     } else {
       // This resource cannot be used to launch this active container 
container, so we initiate a failover
-      log.warn("Running active container {} on host {} does not meet standby 
constraints, cancelling resource request, releasing resource",
+      log.warn(
+          "Running active container {} on host {} does not meet standby 
constraints, cancelling resource request, releasing resource",
           containerID, samzaResource.getHost());
       resourceRequestState.releaseUnstartableContainer(samzaResource, 
preferredHost);
       resourceRequestState.cancelResourceRequest(request);
 
       Optional<FailoverMetadata> failoverMetadata = 
getFailoverMetadata(request);
-
-      // if this active-container has never failed, then simple request anyhost
-      if (!failoverMetadata.isPresent()) {
-        log.info("Requesting ANY_HOST for active container {}", containerID);
-        containerAllocator.requestResource(containerID, 
ResourceRequestState.ANY_HOST);
-      } else {
-        log.info("Initiating failover for active container {}", containerID);
-        // we use the activeContainer's last resourceID to initiate the 
failover
-        String lastKnownResourceID = 
failoverMetadata.get().activeContainerResourceID;
-        initiateActiveContainerFailover(containerID, lastKnownResourceID, 
containerAllocator);
-      }
-
+      String lastKnownResourceID =
+          failoverMetadata.isPresent() ? 
failoverMetadata.get().activeContainerResourceID : "unknown-" + containerID;
+      initiateStandbyAwareAllocation(containerID, lastKnownResourceID, 
containerAllocator);
       samzaApplicationState.failedStandbyAllocations.incrementAndGet();
     }
   }
@@ -368,9 +416,10 @@ public class StandbyContainerManager {
       ResourceRequestState resourceRequestState) {
 
     if (StandbyTaskUtil.isStandbyContainer(containerID)) {
-      handleExpiredRequestForStandbyContainer(containerID, request, 
alternativeResource, containerAllocator, resourceRequestState);
+      handleExpiredRequestForStandbyContainer(containerID, request, 
alternativeResource, containerAllocator,
+          resourceRequestState);
     } else {
-      handleExpiredRequestForActiveContainer(containerID, request, 
alternativeResource, containerAllocator, resourceRequestState);
+      handleExpiredRequestForActiveContainer(containerID, request, 
containerAllocator, resourceRequestState);
     }
   }
 
@@ -382,10 +431,10 @@ public class StandbyContainerManager {
     if (alternativeResource.isPresent()) {
       // A standby container can be started on the 
anyhost-alternative-resource rightaway provided it passes all the
       // standby constraints
-      log.info("Handling expired request, standby container {} can be started 
on alternative resource {}", containerID, alternativeResource.get());
+      log.info("Handling expired request, standby container {} can be started 
on alternative resource {}", containerID,
+          alternativeResource.get());
       checkStandbyConstraintsAndRunStreamProcessor(request, 
ResourceRequestState.ANY_HOST, alternativeResource.get(),
           containerAllocator, resourceRequestState);
-
     } else {
       // If there is no alternative-resource for the standby container we make 
a new anyhost request
       log.info("Handling expired request, requesting anyHost resource for 
standby container {}", containerID);
@@ -396,45 +445,20 @@ public class StandbyContainerManager {
 
   // Handle an expired resource request that was made for placing an active 
container
   private void handleExpiredRequestForActiveContainer(String containerID, 
SamzaResourceRequest request,
-      Optional<SamzaResource> alternativeResource, AbstractContainerAllocator 
containerAllocator,
-      ResourceRequestState resourceRequestState) {
+      AbstractContainerAllocator containerAllocator, ResourceRequestState 
resourceRequestState) {
 
+    log.info("Handling expired request for active container {}", containerID);
     Optional<FailoverMetadata> failoverMetadata = getFailoverMetadata(request);
-
-    // An active container can be started on the alternative-any-host resource 
rightaway, if it has no prior failure,
-    // that is, there is no failoverMetadata associated with this 
resource-request
-    if (alternativeResource.isPresent() && !failoverMetadata.isPresent()) {
-
-      log.info("Handling expired request, trying to run active container {} on 
alternative resource {}", containerID, alternativeResource.get());
-
-      checkStandbyConstraintsAndRunStreamProcessor(request, 
ResourceRequestState.ANY_HOST, alternativeResource.get(),
-          containerAllocator, resourceRequestState);
-
-    } else if (!failoverMetadata.isPresent() && 
!alternativeResource.isPresent()) {
-    // An active container has no prior failure, and there is 
no-alternative-anyhost resource, so we make a new anyhost request
-      log.info("Handling expired request, requesting anyHost resource for 
active container {} because this active container has never failed", 
containerID);
-
-      resourceRequestState.cancelResourceRequest(request);
-      containerAllocator.requestResource(containerID, 
ResourceRequestState.ANY_HOST);
-
-    } else if (failoverMetadata.isPresent()) {
-      // An active container that had failed, and whose subsequent resource 
request has expired, needs to be failed over to
-      // a new standby-candidate, so we initiate a failover
-
-      log.info("Handling expired request, initiating failover for active 
container {}", containerID);
-
-      resourceRequestState.cancelResourceRequest(request);
-
-      // we use the activeContainer's resourceID to initiate the failover
-      String lastKnownResourceID = 
failoverMetadata.get().activeContainerResourceID;
-      initiateActiveContainerFailover(containerID, lastKnownResourceID, 
containerAllocator);
-
-    } else {
-      log.error("Handling expired request, invalid state containerID {}, 
resource request {}", containerID, request);
-    }
+    resourceRequestState.cancelResourceRequest(request);
+
+    // we use the activeContainer's resourceID to initiate the failover and 
index failover-state
+    // if there is no prior failure for this active-Container, we use "unknown"
+    String lastKnownResourceID =
+        failoverMetadata.isPresent() ? 
failoverMetadata.get().activeContainerResourceID : "unknown-" + containerID;
+    log.info("Handling expired request for active container {}, 
lastKnownResourceID is {}", containerID, lastKnownResourceID);
+    initiateStandbyAwareAllocation(containerID, lastKnownResourceID, 
containerAllocator);
   }
 
-
   /**
    * Check if a activeContainerResource has failover-metadata associated with 
it
    */
@@ -455,7 +479,6 @@ public class StandbyContainerManager {
     return Optional.empty();
   }
 
-
   @Override
   public String toString() {
     return this.failovers.toString();
@@ -501,11 +524,18 @@ public class StandbyContainerManager {
       this.resourceRequests.add(samzaResourceRequest);
     }
 
-    // Check if this resource request is used for this failover
+    // Check if this resource request has been issued in this failover
     public synchronized boolean containsResourceRequest(SamzaResourceRequest 
samzaResourceRequest) {
       return this.resourceRequests.contains(samzaResourceRequest);
     }
 
+    // Check if this standby-host has been used in this failover, that is, if 
this host matches the host of a selected-standby,
+    // or if we have made a resourceRequest for this host
+    public boolean isStandbyHostUsed(String standbyHost) {
+      return this.selectedStandbyContainers.values().contains(standbyHost)
+          || this.resourceRequests.stream().filter(request -> 
request.getPreferredHost().equals(standbyHost)).count() > 0;
+    }
+
     @Override
     public String toString() {
       return "[activeContainerID: " + this.activeContainerID + " 
activeContainerResourceID: "
diff --git 
a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
 
b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
index 6d0fdb1..3a66573 100644
--- 
a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
+++ 
b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
@@ -563,11 +563,6 @@ public class YarnClusterResourceManager extends 
ClusterResourceManager implement
     if (samzaContainerId != null) {
       YarnContainer container = 
state.runningYarnContainers.get(samzaContainerId);
       log.info("Failed Stop on Yarn Container: {} had Samza ContainerId: {} ", 
containerId.toString(), samzaContainerId);
-      SamzaResource resource = new 
SamzaResource(container.resource().getVirtualCores(),
-          container.resource().getMemory(), container.nodeId().getHost(), 
containerId.toString());
-
-      log.info("Re-invoking stop stream processor for container: {}", 
containerId);
-      this.stopStreamProcessor(resource);// For now, we retry the stopping of 
the container
     } else {
       log.info("Got an invalid notification for container: {}", 
containerId.toString());
     }

Reply via email to