Sanil15 commented on a change in pull request #1219: SAMZA-2373: Container
Placement Service (core functionality) for container move and restart
URL: https://github.com/apache/samza/pull/1219#discussion_r352929271
##########
File path:
samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
##########
@@ -161,4 +262,140 @@ void handleExpiredRequestWithHostAffinityEnabled(String
processorId, String pref
allocator.requestResource(processorId, ResourceRequestState.ANY_HOST);
}
}
+
+ /**
+ * Registers a container placement action to move the running container to
destination host, if destination host is same as the
+ * host on which container is running, container placement action is treated
as a restart.
+ *
+ * When host affinity is disabled a move / restart is only allowed on
ANY_HOST
+ * When host affinity is enabled move / restart is allowed on specific or
ANY_HOST
+ * TODO: SAMZA-2378: Container Placements for Standby containers enabled jobs
+ *
+ * @param requestMessage request containing logical processor id 0,1,2 and
host where container is desired to be moved,
+ * acceptable values of this param are any valid
hostname or "ANY_HOST"(in this case the request
+ * is sent to resource manager for any host)
+ * @param containerAllocator to request physical resources
+ */
+ public void
registerContainerPlacementAction(ContainerPlacementRequestMessage
requestMessage, ContainerAllocator containerAllocator) {
+ LOG.info("Received ControlAction request to move or restart container with
processor id {} to host {}",
+ requestMessage.getProcessorId(), requestMessage.getDestinationHost());
+ String processorId = requestMessage.getProcessorId();
+ String destinationHost = requestMessage.getDestinationHost();
+ Pair<ContainerPlacementMessage.StatusCode, String> actionStatus =
+ checkValidControlAction(processorId, destinationHost,
requestMessage.getUuid());
+
+ // Request is bad just update the response on message & return
+ if (actionStatus.getKey() ==
ContainerPlacementMessage.StatusCode.BAD_REQUEST) {
+ return;
+ }
+
+ SamzaResource currentResource =
samzaApplicationState.runningProcessors.get(processorId);
+ LOG.info("Processor ID: {} matched a active container with deployment ID:
{} running on host: {}", processorId,
+ currentResource.getContainerId(), currentResource.getHost());
+
+ if (destinationHost.equals(ANY_HOST) || !hostAffinityEnabled) {
+ LOG.info("Changing the requested host to {} because either it is
requested or host affinity is disabled",
+ ResourceRequestState.ANY_HOST);
+ destinationHost = ANY_HOST;
+ }
+
+ SamzaResourceRequest resourceRequest =
containerAllocator.getResourceRequest(processorId, destinationHost);
+ Optional<Duration> requestExpiry =
+ requestMessage.getRequestExpiry() != null ?
Optional.of(Duration.ofMillis(requestMessage.getRequestExpiry())) :
Optional.empty();
+ ContainerPlacementMetadata actionMetaData =
+ new ContainerPlacementMetadata(requestMessage.getUuid(), processorId,
currentResource.getContainerId(),
+ currentResource.getHost(), destinationHost, requestExpiry);
+
+ // Record the resource request for monitoring
+
actionMetaData.setActionStatus(ContainerPlacementMessage.StatusCode.IN_PROGRESS,
"Preferred Resources requested");
+ actionMetaData.recordResourceRequest(resourceRequest);
+ actions.put(processorId, actionMetaData);
+ containerAllocator.issueResourceRequest(resourceRequest);
+ LOG.info("Container Placement action with metadata {} and issued a request
for resources in progress",
+ actionMetaData);
+ }
+
+ @VisibleForTesting
+ ContainerPlacementMetadata
registerContainerPlacementActionForTest(ContainerPlacementRequestMessage
requestMessage, ContainerAllocator containerAllocator) {
+ registerContainerPlacementAction(requestMessage, containerAllocator);
+ if (hasActiveContainerPlacementAction(requestMessage.getProcessorId())) {
+ return getPlacementActionMetadata(requestMessage.getProcessorId()).get();
+ }
+ return null;
+ }
+
+ public Optional<Duration> getActionExpiryTimeout(String processorId) {
+ return hasActiveContainerPlacementAction(processorId) ?
+
getPlacementActionMetadata(processorId).get().getRequestActionExpiryTimeout() :
Optional.empty();
+ }
+
+ private void handleControlActionFailure(String processorId,
ContainerPlacementMetadata metaData, String failureMessage) {
+ metaData.setActionStatus(ContainerPlacementMessage.StatusCode.FAILED,
failureMessage);
+ LOG.info("Container Placement action failed with metadata {}", metaData);
+ }
+
+ /**
+ * A ContainerPlacementAction is only active if it is either CREATED,
ACCEPTED or IN_PROGRESS
+ */
+ private Boolean hasActiveContainerPlacementAction(String processorId) {
+ Optional<ContainerPlacementMetadata> metadata =
getPlacementActionMetadata(processorId);
+ if (metadata.isPresent()) {
+ ContainerPlacementMessage.StatusCode actionStatus =
metadata.get().getActionStatus();
+ return (actionStatus == ContainerPlacementMessage.StatusCode.CREATED) ||
+ (actionStatus == ContainerPlacementMessage.StatusCode.ACCEPTED) ||
+ (actionStatus == ContainerPlacementMessage.StatusCode.IN_PROGRESS);
Review comment:
simplified it I only need to check IN_PROGRESS, removed redundant!
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services