lakshmi-manasa-g commented on a change in pull request #1446:
URL: https://github.com/apache/samza/pull/1446#discussion_r545335678



##########
File path: 
samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
##########
@@ -381,6 +428,9 @@ public final void issueResourceRequest(SamzaResourceRequest 
request) {
     } else {
       state.preferredHostRequests.incrementAndGet();
     }
+    if (!request.getFaultDomains().isEmpty()) {

Review comment:
       is it possible getFaultDomains returns null? because the expired metric 
increment checks for it.

##########
File path: 
samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
##########
@@ -170,6 +170,26 @@
    */
   public final AtomicInteger failedContainerPlacementActions = new 
AtomicInteger(0);
 
+  /**
+   * Number of fault domain aware container requests made for a container.

Review comment:
       nit: wondering if this reads as though "# of requests per container" -- 
whereas we want to communicate "# of requests by job" right? not too strong 
about this. okay to drop. if changing, pl do for rest of the things below

##########
File path: 
samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
##########
@@ -409,16 +470,18 @@ public void 
checkStandbyConstraintsAndRunStreamProcessor(SamzaResourceRequest re
       log.info("Running container {} on {} meets standby constraints, 
preferredHost = {}", containerID,
           samzaResource.getHost(), preferredHost);
       containerAllocator.runStreamProcessor(request, preferredHost);
+      
samzaApplicationState.faultDomainAwareContainersStarted.incrementAndGet();

Review comment:
       this will increment and emit metrics even when config=off right? 
   are we okay with that?

##########
File path: 
samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java
##########
@@ -250,6 +258,14 @@ public String getContainerManagerClass() {
     return get(CLUSTER_MANAGER_FACTORY, CLUSTER_MANAGER_FACTORY_DEFAULT);
   }
 
+  public String getFaultDomainManagerClass() {
+    return get(FAULT_DOMAIN_MANAGER_FACTORY, 
FAULT_DOMAIN_MANAGER_FACTORY_DEFAULT);
+  }
+
+  public boolean getFaultDomainAwareStandbyEnabled() {
+    return getBoolean(FAULT_DOMAIN_AWARE_STANDBY_ENABLED, false);

Review comment:
       nit: might be nice to just keep default value="false" in 
FAULT_DOMAIN_AWARE_STANDBY_ENABLED_DEFUALT=false. 

##########
File path: 
samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
##########
@@ -375,24 +421,39 @@ boolean checkStandbyConstraints(String 
containerIdToStart, String host) {
       SamzaResource resource = 
samzaApplicationState.pendingProcessors.get(containerID);
 
       // return false if a conflicting container is pending for launch on the 
host
-      if (resource != null && resource.getHost().equals(host)) {
-        log.info("Container {} cannot be started on host {} because container 
{} is already scheduled on this host",
-            containerIdToStart, host, containerID);
+      if (!checkStandbyConstraintsHelper(containerIdToStart, host, resource, 
containerID)) {
         return false;
       }
 
       // return false if a conflicting container is running on the host
       resource = samzaApplicationState.runningProcessors.get(containerID);
-      if (resource != null && resource.getHost().equals(host)) {
-        log.info("Container {} cannot be started on host {} because container 
{} is already running on this host",
-            containerIdToStart, host, containerID);
+      if (!checkStandbyConstraintsHelper(containerIdToStart, host, resource, 
containerID)) {
         return false;
       }
     }
 
     return true;
   }
 
+  boolean checkStandbyConstraintsHelper(String containerIdToStart, String 
hostToStartContainerOn, SamzaResource existingResource, String 
existingContainerID) {
+    if (existingResource != null) {
+      ClusterManagerConfig clusterManagerConfig = new 
ClusterManagerConfig(config);
+      if (clusterManagerConfig.getFaultDomainAwareStandbyEnabled() && 
faultDomainManager.hasSameFaultDomains(hostToStartContainerOn, 
existingResource.getHost())) {
+        log.info("Container {} cannot be started on host {} because container 
{} is already scheduled on this rack",
+                containerIdToStart, hostToStartContainerOn, 
existingContainerID);
+        if (StandbyTaskUtil.isStandbyContainer(containerIdToStart)) {
+          
samzaApplicationState.failedFaultDomainAwareContainerAllocations.incrementAndGet();
+        }
+        return false;
+      } else if (existingResource.getHost().equals(hostToStartContainerOn)) {
+        log.info("Container {} cannot be started on host {} because container 
{} is already scheduled on this host",
+                containerIdToStart, hostToStartContainerOn, 
existingContainerID);

Review comment:
       im a little confused how the logging is impacted.
   Earlier it was 
   `if (resource != null && resource.getHost().equals(host)) {
           log.info("..running on this host",`
   
   Now it is         
   `if (existingResource != null) {
         // if rack aware enabled
         } else if (existingResource.getHost().equals(hostToStartContainerOn)) {
           log.info("Container {} cannot be started on host {} because 
container {} is already scheduled on this host"`
           
     Did you mean to say the exact log message is changed?

##########
File path: 
samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
##########
@@ -170,6 +170,26 @@
    */
   public final AtomicInteger failedContainerPlacementActions = new 
AtomicInteger(0);
 
+  /**
+   * Number of fault domain aware container requests made for a container.
+   */
+  public final AtomicInteger faultDomainAwareContainerRequests = new 
AtomicInteger(0);
+
+  /**
+   * Number of fault domain aware container requests made for a container.

Review comment:
       nit: java doc is a copy of above :P 

##########
File path: 
samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
##########
@@ -181,13 +207,14 @@ private void handleStandbyContainerStop(String 
standbyContainerID, String resour
 
       // request standbycontainer's host for active-container
       SamzaResourceRequest resourceRequestForActive =
-          containerAllocator.getResourceRequestWithDelay(activeContainerID, 
standbyContainerHostname, preferredHostRetryDelay);
+        containerAllocator.getResourceRequestWithDelay(activeContainerID, 
standbyContainerHostname, preferredHostRetryDelay);
       // record the resource request, before issuing it to avoid race with 
allocation-thread
       failoverMetadata.get().recordResourceRequest(resourceRequestForActive);
       containerAllocator.issueResourceRequest(resourceRequestForActive);
 
       // request any-host for standby container
-      containerAllocator.requestResource(standbyContainerID, 
ResourceRequestState.ANY_HOST);
+      containerAllocator.requestResource(standbyContainerID, 
ResourceRequestState.ANY_HOST,

Review comment:
       i feel this should also be guarded the config -- we would want the flow 
to be exactly same as earlier when config is off right.
   Same for other requests made.
   since yarnclusterResourceManager honors the fault domains only when config = 
on, im okay with dropping it since though the code is changing when config=off, 
the flow does not change.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to