This is an automated email from the ASF dual-hosted git repository.

bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 29f2ef7  SAMZA-2579: Force restart feature for Container Placements 
(#1414)
29f2ef7 is described below

commit 29f2ef7cd9169ce8e0b474228c59a3a1da71e5c2
Author: Sanil Jain <[email protected]>
AuthorDate: Mon Aug 24 11:59:26 2020 -0700

    SAMZA-2579: Force restart feature for Container Placements (#1414)
    
    Changes: The current restart ability for container placements works in the 
following way:
    
    Tries to fetch resources on a host
    Stops the active container if resources are accrued
    Tried to start the container on host accrued
    In production, we have seen the following observation at Linkedin
    
    Some jobs are configured to use resources for the peak which leads to no 
headroom left on a host for requesting additional resources
    This leads to restart requests failing due to not able to get resources on 
that host
    A fix to this is to implement a force-restart utility , in this version we 
will stop the container first and then accrue resources. The upside being we 
will at least free up the resources on the host before issuing resource 
request, the downside being it will be a best-effort scenario to bring that 
container back up on that host
    
    API Changes: Added new param values to destinationHost param for container 
placement request message
    
    LAST_SEEN: Tries to restart a container on last seen host with RESERVE -> 
STOP -> MOVE policy
    
    FORCE_RESTART_LAST_SEEN: Tries to restart a container on last seen host 
with STOP -> RESERVE -> MOVE policy
---
 .../samza/clustermanager/ContainerManager.java     | 39 ++++++++++++++++++++--
 1 file changed, 37 insertions(+), 2 deletions(-)

diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
index 70a050c..2730c0c 100644
--- 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
@@ -54,6 +54,8 @@ public class ContainerManager {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(ContainerManager.class);
   private static final String ANY_HOST = ResourceRequestState.ANY_HOST;
+  private static final String LAST_SEEN = "LAST_SEEN";
+  private static final String FORCE_RESTART_LAST_SEEN = 
"FORCE_RESTART_LAST_SEEN";
   private static final int UUID_CACHE_SIZE = 20000;
 
   /**
@@ -366,9 +368,18 @@ public class ContainerManager {
    * Container placement requests are tied to deploymentId which is currently 
{@link org.apache.samza.config.ApplicationConfig#APP_RUN_ID}
    * On job restarts container placement requests queued for the previous 
deployment are deleted using this
    *
+   * All kinds of container placement request except for when destination host 
is "FORCE_RESTART_LAST_SEEN" work with
+   * a RESERVE - STOP - START policy, which means resources are accrued first 
before issuing a container stop, failure to
+   * do so will leave the running container untouched. Requests with 
destination host "FORCE_RESTART_LAST_SEEN" works with
+   * STOP - RESERVE - START policy, which means running container is stopped 
first then resource request are issued, this case
+   * is equivalent to doing a kill -9 on a container
+   *
    * @param requestMessage request containing logical processor id 0,1,2 and 
host where container is desired to be moved,
-   *                       acceptable values of this param are any valid 
hostname or "ANY_HOST"(in this case the request
-   *                       is sent to resource manager for any host)
+   *                       acceptable values of this param are
+   *                       - valid hostname
+   *                       - "ANY_HOST" in this case the request is sent to 
resource manager for any host
+   *                       - "LAST_SEEN" in this case request is sent to 
resource manager for last seen host
+   *                       - "FORCE_RESTART_LAST_SEEN" in this case request is 
sent to resource manager for last seen host
    * @param containerAllocator to request physical resources
    */
   public void 
registerContainerPlacementAction(ContainerPlacementRequestMessage 
requestMessage, ContainerAllocator containerAllocator) {
@@ -391,6 +402,30 @@ public class ContainerManager {
       return;
     }
 
+    /*
+     * When destination host is {@code FORCE_RESTART_LAST_SEEN} its treated as 
eqvivalent to kill -9 operation for the container
+     * In this scenario container is stopped first and we fallback to normal 
restart path so the policy here is
+     * stop - reserve - move
+     */
+    if (destinationHost.equals(FORCE_RESTART_LAST_SEEN)) {
+      LOG.info("Issuing a force restart for Processor ID: {} for 
ContainerPlacement action request {}", processorId, requestMessage);
+      
clusterResourceManager.stopStreamProcessor(samzaApplicationState.runningProcessors.get(processorId));
+      writeContainerPlacementResponseMessage(requestMessage, 
ContainerPlacementMessage.StatusCode.SUCCEEDED,
+          "Successfully issued a stop container request falling back to normal 
restart path");
+      return;
+    }
+
+    /**
+     * When destination host is {@code LAST_SEEN} its treated as a restart 
request on the host where container is running
+     * on or has been seen last, but in this policy would be reserve - stop - 
move, which means reserve resources first
+     * only if resources are accrued stop the active container and issue a 
start on it on resource acquired
+     */
+    if (destinationHost.equals(LAST_SEEN)) {
+      String lastSeenHost = getSourceHostForContainer(requestMessage);
+      LOG.info("Changing the requested host for placement action to {} because 
requested host is LAST_SEEN", lastSeenHost);
+      destinationHost = lastSeenHost;
+    }
+
     // TODO: SAMZA-2457: Allow host affinity disabled jobs to move containers 
to specific host
     if (!hostAffinityEnabled) {
       LOG.info("Changing the requested host for placement action to {} because 
host affinity is disabled", ResourceRequestState.ANY_HOST);

Reply via email to