This is an automated email from the ASF dual-hosted git repository.
bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new b4a9fe6 SAMZA-2402: Tie Container placement service and Container
placement handler and validate placement requests (#1267)
b4a9fe6 is described below
commit b4a9fe67238d6eb81b2c61177cafd276ade67c07
Author: Sanil Jain <[email protected]>
AuthorDate: Tue Feb 18 12:32:05 2020 -0800
SAMZA-2402: Tie Container placement service and Container placement handler
and validate placement requests (#1267)
Tie Container placement service and Container placement handler and
validate placement requests
---
.../ContainerPlacementResponseMessage.java | 6 +-
.../clustermanager/ClusterBasedJobCoordinator.java | 11 +-
.../samza/clustermanager/ContainerManager.java | 133 +++++++----
.../clustermanager/ContainerProcessManager.java | 9 +-
.../ContainerPlacementMetadata.java | 14 +-
.../placement/ContainerPlacementMetadataStore.java | 5 +-
.../ContainerPlacementRequestAllocator.java | 19 +-
.../apache/samza/util/BoundedLinkedHashSet.java | 53 +++++
.../TestContainerAllocatorWithHostAffinity.java | 23 +-
.../TestContainerAllocatorWithoutHostAffinity.java | 18 +-
.../TestContainerPlacementActions.java | 263 +++++++++++++++++----
.../TestContainerProcessManager.java | 47 +++-
12 files changed, 480 insertions(+), 121 deletions(-)
diff --git
a/samza-api/src/main/java/org/apache/samza/container/placement/ContainerPlacementResponseMessage.java
b/samza-api/src/main/java/org/apache/samza/container/placement/ContainerPlacementResponseMessage.java
index 996bfe8..73d8287 100644
---
a/samza-api/src/main/java/org/apache/samza/container/placement/ContainerPlacementResponseMessage.java
+++
b/samza-api/src/main/java/org/apache/samza/container/placement/ContainerPlacementResponseMessage.java
@@ -42,7 +42,11 @@ public class ContainerPlacementResponseMessage extends
ContainerPlacementMessage
this(uuid, deploymentId, processorId, destinationHost, null, statusCode,
responseMessage, timestamp);
}
- static ContainerPlacementResponseMessage
fromContainerPlacementRequestMessage(
+ /**
+ * Creates a {@link ContainerPlacementResponseMessage} using {@link
ContainerPlacementRequestMessage}
+ * status of current request and response associated with status
+ */
+ public static ContainerPlacementResponseMessage
fromContainerPlacementRequestMessage(
ContainerPlacementRequestMessage requestMessage, StatusCode statusCode,
String responseMessage, long timestamp) {
return new ContainerPlacementResponseMessage(requestMessage.getUuid(),
requestMessage.getDeploymentId(), requestMessage.getProcessorId(),
requestMessage.getDestinationHost(),
requestMessage.getRequestExpiry(), statusCode, responseMessage, timestamp);
diff --git
a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
index 2b9636c..f894bcf 100644
---
a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
+++
b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
@@ -38,6 +38,7 @@ import
org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
import org.apache.samza.application.descriptors.ApplicationDescriptorUtil;
import org.apache.samza.classloader.IsolatingClassLoaderFactory;
import
org.apache.samza.clustermanager.container.placement.ContainerPlacementRequestAllocator;
+import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.ClusterManagerConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
@@ -217,12 +218,16 @@ public class ClusterBasedJobCoordinator {
this.isJmxEnabled = clusterManagerConfig.getJmxEnabledOnJobCoordinator();
this.jobCoordinatorSleepInterval =
clusterManagerConfig.getJobCoordinatorSleepInterval();
+ // build metastore for container placement messages
+ containerPlacementMetadataStore = new
ContainerPlacementMetadataStore(metadataStore);
+
// build a container process Manager
containerProcessManager = createContainerProcessManager();
// build utils related to container placements
- containerPlacementMetadataStore = new
ContainerPlacementMetadataStore(metadataStore);
- containerPlacementRequestAllocator = new
ContainerPlacementRequestAllocator(containerPlacementMetadataStore,
containerProcessManager);
+ containerPlacementRequestAllocator =
+ new
ContainerPlacementRequestAllocator(containerPlacementMetadataStore,
containerProcessManager,
+ new ApplicationConfig(config));
this.containerPlacementRequestAllocatorThread =
new Thread(containerPlacementRequestAllocator, "Samza-" +
ContainerPlacementRequestAllocator.class.getSimpleName());
}
@@ -452,7 +457,7 @@ public class ClusterBasedJobCoordinator {
@VisibleForTesting
ContainerProcessManager createContainerProcessManager() {
- return new ContainerProcessManager(config, state, metrics);
+ return new ContainerProcessManager(config, state, metrics,
containerPlacementMetadataStore);
}
/**
diff --git
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
index 79076d8..c924447 100644
---
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
+++
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
@@ -25,9 +25,12 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
-import
org.apache.samza.clustermanager.container.placements.ContainerPlacementMetadata;
+import
org.apache.samza.clustermanager.container.placement.ContainerPlacementMetadataStore;
+import
org.apache.samza.clustermanager.container.placement.ContainerPlacementMetadata;
import org.apache.samza.container.placement.ContainerPlacementMessage;
import org.apache.samza.container.placement.ContainerPlacementRequestMessage;
+import org.apache.samza.container.placement.ContainerPlacementResponseMessage;
+import org.apache.samza.util.BoundedLinkedHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,8 +55,13 @@ public class ContainerManager {
private static final Logger LOG =
LoggerFactory.getLogger(ContainerManager.class);
private static final String ANY_HOST = ResourceRequestState.ANY_HOST;
+ private static final int UUID_CACHE_SIZE = 20000;
/**
+ * Container placement metadata store to write responses to control actions
+ */
+ private final ContainerPlacementMetadataStore
containerPlacementMetadataStore;
+ /**
* Resource-manager, used to stop containers
*/
private final ClusterResourceManager clusterResourceManager;
@@ -65,15 +73,23 @@ public class ContainerManager {
* Key is chosen to be processorId since at a time only one placement action
can be in progress on a container.
*/
private final ConcurrentHashMap<String, ContainerPlacementMetadata> actions;
+ /**
+ * In-memory cache of placement requests UUIDs de-queued from the metadata
store. Used to de-dup requests with the same
+ * request UUID. Sized using max tolerable memory footprint and max likely
duplicate-spacing.
+ */
+ private final BoundedLinkedHashSet<UUID> placementRequestsCache;
private final Optional<StandbyContainerManager> standbyContainerManager;
- public ContainerManager(SamzaApplicationState samzaApplicationState,
ClusterResourceManager clusterResourceManager,
+ public ContainerManager(ContainerPlacementMetadataStore
containerPlacementMetadataStore,
+ SamzaApplicationState samzaApplicationState, ClusterResourceManager
clusterResourceManager,
boolean hostAffinityEnabled, boolean standByEnabled) {
this.samzaApplicationState = samzaApplicationState;
this.clusterResourceManager = clusterResourceManager;
this.actions = new ConcurrentHashMap<>();
+ this.placementRequestsCache = new
BoundedLinkedHashSet<UUID>(UUID_CACHE_SIZE);
this.hostAffinityEnabled = hostAffinityEnabled;
+ this.containerPlacementMetadataStore = containerPlacementMetadataStore;
// Enable standby container manager if required
if (standByEnabled) {
this.standbyContainerManager =
@@ -110,12 +126,14 @@ public class ContainerManager {
ContainerPlacementMetadata actionMetaData =
getPlacementActionMetadata(processorId).get();
ContainerPlacementMetadata.ContainerStatus actionStatus =
actionMetaData.getContainerStatus();
if (samzaApplicationState.runningProcessors.containsKey(processorId) &&
actionStatus == ContainerPlacementMetadata.ContainerStatus.RUNNING) {
- LOG.info("Requesting running container to shutdown due to existing
container placement action {}", actionMetaData);
+ LOG.debug("Requesting running container to shutdown due to existing
ContainerPlacement action {}", actionMetaData);
actionMetaData.setContainerStatus(ContainerPlacementMetadata.ContainerStatus.STOP_IN_PROGRESS);
+ updateContainerPlacementActionStatus(actionMetaData,
ContainerPlacementMessage.StatusCode.IN_PROGRESS,
+ "Active container stop in progress");
clusterResourceManager.stopStreamProcessor(samzaApplicationState.runningProcessors.get(processorId));
return false;
} else if (actionStatus ==
ContainerPlacementMetadata.ContainerStatus.STOP_IN_PROGRESS) {
- LOG.info("Waiting for running container to shutdown due to existing
container placement action {}", actionMetaData);
+ LOG.info("Waiting for running container to shutdown due to existing
ContainerPlacement action {}", actionMetaData);
return false;
} else if (actionStatus ==
ContainerPlacementMetadata.ContainerStatus.STOPPED) {
allocator.runStreamProcessor(request, preferredHost);
@@ -210,9 +228,10 @@ public class ContainerManager {
void handleContainerLaunchSuccess(String processorId) {
if (hasActiveContainerPlacementAction(processorId)) {
ContainerPlacementMetadata metadata =
getPlacementActionMetadata(processorId).get();
- metadata.setActionStatus(ContainerPlacementMessage.StatusCode.SUCCEEDED,
- "Successfully completed the container placement action");
- LOG.info("Container placement action succeeded {}", metadata);
+ // Mark the active container running again and dispatch a response
+
metadata.setContainerStatus(ContainerPlacementMetadata.ContainerStatus.RUNNING);
+ updateContainerPlacementActionStatus(metadata,
ContainerPlacementMessage.StatusCode.SUCCEEDED,
+ "Successfully completed the container placement action");
}
}
@@ -279,40 +298,49 @@ public class ContainerManager {
* When host affinity is enabled move / restart is allowed on specific or
ANY_HOST
* TODO: SAMZA-2378: Container Placements for Standby containers enabled jobs
*
+ * Container placement requests are tied to deploymentId which is currently
{@link org.apache.samza.config.ApplicationConfig#APP_RUN_ID}
+ * On job restarts container placement requests queued for the previous
deployment are deleted using this
+ *
* @param requestMessage request containing logical processor id 0,1,2 and
host where container is desired to be moved,
* acceptable values of this param are any valid
hostname or "ANY_HOST"(in this case the request
* is sent to resource manager for any host)
* @param containerAllocator to request physical resources
*/
public void
registerContainerPlacementAction(ContainerPlacementRequestMessage
requestMessage, ContainerAllocator containerAllocator) {
- LOG.info("Received a ContainerPlacement action request: {}",
requestMessage);
String processorId = requestMessage.getProcessorId();
String destinationHost = requestMessage.getDestinationHost();
- Pair<ContainerPlacementMessage.StatusCode, String> actionStatus =
- checkValidControlAction(processorId, destinationHost,
requestMessage.getUuid());
-
+ // Is the action ready to be de-queued and taken or it needs to wait to be
executed in future
+ if (!deQueueAction(requestMessage)) {
+ return;
+ }
+ Pair<ContainerPlacementMessage.StatusCode, String> actionStatus =
validatePlacementAction(requestMessage);
+ LOG.info("ContainerPlacement action is de-queued metadata: {}",
requestMessage);
+ // Action is de-queued upon so we record it in the cache
+ placementRequestsCache.put(requestMessage.getUuid());
+ // Remove the request message from metastore since this message is already
acted upon
+
containerPlacementMetadataStore.deleteContainerPlacementRequestMessage(requestMessage.getUuid());
// Request is bad just update the response on message & return
if (actionStatus.getKey() ==
ContainerPlacementMessage.StatusCode.BAD_REQUEST) {
+ writeContainerPlacementResponseMessage(requestMessage,
actionStatus.getKey(), actionStatus.getValue());
return;
}
SamzaResource currentResource =
samzaApplicationState.runningProcessors.get(processorId);
- LOG.info(
- "Processor ID: {} matched an active container with deployment ID: {}
is running on host: {} for ContainerPlacement action: {}",
+ LOG.info("Processor ID: {} matched an active container with containerId
ID: {} is running on host: {} for ContainerPlacement action: {}",
processorId, currentResource.getContainerId(),
currentResource.getHost(), requestMessage);
+ // TODO: SAMZA-2457: Allow host affinity disabled jobs to move containers
to specific host
if (!hostAffinityEnabled) {
- LOG.info("Changing the requested host for placement action to {} because
host affinity is disabled",
- ResourceRequestState.ANY_HOST);
+ LOG.info("Changing the requested host for placement action to {} because
host affinity is disabled", ResourceRequestState.ANY_HOST);
destinationHost = ANY_HOST;
}
SamzaResourceRequest resourceRequest =
containerAllocator.getResourceRequest(processorId, destinationHost);
ContainerPlacementMetadata actionMetaData = new
ContainerPlacementMetadata(requestMessage, currentResource.getHost());
// Record the resource request for monitoring
-
actionMetaData.setActionStatus(ContainerPlacementMessage.StatusCode.IN_PROGRESS,
"Preferred Resources requested");
actionMetaData.recordResourceRequest(resourceRequest);
actions.put(processorId, actionMetaData);
+ updateContainerPlacementActionStatus(actionMetaData,
ContainerPlacementMessage.StatusCode.IN_PROGRESS, "Preferred Resources
requested");
containerAllocator.issueResourceRequest(resourceRequest);
LOG.info("Issued resource request for preferred resources for
ContainerPlacement action: {}", actionMetaData);
}
@@ -342,8 +370,7 @@ public class ContainerManager {
}
private void markContainerPlacementActionFailed(ContainerPlacementMetadata
metaData, String failureMessage) {
- metaData.setActionStatus(ContainerPlacementMessage.StatusCode.FAILED,
failureMessage);
- LOG.info("Container Placement action failed with metadata {}", metaData);
+ updateContainerPlacementActionStatus(metaData,
ContainerPlacementMessage.StatusCode.FAILED, failureMessage);
}
/**
@@ -354,7 +381,6 @@ public class ContainerManager {
if (metadata.isPresent()) {
switch (metadata.get().getActionStatus()) {
case ACCEPTED:
- case CREATED:
case IN_PROGRESS:
return true;
default:
@@ -371,39 +397,62 @@ public class ContainerManager {
return Optional.ofNullable(this.actions.get(processorId));
}
+ private void updateContainerPlacementActionStatus(ContainerPlacementMetadata
metadata,
+ ContainerPlacementMessage.StatusCode statusCode, String responseMessage)
{
+ metadata.setActionStatus(statusCode, responseMessage);
+ writeContainerPlacementResponseMessage(metadata.getRequestMessage(),
statusCode, responseMessage);
+ LOG.info("Status updated for ContainerPlacement action: {}", metadata);
+ }
+
+ private void
writeContainerPlacementResponseMessage(ContainerPlacementRequestMessage
requestMessage,
+ ContainerPlacementMessage.StatusCode statusCode, String responseMessage)
{
+ containerPlacementMetadataStore.writeContainerPlacementResponseMessage(
+
ContainerPlacementResponseMessage.fromContainerPlacementRequestMessage(requestMessage,
statusCode,
+ responseMessage, System.currentTimeMillis()));
+ }
+
/**
- * A valid container placement action is only issued for a running processor
with a valid processor id which has no
- * in flight container requests. Duplicate actions are handled by deduping
on uuid
+ * If there is an existing inflight request or container is pending a start,
the container placement action shall wait
+ * until this in-flight action is complete
*
- * TODO: SAMZA-2402: Disallow pending Container Placement actions in
metastore on job restarts
+ * @param requestMessage container placement request message
+ * @return true if action should be taken right now, false if it needs to
wait to be taken in future
*/
- private Pair<ContainerPlacementMessage.StatusCode, String>
checkValidControlAction(String processorId, String destinationHost, UUID uuid) {
- String errorMessagePrefix =
- String.format("ControlAction to move or restart container with
processor id %s to host %s is rejected due to",
- processorId, destinationHost);
+ private boolean deQueueAction(ContainerPlacementRequestMessage
requestMessage) {
+ // Do not dequeue action wait for the in-flight action to complete
+ if (hasActiveContainerPlacementAction(requestMessage.getProcessorId())) {
+ LOG.info("ContainerPlacement request: {} is en-queued because container
has an in-progress placement action", requestMessage);
+ return false;
+ }
+ // Do not dequeue the action wait for the container to come to a running
state
+ if
(!samzaApplicationState.runningProcessors.containsKey(requestMessage.getProcessorId())
+ ||
samzaApplicationState.pendingProcessors.containsKey(requestMessage.getProcessorId()))
{
+ LOG.info("ContainerPlacement request: {} is en-queued because container
is pending start", requestMessage);
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * A valid container placement action needs a valid processor id. Duplicate
actions are handled by de-duping on uuid.
+ *
+ * @param requestMessage container placement request message
+ * @return Pair<ContainerPlacementMessage.StatusCode, String> which is
status code & response suggesting if the request is valid
+ */
+ private Pair<ContainerPlacementMessage.StatusCode, String>
validatePlacementAction(ContainerPlacementRequestMessage requestMessage) {
+ String errorMessagePrefix = String.format("ContainerPlacement request: %s
is rejected due to", requestMessage);
Boolean invalidAction = false;
String errorMessage = null;
if (standbyContainerManager.isPresent()) {
- errorMessage = String.format("%s not supported for host standby
enabled", errorMessagePrefix);
+ errorMessage = String.format("%s not supported for hot standby enabled",
errorMessagePrefix);
invalidAction = true;
- } else if (processorId == null || destinationHost == null) {
- errorMessage = String.format("%s either processor id or the host
argument is null", errorMessagePrefix);
+ } else if (placementRequestsCache.containsKey(requestMessage.getUuid())) {
+ errorMessage = String.format("%s duplicate UUID of the request, please
retry", errorMessagePrefix);
invalidAction = true;
- } else if (Integer.parseInt(processorId) >=
samzaApplicationState.processorCount.get()) {
+ } else if (Integer.parseInt(requestMessage.getProcessorId()) >=
samzaApplicationState.processorCount.get()
+ ) {
errorMessage = String.format("%s invalid processor id",
errorMessagePrefix);
invalidAction = true;
- } else if (hasActiveContainerPlacementAction(processorId)) {
- errorMessage = String.format("%s existing container placement action on
container with metadata %s", errorMessagePrefix,
- actions.get(processorId));
- invalidAction = true;
- } else if (actions.get(processorId) != null &&
actions.get(processorId).getUuid().equals(uuid)) {
- errorMessage = String.format("%s duplicate Container Placement request
received, previous action taken: %s", errorMessagePrefix,
- actions.get(processorId));
- invalidAction = true;
- } else if
(!samzaApplicationState.runningProcessors.containsKey(processorId)
- || samzaApplicationState.pendingProcessors.containsKey(processorId)) {
- errorMessage = String.format("%s container is either is not running or
is in pending state", errorMessagePrefix);
- invalidAction = true;
}
if (invalidAction) {
diff --git
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
index 6ba7e7f..7f99159 100644
---
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
+++
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
@@ -27,6 +27,7 @@ import java.util.Map;
import java.util.Optional;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.samza.SamzaException;
+import
org.apache.samza.clustermanager.container.placement.ContainerPlacementMetadataStore;
import org.apache.samza.config.ClusterManagerConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
@@ -127,7 +128,8 @@ public class ContainerProcessManager implements
ClusterResourceManager.Callback
private JvmMetrics jvmMetrics;
private Map<String, MetricsReporter> metricsReporters;
- public ContainerProcessManager(Config config, SamzaApplicationState state,
MetricsRegistryMap registry) {
+ public ContainerProcessManager(Config config, SamzaApplicationState state,
MetricsRegistryMap registry,
+ ContainerPlacementMetadataStore metadataStore) {
this.state = state;
this.clusterManagerConfig = new ClusterManagerConfig(config);
this.jobConfig = new JobConfig(config);
@@ -159,7 +161,8 @@ public class ContainerProcessManager implements
ClusterResourceManager.Callback
// Wire all metrics to all reporters
this.metricsReporters.values().forEach(reporter ->
reporter.register(METRICS_SOURCE_NAME, registry));
- this.containerManager = new ContainerManager(state,
clusterResourceManager, hostAffinityEnabled,
jobConfig.getStandbyTasksEnabled());
+ this.containerManager = new ContainerManager(metadataStore, state,
clusterResourceManager, hostAffinityEnabled,
+ jobConfig.getStandbyTasksEnabled());
this.containerAllocator = new
ContainerAllocator(this.clusterResourceManager, config, state,
hostAffinityEnabled, this.containerManager);
this.allocatorThread = new Thread(this.containerAllocator, "Container
Allocator Thread");
@@ -572,7 +575,7 @@ public class ContainerProcessManager implements
ClusterResourceManager.Callback
* @param requestMessage request containing details of the desited container
placement action
*/
public void
registerContainerPlacementAction(ContainerPlacementRequestMessage
requestMessage) {
- // Call the ContainerManager#registerContainerPlacementAction
+ containerManager.registerContainerPlacementAction(requestMessage,
containerAllocator);
}
private Duration getRetryDelay(String processorId) {
diff --git
a/samza-core/src/main/java/org/apache/samza/clustermanager/container/placements/ContainerPlacementMetadata.java
b/samza-core/src/main/java/org/apache/samza/clustermanager/container/placement/ContainerPlacementMetadata.java
similarity index 89%
rename from
samza-core/src/main/java/org/apache/samza/clustermanager/container/placements/ContainerPlacementMetadata.java
rename to
samza-core/src/main/java/org/apache/samza/clustermanager/container/placement/ContainerPlacementMetadata.java
index fc521a1..15c9e1c 100644
---
a/samza-core/src/main/java/org/apache/samza/clustermanager/container/placements/ContainerPlacementMetadata.java
+++
b/samza-core/src/main/java/org/apache/samza/clustermanager/container/placement/ContainerPlacementMetadata.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.samza.clustermanager.container.placements;
+package org.apache.samza.clustermanager.container.placement;
import java.time.Duration;
import java.util.HashSet;
@@ -88,6 +88,10 @@ public class ContainerPlacementMetadata {
return actionStatus;
}
+ public ContainerPlacementRequestMessage getRequestMessage() {
+ return requestMessage;
+ }
+
public synchronized String getResponseMessage() {
return responseMessage;
}
@@ -111,12 +115,16 @@ public class ContainerPlacementMetadata {
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("ContainerPlacementMetadata{");
- sb.append("Request= ").append(requestMessage);
+ sb.append(" UUID: ").append(requestMessage.getUuid());
+ sb.append(", Processor ID: ").append(requestMessage.getProcessorId());
+ sb.append(",
deploymentId='").append(requestMessage.getDeploymentId()).append('\'');
+ sb.append(",
destinationHost='").append(requestMessage.getDestinationHost()).append('\'');
+ sb.append(", requestExpiry=").append(requestMessage.getRequestExpiry());
sb.append(", sourceHost='").append(sourceHost).append('\'');
- sb.append(", resourceRequests=").append(resourceRequests);
sb.append(", actionStatus=").append(actionStatus);
sb.append(", containerStatus=").append(containerStatus);
sb.append(", responseMessage='").append(responseMessage).append('\'');
+ sb.append(", resourceRequests=").append(resourceRequests);
sb.append('}');
return sb.toString();
}
diff --git
a/samza-core/src/main/java/org/apache/samza/clustermanager/container/placement/ContainerPlacementMetadataStore.java
b/samza-core/src/main/java/org/apache/samza/clustermanager/container/placement/ContainerPlacementMetadataStore.java
index 02f698d..593cf3d 100644
---
a/samza-core/src/main/java/org/apache/samza/clustermanager/container/placement/ContainerPlacementMetadataStore.java
+++
b/samza-core/src/main/java/org/apache/samza/clustermanager/container/placement/ContainerPlacementMetadataStore.java
@@ -23,6 +23,7 @@ import com.google.common.base.Preconditions;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
+import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -132,7 +133,7 @@ public class ContainerPlacementMetadataStore {
* only to write responses to Container Placement Action
* @param message
*/
- void
writeContainerPlacementResponseMessage(ContainerPlacementResponseMessage
message) {
+ public void
writeContainerPlacementResponseMessage(ContainerPlacementResponseMessage
message) {
Preconditions.checkState(!stopped, "Underlying metadata store not
available");
Preconditions.checkNotNull(message);
try {
@@ -264,6 +265,8 @@ public class ContainerPlacementMetadataStore {
throw new SamzaException(e);
}
}
+ // Sort the actions in order of timestamp
+
newActions.sort(Comparator.comparingLong(ContainerPlacementRequestMessage::getTimestamp));
return newActions;
}
diff --git
a/samza-core/src/main/java/org/apache/samza/clustermanager/container/placement/ContainerPlacementRequestAllocator.java
b/samza-core/src/main/java/org/apache/samza/clustermanager/container/placement/ContainerPlacementRequestAllocator.java
index f01ac41..5161cfb 100644
---
a/samza-core/src/main/java/org/apache/samza/clustermanager/container/placement/ContainerPlacementRequestAllocator.java
+++
b/samza-core/src/main/java/org/apache/samza/clustermanager/container/placement/ContainerPlacementRequestAllocator.java
@@ -20,12 +20,15 @@ package org.apache.samza.clustermanager.container.placement;
import com.google.common.base.Preconditions;
import org.apache.samza.clustermanager.ContainerProcessManager;
+import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.container.placement.ContainerPlacementRequestMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Stateless handler that periodically dispatches {@link
ContainerPlacementRequestMessage} read from Metadata store to Job Coordinator
+ * Container placement requests from the previous deployment are deleted from
the metadata store, ContainerPlacementRequestAllocatorThread
+ * does this cleanup
*/
public class ContainerPlacementRequestAllocator implements Runnable {
@@ -43,13 +46,18 @@ public class ContainerPlacementRequestAllocator implements
Runnable {
* State that controls the lifecycle of the
ContainerPlacementRequestAllocator thread
*/
private volatile boolean isRunning;
+ /**
+ * RunId of the app
+ */
+ private final String appRunId;
- public ContainerPlacementRequestAllocator(ContainerPlacementMetadataStore
containerPlacementMetadataStore, ContainerProcessManager manager) {
+ public ContainerPlacementRequestAllocator(ContainerPlacementMetadataStore
containerPlacementMetadataStore, ContainerProcessManager manager,
ApplicationConfig config) {
Preconditions.checkNotNull(containerPlacementMetadataStore,
"containerPlacementMetadataStore cannot be null");
Preconditions.checkNotNull(manager, "ContainerProcessManager cannot be
null");
this.containerProcessManager = manager;
this.containerPlacementMetadataStore = containerPlacementMetadataStore;
this.isRunning = true;
+ this.appRunId = config.getRunId();
}
@Override
@@ -59,8 +67,13 @@ public class ContainerPlacementRequestAllocator implements
Runnable {
for (ContainerPlacementRequestMessage message :
containerPlacementMetadataStore.readAllContainerPlacementRequestMessages()) {
// We do not need to dispatch ContainerPlacementResponseMessage
because they are written from JobCoordinator
// in response to a Container Placement Action
- LOG.info("Received a container placement message {}", message);
- containerProcessManager.registerContainerPlacementAction(message);
+ if (message.getDeploymentId().equals(appRunId)) {
+ LOG.debug("Received a container placement message {}", message);
+ containerProcessManager.registerContainerPlacementAction(message);
+ } else {
+ // Delete the ContainerPlacementMessages from the previous
deployment
+
containerPlacementMetadataStore.deleteAllContainerPlacementMessages(message.getUuid());
+ }
}
Thread.sleep(DEFAULT_CLUSTER_MANAGER_CONTAINER_PLACEMENT_HANDLER_SLEEP_MS);
} catch (InterruptedException e) {
diff --git
a/samza-core/src/main/java/org/apache/samza/util/BoundedLinkedHashSet.java
b/samza-core/src/main/java/org/apache/samza/util/BoundedLinkedHashSet.java
new file mode 100644
index 0000000..c99e475
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/util/BoundedLinkedHashSet.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.util;
+
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+/**
+ * LinkedinHashSet of bounded size {@code size} with a FIFO eviction policy
+ *
+ * This class is not thread-safe
+ */
+public class BoundedLinkedHashSet<T> {
+
+ private final int cacheSize;
+ private final Set<T> cache;
+
+ public BoundedLinkedHashSet(int size) {
+ this.cache = new LinkedHashSet<T>();
+ this.cacheSize = size;
+ }
+
+ public boolean containsKey(T element) {
+ return cache.contains(element);
+ }
+
+ public void put(T element) {
+ if (cache.size() > cacheSize) {
+ Iterator iterator = cache.iterator();
+ if (iterator.hasNext()) {
+ iterator.remove();
+ }
+ }
+ cache.add(element);
+ }
+}
diff --git
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
index b7a527d..bf2eabf 100644
---
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
+++
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
@@ -28,11 +28,14 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
+import
org.apache.samza.clustermanager.container.placement.ContainerPlacementMetadataStore;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
import org.apache.samza.container.LocalityManager;
import org.apache.samza.coordinator.JobModelManager;
import org.apache.samza.coordinator.JobModelManagerTestUtil;
+import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
+import
org.apache.samza.coordinator.metadatastore.CoordinatorStreamStoreTestUtil;
import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
import org.apache.samza.testUtils.MockHttpServer;
import org.eclipse.jetty.servlet.DefaultServlet;
@@ -60,7 +63,8 @@ public class TestContainerAllocatorWithHostAffinity {
private final SamzaApplicationState state = new
SamzaApplicationState(jobModelManager);
private final MockClusterResourceManager clusterResourceManager = new
MockClusterResourceManager(callback, state);
- private final ContainerManager containerManager = new
ContainerManager(state, clusterResourceManager, true, false);
+ private ContainerPlacementMetadataStore containerPlacementMetadataStore;
+ private ContainerManager containerManager;
private JobModelManager initializeJobModelManager(Config config, int
containerCount) {
Map<String, Map<String, String>> localityMap = new HashMap<>();
@@ -83,6 +87,12 @@ public class TestContainerAllocatorWithHostAffinity {
@Before
public void setup() throws Exception {
+ CoordinatorStreamStoreTestUtil coordinatorStreamStoreTestUtil = new
CoordinatorStreamStoreTestUtil(config);
+ CoordinatorStreamStore coordinatorStreamStore =
coordinatorStreamStoreTestUtil.getCoordinatorStreamStore();
+ coordinatorStreamStore.init();
+ containerPlacementMetadataStore = new
ContainerPlacementMetadataStore(coordinatorStreamStore);
+ containerPlacementMetadataStore.start();
+ containerManager = new ContainerManager(containerPlacementMetadataStore,
state, clusterResourceManager, true, false);
containerAllocator =
new ContainerAllocator(clusterResourceManager, config, state, true,
containerManager);
requestState = new MockContainerRequestState(clusterResourceManager, true);
@@ -361,7 +371,8 @@ public class TestContainerAllocatorWithHostAffinity {
public void testRequestAllocationOnPreferredHostWithRunStreamProcessor()
throws Exception {
ClusterResourceManager.Callback mockCPM =
mock(MockClusterResourceManagerCallback.class);
ClusterResourceManager mockClusterResourceManager = new
MockClusterResourceManager(mockCPM, state);
- ContainerManager containerManager = new ContainerManager(state,
mockClusterResourceManager, true, false);
+ ContainerManager containerManager =
+ new ContainerManager(containerPlacementMetadataStore, state,
mockClusterResourceManager, true, false);
// Mock the callback from ClusterManager to add resources to the allocator
doAnswer((InvocationOnMock invocation) -> {
SamzaResource resource = (SamzaResource) invocation.getArgumentAt(0,
List.class).get(0);
@@ -407,7 +418,8 @@ public class TestContainerAllocatorWithHostAffinity {
@Test
public void testExpiredRequestAllocationOnAnyHost() throws Exception {
MockClusterResourceManager spyManager = spy(new
MockClusterResourceManager(callback, state));
- ContainerManager spyContainerManager = spy(new ContainerManager(state,
spyManager, true, false));
+ ContainerManager spyContainerManager =
+ spy(new ContainerManager(containerPlacementMetadataStore, state,
spyManager, true, false));
spyAllocator = Mockito.spy(
new ContainerAllocator(spyManager, config, state, true,
spyContainerManager));
// Request Preferred Resources
@@ -450,7 +462,8 @@ public class TestContainerAllocatorWithHostAffinity {
public void
testExpiredRequestAllocationOnSurplusAnyHostWithRunStreamProcessor() throws
Exception {
// Add Extra Resources
MockClusterResourceManager spyClusterResourceManager = spy(new
MockClusterResourceManager(callback, state));
- ContainerManager spyContainerManager = spy(new ContainerManager(state,
spyClusterResourceManager, true, false));
+ ContainerManager spyContainerManager =
+ spy(new ContainerManager(containerPlacementMetadataStore, state,
spyClusterResourceManager, true, false));
spyAllocator = Mockito.spy(
new ContainerAllocator(spyClusterResourceManager, config, state, true,
spyContainerManager));
@@ -596,6 +609,8 @@ public class TestContainerAllocatorWithHostAffinity {
put("systems.test-system.samza.key.serde",
"org.apache.samza.serializers.JsonSerde");
put("systems.test-system.samza.msg.serde",
"org.apache.samza.serializers.JsonSerde");
put("job.host-affinity.enabled", "true");
+ put("job.name", "test-job");
+ put("job.coordinator.system", "test-kafka");
}
});
diff --git
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java
index b81e249..9d55218 100644
---
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java
+++
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java
@@ -23,10 +23,13 @@ import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import
org.apache.samza.clustermanager.container.placement.ContainerPlacementMetadataStore;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
import org.apache.samza.coordinator.JobModelManager;
import org.apache.samza.coordinator.JobModelManagerTestUtil;
+import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
+import
org.apache.samza.coordinator.metadatastore.CoordinatorStreamStoreTestUtil;
import org.apache.samza.testUtils.MockHttpServer;
import org.eclipse.jetty.servlet.DefaultServlet;
import org.eclipse.jetty.servlet.ServletHolder;
@@ -57,6 +60,9 @@ public class TestContainerAllocatorWithoutHostAffinity {
private final SamzaApplicationState state = new
SamzaApplicationState(jobModelManager);
private final MockClusterResourceManager manager = new
MockClusterResourceManager(callback, state);
+ private CoordinatorStreamStore coordinatorStreamStore;
+ private ContainerPlacementMetadataStore containerPlacementMetadataStore;
+
private ContainerAllocator containerAllocator;
private MockContainerRequestState requestState;
private Thread allocatorThread;
@@ -66,8 +72,13 @@ public class TestContainerAllocatorWithoutHostAffinity {
@Before
public void setup() throws Exception {
+ CoordinatorStreamStoreTestUtil coordinatorStreamStoreTestUtil = new
CoordinatorStreamStoreTestUtil(config);
+ CoordinatorStreamStore coordinatorStreamStore =
coordinatorStreamStoreTestUtil.getCoordinatorStreamStore();
+ coordinatorStreamStore.init();
+ containerPlacementMetadataStore = new
ContainerPlacementMetadataStore(coordinatorStreamStore);
+ containerPlacementMetadataStore.start();
containerAllocator = new ContainerAllocator(manager, config, state, false,
- new ContainerManager(state, manager, false, false));
+ new ContainerManager(containerPlacementMetadataStore, state, manager,
false, false));
requestState = new MockContainerRequestState(manager, false);
Field requestStateField =
containerAllocator.getClass().getDeclaredField("resourceRequestState");
requestStateField.setAccessible(true);
@@ -94,6 +105,8 @@ public class TestContainerAllocatorWithoutHostAffinity {
put("systems.test-system.samza.factory",
"org.apache.samza.system.MockSystemFactory");
put("systems.test-system.samza.key.serde",
"org.apache.samza.serializers.JsonSerde");
put("systems.test-system.samza.msg.serde",
"org.apache.samza.serializers.JsonSerde");
+ put("job.name", "test-job");
+ put("job.coordinator.system", "test-kafka");
}
});
@@ -262,7 +275,8 @@ public class TestContainerAllocatorWithoutHostAffinity {
ClusterResourceManager.Callback mockCPM =
mock(ClusterResourceManager.Callback.class);
ClusterResourceManager mockManager = new
MockClusterResourceManager(mockCPM, state);
- ContainerManager spyContainerManager = spy(new ContainerManager(state,
mockManager, false, false));
+ ContainerManager spyContainerManager =
+ spy(new ContainerManager(containerPlacementMetadataStore, state,
mockManager, false, false));
spyAllocator = Mockito.spy(
new ContainerAllocator(mockManager, config, state, false,
spyContainerManager));
// Mock the callback from ClusterManager to add resources to the allocator
diff --git
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java
index fd0008e..b433cba 100644
---
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java
+++
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java
@@ -28,15 +28,21 @@ import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.RandomStringUtils;
-import
org.apache.samza.clustermanager.container.placements.ContainerPlacementMetadata;
+import
org.apache.samza.clustermanager.container.placement.ContainerPlacementMetadataStore;
+import
org.apache.samza.clustermanager.container.placement.ContainerPlacementRequestAllocator;
+import
org.apache.samza.clustermanager.container.placement.ContainerPlacementMetadata;
+import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.ClusterManagerConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
import org.apache.samza.container.LocalityManager;
import org.apache.samza.container.placement.ContainerPlacementMessage;
import org.apache.samza.container.placement.ContainerPlacementRequestMessage;
+import org.apache.samza.container.placement.ContainerPlacementResponseMessage;
import org.apache.samza.coordinator.JobModelManager;
import org.apache.samza.coordinator.JobModelManagerTestUtil;
+import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
+import
org.apache.samza.coordinator.metadatastore.CoordinatorStreamStoreTestUtil;
import org.apache.samza.coordinator.server.HttpServer;
import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
import org.apache.samza.metrics.MetricsRegistryMap;
@@ -53,6 +59,9 @@ import org.mockito.stubbing.Answer;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
+/**
+ * Set of Integration tests for container placement actions
+ */
@RunWith(MockitoJUnitRunner.class)
public class TestContainerPlacementActions {
@@ -71,11 +80,23 @@ public class TestContainerPlacementActions {
put("systems.test-system.samza.factory",
"org.apache.samza.system.MockSystemFactory");
put("systems.test-system.samza.key.serde",
"org.apache.samza.serializers.JsonSerde");
put("systems.test-system.samza.msg.serde",
"org.apache.samza.serializers.JsonSerde");
+ put("job.name", "test-job");
+ put("job.coordinator.system", "test-kafka");
+ put("app.run.id", "appAttempt-001");
}
};
private Config config = new MapConfig(configVals);
+ private CoordinatorStreamStore coordinatorStreamStore;
+ private ContainerPlacementMetadataStore containerPlacementMetadataStore;
+
+ private SamzaApplicationState state;
+ private ContainerManager containerManager;
+ private MockContainerAllocatorWithHostAffinity allocatorWithHostAffinity;
+ private ContainerProcessManager cpm;
+ private ClusterResourceManager.Callback callback;
+
private Config getConfig() {
Map<String, String> map = new HashMap<>();
map.putAll(config);
@@ -108,26 +129,27 @@ public class TestContainerPlacementActions {
@Before
public void setup() throws Exception {
server = new MockHttpServer("/", 7777, null, new
ServletHolder(DefaultServlet.class));
- }
-
- @Test(timeout = 10000)
- public void testContainerSuccessfulMoveAction() throws Exception {
+ // Utils Related to Container Placement Metadata store
+ CoordinatorStreamStoreTestUtil coordinatorStreamStoreTestUtil = new
CoordinatorStreamStoreTestUtil(config);
+ coordinatorStreamStore =
coordinatorStreamStoreTestUtil.getCoordinatorStreamStore();
+ coordinatorStreamStore.init();
+ containerPlacementMetadataStore = new
ContainerPlacementMetadataStore(coordinatorStreamStore);
+ containerPlacementMetadataStore.start();
+ // Utils Related to Cluster manager:
Map<String, String> conf = new HashMap<>();
conf.putAll(getConfigWithHostAffinityAndRetries(true, 1, true));
-
- SamzaApplicationState state =
- new
SamzaApplicationState(getJobModelManagerWithHostAffinity(ImmutableMap.of("0",
"host-1", "1", "host-2")));
-
- ClusterResourceManager.Callback callback =
mock(ClusterResourceManager.Callback.class);
+ state = new
SamzaApplicationState(getJobModelManagerWithHostAffinity(ImmutableMap.of("0",
"host-1", "1", "host-2")));
+ callback = mock(ClusterResourceManager.Callback.class);
MockClusterResourceManager clusterResourceManager = new
MockClusterResourceManager(callback, state);
ClusterManagerConfig clusterManagerConfig = new
ClusterManagerConfig(config);
- ContainerManager containerManager = spy(new ContainerManager(state,
clusterResourceManager, true, false));
- MockContainerAllocatorWithHostAffinity allocatorWithHostAffinity =
- new MockContainerAllocatorWithHostAffinity(clusterResourceManager,
config, state, containerManager);
- ContainerProcessManager cpm =
- new ContainerProcessManager(clusterManagerConfig, state, new
MetricsRegistryMap(),
+ containerManager = spy(new
ContainerManager(containerPlacementMetadataStore, state,
clusterResourceManager, true, false));
+ allocatorWithHostAffinity = new
MockContainerAllocatorWithHostAffinity(clusterResourceManager, config, state,
containerManager);
+ cpm = new ContainerProcessManager(clusterManagerConfig, state, new
MetricsRegistryMap(),
clusterResourceManager, Optional.of(allocatorWithHostAffinity),
containerManager);
+ }
+ @Test(timeout = 10000)
+ public void testContainerSuccessfulMoveAction() throws Exception {
doAnswer(new Answer<Void>() {
public Void answer(InvocationOnMock invocation) {
new Thread(() -> {
@@ -179,6 +201,7 @@ public class TestContainerPlacementActions {
ContainerPlacementRequestMessage requestMessage =
new ContainerPlacementRequestMessage(UUID.randomUUID(),
"appAttempt-001", "0", "host-3",
System.currentTimeMillis());
+
ContainerPlacementMetadata metadata =
containerManager.registerContainerPlacementActionForTest(requestMessage,
allocatorWithHostAffinity);
@@ -197,24 +220,120 @@ public class TestContainerPlacementActions {
assertEquals(state.runningProcessors.get("1").getHost(), "host-2");
assertEquals(state.anyHostRequests.get(), 0);
assertEquals(metadata.getActionStatus(),
ContainerPlacementMessage.StatusCode.SUCCEEDED);
+
+ Optional<ContainerPlacementResponseMessage> responseMessage =
+
containerPlacementMetadataStore.readContainerPlacementResponseMessage(requestMessage.getUuid());
+
+ assertTrue(responseMessage.isPresent());
+ assertEquals(responseMessage.get().getStatusCode(),
ContainerPlacementMessage.StatusCode.SUCCEEDED);
+ assertResponseMessage(responseMessage.get(), requestMessage);
}
- @Test(timeout = 10000)
- public void
testContainerMoveActionExpiredRequestNotAffectRunningContainers() throws
Exception {
- Map<String, String> conf = new HashMap<>();
- conf.putAll(getConfigWithHostAffinityAndRetries(true, 1, true));
+ @Test(timeout = 30000)
+ public void testActionQueuingForConsecutivePlacementActions() throws
Exception {
+ // Spawn a Request Allocator Thread
+ Thread requestAllocatorThread = new Thread(
+ new
ContainerPlacementRequestAllocator(containerPlacementMetadataStore, cpm, new
ApplicationConfig(config)),
+ "ContainerPlacement Request Allocator Thread");
+ requestAllocatorThread.start();
- SamzaApplicationState state =
- new
SamzaApplicationState(getJobModelManagerWithHostAffinity(ImmutableMap.of("0",
"host-1", "1", "host-2")));
+ doAnswer(new Answer<Void>() {
+ public Void answer(InvocationOnMock invocation) {
+ new Thread(() -> {
+ Object[] args = invocation.getArguments();
+ cpm.onResourcesAvailable((List<SamzaResource>) args[0]);
+ }, "AMRMClientAsync").start();
+ return null;
+ }
+ }).when(callback).onResourcesAvailable(anyList());
- ClusterResourceManager.Callback callback =
mock(ClusterResourceManager.Callback.class);
- MockClusterResourceManager clusterResourceManager = new
MockClusterResourceManager(callback, state);
- ContainerManager containerManager = spy(new ContainerManager(state,
clusterResourceManager, true, false));
- MockContainerAllocatorWithHostAffinity allocatorWithHostAffinity =
- new MockContainerAllocatorWithHostAffinity(clusterResourceManager,
config, state, containerManager);
- ContainerProcessManager cpm =
- new ContainerProcessManager(new ClusterManagerConfig(new
MapConfig(conf)), state, new MetricsRegistryMap(),
- clusterResourceManager, Optional.of(allocatorWithHostAffinity),
containerManager);
+ doAnswer(new Answer<Void>() {
+ public Void answer(InvocationOnMock invocation) {
+ new Thread(() -> {
+ Object[] args = invocation.getArguments();
+ cpm.onStreamProcessorLaunchSuccess((SamzaResource) args[0]);
+ }, "AMRMClientAsync").start();
+ return null;
+ }
+ }).when(callback).onStreamProcessorLaunchSuccess(any());
+
+ doAnswer(new Answer<Void>() {
+ public Void answer(InvocationOnMock invocation) {
+ new Thread(() -> {
+ Object[] args = invocation.getArguments();
+ cpm.onResourcesCompleted((List<SamzaResourceStatus>) args[0]);
+ }, "AMRMClientAsync").start();
+ return null;
+ }
+ }).when(callback).onResourcesCompleted(anyList());
+
+ cpm.start();
+
+ if (!allocatorWithHostAffinity.awaitContainersStart(2, 5,
TimeUnit.SECONDS)) {
+ fail("timed out waiting for the containers to start");
+ }
+
+ while (state.runningProcessors.size() != 2) {
+ Thread.sleep(100);
+ }
+
+ // App is in running state with two containers running
+ assertEquals(state.runningProcessors.size(), 2);
+ assertEquals(state.runningProcessors.get("0").getHost(), "host-1");
+ assertEquals(state.runningProcessors.get("1").getHost(), "host-2");
+ assertEquals(state.preferredHostRequests.get(), 2);
+ assertEquals(state.anyHostRequests.get(), 0);
+
+ // Initiate container placement action to move a container with container
id 0
+
+ UUID requestUUIDMove1 =
containerPlacementMetadataStore.writeContainerPlacementRequestMessage("appAttempt-001",
"0", "host-3",
+ null, System.currentTimeMillis());
+
+ UUID requestUUIDMoveBad =
containerPlacementMetadataStore.writeContainerPlacementRequestMessage("appAttempt-002",
"0", "host-4",
+ null, System.currentTimeMillis());
+
+ UUID requestUUIDMove2 =
containerPlacementMetadataStore.writeContainerPlacementRequestMessage("appAttempt-001",
"0", "host-4",
+ null, System.currentTimeMillis());
+
+ // Wait for the ControlAction to complete
+ while (true) {
+ if
(containerPlacementMetadataStore.readContainerPlacementResponseMessage(requestUUIDMove2).isPresent()
&&
+
containerPlacementMetadataStore.readContainerPlacementResponseMessage(requestUUIDMove2).get().getStatusCode()
+ == ContainerPlacementMessage.StatusCode.SUCCEEDED) {
+ break;
+ }
+ Thread.sleep(Duration.ofSeconds(5).toMillis());
+ }
+
+ assertEquals(state.preferredHostRequests.get(), 4);
+ assertEquals(state.runningProcessors.size(), 2);
+ assertEquals(state.runningProcessors.get("0").getHost(), "host-4");
+ assertEquals(state.runningProcessors.get("1").getHost(), "host-2");
+ assertEquals(state.anyHostRequests.get(), 0);
+
+ Optional<ContainerPlacementResponseMessage> responseMessageMove1 =
+
containerPlacementMetadataStore.readContainerPlacementResponseMessage(requestUUIDMove1);
+
+ Optional<ContainerPlacementResponseMessage> responseMessageMove2 =
+
containerPlacementMetadataStore.readContainerPlacementResponseMessage(requestUUIDMove2);
+
+ assertTrue(responseMessageMove1.isPresent());
+ assertEquals(responseMessageMove1.get().getStatusCode(),
ContainerPlacementMessage.StatusCode.SUCCEEDED);
+
+ assertTrue(responseMessageMove2.isPresent());
+ assertEquals(responseMessageMove2.get().getStatusCode(),
ContainerPlacementMessage.StatusCode.SUCCEEDED);
+
+ // Request should be deleted as soon as ita accepted / being acted upon
+
assertFalse(containerPlacementMetadataStore.readContainerPlacementRequestMessage(requestUUIDMove1).isPresent());
+
assertFalse(containerPlacementMetadataStore.readContainerPlacementRequestMessage(requestUUIDMove2).isPresent());
+
+ // Requests from Previous deploy must be cleaned
+
assertFalse(containerPlacementMetadataStore.readContainerPlacementRequestMessage(requestUUIDMoveBad).isPresent());
+
assertFalse(containerPlacementMetadataStore.readContainerPlacementResponseMessage(requestUUIDMoveBad).isPresent());
+ }
+
+ @Test(timeout = 10000)
+ public void
testContainerMoveActionExpiredRequestNotAffectRunningContainers() throws
Exception {
// Mimic the behavior of Expired request
doAnswer(new Answer<Void>() {
@@ -276,25 +395,19 @@ public class TestContainerPlacementActions {
assertEquals(state.runningProcessors.get("0").getHost(), "host-1");
assertEquals(state.runningProcessors.get("1").getHost(), "host-2");
assertEquals(state.anyHostRequests.get(), 0);
+
+ Optional<ContainerPlacementResponseMessage> responseMessage =
+
containerPlacementMetadataStore.readContainerPlacementResponseMessage(requestMessage.getUuid());
+
+ assertTrue(responseMessage.isPresent());
+ assertEquals(responseMessage.get().getStatusCode(),
ContainerPlacementMessage.StatusCode.FAILED);
+ assertResponseMessage(responseMessage.get(), requestMessage);
+ // Request shall be deleted as soon as it is acted upon
+
assertFalse(containerPlacementMetadataStore.readContainerPlacementRequestMessage(requestMessage.getUuid()).isPresent());
}
@Test(timeout = 10000)
public void
testActiveContainerLaunchFailureOnControlActionShouldFallbackToSourceHost()
throws Exception {
- Map<String, String> conf = new HashMap<>();
- conf.putAll(getConfigWithHostAffinityAndRetries(true, 1, true));
-
- SamzaApplicationState state =
- new
SamzaApplicationState(getJobModelManagerWithHostAffinity(ImmutableMap.of("0",
"host-1", "1", "host-2")));
-
- ClusterResourceManager.Callback callback =
mock(ClusterResourceManager.Callback.class);
- MockClusterResourceManager clusterResourceManager = new
MockClusterResourceManager(callback, state);
- ContainerManager containerManager = spy(new ContainerManager(state,
clusterResourceManager, true, false));
- MockContainerAllocatorWithHostAffinity allocatorWithHostAffinity =
- new MockContainerAllocatorWithHostAffinity(clusterResourceManager,
config, state, containerManager);
- ContainerProcessManager cpm =
- new ContainerProcessManager(new ClusterManagerConfig(new
MapConfig(conf)), state, new MetricsRegistryMap(),
- clusterResourceManager, Optional.of(allocatorWithHostAffinity),
containerManager);
-
doAnswer(new Answer<Void>() {
public Void answer(InvocationOnMock invocation) {
new Thread(() -> {
@@ -375,6 +488,16 @@ public class TestContainerPlacementActions {
assertEquals(state.anyHostRequests.get(), 0);
// Control Action should be failed in this case
assertEquals(metadata.getActionStatus(),
ContainerPlacementMessage.StatusCode.FAILED);
+
+ Optional<ContainerPlacementResponseMessage> responseMessage =
+
containerPlacementMetadataStore.readContainerPlacementResponseMessage(requestMessage.getUuid());
+
+ assertTrue(responseMessage.isPresent());
+ assertEquals(responseMessage.get().getStatusCode(),
ContainerPlacementMessage.StatusCode.FAILED);
+ assertResponseMessage(responseMessage.get(), requestMessage);
+
+ // Request shall be deleted as soon as it is acted upon
+
assertFalse(containerPlacementMetadataStore.readContainerPlacementRequestMessage(requestMessage.getUuid()).isPresent());
}
@Test(timeout = 10000)
@@ -385,7 +508,8 @@ public class TestContainerPlacementActions {
new
SamzaApplicationState(getJobModelManagerWithHostAffinity(ImmutableMap.of("0",
"host-1", "1", "host-2")));
ClusterResourceManager.Callback callback =
mock(ClusterResourceManager.Callback.class);
MockClusterResourceManager clusterResourceManager = new
MockClusterResourceManager(callback, state);
- ContainerManager containerManager = new ContainerManager(state,
clusterResourceManager, false, false);
+ ContainerManager containerManager =
+ new ContainerManager(containerPlacementMetadataStore, state,
clusterResourceManager, false, false);
MockContainerAllocatorWithoutHostAffinity allocatorWithoutHostAffinity =
new MockContainerAllocatorWithoutHostAffinity(clusterResourceManager,
new MapConfig(conf), state,
containerManager);
@@ -479,6 +603,36 @@ public class TestContainerPlacementActions {
assertEquals(3, state.anyHostRequests.get());
// Action should success
assertEquals(ContainerPlacementMessage.StatusCode.SUCCEEDED,
metadata.getActionStatus());
+
+ Optional<ContainerPlacementResponseMessage> responseMessage =
+
containerPlacementMetadataStore.readContainerPlacementResponseMessage(requestMessage.getUuid());
+
+ assertTrue(responseMessage.isPresent());
+ assertEquals(responseMessage.get().getStatusCode(),
ContainerPlacementMessage.StatusCode.SUCCEEDED);
+ assertResponseMessage(responseMessage.get(), requestMessage);
+
+ /**
+ * Inject a duplicate request and check it is not accepted
+ */
+ ContainerPlacementRequestMessage duplicateRequestToBeIgnored =
+ new ContainerPlacementRequestMessage(requestMessage.getUuid(),
"app-attempt-001", "1",
+ "host-3", System.currentTimeMillis());
+
+ // Request with a dup uuid should not be accepted
+ metadata =
containerManager.registerContainerPlacementActionForTest(duplicateRequestToBeIgnored,
+ allocatorWithoutHostAffinity);
+ // metadata should be from the previous completed action
+ assertTrue(metadata == null || metadata.getUuid() !=
duplicateRequestToBeIgnored.getUuid());
+
+ responseMessage =
+
containerPlacementMetadataStore.readContainerPlacementResponseMessage(requestMessage.getUuid());
+
+ assertTrue(responseMessage.isPresent());
+ assertEquals(responseMessage.get().getStatusCode(),
ContainerPlacementMessage.StatusCode.BAD_REQUEST);
+ assertResponseMessage(responseMessage.get(), duplicateRequestToBeIgnored);
+
+ // Request shall be deleted as soon as it is acted upon
+
assertFalse(containerPlacementMetadataStore.readContainerPlacementRequestMessage(requestMessage.getUuid()).isPresent());
}
@Test(expected = NullPointerException.class)
@@ -487,7 +641,8 @@ public class TestContainerPlacementActions {
new
SamzaApplicationState(getJobModelManagerWithHostAffinity(ImmutableMap.of("0",
"host-1", "1", "host-2")));
ClusterResourceManager.Callback callback =
mock(ClusterResourceManager.Callback.class);
MockClusterResourceManager clusterResourceManager = new
MockClusterResourceManager(callback, state);
- ContainerManager containerManager = spy(new ContainerManager(state,
clusterResourceManager, true, false));
+ ContainerManager containerManager =
+ spy(new ContainerManager(containerPlacementMetadataStore, state,
clusterResourceManager, true, false));
MockContainerAllocatorWithHostAffinity allocatorWithHostAffinity =
new MockContainerAllocatorWithHostAffinity(clusterResourceManager,
config, state, containerManager);
ContainerProcessManager cpm = new ContainerProcessManager(
@@ -521,6 +676,13 @@ public class TestContainerPlacementActions {
assertBadRequests("2", "host8", containerManager,
allocatorWithHostAffinity);
}
+ private void assertResponseMessage(ContainerPlacementResponseMessage
responseMessage,
+ ContainerPlacementRequestMessage requestMessage) {
+ assertEquals(responseMessage.getProcessorId(),
requestMessage.getProcessorId());
+ assertEquals(responseMessage.getDeploymentId(),
requestMessage.getDeploymentId());
+ assertEquals(responseMessage.getDestinationHost(),
requestMessage.getDestinationHost());
+ }
+
private void assertBadRequests(String processorId, String destinationHost,
ContainerManager containerManager,
ContainerAllocator allocator) {
ContainerPlacementRequestMessage requestMessage =
@@ -529,5 +691,14 @@ public class TestContainerPlacementActions {
ContainerPlacementMetadata metadata =
containerManager.registerContainerPlacementActionForTest(requestMessage,
allocator);
assertNull(metadata);
+
+ Optional<ContainerPlacementResponseMessage> responseMessage =
+
containerPlacementMetadataStore.readContainerPlacementResponseMessage(requestMessage.getUuid());
+
+ assertTrue(responseMessage.isPresent());
+ assertEquals(responseMessage.get().getStatusCode(),
ContainerPlacementMessage.StatusCode.BAD_REQUEST);
+ assertResponseMessage(responseMessage.get(), requestMessage);
+ // Request shall be deleted as soon as it is acted upon
+
assertFalse(containerPlacementMetadataStore.readContainerPlacementRequestMessage(requestMessage.getUuid()).isPresent());
}
}
diff --git
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
index eca6d6c..14771e7 100644
---
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
+++
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
@@ -28,12 +28,15 @@ import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import
org.apache.samza.clustermanager.container.placement.ContainerPlacementMetadataStore;
import org.apache.samza.config.ClusterManagerConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
import org.apache.samza.container.LocalityManager;
import org.apache.samza.coordinator.JobModelManager;
import org.apache.samza.coordinator.JobModelManagerTestUtil;
+import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
+import
org.apache.samza.coordinator.metadatastore.CoordinatorStreamStoreTestUtil;
import org.apache.samza.coordinator.server.HttpServer;
import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
import org.apache.samza.metrics.MetricsRegistryMap;
@@ -77,9 +80,12 @@ public class TestContainerProcessManager {
put("systems.test-system.samza.factory",
"org.apache.samza.system.MockSystemFactory");
put("systems.test-system.samza.key.serde",
"org.apache.samza.serializers.JsonSerde");
put("systems.test-system.samza.msg.serde",
"org.apache.samza.serializers.JsonSerde");
+ put("job.name", "test-job");
+ put("job.coordinator.system", "test-kafka");
}
};
private Config config = new MapConfig(configVals);
+ private ContainerPlacementMetadataStore containerPlacementMetadataStore;
private Config getConfig() {
Map<String, String> map = new HashMap<>();
@@ -122,6 +128,11 @@ public class TestContainerProcessManager {
@Before
public void setup() throws Exception {
server = new MockHttpServer("/", 7777, null, new
ServletHolder(DefaultServlet.class));
+ CoordinatorStreamStoreTestUtil coordinatorStreamStoreTestUtil = new
CoordinatorStreamStoreTestUtil(config);
+ CoordinatorStreamStore coordinatorStreamStore =
coordinatorStreamStoreTestUtil.getCoordinatorStreamStore();
+ coordinatorStreamStore.init();
+ containerPlacementMetadataStore = new
ContainerPlacementMetadataStore(coordinatorStreamStore);
+ containerPlacementMetadataStore.start();
}
private Field getPrivateFieldFromCpm(String fieldName,
ContainerProcessManager object) throws Exception {
@@ -141,7 +152,8 @@ public class TestContainerProcessManager {
SamzaApplicationState state = new
SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
MockClusterResourceManagerCallback callback = new
MockClusterResourceManagerCallback();
MockClusterResourceManager clusterResourceManager = new
MockClusterResourceManager(callback, state);
- ContainerManager containerManager = new ContainerManager(state,
clusterResourceManager, true, false);
+ ContainerManager containerManager =
+ new ContainerManager(containerPlacementMetadataStore, state,
clusterResourceManager, true, false);
ContainerProcessManager cpm =
buildContainerProcessManager(new ClusterManagerConfig(new
MapConfig(conf)), state, clusterResourceManager, Optional.empty());
@@ -185,7 +197,8 @@ public class TestContainerProcessManager {
ClusterResourceManager clusterResourceManager = new
MockClusterResourceManager(callback, state);
ClusterManagerConfig clusterManagerConfig = spy(new
ClusterManagerConfig(conf));
ContainerManager containerManager =
- new ContainerManager(state, clusterResourceManager,
clusterManagerConfig.getHostAffinityEnabled(), false);
+ new ContainerManager(containerPlacementMetadataStore, state,
clusterResourceManager,
+ clusterManagerConfig.getHostAffinityEnabled(), false);
ContainerProcessManager cpm =
buildContainerProcessManager(clusterManagerConfig, state,
clusterResourceManager, Optional.empty());
@@ -252,7 +265,8 @@ public class TestContainerProcessManager {
MockClusterResourceManager clusterResourceManager = new
MockClusterResourceManager(callback, state);
ClusterManagerConfig clusterManagerConfig = spy(new
ClusterManagerConfig(conf));
ContainerManager containerManager =
- new ContainerManager(state, clusterResourceManager,
clusterManagerConfig.getHostAffinityEnabled(), false);
+ new ContainerManager(containerPlacementMetadataStore, state,
clusterResourceManager,
+ clusterManagerConfig.getHostAffinityEnabled(), false);
MockContainerAllocatorWithoutHostAffinity allocator = new
MockContainerAllocatorWithoutHostAffinity(
clusterResourceManager,
@@ -299,7 +313,8 @@ public class TestContainerProcessManager {
MockClusterResourceManager clusterResourceManager = new
MockClusterResourceManager(callback, state);
ClusterManagerConfig clusterManagerConfig = spy(new
ClusterManagerConfig(conf));
ContainerManager containerManager =
- new ContainerManager(state, clusterResourceManager,
clusterManagerConfig.getHostAffinityEnabled(), false);
+ new ContainerManager(containerPlacementMetadataStore, state,
clusterResourceManager,
+ clusterManagerConfig.getHostAffinityEnabled(), false);
MockContainerAllocatorWithoutHostAffinity allocator = new
MockContainerAllocatorWithoutHostAffinity(
clusterResourceManager,
@@ -397,7 +412,8 @@ public class TestContainerProcessManager {
MockClusterResourceManagerCallback callback = new
MockClusterResourceManagerCallback();
MockClusterResourceManager clusterResourceManager = new
MockClusterResourceManager(callback, state);
ContainerManager containerManager =
- new ContainerManager(state, clusterResourceManager,
clusterManagerConfig.getHostAffinityEnabled(), false);
+ new ContainerManager(containerPlacementMetadataStore, state,
clusterResourceManager,
+ clusterManagerConfig.getHostAffinityEnabled(), false);
MockContainerAllocatorWithoutHostAffinity allocator = new
MockContainerAllocatorWithoutHostAffinity(
clusterResourceManager,
@@ -482,7 +498,8 @@ public class TestContainerProcessManager {
MockClusterResourceManagerCallback callback = new
MockClusterResourceManagerCallback();
MockClusterResourceManager clusterResourceManager = new
MockClusterResourceManager(callback, state);
ContainerManager containerManager =
- new ContainerManager(state, clusterResourceManager,
clusterManagerConfig.getHostAffinityEnabled(), false);
+ new ContainerManager(containerPlacementMetadataStore, state,
clusterResourceManager,
+ clusterManagerConfig.getHostAffinityEnabled(), false);
MockContainerAllocatorWithoutHostAffinity allocator = new
MockContainerAllocatorWithoutHostAffinity(
clusterResourceManager,
@@ -591,7 +608,8 @@ public class TestContainerProcessManager {
MockClusterResourceManager clusterResourceManager = new
MockClusterResourceManager(callback, state);
ClusterManagerConfig clusterManagerConfig = spy(new
ClusterManagerConfig(conf));
ContainerManager containerManager =
- new ContainerManager(state, clusterResourceManager,
clusterManagerConfig.getHostAffinityEnabled(), false);
+ new ContainerManager(containerPlacementMetadataStore, state,
clusterResourceManager,
+ clusterManagerConfig.getHostAffinityEnabled(), false);
MockContainerAllocatorWithoutHostAffinity allocator = new
MockContainerAllocatorWithoutHostAffinity(
clusterResourceManager,
@@ -631,7 +649,7 @@ public class TestContainerProcessManager {
configMap.putAll(getConfig());
MockClusterResourceManagerCallback callback = new
MockClusterResourceManagerCallback();
MockClusterResourceManager clusterResourceManager = new
MockClusterResourceManager(callback, state);
- ContainerManager containerManager = new ContainerManager(state,
clusterResourceManager,
+ ContainerManager containerManager = new
ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager,
Boolean.valueOf(config.get(ClusterManagerConfig.HOST_AFFINITY_ENABLED)), false);
MockContainerAllocatorWithoutHostAffinity allocator = new
MockContainerAllocatorWithoutHostAffinity(
@@ -666,7 +684,7 @@ public class TestContainerProcessManager {
"1", "host2")));
MockClusterResourceManagerCallback callback = new
MockClusterResourceManagerCallback();
MockClusterResourceManager clusterResourceManager = new
MockClusterResourceManager(callback, state);
- ContainerManager containerManager = new ContainerManager(state,
clusterResourceManager,
+ ContainerManager containerManager = new
ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager,
Boolean.valueOf(config.get(ClusterManagerConfig.HOST_AFFINITY_ENABLED)), false);
MockContainerAllocatorWithHostAffinity allocator = new
MockContainerAllocatorWithHostAffinity(
@@ -731,7 +749,8 @@ public class TestContainerProcessManager {
MockClusterResourceManager clusterResourceManager = new
MockClusterResourceManager(callback, state);
ClusterManagerConfig clusterManagerConfig = spy(new
ClusterManagerConfig(new MapConfig(conf)));
ContainerManager containerManager =
- new ContainerManager(state, clusterResourceManager,
clusterManagerConfig.getHostAffinityEnabled(), false);
+ new ContainerManager(containerPlacementMetadataStore, state,
clusterResourceManager,
+ clusterManagerConfig.getHostAffinityEnabled(), false);
MockContainerAllocatorWithoutHostAffinity allocator = new
MockContainerAllocatorWithoutHostAffinity(
clusterResourceManager,
@@ -805,7 +824,8 @@ public class TestContainerProcessManager {
MockClusterResourceManager clusterResourceManager = new
MockClusterResourceManager(callback, state);
ClusterManagerConfig clusterManagerConfig = spy(new
ClusterManagerConfig(new MapConfig(config)));
ContainerManager containerManager =
- new ContainerManager(state, clusterResourceManager,
clusterManagerConfig.getHostAffinityEnabled(), false);
+ new ContainerManager(containerPlacementMetadataStore, state,
clusterResourceManager,
+ clusterManagerConfig.getHostAffinityEnabled(), false);
MockContainerAllocatorWithoutHostAffinity allocator = new
MockContainerAllocatorWithoutHostAffinity(
clusterResourceManager,
@@ -897,7 +917,8 @@ public class TestContainerProcessManager {
private ContainerProcessManager
buildContainerProcessManager(ClusterManagerConfig clusterManagerConfig,
SamzaApplicationState state,
ClusterResourceManager clusterResourceManager,
Optional<ContainerAllocator> allocator) {
- return new ContainerProcessManager(clusterManagerConfig, state, new
MetricsRegistryMap(), clusterResourceManager, allocator,
- new ContainerManager(state, clusterResourceManager,
clusterManagerConfig.getHostAffinityEnabled(), false));
+ return new ContainerProcessManager(clusterManagerConfig, state, new
MetricsRegistryMap(), clusterResourceManager,
+ allocator, new ContainerManager(containerPlacementMetadataStore,
state, clusterResourceManager,
+ clusterManagerConfig.getHostAffinityEnabled(), false));
}
}