Sanil15 commented on a change in pull request #1219: SAMZA-2373: Container 
Placement Service (core functionality) for container move and restart
URL: https://github.com/apache/samza/pull/1219#discussion_r352929271
 
 

 ##########
 File path: 
samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
 ##########
 @@ -161,4 +262,140 @@ void handleExpiredRequestWithHostAffinityEnabled(String 
processorId, String pref
       allocator.requestResource(processorId, ResourceRequestState.ANY_HOST);
     }
   }
+
+  /**
+   * Registers a container placement action to move the running container to 
destination host, if destination host is same as the
+   * host on which container is running, container placement action is treated 
as a restart.
+   *
+   * When host affinity is disabled a move / restart is only allowed on 
ANY_HOST
+   * When host affinity is enabled move / restart is allowed on specific or 
ANY_HOST
+   * TODO: SAMZA-2378: Container Placements for Standby containers enabled jobs
+   *
+   * @param requestMessage request containing logical processor id 0,1,2 and 
host where container is desired to be moved,
+   *                       acceptable values of this param are any valid 
hostname or "ANY_HOST"(in this case the request
+   *                       is sent to resource manager for any host)
+   * @param containerAllocator to request physical resources
+   */
+  public void 
registerContainerPlacementAction(ContainerPlacementRequestMessage 
requestMessage, ContainerAllocator containerAllocator) {
+    LOG.info("Received ControlAction request to move or restart container with 
processor id {} to host {}",
+        requestMessage.getProcessorId(), requestMessage.getDestinationHost());
+    String processorId = requestMessage.getProcessorId();
+    String destinationHost = requestMessage.getDestinationHost();
+    Pair<ContainerPlacementMessage.StatusCode, String> actionStatus =
+        checkValidControlAction(processorId, destinationHost, 
requestMessage.getUuid());
+
+    // Request is bad just update the response on message & return
+    if (actionStatus.getKey() == 
ContainerPlacementMessage.StatusCode.BAD_REQUEST) {
+      return;
+    }
+
+    SamzaResource currentResource = 
samzaApplicationState.runningProcessors.get(processorId);
+    LOG.info("Processor ID: {} matched a active container with deployment ID: 
{} running on host: {}", processorId,
+        currentResource.getContainerId(), currentResource.getHost());
+
+    if (destinationHost.equals(ANY_HOST) || !hostAffinityEnabled) {
+      LOG.info("Changing the requested host to {} because either it is 
requested or host affinity is disabled",
+          ResourceRequestState.ANY_HOST);
+      destinationHost = ANY_HOST;
+    }
+
+    SamzaResourceRequest resourceRequest = 
containerAllocator.getResourceRequest(processorId, destinationHost);
+    Optional<Duration> requestExpiry =
+        requestMessage.getRequestExpiry() != null ? 
Optional.of(Duration.ofMillis(requestMessage.getRequestExpiry())) : 
Optional.empty();
+    ContainerPlacementMetadata actionMetaData =
+        new ContainerPlacementMetadata(requestMessage.getUuid(), processorId, 
currentResource.getContainerId(),
+            currentResource.getHost(), destinationHost, requestExpiry);
+
+    // Record the resource request for monitoring
+    
actionMetaData.setActionStatus(ContainerPlacementMessage.StatusCode.IN_PROGRESS,
 "Preferred Resources requested");
+    actionMetaData.recordResourceRequest(resourceRequest);
+    actions.put(processorId, actionMetaData);
+    containerAllocator.issueResourceRequest(resourceRequest);
+    LOG.info("Container Placement action with metadata {} and issued a request 
for resources in progress",
+        actionMetaData);
+  }
+
+  @VisibleForTesting
+  ContainerPlacementMetadata 
registerContainerPlacementActionForTest(ContainerPlacementRequestMessage 
requestMessage, ContainerAllocator containerAllocator) {
+    registerContainerPlacementAction(requestMessage, containerAllocator);
+    if (hasActiveContainerPlacementAction(requestMessage.getProcessorId())) {
+      return getPlacementActionMetadata(requestMessage.getProcessorId()).get();
+    }
+    return null;
+  }
+
+  public Optional<Duration> getActionExpiryTimeout(String processorId) {
+    return hasActiveContainerPlacementAction(processorId) ?
+        
getPlacementActionMetadata(processorId).get().getRequestActionExpiryTimeout() : 
Optional.empty();
+  }
+
+  private void handleControlActionFailure(String processorId, 
ContainerPlacementMetadata metaData, String failureMessage) {
+    metaData.setActionStatus(ContainerPlacementMessage.StatusCode.FAILED, 
failureMessage);
+    LOG.info("Container Placement action failed with metadata {}", metaData);
+  }
+
+  /**
+   * A ContainerPlacementAction is only active if it is either CREATED, 
ACCEPTED or IN_PROGRESS
+   */
+  private Boolean hasActiveContainerPlacementAction(String processorId) {
+    Optional<ContainerPlacementMetadata> metadata = 
getPlacementActionMetadata(processorId);
+    if (metadata.isPresent()) {
+      ContainerPlacementMessage.StatusCode actionStatus = 
metadata.get().getActionStatus();
+      return (actionStatus == ContainerPlacementMessage.StatusCode.CREATED) ||
+          (actionStatus == ContainerPlacementMessage.StatusCode.ACCEPTED) ||
+          (actionStatus == ContainerPlacementMessage.StatusCode.IN_PROGRESS);
 
 Review comment:
   simplified it I only need to check IN_PROGRESS, removed redundant!

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to