This is an automated email from the ASF dual-hosted git repository.
bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new d022167 SAMZA-2378: Container Placements support for Standby
containers enabled jobs (#1281)
d022167 is described below
commit d02216726e03695096b680f0f5bff9c0d6ca2e6c
Author: Sanil Jain <[email protected]>
AuthorDate: Fri Feb 28 14:37:45 2020 -0800
SAMZA-2378: Container Placements support for Standby containers enabled
jobs (#1281)
Container Placements support for Standby containers enabled jobs
---
.../samza/clustermanager/ContainerManager.java | 97 ++++++++---
.../clustermanager/ContainerProcessManager.java | 2 +-
.../clustermanager/SamzaApplicationState.java | 5 +
.../clustermanager/StandbyContainerManager.java | 42 +++--
.../metrics/ContainerProcessManagerMetrics.scala | 2 +
.../TestContainerPlacementActions.java | 178 ++++++++++++++++++++-
.../samza/clustermanager/TestStandbyAllocator.java | 12 +-
7 files changed, 296 insertions(+), 42 deletions(-)
diff --git
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
index c924447..3e3a060 100644
---
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
+++
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
@@ -48,8 +48,7 @@ import org.slf4j.LoggerFactory;
* ContainerManager encapsulates logic and state related to container
placement actions like move, restarts for active container
* if issued externally.
*
- * TODO SAMZA-2378: Container Placements for Standby containers enabled jobs
- * SAMZA-2379: Container Placements for job running in degraded state
+ * TODO SAMZA-2379: Container Placements for job running in degraded state
*/
public class ContainerManager {
@@ -136,7 +135,22 @@ public class ContainerManager {
LOG.info("Waiting for running container to shutdown due to existing
ContainerPlacement action {}", actionMetaData);
return false;
} else if (actionStatus ==
ContainerPlacementMetadata.ContainerStatus.STOPPED) {
- allocator.runStreamProcessor(request, preferredHost);
+ // If the job has standby containers enabled, always check standby
constraints before issuing a start on container
+ // Note: Always check constraints against allocated resource, since
preferred host can be ANY_HOST as well
+ if (standbyContainerManager.isPresent() &&
!standbyContainerManager.get().checkStandbyConstraints(request.getProcessorId(),
allocatedResource.getHost())) {
+ LOG.info(
+ "Starting container {} on host {} does not meet standby
constraints, falling back to source host placement metadata: {}",
+ request.getProcessorId(), preferredHost, actionMetaData);
+ // Release unstartable container
+ standbyContainerManager.get().releaseUnstartableContainer(request,
allocatedResource, preferredHost, resourceRequestState);
+ // Fallback to source host since the new allocated resource does not
meet standby constraints
+ allocator.requestResource(processorId,
actionMetaData.getSourceHost());
+ markContainerPlacementActionFailed(actionMetaData,
+ String.format("allocated resource %s does not meet standby
constraints now, falling back to source host", allocatedResource));
+ } else {
+ LOG.info("Status updated for ContainerPlacement action: ",
actionMetaData);
+ allocator.runStreamProcessor(request, preferredHost);
+ }
return true;
}
}
@@ -225,13 +239,13 @@ public class ContainerManager {
*
* @param processorId logical processor id of container 0,1,2
*/
- void handleContainerLaunchSuccess(String processorId) {
+ void handleContainerLaunchSuccess(String processorId, String containerHost) {
if (hasActiveContainerPlacementAction(processorId)) {
ContainerPlacementMetadata metadata =
getPlacementActionMetadata(processorId).get();
// Mark the active container running again and dispatch a response
metadata.setContainerStatus(ContainerPlacementMetadata.ContainerStatus.RUNNING);
updateContainerPlacementActionStatus(metadata,
ContainerPlacementMessage.StatusCode.SUCCEEDED,
- "Successfully completed the container placement action");
+ "Successfully completed the container placement action started
container on host " + containerHost);
}
}
@@ -296,7 +310,6 @@ public class ContainerManager {
*
* When host affinity is disabled a move / restart is only allowed on
ANY_HOST
* When host affinity is enabled move / restart is allowed on specific or
ANY_HOST
- * TODO: SAMZA-2378: Container Placements for Standby containers enabled jobs
*
* Container placement requests are tied to deploymentId which is currently
{@link org.apache.samza.config.ApplicationConfig#APP_RUN_ID}
* On job restarts container placement requests queued for the previous
deployment are deleted using this
@@ -313,14 +326,15 @@ public class ContainerManager {
if (!deQueueAction(requestMessage)) {
return;
}
- Pair<ContainerPlacementMessage.StatusCode, String> actionStatus =
validatePlacementAction(requestMessage);
LOG.info("ContainerPlacement action is de-queued metadata: {}",
requestMessage);
+ Pair<ContainerPlacementMessage.StatusCode, String> actionStatus =
validatePlacementAction(requestMessage);
// Action is de-queued upon so we record it in the cache
placementRequestsCache.put(requestMessage.getUuid());
// Remove the request message from metastore since this message is already
acted upon
containerPlacementMetadataStore.deleteContainerPlacementRequestMessage(requestMessage.getUuid());
// Request is bad just update the response on message & return
if (actionStatus.getKey() ==
ContainerPlacementMessage.StatusCode.BAD_REQUEST) {
+ LOG.info("Status updated for ContainerPlacement action request: {}
response: {}", requestMessage, actionStatus.getValue());
writeContainerPlacementResponseMessage(requestMessage,
actionStatus.getKey(), actionStatus.getValue());
return;
}
@@ -335,14 +349,17 @@ public class ContainerManager {
destinationHost = ANY_HOST;
}
- SamzaResourceRequest resourceRequest =
containerAllocator.getResourceRequest(processorId, destinationHost);
+ // Register metadata
ContainerPlacementMetadata actionMetaData = new
ContainerPlacementMetadata(requestMessage, currentResource.getHost());
+ actions.put(processorId, actionMetaData);
+
+ SamzaResourceRequest resourceRequest =
containerAllocator.getResourceRequest(processorId, destinationHost);
// Record the resource request for monitoring
actionMetaData.recordResourceRequest(resourceRequest);
actions.put(processorId, actionMetaData);
updateContainerPlacementActionStatus(actionMetaData,
ContainerPlacementMessage.StatusCode.IN_PROGRESS, "Preferred Resources
requested");
containerAllocator.issueResourceRequest(resourceRequest);
- LOG.info("Issued resource request for preferred resources for
ContainerPlacement action: {}", actionMetaData);
+
}
/**
@@ -370,6 +387,7 @@ public class ContainerManager {
}
private void markContainerPlacementActionFailed(ContainerPlacementMetadata
metaData, String failureMessage) {
+ samzaApplicationState.failedContainerPlacementActions.incrementAndGet();
updateContainerPlacementActionStatus(metaData,
ContainerPlacementMessage.StatusCode.FAILED, failureMessage);
}
@@ -412,16 +430,17 @@ public class ContainerManager {
}
/**
- * If there is an existing inflight request or container is pending a start,
the container placement action shall wait
- * until this in-flight action is complete
+ * These are specific scenarios in which a placement action should wait for
existing action to complete before it is executed
+ * 1. If there is an placement request in progress on active container
+ * 2. If there is an placement request is progress on any of its standby
container
+ * 3. If the container itself is pending a start
*
* @param requestMessage container placement request message
* @return true if action should be taken right now, false if it needs to
wait to be taken in future
*/
private boolean deQueueAction(ContainerPlacementRequestMessage
requestMessage) {
// Do not dequeue action wait for the in-flight action to complete
- if (hasActiveContainerPlacementAction(requestMessage.getProcessorId())) {
- LOG.info("ContainerPlacement request: {} is en-queued because container
has an in-progress placement action", requestMessage);
+ if
(checkIfActiveOrStandbyContainerHasActivePlacementAction(requestMessage)) {
return false;
}
// Do not dequeue the action wait for the container to come to a running
state
@@ -435,32 +454,68 @@ public class ContainerManager {
/**
* A valid container placement action needs a valid processor id. Duplicate
actions are handled by de-duping on uuid.
+ * If standby containers are enabled destination host requested must meet
standby constraints
*
* @param requestMessage container placement request message
* @return Pair<ContainerPlacementMessage.StatusCode, String> which is
status code & response suggesting if the request is valid
*/
private Pair<ContainerPlacementMessage.StatusCode, String>
validatePlacementAction(ContainerPlacementRequestMessage requestMessage) {
- String errorMessagePrefix = String.format("ContainerPlacement request: %s
is rejected due to", requestMessage);
+ String errorMessagePrefix =
ContainerPlacementMessage.StatusCode.BAD_REQUEST + " reason: %s";
Boolean invalidAction = false;
String errorMessage = null;
- if (standbyContainerManager.isPresent()) {
- errorMessage = String.format("%s not supported for hot standby enabled",
errorMessagePrefix);
+ if
(!samzaApplicationState.runningProcessors.containsKey(requestMessage.getProcessorId())
&&
+
!samzaApplicationState.pendingProcessors.containsKey(requestMessage.getProcessorId())
+ ) {
+ errorMessage = String.format(errorMessagePrefix, "invalid processor id
neither in running or pending processors");
invalidAction = true;
} else if (placementRequestsCache.containsKey(requestMessage.getUuid())) {
- errorMessage = String.format("%s duplicate UUID of the request, please
retry", errorMessagePrefix);
+ errorMessage = String.format(errorMessagePrefix, "duplicate UUID of the
request, please retry");
invalidAction = true;
- } else if (Integer.parseInt(requestMessage.getProcessorId()) >=
samzaApplicationState.processorCount.get()
- ) {
- errorMessage = String.format("%s invalid processor id",
errorMessagePrefix);
+ } else if (standbyContainerManager.isPresent() &&
!standbyContainerManager.get()
+ .checkStandbyConstraints(requestMessage.getProcessorId(),
requestMessage.getDestinationHost())) {
+ errorMessage = String.format(errorMessagePrefix, "destination host does
not meet standby constraints");
invalidAction = true;
}
if (invalidAction) {
- LOG.info(errorMessage);
return new
ImmutablePair<>(ContainerPlacementMessage.StatusCode.BAD_REQUEST, errorMessage);
}
return new ImmutablePair<>(ContainerPlacementMessage.StatusCode.ACCEPTED,
"Request is accepted");
}
+ /**
+ * Checks if there are any active container placement action on container
itself or on any of its standby containers
+ * (if enabled)
+ */
+ private boolean
checkIfActiveOrStandbyContainerHasActivePlacementAction(ContainerPlacementRequestMessage
request) {
+ String processorId = request.getProcessorId();
+ // Container itself has active container placement actions
+ if (hasActiveContainerPlacementAction(processorId)) {
+ LOG.info("ContainerPlacement request: {} is en-queued because container
has an in-progress placement action", request);
+ return true;
+ }
+
+ if (standbyContainerManager.isPresent()) {
+ // If requested placement action is on a standby container and its
active container has a placement request,
+ // this request shall not be de-queued until in-flight action on active
container is complete
+ if (StandbyTaskUtil.isStandbyContainer(processorId) &&
hasActiveContainerPlacementAction(
+ StandbyTaskUtil.getActiveContainerId(processorId))) {
+ LOG.info("ContainerPlacement request: {} is en-queued because its
active container has an in-progress placement action", request);
+ return true;
+ }
+ // If requested placement action is on a standby container and its
active container has a placement request,
+ // this request shall not be de-queued until in-flight action on active
container is complete
+ if (!StandbyTaskUtil.isStandbyContainer(processorId)) {
+ for (String standby :
standbyContainerManager.get().getStandbyList(processorId)) {
+ if (hasActiveContainerPlacementAction(standby)) {
+ LOG.info("ContainerPlacement request: {} is en-queued because one
of its standby replica has an in-progress placement action", request);
+ return true;
+ }
+ }
+ }
+ }
+ return false;
+ }
+
}
diff --git
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
index 7f99159..1535b2d 100644
---
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
+++
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
@@ -405,7 +405,7 @@ public class ContainerProcessManager implements
ClusterResourceManager.Callback
if (state.neededProcessors.decrementAndGet() == 0) {
state.jobHealthy.set(true);
}
- containerManager.handleContainerLaunchSuccess(processorId);
+ containerManager.handleContainerLaunchSuccess(processorId,
containerHost);
} else {
LOG.warn("Did not find a pending Processor ID for Container ID: {} on
host: {}. " +
"Ignoring invalid/redundant notification.", containerId,
containerHost);
diff --git
a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
index ef09da1..c3c935d 100644
---
a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
+++
b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
@@ -160,6 +160,11 @@ public class SamzaApplicationState {
*/
public final AtomicInteger failoversToAnyHost = new AtomicInteger(0);
+ /**
+ * Number of occurrences of failed container placement actions
+ */
+ public final AtomicInteger failedContainerPlacementActions = new
AtomicInteger(0);
+
public SamzaApplicationState(JobModelManager jobModelManager) {
this.jobModelManager = jobModelManager;
}
diff --git
a/samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
b/samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
index 7ea429d..a9d298d 100644
---
a/samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
+++
b/samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
@@ -332,14 +332,12 @@ public class StandbyContainerManager {
/**
* Check if matching this SamzaResourceRequest to the given resource, meets
all standby-container container constraints.
*
- * @param request The resource request to match.
- * @param samzaResource The samzaResource to potentially match the resource
to.
+ * @param containerIdToStart logical id of the container to start
+ * @param host potential host to start the container on
* @return
*/
- private boolean checkStandbyConstraints(SamzaResourceRequest request,
SamzaResource samzaResource) {
- String containerIDToStart = request.getProcessorId();
- String host = samzaResource.getHost();
- List<String> containerIDsForStandbyConstraints =
this.standbyContainerConstraints.get(containerIDToStart);
+ boolean checkStandbyConstraints(String containerIdToStart, String host) {
+ List<String> containerIDsForStandbyConstraints =
this.standbyContainerConstraints.get(containerIdToStart);
// Check if any of these conflicting containers are running/launching on
host
for (String containerID : containerIDsForStandbyConstraints) {
@@ -348,7 +346,7 @@ public class StandbyContainerManager {
// return false if a conflicting container is pending for launch on the
host
if (resource != null && resource.getHost().equals(host)) {
log.info("Container {} cannot be started on host {} because container
{} is already scheduled on this host",
- containerIDToStart, samzaResource.getHost(), containerID);
+ containerIdToStart, host, containerID);
return false;
}
@@ -356,7 +354,7 @@ public class StandbyContainerManager {
resource = samzaApplicationState.runningProcessors.get(containerID);
if (resource != null && resource.getHost().equals(host)) {
log.info("Container {} cannot be started on host {} because container
{} is already running on this host",
- containerIDToStart, samzaResource.getHost(), containerID);
+ containerIdToStart, host, containerID);
return false;
}
}
@@ -375,7 +373,7 @@ public class StandbyContainerManager {
ResourceRequestState resourceRequestState) {
String containerID = request.getProcessorId();
- if (checkStandbyConstraints(request, samzaResource)) {
+ if (checkStandbyConstraints(containerID, samzaResource.getHost())) {
// This resource can be used to launch this container
log.info("Running container {} on {} meets standby constraints,
preferredHost = {}", containerID,
samzaResource.getHost(), preferredHost);
@@ -385,8 +383,7 @@ public class StandbyContainerManager {
log.info(
"Running standby container {} on host {} does not meet standby
constraints, cancelling resource request, releasing resource, and making a new
ANY_HOST request",
containerID, samzaResource.getHost());
- resourceRequestState.releaseUnstartableContainer(samzaResource,
preferredHost);
- resourceRequestState.cancelResourceRequest(request);
+ releaseUnstartableContainer(request, samzaResource, preferredHost,
resourceRequestState);
containerAllocator.requestResource(containerID,
ResourceRequestState.ANY_HOST);
samzaApplicationState.failedStandbyAllocations.incrementAndGet();
} else {
@@ -394,9 +391,7 @@ public class StandbyContainerManager {
log.warn(
"Running active container {} on host {} does not meet standby
constraints, cancelling resource request, releasing resource",
containerID, samzaResource.getHost());
- resourceRequestState.releaseUnstartableContainer(samzaResource,
preferredHost);
- resourceRequestState.cancelResourceRequest(request);
-
+ releaseUnstartableContainer(request, samzaResource, preferredHost,
resourceRequestState);
Optional<FailoverMetadata> failoverMetadata =
getFailoverMetadata(request);
String lastKnownResourceID =
failoverMetadata.isPresent() ?
failoverMetadata.get().activeContainerResourceID : "unknown-" + containerID;
@@ -425,6 +420,25 @@ public class StandbyContainerManager {
}
}
+ /**
+ * Fetches a list of standby container for an active container
+ * @param activeContainerId logical id of the container ex: 0,1,2
+ * @return list of standby containers ex: for active container 0: {0-0, 0-1}
+ */
+ List<String> getStandbyList(String activeContainerId) {
+ return this.standbyContainerConstraints.get(activeContainerId);
+ }
+
+ /**
+ * Release un-startable resources immediately and deletes requests
corresponsing to it
+ */
+ void releaseUnstartableContainer(SamzaResourceRequest request, SamzaResource
resource, String preferredHost,
+ ResourceRequestState resourceRequestState) {
+ resourceRequestState.releaseUnstartableContainer(resource, preferredHost);
+ resourceRequestState.cancelResourceRequest(request);
+ }
+
+
// Handle an expired resource request that was made for placing a standby
container
private void handleExpiredRequestForStandbyContainer(String containerID,
SamzaResourceRequest request,
Optional<SamzaResource> alternativeResource, ContainerAllocator
containerAllocator,
diff --git
a/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala
b/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala
index 7bde882..91fec28 100644
---
a/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala
+++
b/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala
@@ -60,6 +60,8 @@ class ContainerProcessManagerMetrics(val config: Config,
val mFailoversToAnyHost = newGauge("failovers-to-any-host", () =>
state.failoversToAnyHost.get())
val mFailoversToStandby = newGauge("failovers-to-standby", () =>
state.failoversToStandby.get())
+ val mFailedContainerPlacementActions =
newGauge("failed-container-placements-actions", () =>
state.failedContainerPlacementActions.get())
+
val mContainerMemoryMb = newGauge("container-memory-mb", () =>
clusterManagerConfig.getContainerMemoryMb)
val mContainerCpuCores = newGauge("container-cpu-cores", () =>
clusterManagerConfig.getNumCores)
}
diff --git
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java
index b433cba..9497f8e 100644
---
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java
+++
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java
@@ -27,6 +27,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
import org.apache.commons.lang3.RandomStringUtils;
import
org.apache.samza.clustermanager.container.placement.ContainerPlacementMetadataStore;
import
org.apache.samza.clustermanager.container.placement.ContainerPlacementRequestAllocator;
@@ -45,6 +46,7 @@ import
org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
import
org.apache.samza.coordinator.metadatastore.CoordinatorStreamStoreTestUtil;
import org.apache.samza.coordinator.server.HttpServer;
import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
+import org.apache.samza.job.model.JobModel;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.testUtils.MockHttpServer;
import org.eclipse.jetty.servlet.DefaultServlet;
@@ -83,6 +85,7 @@ public class TestContainerPlacementActions {
put("job.name", "test-job");
put("job.coordinator.system", "test-kafka");
put("app.run.id", "appAttempt-001");
+ put("job.standbytasks.replication.factor", "2");
}
};
@@ -126,6 +129,19 @@ public class TestContainerPlacementActions {
mockLocalityManager, this.server);
}
+ private JobModelManager
getJobModelManagerWithHostAffinityWithStandby(Map<String, String>
containerIdToHost) {
+ Map<String, Map<String, String>> localityMap = new HashMap<>();
+ containerIdToHost.forEach((containerId, host) -> {
+ localityMap.put(containerId,
+ ImmutableMap.of(SetContainerHostMapping.HOST_KEY,
containerIdToHost.get(containerId)));
+ });
+ LocalityManager mockLocalityManager = mock(LocalityManager.class);
+ when(mockLocalityManager.readContainerLocality()).thenReturn(localityMap);
+ // Generate JobModel for standby containers
+ JobModel standbyJobModel = TestStandbyAllocator.getJobModelWithStandby(2,
2, 2, Optional.of(mockLocalityManager));
+ return new JobModelManager(standbyJobModel, server, null);
+ }
+
@Before
public void setup() throws Exception {
server = new MockHttpServer("/", 7777, null, new
ServletHolder(DefaultServlet.class));
@@ -148,8 +164,20 @@ public class TestContainerPlacementActions {
clusterResourceManager, Optional.of(allocatorWithHostAffinity),
containerManager);
}
+ public void setupStandby() throws Exception {
+ state = new
SamzaApplicationState(getJobModelManagerWithHostAffinityWithStandby(ImmutableMap.of("0",
"host-1", "1", "host-2", "0-0", "host-2", "1-0", "host-1")));
+ callback = mock(ClusterResourceManager.Callback.class);
+ MockClusterResourceManager clusterResourceManager = new
MockClusterResourceManager(callback, state);
+ ClusterManagerConfig clusterManagerConfig = new
ClusterManagerConfig(config);
+ // Enable standby
+ containerManager = spy(new
ContainerManager(containerPlacementMetadataStore, state,
clusterResourceManager, true, true));
+ allocatorWithHostAffinity = new
MockContainerAllocatorWithHostAffinity(clusterResourceManager, config, state,
containerManager);
+ cpm = new ContainerProcessManager(clusterManagerConfig, state, new
MetricsRegistryMap(),
+ clusterResourceManager, Optional.of(allocatorWithHostAffinity),
containerManager);
+ }
+
@Test(timeout = 10000)
- public void testContainerSuccessfulMoveAction() throws Exception {
+ public void testContainerSuccessfulMoveActionWithoutStandby() throws
Exception {
doAnswer(new Answer<Void>() {
public Void answer(InvocationOnMock invocation) {
new Thread(() -> {
@@ -676,6 +704,154 @@ public class TestContainerPlacementActions {
assertBadRequests("2", "host8", containerManager,
allocatorWithHostAffinity);
}
+
+ @Test(timeout = 30000)
+ public void testContainerSuccessfulMoveActionWithStandbyEnabled() throws
Exception {
+ // Setup standby for job
+ setupStandby();
+
+ // Spawn a Request Allocator Thread
+ Thread requestAllocatorThread = new Thread(
+ new
ContainerPlacementRequestAllocator(containerPlacementMetadataStore, cpm, new
ApplicationConfig(config)),
+ "ContainerPlacement Request Allocator Thread");
+ requestAllocatorThread.start();
+
+ doAnswer(new Answer<Void>() {
+ public Void answer(InvocationOnMock invocation) {
+ new Thread(() -> {
+ Object[] args = invocation.getArguments();
+ cpm.onResourcesAvailable((List<SamzaResource>) args[0]);
+ }, "AMRMClientAsync").start();
+ return null;
+ }
+ }).when(callback).onResourcesAvailable(anyList());
+
+ doAnswer(new Answer<Void>() {
+ public Void answer(InvocationOnMock invocation) {
+ new Thread(() -> {
+ Object[] args = invocation.getArguments();
+ cpm.onStreamProcessorLaunchSuccess((SamzaResource) args[0]);
+ }, "AMRMClientAsync").start();
+ return null;
+ }
+ }).when(callback).onStreamProcessorLaunchSuccess(any());
+
+ doAnswer(new Answer<Void>() {
+ public Void answer(InvocationOnMock invocation) {
+ new Thread(() -> {
+ Object[] args = invocation.getArguments();
+ cpm.onResourcesCompleted((List<SamzaResourceStatus>) args[0]);
+ }, "AMRMClientAsync").start();
+ return null;
+ }
+ }).when(callback).onResourcesCompleted(anyList());
+
+ cpm.start();
+
+ if (!allocatorWithHostAffinity.awaitContainersStart(4, 4,
TimeUnit.SECONDS)) {
+ fail("timed out waiting for the containers to start");
+ }
+
+ while (state.runningProcessors.size() != 4) {
+ Thread.sleep(100);
+ }
+
+ // First running state of the app
+ Consumer<SamzaApplicationState> stateCheck = (SamzaApplicationState state)
-> {
+ assertEquals(4, state.runningProcessors.size());
+ assertEquals("host-1", state.runningProcessors.get("0").getHost());
+ assertEquals("host-2", state.runningProcessors.get("1").getHost());
+ assertEquals("host-2", state.runningProcessors.get("0-0").getHost());
+ assertEquals("host-1", state.runningProcessors.get("1-0").getHost());
+ assertEquals(4, state.preferredHostRequests.get());
+ assertEquals(0, state.failedStandbyAllocations.get());
+ assertEquals(0, state.anyHostRequests.get());
+ };
+ // Invoke a state check
+ stateCheck.accept(state);
+
+ // Initiate a bad container placement action to move a standby to its
active host and vice versa
+ // which should fail because this violates standby constraints
+ UUID badRequest1 =
containerPlacementMetadataStore.writeContainerPlacementRequestMessage("appAttempt-001",
"0-0", "host-1",
+ null, System.currentTimeMillis());
+
+ UUID badRequest2 =
containerPlacementMetadataStore.writeContainerPlacementRequestMessage("appAttempt-001",
"0", "host-2",
+ null, System.currentTimeMillis() + 100);
+
+ // Wait for the ControlActions to complete
+ while (true) {
+ if
(containerPlacementMetadataStore.readContainerPlacementResponseMessage(badRequest2).isPresent()
&&
+
containerPlacementMetadataStore.readContainerPlacementResponseMessage(badRequest2).get().getStatusCode()
+ == ContainerPlacementMessage.StatusCode.BAD_REQUEST) {
+ break;
+ }
+ Thread.sleep(Duration.ofSeconds(5).toMillis());
+ }
+
+ // App running state should remain the same
+ stateCheck.accept(state);
+
+ Optional<ContainerPlacementResponseMessage> responseMessageMove1 =
+
containerPlacementMetadataStore.readContainerPlacementResponseMessage(badRequest1);
+ Optional<ContainerPlacementResponseMessage> responseMessageMove2 =
+
containerPlacementMetadataStore.readContainerPlacementResponseMessage(badRequest2);
+
+ // Assert that both the requests were bad
+ assertTrue(responseMessageMove1.isPresent());
+ assertEquals(responseMessageMove1.get().getStatusCode(),
ContainerPlacementMessage.StatusCode.BAD_REQUEST);
+ assertTrue(responseMessageMove2.isPresent());
+ assertEquals(responseMessageMove2.get().getStatusCode(),
ContainerPlacementMessage.StatusCode.BAD_REQUEST);
+
+
+ // Initiate a standby failover which is supposed to be done in two steps
+ // Step 1. Move the standby container to any other host: move 0-0 to say
host-3
+ // Step 2. Move the active container to the standby's host: move 0 to
host-1
+
+ // Action will get executed first
+ UUID standbyMoveRequest =
+
containerPlacementMetadataStore.writeContainerPlacementRequestMessage("appAttempt-001",
"0-0", "host-3",
+ null, System.currentTimeMillis());
+ // Action will get executed when standbyMoveRequest move request is
complete
+ UUID activeMoveRequest =
containerPlacementMetadataStore.writeContainerPlacementRequestMessage("appAttempt-001",
"0", "host-2", null,
+ System.currentTimeMillis() + 100);
+
+ // Wait for the ControlActions to complete
+ while (true) {
+ if
(containerPlacementMetadataStore.readContainerPlacementResponseMessage(activeMoveRequest).isPresent()
&&
+
containerPlacementMetadataStore.readContainerPlacementResponseMessage(activeMoveRequest).get().getStatusCode()
+ == ContainerPlacementMessage.StatusCode.SUCCEEDED) {
+ break;
+ }
+ Thread.sleep(Duration.ofSeconds(5).toMillis());
+ }
+
+ assertEquals(4, state.runningProcessors.size());
+ assertEquals("host-2", state.runningProcessors.get("0").getHost());
+ assertEquals("host-2", state.runningProcessors.get("1").getHost());
+ assertEquals("host-3", state.runningProcessors.get("0-0").getHost());
+ assertEquals("host-1", state.runningProcessors.get("1-0").getHost());
+ assertEquals(6, state.preferredHostRequests.get());
+ assertEquals(0, state.failedStandbyAllocations.get());
+ assertEquals(0, state.anyHostRequests.get());
+
+
+ Optional<ContainerPlacementResponseMessage> responseStandbyMove =
+
containerPlacementMetadataStore.readContainerPlacementResponseMessage(standbyMoveRequest);
+
+ Optional<ContainerPlacementResponseMessage> responseActiveMove =
+
containerPlacementMetadataStore.readContainerPlacementResponseMessage(activeMoveRequest);
+
+ assertTrue(responseStandbyMove.isPresent());
+ assertEquals(responseStandbyMove.get().getStatusCode(),
ContainerPlacementMessage.StatusCode.SUCCEEDED);
+
+ assertTrue(responseActiveMove.isPresent());
+ assertEquals(responseActiveMove.get().getStatusCode(),
ContainerPlacementMessage.StatusCode.SUCCEEDED);
+
+ // Request should be deleted as soon as ita accepted / being acted upon
+
assertFalse(containerPlacementMetadataStore.readContainerPlacementRequestMessage(standbyMoveRequest).isPresent());
+
assertFalse(containerPlacementMetadataStore.readContainerPlacementRequestMessage(activeMoveRequest).isPresent());
+ }
+
private void assertResponseMessage(ContainerPlacementResponseMessage
responseMessage,
ContainerPlacementRequestMessage requestMessage) {
assertEquals(responseMessage.getProcessorId(),
requestMessage.getProcessorId());
diff --git
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestStandbyAllocator.java
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestStandbyAllocator.java
index c075f14..459f39d 100644
---
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestStandbyAllocator.java
+++
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestStandbyAllocator.java
@@ -22,8 +22,10 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import org.apache.samza.Partition;
import org.apache.samza.config.MapConfig;
+import org.apache.samza.container.LocalityManager;
import org.apache.samza.container.TaskName;
import org.apache.samza.job.model.ContainerModel;
import org.apache.samza.job.model.JobModel;
@@ -38,7 +40,7 @@ public class TestStandbyAllocator {
@Test
public void testWithNoStandby() {
- JobModel jobModel = getJobModelWithStandby(1, 1, 1);
+ JobModel jobModel = getJobModelWithStandby(1, 1, 1, Optional.empty());
List<String> containerConstraints =
StandbyTaskUtil.getStandbyContainerConstraints("0", jobModel);
Assert.assertEquals("Constrained container count should be 0", 0,
containerConstraints.size());
}
@@ -57,7 +59,7 @@ public class TestStandbyAllocator {
public void testWithStandby(int nContainers, int nTasks, int
replicationFactor) {
- JobModel jobModel = getJobModelWithStandby(nContainers, nTasks,
replicationFactor);
+ JobModel jobModel = getJobModelWithStandby(nContainers, nTasks,
replicationFactor, Optional.empty());
for (String containerID : jobModel.getContainers().keySet()) {
List<String> containerConstraints =
StandbyTaskUtil.getStandbyContainerConstraints(containerID, jobModel);
@@ -79,7 +81,7 @@ public class TestStandbyAllocator {
}
// Helper method to create a jobmodel with given number of containers, tasks
and replication factor
- private JobModel getJobModelWithStandby(int nContainers, int nTasks, int
replicationFactor) {
+ public static JobModel getJobModelWithStandby(int nContainers, int nTasks,
int replicationFactor, Optional<LocalityManager> localityManager) {
Map<String, ContainerModel> containerModels = new HashMap<>();
int taskID = 0;
@@ -102,7 +104,7 @@ public class TestStandbyAllocator {
}
containerModels.putAll(standbyContainerModels);
- return new JobModel(new MapConfig(), containerModels);
+ return new JobModel(new MapConfig(), containerModels,
localityManager.orElse(null));
}
// Helper method that creates a taskmodel with one input ssp
@@ -113,7 +115,7 @@ public class TestStandbyAllocator {
}
// Helper method to create standby-taskModels from active-taskModels
- private Map<TaskName, TaskModel> getStandbyTasks(Map<TaskName, TaskModel>
tasks, int replicaNum) {
+ private static Map<TaskName, TaskModel> getStandbyTasks(Map<TaskName,
TaskModel> tasks, int replicaNum) {
Map<TaskName, TaskModel> standbyTasks = new HashMap<>();
tasks.forEach((taskName, taskModel) -> {
TaskName standbyTaskName =
StandbyTaskUtil.getStandbyTaskName(taskName, replicaNum);