PawasChhokra commented on a change in pull request #1446:
URL: https://github.com/apache/samza/pull/1446#discussion_r546188838



##########
File path: 
samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
##########
@@ -375,24 +421,39 @@ boolean checkStandbyConstraints(String 
containerIdToStart, String host) {
       SamzaResource resource = 
samzaApplicationState.pendingProcessors.get(containerID);
 
       // return false if a conflicting container is pending for launch on the 
host
-      if (resource != null && resource.getHost().equals(host)) {
-        log.info("Container {} cannot be started on host {} because container 
{} is already scheduled on this host",
-            containerIdToStart, host, containerID);
+      if (!checkStandbyConstraintsHelper(containerIdToStart, host, resource, 
containerID)) {
         return false;
       }
 
       // return false if a conflicting container is running on the host
       resource = samzaApplicationState.runningProcessors.get(containerID);
-      if (resource != null && resource.getHost().equals(host)) {
-        log.info("Container {} cannot be started on host {} because container 
{} is already running on this host",
-            containerIdToStart, host, containerID);
+      if (!checkStandbyConstraintsHelper(containerIdToStart, host, resource, 
containerID)) {
         return false;
       }
     }
 
     return true;
   }
 
+  boolean checkStandbyConstraintsHelper(String containerIdToStart, String 
hostToStartContainerOn, SamzaResource existingResource, String 
existingContainerID) {
+    if (existingResource != null) {
+      ClusterManagerConfig clusterManagerConfig = new 
ClusterManagerConfig(config);
+      if (clusterManagerConfig.getFaultDomainAwareStandbyEnabled() && 
faultDomainManager.hasSameFaultDomains(hostToStartContainerOn, 
existingResource.getHost())) {
+        log.info("Container {} cannot be started on host {} because container 
{} is already scheduled on this rack",
+                containerIdToStart, hostToStartContainerOn, 
existingContainerID);
+        if (StandbyTaskUtil.isStandbyContainer(containerIdToStart)) {
+          
samzaApplicationState.failedFaultDomainAwareContainerAllocations.incrementAndGet();
+        }
+        return false;
+      } else if (existingResource.getHost().equals(hostToStartContainerOn)) {
+        log.info("Container {} cannot be started on host {} because container 
{} is already scheduled on this host",
+                containerIdToStart, hostToStartContainerOn, 
existingContainerID);

Review comment:
       Thanks for noticing the logging difference. I've added a parameter to 
the helper method to log differently in each case.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to