rmatharu commented on a change in pull request #1170: Samza-2330: Handle
expired resource request for Container allocator when host affinity is disabled
URL: https://github.com/apache/samza/pull/1170#discussion_r330823454
##########
File path:
samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
##########
@@ -221,7 +221,10 @@ void assignResourceRequests() {
updateExpiryMetrics(request);
if (hostAffinityEnabled) {
handleExpiredRequestWithHostAffinityEnabled(processorId,
preferredHost, request);
+ } else {
+ handleExpiredRequestWithHostAffinityDisabled(processorId, request);
Review comment:
It might be better to have a single method called
handleExpiredRequest(processorId, request, preferredHost). Perhaps along the
lines:
```
if (expired) {
updateExpiryMetrics(request);
handleExpiredRequest(processorId, preferredHost, request);
}
```
```
/** Handles expired resource-requests by
a. invoking standbyContainer manager (if enabled), b. running on another
container resource if available, or
c. issuing a new resource request for any-host and cancelling the existing
one.
Note: This behavior holds regardless of host-affinity enabled or not.
*/
void handleExpiredRequest(String processorId, String preferredHost,
SamzaResourceRequest request) {
boolean resourceAvailableOnAnyHost =
hasAllocatedResource(ResourceRequestState.ANY_HOST);
if (hostAffinityEnabled && standbyContainerManager.isPresent()) {
standbyContainerManager.get()
.handleExpiredResourceRequest(processorId, request,
Optional.ofNullable(peekAllocatedResource(ResourceRequestState.ANY_HOST)),
this, resourceRequestState);
} else if (resourceAvailableOnAnyHost) {
LOG.info("Request for Processor ID: {} on host: {} has expired.
Running on ANY_HOST", processorId, preferredHost);
runStreamProcessor(request, ResourceRequestState.ANY_HOST);
} else {
LOG.info("Request for Processor ID: {} on host: {} has expired.
Requesting additional resources on ANY_HOST.",
processorId, preferredHost);
resourceRequestState.cancelResourceRequest(request);
requestResource(processorId, ResourceRequestState.ANY_HOST);
}
}
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services