mynameborat commented on a change in pull request #1446:
URL: https://github.com/apache/samza/pull/1446#discussion_r546927562



##########
File path: 
samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
##########
@@ -56,8 +58,13 @@
   // Resource-manager, used to stop containers
   private ClusterResourceManager clusterResourceManager;
 
-  public StandbyContainerManager(SamzaApplicationState samzaApplicationState,
-      ClusterResourceManager clusterResourceManager, LocalityManager 
localityManager) {
+  // FaultDomainManager, used to get fault domain information of different 
hosts from the cluster manager.
+  private final FaultDomainManager faultDomainManager;
+
+  private final Config config;
+
+  public StandbyContainerManager(SamzaApplicationState samzaApplicationState, 
ClusterResourceManager clusterResourceManager,
+                                 FaultDomainManager faultDomainManager, 
LocalityManager localityManager, Config config) {

Review comment:
       Consistency: keep the new parameters at the end.
   
   

##########
File path: 
samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
##########
@@ -469,6 +566,10 @@ void releaseUnstartableContainer(SamzaResourceRequest 
request, SamzaResource res
     resourceRequestState.cancelResourceRequest(request);
   }
 
+  private boolean isFaultDomainAwareStandbyEnabled() {
+    ClusterManagerConfig clusterManagerConfig = new 
ClusterManagerConfig(config);
+    return clusterManagerConfig.getFaultDomainAwareStandbyEnabled();
+  }

Review comment:
       Can be more efficient and determine if it is enabled in constructor and 
use an instance variable to infer if its enabled?
   Storing config seems unnecessary and creating ClusterManagerConfig on the 
fly for invocation seems sub-optimal.
   

##########
File path: 
samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
##########
@@ -233,9 +275,13 @@ private void initiateStandbyAwareAllocation(String 
activeContainerID, String res
             standbyHost, activeContainerID, standbyHost, resourceID);
         FailoverMetadata failoverMetadata = 
this.registerActiveContainerFailure(activeContainerID, resourceID);
 
+        Set<FaultDomain> allowedFaultDomains = new HashSet<>();
+        if (isFaultDomainAwareStandbyEnabled()) {
+          allowedFaultDomains = 
getAllowedFaultDomainsForStandbyContainerGivenContainerId(activeContainerID);
+        }
         // record the resource request, before issuing it to avoid race with 
allocation-thread
         SamzaResourceRequest resourceRequestForActive =
-            containerAllocator.getResourceRequest(activeContainerID, 
standbyHost);
+                containerAllocator.getResourceRequest(activeContainerID, 
standbyHost, allowedFaultDomains);

Review comment:
       boiler plate since the function introduced below in this PR already does 
this. 
   context: `checkFaultDomainAwarenessEnabledAndRequestResource`

##########
File path: 
samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
##########
@@ -375,24 +454,39 @@ boolean checkStandbyConstraints(String 
containerIdToStart, String host) {
       SamzaResource resource = 
samzaApplicationState.pendingProcessors.get(containerID);
 
       // return false if a conflicting container is pending for launch on the 
host
-      if (resource != null && resource.getHost().equals(host)) {
-        log.info("Container {} cannot be started on host {} because container 
{} is already scheduled on this host",
-            containerIdToStart, host, containerID);
+      if (!checkStandbyConstraintsHelper(containerIdToStart, host, resource, 
containerID, "pending")) {
         return false;
       }
 
       // return false if a conflicting container is running on the host
       resource = samzaApplicationState.runningProcessors.get(containerID);
-      if (resource != null && resource.getHost().equals(host)) {
-        log.info("Container {} cannot be started on host {} because container 
{} is already running on this host",
-            containerIdToStart, host, containerID);
+      if (!checkStandbyConstraintsHelper(containerIdToStart, host, resource, 
containerID, "running")) {

Review comment:
       I still feel this is unclean to me especially passing in parameters that 
are not necessarily relevant to the core of what the method is responsible for. 
e.g., "pending" vs "running" which is purely for logging and isn't state per 
say and one realizes until reading through the code underneath.
   
   If you don't want to inline and still feel refactor would help and make it 
clear, i'd suggest extracting the existing logic into 
`checkActiveAndStandbyOnSameHost` and then add your logic into 
`checkActiveAndStandbyOnSameFaultDomain` and within `checkStandbyConstraints` 
you can fire off both of these checks or one of them based on the fault domain 
enabled or not.

##########
File path: 
samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
##########
@@ -361,8 +407,41 @@ private FailoverMetadata 
registerActiveContainerFailure(String activeContainerID
   }
 
   /**
-   * Check if matching this SamzaResourceRequest to the given resource, meets 
all standby-container container constraints.
+   * This method checks from the config if standby allocation is fault domain 
aware or not, and requests resources accordingly.
+   *
+   * @param containerAllocator ContainerAllocator object that requests for 
resources from the resource manager
+   * @param containerID Samza container ID that will be run when a resource is 
allocated for this request
+   * @param preferredHost name of the host that you prefer to run the 
processor on
+   */
+  void checkFaultDomainAwarenessEnabledAndRequestResource(ContainerAllocator 
containerAllocator, String containerID, String preferredHost) {

Review comment:
       nit: I'd prefer to rename the method to `requestResource` since the 
intent of the method that way is clear. i.e. only request resource and 
potentially return the `SamzaResourceRequest`.
   
   How it does to request resource is keep within and can be inferred by 
reading the method implementation. The name seems too much and the fact that 
this returns void makes it unusable in some places which has the exact boiler 
plate code.

##########
File path: 
samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnFaultDomainManager.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.job.yarn;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.samza.clustermanager.FaultDomain;
+import org.apache.samza.clustermanager.FaultDomainManager;
+import org.apache.samza.clustermanager.FaultDomainType;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.MetricsRegistry;
+
+/**
+ * This class functionality works with the assumption that the 
job.standbytasks.replication.factor is 2.
+ * For values greater than 2, it is possible that the standby containers could 
be on the same rack as the active, or the already existing standby racks.
+ */
+public class YarnFaultDomainManager implements FaultDomainManager {
+
+  private static final String FAULT_DOMAIN_MANAGER_GROUP = 
"yarn-fault-domain-manager";
+  private static final String HOST_TO_FAULT_DOMAIN_CACHE_UPDATES = 
"host-to-fault-domain-cache-updates";
+  private Multimap<String, FaultDomain> hostToRackMap;
+  private final YarnClientImpl yarnClient;
+  private Counter hostToFaultDomainCacheUpdates;
+
+  public YarnFaultDomainManager(MetricsRegistry metricsRegistry) {
+    this.yarnClient = new YarnClientImpl();
+    yarnClient.init(new YarnConfiguration());
+    yarnClient.start();
+    this.hostToRackMap = computeHostToFaultDomainMap();
+    hostToFaultDomainCacheUpdates = 
metricsRegistry.newCounter(FAULT_DOMAIN_MANAGER_GROUP, 
HOST_TO_FAULT_DOMAIN_CACHE_UPDATES);
+  }
+
+  @VisibleForTesting
+  YarnFaultDomainManager(MetricsRegistry metricsRegistry, YarnClientImpl 
yarnClient, Multimap<String, FaultDomain> hostToRackMap) {
+    this.yarnClient = yarnClient;
+    yarnClient.init(new YarnConfiguration());
+    yarnClient.start();
+    this.hostToRackMap = hostToRackMap;
+    hostToFaultDomainCacheUpdates = 
metricsRegistry.newCounter(FAULT_DOMAIN_MANAGER_GROUP, 
HOST_TO_FAULT_DOMAIN_CACHE_UPDATES);
+  }
+
+  /**
+   * This method returns all the last cached rack values in a cluster, for all 
hosts that are healthy, up and running.
+   * @return a set of {@link FaultDomain}s
+   */
+  @Override
+  public Set<FaultDomain> getAllFaultDomains() {
+    return new HashSet<>(hostToRackMap.values());
+  }
+
+  /**
+   * This method returns the rack a particular host resides on based on the 
internal cache.
+   * In case the rack of a host does not exist in this cache, we update the 
cache by computing the host to rack map again using Yarn.
+   * @param host the host
+   * @return the {@link FaultDomain}
+   */
+  @Override
+  public Set<FaultDomain> getFaultDomainsForHost(String host) {
+    if (!hostToRackMap.containsKey(host)) {
+      hostToRackMap = computeHostToFaultDomainMap();
+      hostToFaultDomainCacheUpdates.inc();
+    }
+    return new HashSet<>(hostToRackMap.get(host));
+  }
+
+  /**
+   * This method checks if the two hostnames provided reside on the same rack.
+   * @param host1 hostname
+   * @param host2 hostname
+   * @return true if the hosts exist on the same rack
+   */
+  @Override
+  public boolean hasSameFaultDomains(String host1, String host2) {
+    if (!hostToRackMap.keySet().contains(host1) || 
!hostToRackMap.keySet().contains(host2)) {
+      hostToRackMap = computeHostToFaultDomainMap();
+      hostToFaultDomainCacheUpdates.inc();
+    }
+    return hostToRackMap.get(host1).equals(hostToRackMap.get(host2));
+  }
+
+  /**
+   * This method computes the host to rack map from Yarn.
+   * Only the hosts that are running in the cluster will be a part of this map.
+   * @return map of the host and the rack it resides on
+   */
+  @VisibleForTesting
+  protected Multimap<String, FaultDomain> computeHostToFaultDomainMap() {

Review comment:
       nit: package private instead if it is only used for testing.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to