Sanil15 commented on a change in pull request #1219: SAMZA-2373: Container
Placement Service (core functionality) for container move and restart
URL: https://github.com/apache/samza/pull/1219#discussion_r352928157
##########
File path:
samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
##########
@@ -161,4 +262,140 @@ void handleExpiredRequestWithHostAffinityEnabled(String
processorId, String pref
allocator.requestResource(processorId, ResourceRequestState.ANY_HOST);
}
}
+
+ /**
+ * Registers a container placement action to move the running container to
destination host, if destination host is same as the
+ * host on which container is running, container placement action is treated
as a restart.
+ *
+ * When host affinity is disabled a move / restart is only allowed on
ANY_HOST
+ * When host affinity is enabled move / restart is allowed on specific or
ANY_HOST
+ * TODO: SAMZA-2378: Container Placements for Standby containers enabled jobs
+ *
+ * @param requestMessage request containing logical processor id 0,1,2 and
host where container is desired to be moved,
+ * acceptable values of this param are any valid
hostname or "ANY_HOST"(in this case the request
+ * is sent to resource manager for any host)
+ * @param containerAllocator to request physical resources
+ */
+ public void
registerContainerPlacementAction(ContainerPlacementRequestMessage
requestMessage, ContainerAllocator containerAllocator) {
+ LOG.info("Received ControlAction request to move or restart container with
processor id {} to host {}",
+ requestMessage.getProcessorId(), requestMessage.getDestinationHost());
+ String processorId = requestMessage.getProcessorId();
+ String destinationHost = requestMessage.getDestinationHost();
+ Pair<ContainerPlacementMessage.StatusCode, String> actionStatus =
+ checkValidControlAction(processorId, destinationHost,
requestMessage.getUuid());
+
+ // Request is bad just update the response on message & return
+ if (actionStatus.getKey() ==
ContainerPlacementMessage.StatusCode.BAD_REQUEST) {
+ return;
+ }
+
+ SamzaResource currentResource =
samzaApplicationState.runningProcessors.get(processorId);
+ LOG.info("Processor ID: {} matched a active container with deployment ID:
{} running on host: {}", processorId,
+ currentResource.getContainerId(), currentResource.getHost());
+
+ if (destinationHost.equals(ANY_HOST) || !hostAffinityEnabled) {
+ LOG.info("Changing the requested host to {} because either it is
requested or host affinity is disabled",
+ ResourceRequestState.ANY_HOST);
+ destinationHost = ANY_HOST;
+ }
+
+ SamzaResourceRequest resourceRequest =
containerAllocator.getResourceRequest(processorId, destinationHost);
+ Optional<Duration> requestExpiry =
+ requestMessage.getRequestExpiry() != null ?
Optional.of(Duration.ofMillis(requestMessage.getRequestExpiry())) :
Optional.empty();
+ ContainerPlacementMetadata actionMetaData =
+ new ContainerPlacementMetadata(requestMessage.getUuid(), processorId,
currentResource.getContainerId(),
+ currentResource.getHost(), destinationHost, requestExpiry);
+
+ // Record the resource request for monitoring
+
actionMetaData.setActionStatus(ContainerPlacementMessage.StatusCode.IN_PROGRESS,
"Preferred Resources requested");
+ actionMetaData.recordResourceRequest(resourceRequest);
+ actions.put(processorId, actionMetaData);
+ containerAllocator.issueResourceRequest(resourceRequest);
+ LOG.info("Container Placement action with metadata {} and issued a request
for resources in progress",
+ actionMetaData);
+ }
+
+ @VisibleForTesting
+ ContainerPlacementMetadata
registerContainerPlacementActionForTest(ContainerPlacementRequestMessage
requestMessage, ContainerAllocator containerAllocator) {
+ registerContainerPlacementAction(requestMessage, containerAllocator);
+ if (hasActiveContainerPlacementAction(requestMessage.getProcessorId())) {
+ return getPlacementActionMetadata(requestMessage.getProcessorId()).get();
+ }
+ return null;
+ }
+
+ public Optional<Duration> getActionExpiryTimeout(String processorId) {
+ return hasActiveContainerPlacementAction(processorId) ?
+
getPlacementActionMetadata(processorId).get().getRequestActionExpiryTimeout() :
Optional.empty();
+ }
Review comment:
```This method is really confusing, it takes as input a processorId then
checks if there is action-metadata present on that processorId (which btw is
no-longer to be deleted?), then gets the timeout on that timeout.```
hasActiveContainerPlacementAction only returns true if action is in flight,
I don't see a problem with this but sure if its more readable I will do a
linear scan and match a SamzaResourceRequest by UUID
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services