rmatharu commented on a change in pull request #1281: SAMZA-2378: Container 
Placements support for Standby containers enabled jobs
URL: https://github.com/apache/samza/pull/1281#discussion_r382824513
 
 

 ##########
 File path: 
samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
 ##########
 @@ -435,32 +459,59 @@ private boolean 
deQueueAction(ContainerPlacementRequestMessage requestMessage) {
 
   /**
    * A valid container placement action needs a valid processor id. Duplicate 
actions are handled by de-duping on uuid.
+   * If standby containers are enabled destination host requested must meet 
standby constraints
    *
    * @param requestMessage container placement request message
    * @return Pair<ContainerPlacementMessage.StatusCode, String> which is 
status code & response suggesting if the request is valid
    */
   private Pair<ContainerPlacementMessage.StatusCode, String> 
validatePlacementAction(ContainerPlacementRequestMessage requestMessage) {
-    String errorMessagePrefix = String.format("ContainerPlacement request: %s 
is rejected due to", requestMessage);
+    String errorMessagePrefix = 
ContainerPlacementMessage.StatusCode.BAD_REQUEST + " reason: ";
     Boolean invalidAction = false;
     String errorMessage = null;
-    if (standbyContainerManager.isPresent()) {
-      errorMessage = String.format("%s not supported for hot standby enabled", 
errorMessagePrefix);
+    if 
(!samzaApplicationState.runningProcessors.containsKey(requestMessage.getProcessorId())
 &&
+        
!samzaApplicationState.pendingProcessors.containsKey(requestMessage.getProcessorId())
+    ) {
+      errorMessage = String.format("%s invalid processor id neither in running 
or pending processors", errorMessagePrefix);
       invalidAction = true;
     } else if (placementRequestsCache.containsKey(requestMessage.getUuid())) {
       errorMessage = String.format("%s duplicate UUID of the request, please 
retry", errorMessagePrefix);
       invalidAction = true;
-    } else if (Integer.parseInt(requestMessage.getProcessorId()) >= 
samzaApplicationState.processorCount.get()
-    ) {
-      errorMessage = String.format("%s invalid processor id", 
errorMessagePrefix);
+    } else if (standbyContainerManager.isPresent() && 
!standbyContainerManager.get()
+        .checkStandbyConstraints(requestMessage.getProcessorId(), 
requestMessage.getDestinationHost())) {
+      errorMessage = String.format("%s destination host does not meet standby 
constraints", errorMessagePrefix);
       invalidAction = true;
     }
 
     if (invalidAction) {
-      LOG.info(errorMessage);
       return new 
ImmutablePair<>(ContainerPlacementMessage.StatusCode.BAD_REQUEST, errorMessage);
     }
 
     return new ImmutablePair<>(ContainerPlacementMessage.StatusCode.ACCEPTED, 
"Request is accepted");
   }
 
+  /**
+   * An active and a standby container cannot have a concurrent placement 
action in flight, if they do actions are taken
+   * serially in the order of timestamps
+   */
+  private boolean checkStandbyOrActiveContainerHasActivePlacementAction(String 
processorId) {
+    if (standbyContainerManager.isPresent()) {
+      // If requested placement action is on a standby container and its 
active container has a placement request,
+      // this request shall not be de-queued until in-flight action on active 
container is complete
+      if (StandbyTaskUtil.isStandbyContainer(processorId) && 
hasActiveContainerPlacementAction(
+          StandbyTaskUtil.getActiveContainerId(processorId))) {
+        return true;
+      }
+      // If requested placement action is on a standby container and its 
active container has a placement request,
+      // this request shall not be de-queued until in-flight action on active 
container is complete
+      if (!StandbyTaskUtil.isStandbyContainer(processorId)) {
+        for (String standby : 
standbyContainerManager.get().getStandbyList(processorId)) {
+          if (hasActiveContainerPlacementAction(standby)) {
+            return true;
+          }
+        }
+      }
+    }
+    return false;
+  }
 
 Review comment:
   This method can be simplified/inlined with hasActiveAction?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to