This is an automated email from the ASF dual-hosted git repository.
bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 29f2ef7 SAMZA-2579: Force restart feature for Container Placements
(#1414)
29f2ef7 is described below
commit 29f2ef7cd9169ce8e0b474228c59a3a1da71e5c2
Author: Sanil Jain <[email protected]>
AuthorDate: Mon Aug 24 11:59:26 2020 -0700
SAMZA-2579: Force restart feature for Container Placements (#1414)
Changes: The current restart ability for container placements works in the
following way:
Tries to fetch resources on a host
Stops the active container if resources are accrued
Tried to start the container on host accrued
In production, we have seen the following observation at Linkedin
Some jobs are configured to use resources for the peak which leads to no
headroom left on a host for requesting additional resources
This leads to restart requests failing due to not able to get resources on
that host
A fix to this is to implement a force-restart utility , in this version we
will stop the container first and then accrue resources. The upside being we
will at least free up the resources on the host before issuing resource
request, the downside being it will be a best-effort scenario to bring that
container back up on that host
API Changes: Added new param values to destinationHost param for container
placement request message
LAST_SEEN: Tries to restart a container on last seen host with RESERVE ->
STOP -> MOVE policy
FORCE_RESTART_LAST_SEEN: Tries to restart a container on last seen host
with STOP -> RESERVE -> MOVE policy
---
.../samza/clustermanager/ContainerManager.java | 39 ++++++++++++++++++++--
1 file changed, 37 insertions(+), 2 deletions(-)
diff --git
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
index 70a050c..2730c0c 100644
---
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
+++
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
@@ -54,6 +54,8 @@ public class ContainerManager {
private static final Logger LOG =
LoggerFactory.getLogger(ContainerManager.class);
private static final String ANY_HOST = ResourceRequestState.ANY_HOST;
+ private static final String LAST_SEEN = "LAST_SEEN";
+ private static final String FORCE_RESTART_LAST_SEEN =
"FORCE_RESTART_LAST_SEEN";
private static final int UUID_CACHE_SIZE = 20000;
/**
@@ -366,9 +368,18 @@ public class ContainerManager {
* Container placement requests are tied to deploymentId which is currently
{@link org.apache.samza.config.ApplicationConfig#APP_RUN_ID}
* On job restarts container placement requests queued for the previous
deployment are deleted using this
*
+ * All kinds of container placement request except for when destination host
is "FORCE_RESTART_LAST_SEEN" work with
+ * a RESERVE - STOP - START policy, which means resources are accrued first
before issuing a container stop, failure to
+ * do so will leave the running container untouched. Requests with
destination host "FORCE_RESTART_LAST_SEEN" works with
+ * STOP - RESERVE - START policy, which means running container is stopped
first then resource request are issued, this case
+ * is equivalent to doing a kill -9 on a container
+ *
* @param requestMessage request containing logical processor id 0,1,2 and
host where container is desired to be moved,
- * acceptable values of this param are any valid
hostname or "ANY_HOST"(in this case the request
- * is sent to resource manager for any host)
+ * acceptable values of this param are
+ * - valid hostname
+ * - "ANY_HOST" in this case the request is sent to
resource manager for any host
+ * - "LAST_SEEN" in this case request is sent to
resource manager for last seen host
+ * - "FORCE_RESTART_LAST_SEEN" in this case request is
sent to resource manager for last seen host
* @param containerAllocator to request physical resources
*/
public void
registerContainerPlacementAction(ContainerPlacementRequestMessage
requestMessage, ContainerAllocator containerAllocator) {
@@ -391,6 +402,30 @@ public class ContainerManager {
return;
}
+ /*
+ * When destination host is {@code FORCE_RESTART_LAST_SEEN} its treated as
eqvivalent to kill -9 operation for the container
+ * In this scenario container is stopped first and we fallback to normal
restart path so the policy here is
+ * stop - reserve - move
+ */
+ if (destinationHost.equals(FORCE_RESTART_LAST_SEEN)) {
+ LOG.info("Issuing a force restart for Processor ID: {} for
ContainerPlacement action request {}", processorId, requestMessage);
+
clusterResourceManager.stopStreamProcessor(samzaApplicationState.runningProcessors.get(processorId));
+ writeContainerPlacementResponseMessage(requestMessage,
ContainerPlacementMessage.StatusCode.SUCCEEDED,
+ "Successfully issued a stop container request falling back to normal
restart path");
+ return;
+ }
+
+ /**
+ * When destination host is {@code LAST_SEEN} its treated as a restart
request on the host where container is running
+ * on or has been seen last, but in this policy would be reserve - stop -
move, which means reserve resources first
+ * only if resources are accrued stop the active container and issue a
start on it on resource acquired
+ */
+ if (destinationHost.equals(LAST_SEEN)) {
+ String lastSeenHost = getSourceHostForContainer(requestMessage);
+ LOG.info("Changing the requested host for placement action to {} because
requested host is LAST_SEEN", lastSeenHost);
+ destinationHost = lastSeenHost;
+ }
+
// TODO: SAMZA-2457: Allow host affinity disabled jobs to move containers
to specific host
if (!hostAffinityEnabled) {
LOG.info("Changing the requested host for placement action to {} because
host affinity is disabled", ResourceRequestState.ANY_HOST);