rmatharu commented on a change in pull request #1281: SAMZA-2378: Container
Placements support for Standby containers enabled jobs
URL: https://github.com/apache/samza/pull/1281#discussion_r382824513
##########
File path:
samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
##########
@@ -435,32 +459,59 @@ private boolean
deQueueAction(ContainerPlacementRequestMessage requestMessage) {
/**
* A valid container placement action needs a valid processor id. Duplicate
actions are handled by de-duping on uuid.
+ * If standby containers are enabled destination host requested must meet
standby constraints
*
* @param requestMessage container placement request message
* @return Pair<ContainerPlacementMessage.StatusCode, String> which is
status code & response suggesting if the request is valid
*/
private Pair<ContainerPlacementMessage.StatusCode, String>
validatePlacementAction(ContainerPlacementRequestMessage requestMessage) {
- String errorMessagePrefix = String.format("ContainerPlacement request: %s
is rejected due to", requestMessage);
+ String errorMessagePrefix =
ContainerPlacementMessage.StatusCode.BAD_REQUEST + " reason: ";
Boolean invalidAction = false;
String errorMessage = null;
- if (standbyContainerManager.isPresent()) {
- errorMessage = String.format("%s not supported for hot standby enabled",
errorMessagePrefix);
+ if
(!samzaApplicationState.runningProcessors.containsKey(requestMessage.getProcessorId())
&&
+
!samzaApplicationState.pendingProcessors.containsKey(requestMessage.getProcessorId())
+ ) {
+ errorMessage = String.format("%s invalid processor id neither in running
or pending processors", errorMessagePrefix);
invalidAction = true;
} else if (placementRequestsCache.containsKey(requestMessage.getUuid())) {
errorMessage = String.format("%s duplicate UUID of the request, please
retry", errorMessagePrefix);
invalidAction = true;
- } else if (Integer.parseInt(requestMessage.getProcessorId()) >=
samzaApplicationState.processorCount.get()
- ) {
- errorMessage = String.format("%s invalid processor id",
errorMessagePrefix);
+ } else if (standbyContainerManager.isPresent() &&
!standbyContainerManager.get()
+ .checkStandbyConstraints(requestMessage.getProcessorId(),
requestMessage.getDestinationHost())) {
+ errorMessage = String.format("%s destination host does not meet standby
constraints", errorMessagePrefix);
invalidAction = true;
}
if (invalidAction) {
- LOG.info(errorMessage);
return new
ImmutablePair<>(ContainerPlacementMessage.StatusCode.BAD_REQUEST, errorMessage);
}
return new ImmutablePair<>(ContainerPlacementMessage.StatusCode.ACCEPTED,
"Request is accepted");
}
+ /**
+ * An active and a standby container cannot have a concurrent placement
action in flight, if they do actions are taken
+ * serially in the order of timestamps
+ */
+ private boolean checkStandbyOrActiveContainerHasActivePlacementAction(String
processorId) {
+ if (standbyContainerManager.isPresent()) {
+ // If requested placement action is on a standby container and its
active container has a placement request,
+ // this request shall not be de-queued until in-flight action on active
container is complete
+ if (StandbyTaskUtil.isStandbyContainer(processorId) &&
hasActiveContainerPlacementAction(
+ StandbyTaskUtil.getActiveContainerId(processorId))) {
+ return true;
+ }
+ // If requested placement action is on a standby container and its
active container has a placement request,
+ // this request shall not be de-queued until in-flight action on active
container is complete
+ if (!StandbyTaskUtil.isStandbyContainer(processorId)) {
+ for (String standby :
standbyContainerManager.get().getStandbyList(processorId)) {
+ if (hasActiveContainerPlacementAction(standby)) {
+ return true;
+ }
+ }
+ }
+ }
+ return false;
+ }
Review comment:
This method can be simplified/inlined with hasActiveAction?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services