vjagadish1989 commented on a change in pull request #903: SEP-19: Allocator 
changes for standby-aware container allocation, and active container failover
URL: https://github.com/apache/samza/pull/903#discussion_r256083123
 
 

 ##########
 File path: 
samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java
 ##########
 @@ -119,4 +153,219 @@ private void updateExpiryMetrics(SamzaResourceRequest 
request) {
       state.expiredPreferredHostRequests.incrementAndGet();
     }
   }
-}
+
+  // Method to run a container on the given resource if it meets all standby 
constraints. If not, we re-request resource
+  // for the container (similar to the case when we re-request for a 
launch-fail or request expiry).
+  private boolean 
checkStandbyTaskConstraintsAndRunStreamProcessor(SamzaResourceRequest request, 
String preferredHost,
+      SamzaResource samzaResource, SamzaApplicationState state) {
+
+    // If standby tasks are not enabled run streamprocessor and return true
+    if (!new JobConfig(config).getStandbyTasksEnabled()) {
+      runStreamProcessor(request, preferredHost);
+      return true;
+    }
+
+    String containerID = request.getContainerID();
+
+    if (checkStandbyConstraints(request, samzaResource, state)) {
+      // This resource can be used to launch this container
+      log.info("Running container {} on preferred host {} meets standby 
constraints, launching on {}", containerID,
+          preferredHost, samzaResource.getHost());
+      runStreamProcessor(request, preferredHost);
+      state.successfulStandbyAllocations.incrementAndGet();
+      return true;
+    } else {
+      // This resource cannot be used to launch this container, so we treat it 
like a launch fail, and issue an ANY_HOST request
+      log.info("Running container {} on host {} does not meet standby 
constraints, cancelling resource request, releasing resource, and making a new 
ANY_HOST request",
+          containerID, samzaResource.getHost());
+      resourceRequestState.releaseUnstartableContainer(samzaResource, 
preferredHost);
+      resourceRequestState.cancelResourceRequest(request);
+      requestResourceDueToLaunchFailOrExpiredRequest(containerID);
+      state.failedStandbyAllocations.incrementAndGet();
+      return false;
+    }
+  }
+
+  // Helper method to check if this SamzaResourceRequest for a container can 
be met on this resource, given standby
+  // container constraints, and the current set of pending and running 
containers
+  private boolean checkStandbyConstraints(SamzaResourceRequest request, 
SamzaResource samzaResource,
+      SamzaApplicationState samzaApplicationState) {
+    String containerIDToStart = request.getContainerID();
+    String host = samzaResource.getHost();
+    List<String> containerIDsForStandbyConstraints = 
this.standbyContainerConstraints.get(containerIDToStart);
+
+    // Check if any of these conflicting containers are running/launching on 
host
+    for (String containerID : containerIDsForStandbyConstraints) {
+      SamzaResource resource = 
samzaApplicationState.pendingContainers.get(containerID);
+
+      // return false if a conflicting container is pending for launch on the 
host
+      if (resource != null && resource.getHost().equals(host)) {
+        log.info("Container {} cannot be started on host {} because container 
{} is already scheduled on this host",
+            containerIDToStart, samzaResource.getHost(), containerID);
+        return false;
+      }
+
+      // return false if a conflicting container is running on the host
+      resource = samzaApplicationState.runningContainers.get(containerID);
+      if (resource != null && resource.getHost().equals(host)) {
+        log.info("Container {} cannot be started on host {} because container 
{} is already running on this host",
+            containerIDToStart, samzaResource.getHost(), containerID);
+        return false;
+      }
+    }
+
+    return true;
+  }
+
+  /**
+   * Intercept resource requests, which are due to either a launch-failure or 
resource-request expired or standby
+   * 1. a standby container, we proceed to make a anyhost request
+   * 2. an activeContainer, we try to fail-it-over to a standby
+   * @param containerID Identifier of the container that will be run when a 
resource is allocated
+   */
+  @Override
+  public void requestResourceDueToLaunchFailOrExpiredRequest(String 
containerID) {
+    if (StandbyTaskUtil.isStandbyContainer(containerID)) {
+      log.info("Handling rerequesting for container {} using an any host 
request");
+      super.requestResource(containerID, ResourceRequestState.ANY_HOST); // 
proceed with a the anyhost request
+    } else {
+      requestResource(containerID, ResourceRequestState.ANY_HOST); // invoke 
local method & select a new standby if possible
+    }
+  }
+
+  // Intercept issuing of resource requests from the CPM
+  // 1. for ActiveContainers and instead choose a StandbyContainer to stop
+  // 2. for a StandbyContainer (after it has been chosen for failover), to put 
the active on the standby's host
+  // and request another resource for the standby
+  // 3. for a standbyContainer (if not for a failover)
+  @Override
+  public void requestResource(String containerID, String preferredHost) {
+
+    // If StandbyTasks are not enabled, we simply forward the resource requests
+    if (!new JobConfig(config).getStandbyTasksEnabled()) {
+      super.requestResource(containerID, preferredHost);
+      return;
+    }
+
+    // If its an anyhost request for an active container, then we select a 
standby container to stop and place this activeContainer on that standby's host
+    // we may have already chosen a standby (which didnt work for a failover)
+    if (!StandbyTaskUtil.isStandbyContainer(containerID) && 
preferredHost.equals(ResourceRequestState.ANY_HOST)) {
+      initiateActiveContainerFailover(containerID);
+    } else if (StandbyTaskUtil.isStandbyContainer(containerID)) {
 
 Review comment:
   move failover handling logic up-stream - maybe to the 
`ContainerProcessManager`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to