PawasChhokra commented on a change in pull request #1446:
URL: https://github.com/apache/samza/pull/1446#discussion_r546188896



##########
File path: 
samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
##########
@@ -375,24 +421,39 @@ boolean checkStandbyConstraints(String 
containerIdToStart, String host) {
       SamzaResource resource = 
samzaApplicationState.pendingProcessors.get(containerID);
 
       // return false if a conflicting container is pending for launch on the 
host
-      if (resource != null && resource.getHost().equals(host)) {
-        log.info("Container {} cannot be started on host {} because container 
{} is already scheduled on this host",
-            containerIdToStart, host, containerID);
+      if (!checkStandbyConstraintsHelper(containerIdToStart, host, resource, 
containerID)) {
         return false;
       }
 
       // return false if a conflicting container is running on the host
       resource = samzaApplicationState.runningProcessors.get(containerID);
-      if (resource != null && resource.getHost().equals(host)) {
-        log.info("Container {} cannot be started on host {} because container 
{} is already running on this host",
-            containerIdToStart, host, containerID);
+      if (!checkStandbyConstraintsHelper(containerIdToStart, host, resource, 
containerID)) {
         return false;
       }
     }
 
     return true;
   }
 
+  boolean checkStandbyConstraintsHelper(String containerIdToStart, String 
hostToStartContainerOn, SamzaResource existingResource, String 
existingContainerID) {
+    if (existingResource != null) {
+      ClusterManagerConfig clusterManagerConfig = new 
ClusterManagerConfig(config);
+      if (clusterManagerConfig.getFaultDomainAwareStandbyEnabled() && 
faultDomainManager.hasSameFaultDomains(hostToStartContainerOn, 
existingResource.getHost())) {
+        log.info("Container {} cannot be started on host {} because container 
{} is already scheduled on this rack",
+                containerIdToStart, hostToStartContainerOn, 
existingContainerID);
+        if (StandbyTaskUtil.isStandbyContainer(containerIdToStart)) {
+          
samzaApplicationState.failedFaultDomainAwareContainerAllocations.incrementAndGet();
+        }
+        return false;
+      } else if (existingResource.getHost().equals(hostToStartContainerOn)) {
+        log.info("Container {} cannot be started on host {} because container 
{} is already scheduled on this host",
+                containerIdToStart, hostToStartContainerOn, 
existingContainerID);

Review comment:
       Instead of making this inline, I added a parameter for the log to avoid 
code duplication and also made `ClusterManagerConfig` as a parameter. Let me 
know if that makes sense to you.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to