This is an automated email from the ASF dual-hosted git repository.
bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 5748fa6 SAMZA-2605: Make Standby Container Requests Rack Aware
(#1446)
5748fa6 is described below
commit 5748fa6adec729840beef03495aa6b74f739f017
Author: Pawas Chhokra <[email protected]>
AuthorDate: Thu Dec 24 11:20:54 2020 -0800
SAMZA-2605: Make Standby Container Requests Rack Aware (#1446)
Feature: The aim of this feature is to make all standby container requests
rack aware such that all active containers and their corresponding standby
containers are always on different racks. This helps with decreased downtime of
applications during rack failures.
One of the requirements of this feature is that the value of
job.standbytasks.replication.factor is at max 2 for the rack awareness
functionality to be honored.
Changes: This PR uses the FaultDomainManager interface for Yarn to request
for rack aware nodes while making standby container requests.
Usage Instructions: For a job with host affinity and standby containers,
set the config cluster-manager.fault-domain-aware.standby.enabled to true to
enable this feature.
---
.../samza/clustermanager/ContainerAllocator.java | 53 +++++++++
.../samza/clustermanager/ContainerManager.java | 6 +-
.../clustermanager/ContainerProcessManager.java | 27 ++++-
.../clustermanager/StandbyContainerManager.java | 130 ++++++++++++++++++---
.../TestClusterBasedJobCoordinator.java | 1 +
.../TestContainerAllocatorWithHostAffinity.java | 11 +-
.../TestContainerAllocatorWithoutHostAffinity.java | 9 +-
.../TestContainerPlacementActions.java | 15 ++-
.../TestContainerProcessManager.java | 33 +++---
.../samza/job/yarn/YarnClusterResourceManager.java | 7 +-
10 files changed, 239 insertions(+), 53 deletions(-)
diff --git
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
index fa5f783..88be21f 100644
---
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
+++
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
@@ -22,6 +22,7 @@ import java.time.Duration;
import java.time.Instant;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import org.apache.samza.SamzaException;
import org.apache.samza.config.ClusterManagerConfig;
import org.apache.samza.config.Config;
@@ -348,6 +349,16 @@ public class ContainerAllocator implements Runnable {
}
/**
+ * Requests a resource from the cluster manager
+ * @param processorId Samza processor ID that will be run when a resource is
allocated for this request
+ * @param preferredHost name of the host that you prefer to run the
processor on
+ * @param faultDomains set of fault domains on which to schedule this
resource
+ */
+ public final void requestResource(String processorId, String preferredHost,
Set<FaultDomain> faultDomains) {
+ requestResourceWithDelay(processorId, preferredHost, Duration.ZERO,
faultDomains);
+ }
+
+ /**
* Requests a resource from the cluster manager with a request timestamp of
the current time plus the specified delay.
* @param processorId Samza processor ID that will be run when a resource is
allocated for this request
* @param preferredHost name of the host that you prefer to run the
processor on
@@ -359,6 +370,18 @@ public class ContainerAllocator implements Runnable {
}
/**
+ * Requests a resource from the cluster manager with a request timestamp of
the current time plus the specified delay.
+ * @param processorId Samza processor ID that will be run when a resource is
allocated for this request
+ * @param preferredHost name of the host that you prefer to run the
processor on
+ * @param delay the {@link Duration} to add to the request timestamp
+ * @param faultDomains set of fault domains on which to schedule this
resource
+ */
+ public final void requestResourceWithDelay(String processorId, String
preferredHost, Duration delay, Set<FaultDomain> faultDomains) {
+ SamzaResourceRequest request = getResourceRequestWithDelay(processorId,
preferredHost, delay, faultDomains);
+ issueResourceRequest(request);
+ }
+
+ /**
* Creates a {@link SamzaResourceRequest} to send to the cluster manager
* @param processorId Samza processor ID that will be run when a resource is
allocated for this request
* @param preferredHost name of the host that you prefer to run the
processor on
@@ -369,6 +392,17 @@ public class ContainerAllocator implements Runnable {
}
/**
+ * Creates a {@link SamzaResourceRequest} to send to the cluster manager
+ * @param processorId Samza processor ID that will be run when a resource is
allocated for this request
+ * @param preferredHost name of the host that you prefer to run the
processor on
+ * @param faultDomains set of fault domains on which to schedule this
resource
+ * @return the created request
+ */
+ public final SamzaResourceRequest getResourceRequest(String processorId,
String preferredHost, Set<FaultDomain> faultDomains) {
+ return getResourceRequestWithDelay(processorId, preferredHost,
Duration.ZERO, faultDomains);
+ }
+
+ /**
* Creates a {@link SamzaResourceRequest} to send to the cluster manager
with a request timestamp of the current time
* plus the specified delay.
* @param processorId Samza processor ID that will be run when a resource is
allocated for this request
@@ -380,6 +414,19 @@ public class ContainerAllocator implements Runnable {
return new SamzaResourceRequest(this.containerNumCpuCores,
this.containerMemoryMb, preferredHost, processorId, Instant.now().plus(delay));
}
+ /**
+ * Creates a {@link SamzaResourceRequest} to send to the cluster manager
with a request timestamp of the current time
+ * plus the specified delay.
+ * @param processorId Samza processor ID that will be run when a resource is
allocated for this request
+ * @param preferredHost name of the host that you prefer to run the
processor on
+ * @param delay the {@link Duration} to add to the request timestamp
+ * @param faultDomains set of fault domains on which to schedule this
resource
+ * @return the created request
+ */
+ public final SamzaResourceRequest getResourceRequestWithDelay(String
processorId, String preferredHost, Duration delay, Set<FaultDomain>
faultDomains) {
+ return new SamzaResourceRequest(this.containerNumCpuCores,
this.containerMemoryMb, preferredHost, processorId, Instant.now().plus(delay),
faultDomains);
+ }
+
public final void issueResourceRequest(SamzaResourceRequest request) {
resourceRequestState.addResourceRequest(request);
state.containerRequests.incrementAndGet();
@@ -388,6 +435,9 @@ public class ContainerAllocator implements Runnable {
} else {
state.preferredHostRequests.incrementAndGet();
}
+ if (!request.getFaultDomains().isEmpty()) {
+ state.faultDomainAwareContainerRequests.incrementAndGet();
+ }
}
/**
@@ -480,5 +530,8 @@ public class ContainerAllocator implements Runnable {
} else {
state.expiredPreferredHostRequests.incrementAndGet();
}
+ if (!request.getFaultDomains().isEmpty()) {
+ state.expiredFaultDomainAwareContainerRequests.incrementAndGet();
+ }
}
}
diff --git
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
index 24130fd..5fcf328 100644
---
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
+++
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
@@ -28,6 +28,7 @@ import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import
org.apache.samza.clustermanager.container.placement.ContainerPlacementMetadataStore;
import
org.apache.samza.clustermanager.container.placement.ContainerPlacementMetadata;
+import org.apache.samza.config.Config;
import org.apache.samza.container.LocalityManager;
import org.apache.samza.container.placement.ContainerPlacementMessage;
import org.apache.samza.container.placement.ContainerPlacementRequestMessage;
@@ -88,8 +89,9 @@ public class ContainerManager {
public ContainerManager(ContainerPlacementMetadataStore
containerPlacementMetadataStore,
SamzaApplicationState samzaApplicationState, ClusterResourceManager
clusterResourceManager,
- boolean hostAffinityEnabled, boolean standByEnabled, LocalityManager
localityManager) {
+ boolean hostAffinityEnabled, boolean standByEnabled, LocalityManager
localityManager, FaultDomainManager faultDomainManager, Config config) {
Preconditions.checkNotNull(localityManager, "Locality manager cannot be
null");
+ Preconditions.checkNotNull(faultDomainManager, "Fault domain manager
cannot be null");
this.samzaApplicationState = samzaApplicationState;
this.clusterResourceManager = clusterResourceManager;
this.actions = new ConcurrentHashMap<>();
@@ -100,7 +102,7 @@ public class ContainerManager {
// Enable standby container manager if required
if (standByEnabled) {
this.standbyContainerManager =
- Optional.of(new StandbyContainerManager(samzaApplicationState,
clusterResourceManager, localityManager));
+ Optional.of(new StandbyContainerManager(samzaApplicationState,
clusterResourceManager, localityManager, config, faultDomainManager));
} else {
this.standbyContainerManager = Optional.empty();
}
diff --git
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
index 995cf7d..143e0b3 100644
---
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
+++
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
@@ -149,6 +149,9 @@ public class ContainerProcessManager implements
ClusterResourceManager.Callback
ResourceManagerFactory factory =
getContainerProcessManagerFactory(clusterManagerConfig);
this.clusterResourceManager =
checkNotNull(factory.getClusterResourceManager(this, state));
+ FaultDomainManagerFactory faultDomainManagerFactory =
getFaultDomainManagerFactory(clusterManagerConfig);
+ FaultDomainManager faultDomainManager =
checkNotNull(faultDomainManagerFactory.getFaultDomainManager(config, registry));
+
// Initialize metrics
this.containerProcessManagerMetrics = new
ContainerProcessManagerMetrics(config, state, registry);
this.jvmMetrics = new JvmMetrics(registry);
@@ -172,8 +175,8 @@ public class ContainerProcessManager implements
ClusterResourceManager.Callback
// Wire all metrics to all reporters
this.metricsReporters.values().forEach(reporter ->
reporter.register(METRICS_SOURCE_NAME, registry));
- this.containerManager = new ContainerManager(metadataStore, state,
clusterResourceManager, hostAffinityEnabled,
- jobConfig.getStandbyTasksEnabled(), localityManager);
+ this.containerManager = new ContainerManager(metadataStore, state,
clusterResourceManager,
+ hostAffinityEnabled, jobConfig.getStandbyTasksEnabled(),
localityManager, faultDomainManager, config);
this.containerAllocator = new
ContainerAllocator(this.clusterResourceManager, config, state,
hostAffinityEnabled, this.containerManager);
this.allocatorThread = new Thread(this.containerAllocator, "Container
Allocator Thread");
@@ -649,6 +652,26 @@ public class ContainerProcessManager implements
ClusterResourceManager.Callback
}
/**
+ * Returns an instantiated {@link FaultDomainManagerFactory} from a {@link
ClusterManagerConfig}. The
+ * {@link FaultDomainManagerFactory} is used to return an implementation of
a {@link FaultDomainManager}
+ *
+ * @param clusterManagerConfig, the cluster manager config to parse.
+ *
+ */
+ private FaultDomainManagerFactory getFaultDomainManagerFactory(final
ClusterManagerConfig clusterManagerConfig) {
+ final String faultDomainManagerFactoryClass =
clusterManagerConfig.getFaultDomainManagerClass();
+ final FaultDomainManagerFactory factory;
+
+ try {
+ factory = ReflectionUtil.getObj(faultDomainManagerFactoryClass,
FaultDomainManagerFactory.class);
+ } catch (Exception e) {
+ LOG.error("Error creating the fault domain manager.", e);
+ throw new SamzaException(e);
+ }
+ return factory;
+ }
+
+ /**
* Obtains the ID of the Samza processor pending launch on the provided
resource (container).
*
* ContainerProcessManager [INFO] Container ID:
container_e66_1569376389369_0221_01_000049 matched pending Processor ID: 0 on
host: ltx1-app0772.stg.linkedin.com
diff --git
a/samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
b/samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
index b849ea5..a07a924 100644
---
a/samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
+++
b/samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
@@ -19,6 +19,7 @@
package org.apache.samza.clustermanager;
import java.time.Duration;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -28,6 +29,8 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.samza.SamzaException;
+import org.apache.samza.config.ClusterManagerConfig;
+import org.apache.samza.config.Config;
import org.apache.samza.container.LocalityManager;
import org.apache.samza.job.model.ProcessorLocality;
import org.apache.samza.job.model.JobModel;
@@ -56,8 +59,13 @@ public class StandbyContainerManager {
// Resource-manager, used to stop containers
private ClusterResourceManager clusterResourceManager;
- public StandbyContainerManager(SamzaApplicationState samzaApplicationState,
- ClusterResourceManager clusterResourceManager, LocalityManager
localityManager) {
+ // FaultDomainManager, used to get fault domain information of different
hosts from the cluster manager.
+ private final FaultDomainManager faultDomainManager;
+
+ private final boolean isFaultDomainAwareStandbyEnabled;
+
+ public StandbyContainerManager(SamzaApplicationState samzaApplicationState,
ClusterResourceManager clusterResourceManager,
+ LocalityManager localityManager, Config
config, FaultDomainManager faultDomainManager) {
this.failovers = new ConcurrentHashMap<>();
this.localityManager = localityManager;
this.standbyContainerConstraints = new HashMap<>();
@@ -70,6 +78,9 @@ public class StandbyContainerManager {
.forEach(containerId -> standbyContainerConstraints.put(containerId,
StandbyTaskUtil.getStandbyContainerConstraints(containerId,
jobModel)));
this.clusterResourceManager = clusterResourceManager;
+ this.faultDomainManager = faultDomainManager;
+ ClusterManagerConfig clusterManagerConfig = new
ClusterManagerConfig(config);
+ this.isFaultDomainAwareStandbyEnabled =
clusterManagerConfig.getFaultDomainAwareStandbyEnabled();
log.info("Populated standbyContainerConstraints map {}",
standbyContainerConstraints);
}
@@ -125,7 +136,9 @@ public class StandbyContainerManager {
if (StandbyTaskUtil.isStandbyContainer(containerID)) {
log.info("Handling launch fail for standby-container {}, requesting
resource on any host {}", containerID);
- containerAllocator.requestResource(containerID,
ResourceRequestState.ANY_HOST);
+ String activeContainerHost = getActiveContainerHost(containerID)
+ .orElse(null);
+ requestResource(containerAllocator, containerID,
ResourceRequestState.ANY_HOST, Duration.ZERO, activeContainerHost);
} else {
initiateStandbyAwareAllocation(containerID, resourceID,
containerAllocator);
}
@@ -157,6 +170,22 @@ public class StandbyContainerManager {
}
/**
+ * This method removes the fault domain of the host passed as an argument,
from the set of fault domains, and then returns it.
+ * The set of fault domains returned is based on the set difference between
all the available fault domains in the
+ * cluster and the fault domain associated with the host that is passed as
input.
+ * @param hostToAvoid hostname whose fault domains are excluded
+ * @return The set of fault domains which excludes the fault domain that the
given host is on
+ */
+ public Set<FaultDomain> getAllowedFaultDomainsGivenHostToAvoid(String
hostToAvoid) {
+ Set<FaultDomain> allFaultDomains = faultDomainManager.getAllFaultDomains();
+ Set<FaultDomain> faultDomainToAvoid = Optional.ofNullable(hostToAvoid)
+ .map(faultDomainManager::getFaultDomainsForHost)
+ .orElse(Collections.emptySet());
+ allFaultDomains.removeAll(faultDomainToAvoid);
+ return allFaultDomains;
+ }
+
+ /**
* If a standby container has stopped, then there are two possible cases
* Case 1. during a failover, the standby container was stopped for an
active's start, then we
* 1. request a resource on the standby's host to place the
activeContainer, and
@@ -181,24 +210,29 @@ public class StandbyContainerManager {
// request standbycontainer's host for active-container
SamzaResourceRequest resourceRequestForActive =
- containerAllocator.getResourceRequestWithDelay(activeContainerID,
standbyContainerHostname, preferredHostRetryDelay);
+ containerAllocator.getResourceRequestWithDelay(activeContainerID,
standbyContainerHostname, preferredHostRetryDelay);
// record the resource request, before issuing it to avoid race with
allocation-thread
failoverMetadata.get().recordResourceRequest(resourceRequestForActive);
containerAllocator.issueResourceRequest(resourceRequestForActive);
// request any-host for standby container
- containerAllocator.requestResource(standbyContainerID,
ResourceRequestState.ANY_HOST);
+ requestResource(containerAllocator, standbyContainerID,
ResourceRequestState.ANY_HOST, Duration.ZERO, standbyContainerHostname);
} else {
log.info("Issuing request for standby container {} on host {}, since
this is not for a failover",
standbyContainerID, preferredHost);
- containerAllocator.requestResourceWithDelay(standbyContainerID,
preferredHost, preferredHostRetryDelay);
+ String activeContainerHost = getActiveContainerHost(standbyContainerID)
+ .orElse(null);
+ requestResource(containerAllocator, standbyContainerID, preferredHost,
preferredHostRetryDelay, activeContainerHost);
}
}
/** Method to handle standby-aware allocation for an active container.
* We try to find a standby host for the active container, and issue a stop
on any standby-containers running on it,
* request resource to place the active on the standby's host, and one to
place the standby elsewhere.
- *
+ * When requesting for resources,
+ * NOTE: When rack awareness is turned on, we always pass the
<code>hostToAvoid</> parameter as null for the {@link #requestResource} method
used here
+ * because the hostname of the previous active processor that died does not
exist in the running or pending container list anymore.
+ * However, different racks will always be guaranteed through {@link
#checkStandbyConstraintsAndRunStreamProcessor}.
* @param activeContainerID the samzaContainerID of the active-container
* @param resourceID the samza-resource-ID of the container when it failed
(used to index failover-state)
*/
@@ -235,7 +269,7 @@ public class StandbyContainerManager {
// record the resource request, before issuing it to avoid race with
allocation-thread
SamzaResourceRequest resourceRequestForActive =
- containerAllocator.getResourceRequest(activeContainerID,
standbyHost);
+ containerAllocator.getResourceRequest(activeContainerID,
standbyHost);
failoverMetadata.recordResourceRequest(resourceRequestForActive);
containerAllocator.issueResourceRequest(resourceRequestForActive);
samzaApplicationState.failoversToStandby.incrementAndGet();
@@ -281,7 +315,7 @@ public class StandbyContainerManager {
Optional<FailoverMetadata> failoverMetadata =
getFailoverMetadata(activeContainerResourceID);
// Iterate over the list of running standby containers, to find a standby
resource that we have not already
- // used for a failover for this active resoruce
+ // used for a failover for this active resource
for (String standbyContainerID :
this.standbyContainerConstraints.get(activeContainerID)) {
if
(samzaApplicationState.runningProcessors.containsKey(standbyContainerID)) {
@@ -307,7 +341,7 @@ public class StandbyContainerManager {
.map(ProcessorLocality::host)
.orElse(null);
- if (StringUtils.isNotBlank(standbyHost)) {
+ if (StringUtils.isBlank(standbyHost)) {
log.info("No last known standbyHost for container {}",
standbyContainerID);
} else if (failoverMetadata.isPresent() &&
failoverMetadata.get().isStandbyHostUsed(standbyHost)) {
@@ -361,8 +395,44 @@ public class StandbyContainerManager {
}
/**
- * Check if matching this SamzaResourceRequest to the given resource, meets
all standby-container container constraints.
+ * This method checks from the config if standby allocation is fault domain
aware or not, and requests resources accordingly.
+ *
+ * @param containerAllocator ContainerAllocator object that requests for
resources from the resource manager
+ * @param containerID Samza container ID that will be run when a resource is
allocated for this request
+ * @param preferredHost name of the host that you prefer to run the
processor on
+ * @param preferredHostRetryDelay the {@link Duration} to add to the request
timestamp
+ * @param hostToAvoid The hostname to avoid requesting this resource on if
fault domain aware standby allocation is enabled
+ */
+ void requestResource(ContainerAllocator containerAllocator, String
containerID, String preferredHost, Duration preferredHostRetryDelay, String
hostToAvoid) {
+ if (StandbyTaskUtil.isStandbyContainer(containerID) &&
isFaultDomainAwareStandbyEnabled) {
+ containerAllocator.requestResourceWithDelay(containerID, preferredHost,
preferredHostRetryDelay, getAllowedFaultDomainsGivenHostToAvoid(hostToAvoid));
+ } else {
+ containerAllocator.requestResourceWithDelay(containerID, preferredHost,
preferredHostRetryDelay, new HashSet<>());
+ }
+ }
+
+ /**
+ * This method returns the active container host given a standby or active
container ID.
*
+ * @param containerID Standby or active container container ID
+ * @return The active container host
+ */
+ Optional<String> getActiveContainerHost(String containerID) {
+ String activeContainerId = containerID;
+ if (StandbyTaskUtil.isStandbyContainer(containerID)) {
+ activeContainerId = StandbyTaskUtil.getActiveContainerId(containerID);
+ }
+ SamzaResource resource =
samzaApplicationState.pendingProcessors.get(activeContainerId);
+ if (resource == null) {
+ resource =
samzaApplicationState.runningProcessors.get(activeContainerId);
+ }
+ return Optional.ofNullable(resource)
+ .map(SamzaResource::getHost);
+ }
+
+ /**
+ * Check if matching this SamzaResourceRequest to the given resource, meets
all standby-container container constraints.
+ * This includes the check that a standby and its active should not be on
the same fault domain or the same host.
* @param containerIdToStart logical id of the container to start
* @param host potential host to start the container on
* @return
@@ -375,17 +445,33 @@ public class StandbyContainerManager {
SamzaResource resource =
samzaApplicationState.pendingProcessors.get(containerID);
// return false if a conflicting container is pending for launch on the
host
- if (resource != null && resource.getHost().equals(host)) {
+ if (resource != null && isFaultDomainAwareStandbyEnabled
+ && faultDomainManager.hasSameFaultDomains(host,
resource.getHost())) {
+ log.info("Container {} cannot be started on host {} because container
{} is already scheduled on this fault domain",
+ containerIdToStart, host, containerID);
+ if (StandbyTaskUtil.isStandbyContainer(containerIdToStart)) {
+
samzaApplicationState.failedFaultDomainAwareContainerAllocations.incrementAndGet();
+ }
+ return false;
+ } else if (resource != null && resource.getHost().equals(host)) {
log.info("Container {} cannot be started on host {} because container
{} is already scheduled on this host",
- containerIdToStart, host, containerID);
+ containerIdToStart, host, containerID);
return false;
}
// return false if a conflicting container is running on the host
resource = samzaApplicationState.runningProcessors.get(containerID);
- if (resource != null && resource.getHost().equals(host)) {
+ if (resource != null && isFaultDomainAwareStandbyEnabled
+ && faultDomainManager.hasSameFaultDomains(host,
resource.getHost())) {
+ log.info("Container {} cannot be started on host {} because container
{} is already running on this fault domain",
+ containerIdToStart, host, containerID);
+ if (StandbyTaskUtil.isStandbyContainer(containerIdToStart)) {
+
samzaApplicationState.failedFaultDomainAwareContainerAllocations.incrementAndGet();
+ }
+ return false;
+ } else if (resource != null && resource.getHost().equals(host)) {
log.info("Container {} cannot be started on host {} because container
{} is already running on this host",
- containerIdToStart, host, containerID);
+ containerIdToStart, host, containerID);
return false;
}
}
@@ -409,16 +495,21 @@ public class StandbyContainerManager {
log.info("Running container {} on {} meets standby constraints,
preferredHost = {}", containerID,
samzaResource.getHost(), preferredHost);
containerAllocator.runStreamProcessor(request, preferredHost);
+ if (isFaultDomainAwareStandbyEnabled &&
StandbyTaskUtil.isStandbyContainer(containerID)) {
+
samzaApplicationState.faultDomainAwareContainersStarted.incrementAndGet();
+ }
} else if (StandbyTaskUtil.isStandbyContainer(containerID)) {
// This resource cannot be used to launch this standby container, so we
make a new anyhost request
log.info(
"Running standby container {} on host {} does not meet standby
constraints, cancelling resource request, releasing resource, and making a new
ANY_HOST request",
containerID, samzaResource.getHost());
releaseUnstartableContainer(request, samzaResource, preferredHost,
resourceRequestState);
- containerAllocator.requestResource(containerID,
ResourceRequestState.ANY_HOST);
+ String activeContainerHost = getActiveContainerHost(containerID)
+ .orElse(null);
+ requestResource(containerAllocator, containerID,
ResourceRequestState.ANY_HOST, Duration.ZERO, activeContainerHost);
samzaApplicationState.failedStandbyAllocations.incrementAndGet();
} else {
- // This resource cannot be used to launch this active container
container, so we initiate a failover
+ // This resource cannot be used to launch this active container, so we
initiate a failover
log.warn(
"Running active container {} on host {} does not meet standby
constraints, cancelling resource request, releasing resource",
containerID, samzaResource.getHost());
@@ -469,7 +560,6 @@ public class StandbyContainerManager {
resourceRequestState.cancelResourceRequest(request);
}
-
// Handle an expired resource request that was made for placing a standby
container
private void handleExpiredRequestForStandbyContainer(String containerID,
SamzaResourceRequest request,
Optional<SamzaResource> alternativeResource, ContainerAllocator
containerAllocator,
@@ -486,7 +576,9 @@ public class StandbyContainerManager {
// If there is no alternative-resource for the standby container we make
a new anyhost request
log.info("Handling expired request, requesting anyHost resource for
standby container {}", containerID);
resourceRequestState.cancelResourceRequest(request);
- containerAllocator.requestResource(containerID,
ResourceRequestState.ANY_HOST);
+ String activeContainerHost = getActiveContainerHost(containerID)
+ .orElse(null);
+ requestResource(containerAllocator, containerID,
ResourceRequestState.ANY_HOST, Duration.ZERO, activeContainerHost);
}
}
diff --git
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
index 50a1ee1..e0b0739 100644
---
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
+++
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
@@ -88,6 +88,7 @@ public class TestClusterBasedJobCoordinator {
configMap.put("task.inputs", "kafka.topic1");
configMap.put("systems.kafka.samza.factory",
"org.apache.samza.system.MockSystemFactory");
configMap.put("samza.cluster-manager.factory",
"org.apache.samza.clustermanager.MockClusterResourceManagerFactory");
+ configMap.put("cluster-manager.fault-domain-manager.factory",
"org.apache.samza.clustermanager.MockFaultDomainManagerFactory");
configMap.put("job.coordinator.monitor-partition-change.frequency.ms",
"1");
MockSystemFactory.MSG_QUEUES.put(new SystemStreamPartition("kafka",
"topic1", new Partition(0)), new ArrayList<>());
diff --git
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
index 2b4a4b0..2c9ba81 100644
---
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
+++
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
@@ -64,6 +64,7 @@ public class TestContainerAllocatorWithHostAffinity {
private final SamzaApplicationState state = new
SamzaApplicationState(jobModelManager);
private final MockClusterResourceManager clusterResourceManager = new
MockClusterResourceManager(callback, state);
+ private final FaultDomainManager faultDomainManager =
mock(FaultDomainManager.class);
private ContainerPlacementMetadataStore containerPlacementMetadataStore;
private ContainerManager containerManager;
@@ -89,7 +90,7 @@ public class TestContainerAllocatorWithHostAffinity {
coordinatorStreamStore.init();
containerPlacementMetadataStore = new
ContainerPlacementMetadataStore(coordinatorStreamStore);
containerPlacementMetadataStore.start();
- containerManager = new ContainerManager(containerPlacementMetadataStore,
state, clusterResourceManager, true, false, mockLocalityManager);
+ containerManager = new ContainerManager(containerPlacementMetadataStore,
state, clusterResourceManager, true, false, mockLocalityManager,
faultDomainManager, config);
containerAllocator =
new ContainerAllocator(clusterResourceManager, config, state, true,
containerManager);
requestState = new MockContainerRequestState(clusterResourceManager, true);
@@ -369,7 +370,7 @@ public class TestContainerAllocatorWithHostAffinity {
ClusterResourceManager.Callback mockCPM =
mock(MockClusterResourceManagerCallback.class);
ClusterResourceManager mockClusterResourceManager = new
MockClusterResourceManager(mockCPM, state);
ContainerManager containerManager =
- new ContainerManager(containerPlacementMetadataStore, state,
mockClusterResourceManager, true, false, mock(LocalityManager.class));
+ new ContainerManager(containerPlacementMetadataStore, state,
mockClusterResourceManager, true, false, mock(LocalityManager.class),
faultDomainManager, config);
// Mock the callback from ClusterManager to add resources to the allocator
doAnswer((InvocationOnMock invocation) -> {
SamzaResource resource = (SamzaResource) invocation.getArgumentAt(0,
List.class).get(0);
@@ -416,7 +417,7 @@ public class TestContainerAllocatorWithHostAffinity {
public void testExpiredRequestAllocationOnAnyHost() throws Exception {
MockClusterResourceManager spyManager = spy(new
MockClusterResourceManager(callback, state));
ContainerManager spyContainerManager =
- spy(new ContainerManager(containerPlacementMetadataStore, state,
spyManager, true, false, mock(LocalityManager.class)));
+ spy(new ContainerManager(containerPlacementMetadataStore, state,
spyManager, true, false, mock(LocalityManager.class), faultDomainManager,
config));
spyAllocator = Mockito.spy(
new ContainerAllocator(spyManager, config, state, true,
spyContainerManager));
// Request Preferred Resources
@@ -460,7 +461,7 @@ public class TestContainerAllocatorWithHostAffinity {
// Add Extra Resources
MockClusterResourceManager spyClusterResourceManager = spy(new
MockClusterResourceManager(callback, state));
ContainerManager spyContainerManager =
- spy(new ContainerManager(containerPlacementMetadataStore, state,
spyClusterResourceManager, true, false, mock(LocalityManager.class)));
+ spy(new ContainerManager(containerPlacementMetadataStore, state,
spyClusterResourceManager, true, false, mock(LocalityManager.class),
faultDomainManager, config));
spyAllocator = Mockito.spy(
new ContainerAllocator(spyClusterResourceManager, config, state, true,
spyContainerManager));
@@ -513,7 +514,7 @@ public class TestContainerAllocatorWithHostAffinity {
ClusterResourceManager.Callback mockCPM =
mock(MockClusterResourceManagerCallback.class);
MockClusterResourceManager mockClusterResourceManager = new
MockClusterResourceManager(mockCPM, state);
ContainerManager spyContainerManager =
- spy(new ContainerManager(containerPlacementMetadataStore, state,
mockClusterResourceManager, true, false, mock(LocalityManager.class)));
+ spy(new ContainerManager(containerPlacementMetadataStore, state,
mockClusterResourceManager, true, false, mock(LocalityManager.class),
faultDomainManager, config));
SamzaResource expiredAllocatedResource = new SamzaResource(1, 1000,
"host-0", "id0",
System.currentTimeMillis() - Duration.ofMinutes(10).toMillis());
diff --git
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java
index ac5d6f3..1f063d7 100644
---
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java
+++
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java
@@ -62,6 +62,7 @@ public class TestContainerAllocatorWithoutHostAffinity {
private final SamzaApplicationState state = new
SamzaApplicationState(jobModelManager);
private final MockClusterResourceManager manager = new
MockClusterResourceManager(callback, state);
+ private final FaultDomainManager faultDomainManager =
mock(FaultDomainManager.class);
private CoordinatorStreamStore coordinatorStreamStore;
private ContainerPlacementMetadataStore containerPlacementMetadataStore;
@@ -83,7 +84,7 @@ public class TestContainerAllocatorWithoutHostAffinity {
containerPlacementMetadataStore = new
ContainerPlacementMetadataStore(coordinatorStreamStore);
containerPlacementMetadataStore.start();
containerAllocator = new ContainerAllocator(manager, config, state, false,
- new ContainerManager(containerPlacementMetadataStore, state, manager,
false, false, mockLocalityManager));
+ new ContainerManager(containerPlacementMetadataStore, state, manager,
false, false, mockLocalityManager, faultDomainManager, config));
requestState = new MockContainerRequestState(manager, false);
Field requestStateField =
containerAllocator.getClass().getDeclaredField("resourceRequestState");
requestStateField.setAccessible(true);
@@ -179,9 +180,11 @@ public class TestContainerAllocatorWithoutHostAffinity {
});
LocalityManager mockLocalityManager = mock(LocalityManager.class);
when(mockLocalityManager.readLocality()).thenReturn(new LocalityModel(new
HashMap<>()));
+ ContainerManager containerManager = new
ContainerManager(containerPlacementMetadataStore, state, manager, false,
+ false, mockLocalityManager, faultDomainManager, config);
containerAllocator =
MockContainerAllocatorWithoutHostAffinity.createContainerAllocatorWithConfigOverride(manager,
config, state,
- new ContainerManager(containerPlacementMetadataStore, state,
manager, false, false, mockLocalityManager),
+ containerManager,
override);
MockContainerAllocatorWithoutHostAffinity mockAllocator =
(MockContainerAllocatorWithoutHostAffinity) containerAllocator;
@@ -331,7 +334,7 @@ public class TestContainerAllocatorWithoutHostAffinity {
ClusterResourceManager.Callback mockCPM =
mock(ClusterResourceManager.Callback.class);
ClusterResourceManager mockManager = new
MockClusterResourceManager(mockCPM, state);
ContainerManager spyContainerManager =
- spy(new ContainerManager(containerPlacementMetadataStore, state,
mockManager, false, false, mock(LocalityManager.class)));
+ spy(new ContainerManager(containerPlacementMetadataStore, state,
mockManager, false, false, mock(LocalityManager.class), faultDomainManager,
config));
spyAllocator = Mockito.spy(
new ContainerAllocator(mockManager, config, state, false,
spyContainerManager));
// Mock the callback from ClusterManager to add resources to the allocator
diff --git
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java
index c781f4d..e5ead9e 100644
---
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java
+++
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java
@@ -141,13 +141,14 @@ public class TestContainerPlacementActions {
state = new
SamzaApplicationState(JobModelManagerTestUtil.getJobModelManager(getConfig(),
2, server));
callback = mock(ClusterResourceManager.Callback.class);
MockClusterResourceManager clusterResourceManager = new
MockClusterResourceManager(callback, state);
+ FaultDomainManager faultDomainManager = mock(FaultDomainManager.class);
ClusterManagerConfig clusterManagerConfig = new
ClusterManagerConfig(config);
localityManager = mock(LocalityManager.class);
when(localityManager.readLocality())
.thenReturn(new LocalityModel(ImmutableMap.of(
"0", new ProcessorLocality("0", "host-1"),
"1", new ProcessorLocality("1", "host-2"))));
- containerManager = spy(new
ContainerManager(containerPlacementMetadataStore, state,
clusterResourceManager, true, false, localityManager));
+ containerManager = spy(new
ContainerManager(containerPlacementMetadataStore, state,
clusterResourceManager, true, false, localityManager, faultDomainManager,
config));
allocatorWithHostAffinity = new
MockContainerAllocatorWithHostAffinity(clusterResourceManager, config, state,
containerManager);
cpm = new ContainerProcessManager(clusterManagerConfig, state, new
MetricsRegistryMap(),
clusterResourceManager, Optional.of(allocatorWithHostAffinity),
containerManager, localityManager, false);
@@ -171,9 +172,10 @@ public class TestContainerPlacementActions {
state = new SamzaApplicationState(getJobModelManagerWithStandby());
callback = mock(ClusterResourceManager.Callback.class);
MockClusterResourceManager clusterResourceManager = new
MockClusterResourceManager(callback, state);
+ FaultDomainManager faultDomainManager = mock(FaultDomainManager.class);
ClusterManagerConfig clusterManagerConfig = new
ClusterManagerConfig(config);
// Enable standby
- containerManager = spy(new
ContainerManager(containerPlacementMetadataStore, state,
clusterResourceManager, true, true, mockLocalityManager));
+ containerManager = spy(new
ContainerManager(containerPlacementMetadataStore, state,
clusterResourceManager, true, true, mockLocalityManager, faultDomainManager,
config));
allocatorWithHostAffinity = new
MockContainerAllocatorWithHostAffinity(clusterResourceManager, config, state,
containerManager);
cpm = new ContainerProcessManager(clusterManagerConfig, state, new
MetricsRegistryMap(),
clusterResourceManager, Optional.of(allocatorWithHostAffinity),
containerManager, mockLocalityManager, false);
@@ -552,8 +554,9 @@ public class TestContainerPlacementActions {
state = new
SamzaApplicationState(JobModelManagerTestUtil.getJobModelManager(getConfig(),
2, this.server));
callback = mock(ClusterResourceManager.Callback.class);
MockClusterResourceManager clusterResourceManager = new
MockClusterResourceManager(callback, state);
+ FaultDomainManager faultDomainManager = mock(FaultDomainManager.class);
ClusterManagerConfig clusterManagerConfig = new
ClusterManagerConfig(config);
- containerManager = spy(new
ContainerManager(containerPlacementMetadataStore, state,
clusterResourceManager, true, false, localityManager));
+ containerManager = spy(new
ContainerManager(containerPlacementMetadataStore, state,
clusterResourceManager, true, false, localityManager, faultDomainManager,
config));
allocatorWithHostAffinity = new
MockContainerAllocatorWithHostAffinity(clusterResourceManager, config, state,
containerManager);
cpm = new ContainerProcessManager(clusterManagerConfig, state, new
MetricsRegistryMap(),
clusterResourceManager, Optional.of(allocatorWithHostAffinity),
containerManager, localityManager, false);
@@ -666,8 +669,9 @@ public class TestContainerPlacementActions {
new
SamzaApplicationState(JobModelManagerTestUtil.getJobModelManager(getConfig(),
2, this.server));
ClusterResourceManager.Callback callback =
mock(ClusterResourceManager.Callback.class);
MockClusterResourceManager clusterResourceManager = new
MockClusterResourceManager(callback, state);
+ FaultDomainManager faultDomainManager = mock(FaultDomainManager.class);
ContainerManager containerManager =
- new ContainerManager(containerPlacementMetadataStore, state,
clusterResourceManager, false, false, localityManager);
+ new ContainerManager(containerPlacementMetadataStore, state,
clusterResourceManager, false, false, localityManager, faultDomainManager,
config);
MockContainerAllocatorWithoutHostAffinity allocatorWithoutHostAffinity =
new MockContainerAllocatorWithoutHostAffinity(clusterResourceManager,
new MapConfig(conf), state,
containerManager);
@@ -801,8 +805,9 @@ public class TestContainerPlacementActions {
new
SamzaApplicationState(JobModelManagerTestUtil.getJobModelManager(getConfig(),
2, this.server));
ClusterResourceManager.Callback callback =
mock(ClusterResourceManager.Callback.class);
MockClusterResourceManager clusterResourceManager = new
MockClusterResourceManager(callback, state);
+ FaultDomainManager faultDomainManager = mock(FaultDomainManager.class);
ContainerManager containerManager =
- spy(new ContainerManager(containerPlacementMetadataStore, state,
clusterResourceManager, true, false, localityManager));
+ spy(new ContainerManager(containerPlacementMetadataStore, state,
clusterResourceManager, true, false, localityManager, faultDomainManager,
config));
MockContainerAllocatorWithHostAffinity allocatorWithHostAffinity =
new MockContainerAllocatorWithHostAffinity(clusterResourceManager,
config, state, containerManager);
ContainerProcessManager cpm = new ContainerProcessManager(
diff --git
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
index bcbe53f..ad45c5e 100644
---
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
+++
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
@@ -141,11 +141,12 @@ public class TestContainerProcessManager {
SamzaApplicationState state = new
SamzaApplicationState(getJobModelManager(1));
MockClusterResourceManagerCallback callback = new
MockClusterResourceManagerCallback();
MockClusterResourceManager clusterResourceManager = new
MockClusterResourceManager(callback, state);
+ FaultDomainManager faultDomainManager = mock(FaultDomainManager.class);
LocalityManager mockLocalityManager = mock(LocalityManager.class);
when(mockLocalityManager.readLocality())
.thenReturn(new LocalityModel(ImmutableMap.of("0", new
ProcessorLocality("0", "host1"))));
ContainerManager containerManager =
- buildContainerManager(containerPlacementMetadataStore, state,
clusterResourceManager, true, false, mockLocalityManager);
+ buildContainerManager(containerPlacementMetadataStore, state,
clusterResourceManager, true, false, mockLocalityManager, faultDomainManager);
ContainerProcessManager cpm =
buildContainerProcessManager(new ClusterManagerConfig(new
MapConfig(conf)), state, clusterResourceManager, Optional.empty());
@@ -592,6 +593,7 @@ public class TestContainerProcessManager {
SamzaApplicationState state = new
SamzaApplicationState(getJobModelManager(1));
MockClusterResourceManagerCallback callback = new
MockClusterResourceManagerCallback();
MockClusterResourceManager clusterResourceManager = new
MockClusterResourceManager(callback, state);
+ FaultDomainManager faultDomainManager = mock(FaultDomainManager.class);
LocalityManager mockLocalityManager = mock(LocalityManager.class);
if (withHostAffinity) {
@@ -604,7 +606,7 @@ public class TestContainerProcessManager {
ContainerManager containerManager =
buildContainerManager(containerPlacementMetadataStore, state,
clusterResourceManager,
- clusterManagerConfig.getHostAffinityEnabled(), false,
mockLocalityManager);
+ clusterManagerConfig.getHostAffinityEnabled(), false,
mockLocalityManager, faultDomainManager);
MockContainerAllocatorWithoutHostAffinity allocator = new
MockContainerAllocatorWithoutHostAffinity(
clusterResourceManager,
@@ -614,7 +616,7 @@ public class TestContainerProcessManager {
ContainerProcessManager cpm =
buildContainerProcessManager(clusterManagerConfig, state,
clusterResourceManager, Optional.of(allocator),
- mockLocalityManager, false);
+ mockLocalityManager, false, faultDomainManager);
// start triggers a request
cpm.start();
@@ -755,11 +757,12 @@ public class TestContainerProcessManager {
configMap.putAll(getConfig());
MockClusterResourceManagerCallback callback = new
MockClusterResourceManagerCallback();
MockClusterResourceManager clusterResourceManager = new
MockClusterResourceManager(callback, state);
+ FaultDomainManager faultDomainManager = mock(FaultDomainManager.class);
LocalityManager mockLocalityManager = mock(LocalityManager.class);
when(mockLocalityManager.readLocality())
.thenReturn(new LocalityModel(ImmutableMap.of("0", new
ProcessorLocality("1", "host1"))));
ContainerManager containerManager =
buildContainerManager(containerPlacementMetadataStore, state,
clusterResourceManager,
-
Boolean.valueOf(config.get(ClusterManagerConfig.HOST_AFFINITY_ENABLED)), false,
mockLocalityManager);
+
Boolean.valueOf(config.get(ClusterManagerConfig.HOST_AFFINITY_ENABLED)), false,
mockLocalityManager, faultDomainManager);
MockContainerAllocatorWithoutHostAffinity allocator = new
MockContainerAllocatorWithoutHostAffinity(
clusterResourceManager,
@@ -792,11 +795,12 @@ public class TestContainerProcessManager {
SamzaApplicationState state = new
SamzaApplicationState(getJobModelManager(2));
MockClusterResourceManagerCallback callback = new
MockClusterResourceManagerCallback();
MockClusterResourceManager clusterResourceManager = new
MockClusterResourceManager(callback, state);
+ FaultDomainManager faultDomainManager = mock(FaultDomainManager.class);
LocalityManager mockLocalityManager = mock(LocalityManager.class);
when(mockLocalityManager.readLocality())
.thenReturn(new LocalityModel(ImmutableMap.of("0", new
ProcessorLocality("0", "host1"), "1", new ProcessorLocality("1", "host2"))));
ContainerManager containerManager =
buildContainerManager(containerPlacementMetadataStore, state,
clusterResourceManager,
-
Boolean.parseBoolean(config.get(ClusterManagerConfig.HOST_AFFINITY_ENABLED)),
false, mockLocalityManager);
+
Boolean.parseBoolean(config.get(ClusterManagerConfig.HOST_AFFINITY_ENABLED)),
false, mockLocalityManager, faultDomainManager);
MockContainerAllocatorWithHostAffinity allocator = new
MockContainerAllocatorWithHostAffinity(
clusterResourceManager,
@@ -806,7 +810,7 @@ public class TestContainerProcessManager {
ContainerProcessManager cpm =
spy(buildContainerProcessManager(new ClusterManagerConfig(cfg), state,
clusterResourceManager,
- Optional.of(allocator), mockLocalityManager, false));
+ Optional.of(allocator), mockLocalityManager, false,
faultDomainManager));
cpm.start();
assertFalse(cpm.shouldShutdown());
@@ -1031,16 +1035,16 @@ public class TestContainerProcessManager {
SamzaApplicationState samzaApplicationState, ClusterResourceManager
clusterResourceManager,
boolean hostAffinityEnabled, boolean standByEnabled) {
LocalityManager mockLocalityManager = mock(LocalityManager.class);
+ FaultDomainManager faultDomainManager = mock(FaultDomainManager.class);
when(mockLocalityManager.readLocality()).thenReturn(new LocalityModel(new
HashMap<>()));
return buildContainerManager(containerPlacementMetadataStore,
samzaApplicationState, clusterResourceManager,
- hostAffinityEnabled, standByEnabled, mockLocalityManager);
+ hostAffinityEnabled, standByEnabled, mockLocalityManager,
faultDomainManager);
}
private ContainerManager
buildContainerManager(ContainerPlacementMetadataStore
containerPlacementMetadataStore,
- SamzaApplicationState samzaApplicationState, ClusterResourceManager
clusterResourceManager,
- boolean hostAffinityEnabled, boolean standByEnabled, LocalityManager
localityManager) {
- return new ContainerManager(containerPlacementMetadataStore,
samzaApplicationState, clusterResourceManager,
- hostAffinityEnabled, standByEnabled, localityManager);
+ SamzaApplicationState samzaApplicationState, ClusterResourceManager
clusterResourceManager, boolean hostAffinityEnabled,
+ boolean standByEnabled, LocalityManager localityManager,
FaultDomainManager faultDomainManager) {
+ return new ContainerManager(containerPlacementMetadataStore,
samzaApplicationState, clusterResourceManager, hostAffinityEnabled,
standByEnabled, localityManager, faultDomainManager, config);
}
private ContainerProcessManager
buildContainerProcessManager(ClusterManagerConfig clusterManagerConfig,
SamzaApplicationState state,
ClusterResourceManager clusterResourceManager,
Optional<ContainerAllocator> allocator) {
@@ -1050,16 +1054,17 @@ public class TestContainerProcessManager {
private ContainerProcessManager
buildContainerProcessManager(ClusterManagerConfig clusterManagerConfig,
SamzaApplicationState state,
ClusterResourceManager clusterResourceManager,
Optional<ContainerAllocator> allocator, boolean restartContainer) {
LocalityManager mockLocalityManager = mock(LocalityManager.class);
+ FaultDomainManager faultDomainManager = mock(FaultDomainManager.class);
when(mockLocalityManager.readLocality()).thenReturn(new LocalityModel(new
HashMap<>()));
return buildContainerProcessManager(clusterManagerConfig, state,
clusterResourceManager, allocator,
- mockLocalityManager, restartContainer);
+ mockLocalityManager, restartContainer, faultDomainManager);
}
private ContainerProcessManager
buildContainerProcessManager(ClusterManagerConfig clusterManagerConfig,
SamzaApplicationState state,
ClusterResourceManager clusterResourceManager,
Optional<ContainerAllocator> allocator, LocalityManager localityManager,
- boolean restartContainers) {
+ boolean restartContainers, FaultDomainManager faultDomainManager) {
return new ContainerProcessManager(clusterManagerConfig, state, new
MetricsRegistryMap(), clusterResourceManager,
allocator, buildContainerManager(containerPlacementMetadataStore,
state, clusterResourceManager,
- clusterManagerConfig.getHostAffinityEnabled(), false,
localityManager), localityManager, restartContainers);
+ clusterManagerConfig.getHostAffinityEnabled(), false, localityManager,
faultDomainManager), localityManager, restartContainers);
}
}
diff --git
a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
index fa784e0..ccdd00f 100644
---
a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
+++
b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
@@ -241,6 +241,7 @@ public class YarnClusterResourceManager extends
ClusterResourceManager implement
String processorId = resourceRequest.getProcessorId();
String requestId = resourceRequest.getRequestId();
String preferredHost = resourceRequest.getPreferredHost();
+ String[] racks =
resourceRequest.getFaultDomains().stream().map(FaultDomain::getId).toArray(String[]::new);
int memoryMb = resourceRequest.getMemoryMB();
int cpuCores = resourceRequest.getNumCores();
Resource capability = Resource.newInstance(memoryMb, cpuCores);
@@ -261,15 +262,15 @@ public class YarnClusterResourceManager extends
ClusterResourceManager implement
Priority priority = Priority.newInstance(ANY_HOST_PRIORITY);
boolean relaxLocality = true;
log.info("Requesting resources for Processor ID: {} on nodes: {} on
racks: {} with capability: {}, priority: {}, relaxLocality: {},
nodeLabelsExpression: {}",
- processorId, null, null, capability, priority, relaxLocality,
nodeLabelsExpression);
+ processorId, null, Arrays.toString(racks), capability, priority,
relaxLocality, nodeLabelsExpression);
issuedRequest = new AMRMClient.ContainerRequest(capability, null, null,
priority, relaxLocality, nodeLabelsExpression);
} else {
String[] nodes = {preferredHost};
Priority priority = Priority.newInstance(PREFERRED_HOST_PRIORITY);
boolean relaxLocality = false;
log.info("Requesting resources for Processor ID: {} on nodes: {} on
racks: {} with capability: {}, priority: {}, relaxLocality: {},
nodeLabelsExpression: {}",
- processorId, Arrays.toString(nodes), null, capability, priority,
relaxLocality, nodeLabelsExpression);
- issuedRequest = new AMRMClient.ContainerRequest(capability, nodes, null,
priority, relaxLocality, nodeLabelsExpression);
+ processorId, Arrays.toString(nodes), Arrays.toString(racks),
capability, priority, relaxLocality, nodeLabelsExpression);
+ issuedRequest = new AMRMClient.ContainerRequest(capability, nodes,
racks, priority, relaxLocality, nodeLabelsExpression);
}
// ensure that updating the state and making the request are done
atomically.
synchronized (lock) {