mynameborat commented on a change in pull request #1446:
URL: https://github.com/apache/samza/pull/1446#discussion_r546927562
##########
File path:
samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
##########
@@ -56,8 +58,13 @@
// Resource-manager, used to stop containers
private ClusterResourceManager clusterResourceManager;
- public StandbyContainerManager(SamzaApplicationState samzaApplicationState,
- ClusterResourceManager clusterResourceManager, LocalityManager
localityManager) {
+ // FaultDomainManager, used to get fault domain information of different
hosts from the cluster manager.
+ private final FaultDomainManager faultDomainManager;
+
+ private final Config config;
+
+ public StandbyContainerManager(SamzaApplicationState samzaApplicationState,
ClusterResourceManager clusterResourceManager,
+ FaultDomainManager faultDomainManager,
LocalityManager localityManager, Config config) {
Review comment:
Consistency: keep the new parameters at the end.
##########
File path:
samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
##########
@@ -469,6 +566,10 @@ void releaseUnstartableContainer(SamzaResourceRequest
request, SamzaResource res
resourceRequestState.cancelResourceRequest(request);
}
+ private boolean isFaultDomainAwareStandbyEnabled() {
+ ClusterManagerConfig clusterManagerConfig = new
ClusterManagerConfig(config);
+ return clusterManagerConfig.getFaultDomainAwareStandbyEnabled();
+ }
Review comment:
Can be more efficient and determine if it is enabled in constructor and
use an instance variable to infer if its enabled?
Storing config seems unnecessary and creating ClusterManagerConfig on the
fly for invocation seems sub-optimal.
##########
File path:
samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
##########
@@ -233,9 +275,13 @@ private void initiateStandbyAwareAllocation(String
activeContainerID, String res
standbyHost, activeContainerID, standbyHost, resourceID);
FailoverMetadata failoverMetadata =
this.registerActiveContainerFailure(activeContainerID, resourceID);
+ Set<FaultDomain> allowedFaultDomains = new HashSet<>();
+ if (isFaultDomainAwareStandbyEnabled()) {
+ allowedFaultDomains =
getAllowedFaultDomainsForStandbyContainerGivenContainerId(activeContainerID);
+ }
// record the resource request, before issuing it to avoid race with
allocation-thread
SamzaResourceRequest resourceRequestForActive =
- containerAllocator.getResourceRequest(activeContainerID,
standbyHost);
+ containerAllocator.getResourceRequest(activeContainerID,
standbyHost, allowedFaultDomains);
Review comment:
boiler plate since the function introduced below in this PR already does
this.
context: `checkFaultDomainAwarenessEnabledAndRequestResource`
##########
File path:
samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
##########
@@ -375,24 +454,39 @@ boolean checkStandbyConstraints(String
containerIdToStart, String host) {
SamzaResource resource =
samzaApplicationState.pendingProcessors.get(containerID);
// return false if a conflicting container is pending for launch on the
host
- if (resource != null && resource.getHost().equals(host)) {
- log.info("Container {} cannot be started on host {} because container
{} is already scheduled on this host",
- containerIdToStart, host, containerID);
+ if (!checkStandbyConstraintsHelper(containerIdToStart, host, resource,
containerID, "pending")) {
return false;
}
// return false if a conflicting container is running on the host
resource = samzaApplicationState.runningProcessors.get(containerID);
- if (resource != null && resource.getHost().equals(host)) {
- log.info("Container {} cannot be started on host {} because container
{} is already running on this host",
- containerIdToStart, host, containerID);
+ if (!checkStandbyConstraintsHelper(containerIdToStart, host, resource,
containerID, "running")) {
Review comment:
I still feel this is unclean to me especially passing in parameters that
are not necessarily relevant to the core of what the method is responsible for.
e.g., "pending" vs "running" which is purely for logging and isn't state per
say and one realizes until reading through the code underneath.
If you don't want to inline and still feel refactor would help and make it
clear, i'd suggest extracting the existing logic into
`checkActiveAndStandbyOnSameHost` and then add your logic into
`checkActiveAndStandbyOnSameFaultDomain` and within `checkStandbyConstraints`
you can fire off both of these checks or one of them based on the fault domain
enabled or not.
##########
File path:
samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
##########
@@ -361,8 +407,41 @@ private FailoverMetadata
registerActiveContainerFailure(String activeContainerID
}
/**
- * Check if matching this SamzaResourceRequest to the given resource, meets
all standby-container container constraints.
+ * This method checks from the config if standby allocation is fault domain
aware or not, and requests resources accordingly.
+ *
+ * @param containerAllocator ContainerAllocator object that requests for
resources from the resource manager
+ * @param containerID Samza container ID that will be run when a resource is
allocated for this request
+ * @param preferredHost name of the host that you prefer to run the
processor on
+ */
+ void checkFaultDomainAwarenessEnabledAndRequestResource(ContainerAllocator
containerAllocator, String containerID, String preferredHost) {
Review comment:
nit: I'd prefer to rename the method to `requestResource` since the
intent of the method that way is clear. i.e. only request resource and
potentially return the `SamzaResourceRequest`.
How it does to request resource is keep within and can be inferred by
reading the method implementation. The name seems too much and the fact that
this returns void makes it unusable in some places which has the exact boiler
plate code.
##########
File path:
samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnFaultDomainManager.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.job.yarn;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.samza.clustermanager.FaultDomain;
+import org.apache.samza.clustermanager.FaultDomainManager;
+import org.apache.samza.clustermanager.FaultDomainType;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.MetricsRegistry;
+
+/**
+ * This class functionality works with the assumption that the
job.standbytasks.replication.factor is 2.
+ * For values greater than 2, it is possible that the standby containers could
be on the same rack as the active, or the already existing standby racks.
+ */
+public class YarnFaultDomainManager implements FaultDomainManager {
+
+ private static final String FAULT_DOMAIN_MANAGER_GROUP =
"yarn-fault-domain-manager";
+ private static final String HOST_TO_FAULT_DOMAIN_CACHE_UPDATES =
"host-to-fault-domain-cache-updates";
+ private Multimap<String, FaultDomain> hostToRackMap;
+ private final YarnClientImpl yarnClient;
+ private Counter hostToFaultDomainCacheUpdates;
+
+ public YarnFaultDomainManager(MetricsRegistry metricsRegistry) {
+ this.yarnClient = new YarnClientImpl();
+ yarnClient.init(new YarnConfiguration());
+ yarnClient.start();
+ this.hostToRackMap = computeHostToFaultDomainMap();
+ hostToFaultDomainCacheUpdates =
metricsRegistry.newCounter(FAULT_DOMAIN_MANAGER_GROUP,
HOST_TO_FAULT_DOMAIN_CACHE_UPDATES);
+ }
+
+ @VisibleForTesting
+ YarnFaultDomainManager(MetricsRegistry metricsRegistry, YarnClientImpl
yarnClient, Multimap<String, FaultDomain> hostToRackMap) {
+ this.yarnClient = yarnClient;
+ yarnClient.init(new YarnConfiguration());
+ yarnClient.start();
+ this.hostToRackMap = hostToRackMap;
+ hostToFaultDomainCacheUpdates =
metricsRegistry.newCounter(FAULT_DOMAIN_MANAGER_GROUP,
HOST_TO_FAULT_DOMAIN_CACHE_UPDATES);
+ }
+
+ /**
+ * This method returns all the last cached rack values in a cluster, for all
hosts that are healthy, up and running.
+ * @return a set of {@link FaultDomain}s
+ */
+ @Override
+ public Set<FaultDomain> getAllFaultDomains() {
+ return new HashSet<>(hostToRackMap.values());
+ }
+
+ /**
+ * This method returns the rack a particular host resides on based on the
internal cache.
+ * In case the rack of a host does not exist in this cache, we update the
cache by computing the host to rack map again using Yarn.
+ * @param host the host
+ * @return the {@link FaultDomain}
+ */
+ @Override
+ public Set<FaultDomain> getFaultDomainsForHost(String host) {
+ if (!hostToRackMap.containsKey(host)) {
+ hostToRackMap = computeHostToFaultDomainMap();
+ hostToFaultDomainCacheUpdates.inc();
+ }
+ return new HashSet<>(hostToRackMap.get(host));
+ }
+
+ /**
+ * This method checks if the two hostnames provided reside on the same rack.
+ * @param host1 hostname
+ * @param host2 hostname
+ * @return true if the hosts exist on the same rack
+ */
+ @Override
+ public boolean hasSameFaultDomains(String host1, String host2) {
+ if (!hostToRackMap.keySet().contains(host1) ||
!hostToRackMap.keySet().contains(host2)) {
+ hostToRackMap = computeHostToFaultDomainMap();
+ hostToFaultDomainCacheUpdates.inc();
+ }
+ return hostToRackMap.get(host1).equals(hostToRackMap.get(host2));
+ }
+
+ /**
+ * This method computes the host to rack map from Yarn.
+ * Only the hosts that are running in the cluster will be a part of this map.
+ * @return map of the host and the rack it resides on
+ */
+ @VisibleForTesting
+ protected Multimap<String, FaultDomain> computeHostToFaultDomainMap() {
Review comment:
nit: package private instead if it is only used for testing.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]