Sanil15 commented on a change in pull request #1219: SAMZA-2373: Container 
Placement Service (core functionality) for container move and restart
URL: https://github.com/apache/samza/pull/1219#discussion_r349276893
 
 

 ##########
 File path: 
samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
 ##########
 @@ -161,4 +237,140 @@ void handleExpiredRequestWithHostAffinityEnabled(String 
processorId, String pref
       allocator.requestResource(processorId, ResourceRequestState.ANY_HOST);
     }
   }
+
+  /**
+   * Registers a container placement action to move the running container to 
destination host, if destination host is same as the
+   * host on which container is running, container placement action is treated 
as a restart.
+   *
+   * When host affinity is disabled a move / restart is only allowed on 
ANY_HOST
+   * When host affinity is enabled move / restart is allowed on specific or 
ANY_HOST
+   * TODO: SAMZA-2378: Container Placements for Standby containers enabled jobs
+   *
+   * @param processorId logical id of the container 0, 1, 2
+   * @param destinationHost host where container is desired to be moved, 
acceptable values of this param are any valid
+   *                        hostname or "ANY_HOST"(in this case the request is 
sent to resource manager for any host)
+   * @param containerAllocator to request physical resources
+   */
+  public ContainerPlacementStatus registerContainerPlacementAction(String 
processorId, String destinationHost,
+      ContainerAllocator containerAllocator, Optional<Long> requestExpiry) {
+    LOG.info("Received ControlAction request to move or restart container with 
processor id {} to host {}", processorId, destinationHost);
+    ContainerPlacementStatus actionStatus = 
checkValidControlAction(processorId, destinationHost);
+    if (actionStatus.status == 
ContainerPlacementStatus.StatusCode.BAD_REQUEST) {
+      return actionStatus;
+    }
+
+    SamzaResource currentResource = 
samzaApplicationState.runningProcessors.get(processorId);
+    LOG.info("Processor ID: {} matched a active container with deployment ID: 
{} running on host: {}", processorId,
+        currentResource.getContainerId(), currentResource.getHost());
+
+    if (destinationHost.equals(ANY_HOST) || !hostAffinityEnabled) {
+      LOG.info("Changing the requested host to {} because either it is 
requested or host affinity is disabled",
+          ResourceRequestState.ANY_HOST);
+      destinationHost = ANY_HOST;
+    }
+
+    SamzaResourceRequest resourceRequest = 
containerAllocator.getResourceRequest(processorId, destinationHost);
+    ContainerPlacementMetadata actionMetaData =
+        new ContainerPlacementMetadata(processorId, 
currentResource.getContainerId(), currentResource.getHost(),
+            destinationHost, actionStatus, requestExpiry.isPresent() ? 
requestExpiry.get() : DEFAULT_CONTROL_ACTION_EXPIRY);
+
+    // Record the resource request for monitoring
+    
actionMetaData.setActionStatus(ContainerPlacementStatus.StatusCode.IN_PROGRESS);
+    actionMetaData.recordResourceRequest(resourceRequest);
+    actions.put(processorId, actionMetaData);
+    // note this also updates state.preferredHost count
+    containerAllocator.issueResourceRequest(resourceRequest);
+    LOG.info("Control action with metadata {} and issued a request for 
resources in progress", actionMetaData);
+    return actionStatus;
+  }
+
+  public Optional<Long> getActionExpiryTimeout(String processorId) {
+    return this.actions.containsKey(processorId) ? Optional.of(
+        this.actions.get(processorId).getRequestActionExpiryTimeout()) : 
Optional.empty();
 
 Review comment:
   I removed the  default timeout for action defined it was used as stop 
timeout, the default timeout for container expiry requests is defined in 
ContainerAllocator so when this is empty that gets applied

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to