Sanil15 commented on a change in pull request #1219: SAMZA-2373: Container
Placement Service (core functionality) for container move and restart
URL: https://github.com/apache/samza/pull/1219#discussion_r348859993
##########
File path:
samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
##########
@@ -161,4 +237,140 @@ void handleExpiredRequestWithHostAffinityEnabled(String
processorId, String pref
allocator.requestResource(processorId, ResourceRequestState.ANY_HOST);
}
}
+
+ /**
+ * Registers a container placement action to move the running container to
destination host, if destination host is same as the
+ * host on which container is running, container placement action is treated
as a restart.
+ *
+ * When host affinity is disabled a move / restart is only allowed on
ANY_HOST
+ * When host affinity is enabled move / restart is allowed on specific or
ANY_HOST
+ * TODO: SAMZA-2378: Container Placements for Standby containers enabled jobs
+ *
+ * @param processorId logical id of the container 0, 1, 2
+ * @param destinationHost host where container is desired to be moved,
acceptable values of this param are any valid
+ * hostname or "ANY_HOST"(in this case the request is
sent to resource manager for any host)
+ * @param containerAllocator to request physical resources
+ */
+ public ContainerPlacementStatus registerContainerPlacementAction(String
processorId, String destinationHost,
+ ContainerAllocator containerAllocator, Optional<Long> requestExpiry) {
+ LOG.info("Received ControlAction request to move or restart container with
processor id {} to host {}", processorId, destinationHost);
+ ContainerPlacementStatus actionStatus =
checkValidControlAction(processorId, destinationHost);
+ if (actionStatus.status ==
ContainerPlacementStatus.StatusCode.BAD_REQUEST) {
+ return actionStatus;
+ }
+
+ SamzaResource currentResource =
samzaApplicationState.runningProcessors.get(processorId);
+ LOG.info("Processor ID: {} matched a active container with deployment ID:
{} running on host: {}", processorId,
+ currentResource.getContainerId(), currentResource.getHost());
+
+ if (destinationHost.equals(ANY_HOST) || !hostAffinityEnabled) {
+ LOG.info("Changing the requested host to {} because either it is
requested or host affinity is disabled",
+ ResourceRequestState.ANY_HOST);
+ destinationHost = ANY_HOST;
+ }
+
+ SamzaResourceRequest resourceRequest =
containerAllocator.getResourceRequest(processorId, destinationHost);
+ ContainerPlacementMetadata actionMetaData =
+ new ContainerPlacementMetadata(processorId,
currentResource.getContainerId(), currentResource.getHost(),
+ destinationHost, actionStatus, requestExpiry.isPresent() ?
requestExpiry.get() : DEFAULT_CONTROL_ACTION_EXPIRY);
+
+ // Record the resource request for monitoring
+
actionMetaData.setActionStatus(ContainerPlacementStatus.StatusCode.IN_PROGRESS);
+ actionMetaData.recordResourceRequest(resourceRequest);
+ actions.put(processorId, actionMetaData);
+ // note this also updates state.preferredHost count
+ containerAllocator.issueResourceRequest(resourceRequest);
+ LOG.info("Control action with metadata {} and issued a request for
resources in progress", actionMetaData);
+ return actionStatus;
+ }
+
+ public Optional<Long> getActionExpiryTimeout(String processorId) {
+ return this.actions.containsKey(processorId) ? Optional.of(
+ this.actions.get(processorId).getRequestActionExpiryTimeout()) :
Optional.empty();
+ }
+
+ /**
+ * Handles the container allocation for an existing container placement
action by issuing a stop on the active container and
+ * waiting for the active container to shutdown for a timeout of {@code
actionMetaData#getRequestActionExpiryTimeout}.
+ *
+ * Case 1. If active container fails to stop mark the container placement
action failed.
+ * Case 2. Otherwise once active container shuts down then issue a start for
the container on the preferred host.
+ *
+ * This method is invoked by the allocator thread which waits on notify from
the thread issuing
+ * callbacks i.e {@link ContainerProcessManager}
+ */
+ private void handleContainerAllocationForExistingControlAction(String
processorId, ContainerAllocator allocator,
+ ResourceRequestState state, SamzaResourceRequest request, String
preferredhost) {
+ // check if container is already dead without issuing a stop here, fail
the move request
+ ContainerPlacementMetadata actionMetaData =
getControlActionMetadata(processorId).get();
Review comment:
only called when is present
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services