Sanil15 commented on a change in pull request #1152: SAMZA-2319: Simplify
Container Allocation logic
URL: https://github.com/apache/samza/pull/1152#discussion_r327380533
##########
File path:
samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
##########
@@ -121,18 +146,101 @@ public void run() {
Thread.sleep(allocatorSleepIntervalMs);
} catch (InterruptedException e) {
- log.warn("Got InterruptedException in AllocatorThread.", e);
+ LOG.warn("Got InterruptedException in AllocatorThread.", e);
Thread.currentThread().interrupt();
} catch (Exception e) {
- log.error("Got unknown Exception in AllocatorThread.", e);
+ LOG.error("Got unknown Exception in AllocatorThread.", e);
}
}
}
/**
* Assigns resources received from the cluster manager to processors.
+ *
+ * During the run() method, the thread sleeps for allocatorSleepIntervalMs
ms. It then invokes assignResourceRequests,
+ * and tries to allocate any unsatisfied request that is still in the
request queue {@link ResourceRequestState})
+ * with allocated resources.
+ * When {@code hostAffinityEnabled} is disabled, all allocated resources are
buffered in the list keyed by "ANY_HOST".
+ * When {@code hostAffinityEnabled} is enabled, all allocated resources are
buffered in the list keyed by "preferredHost
+ *
+ * If the requested host is not available, the thread checks to see if the
request has expired. If it has expired
+ * then two cases are handled seperately
+ *
+ * Case 1: host-affinity is disabled, cancels the current request and issues
another ANY_HOST request
+ * Case 2: host-affinity is enabled, looks for allocated resouces on
ANY_HOST and issues a container start if available,
+ * otherwise issues an ANY_HOST request
+ *
+ * In either of the scenarious if a {@code StandbyContainerManager} is
present, the allocator transfers the request
+ * to it for checking StandByConstraints
*/
- protected abstract void assignResourceRequests();
+ protected void assignResourceRequests() {
+ while (hasReadyPendingRequest()) {
+ SamzaResourceRequest request = peekReadyPendingRequest().get();
+ String processorId = request.getProcessorId();
+ String preferredHost = hostAffinityEnabled ? request.getPreferredHost()
: ResourceRequestState.ANY_HOST;
+ Instant requestCreationTime = request.getRequestTimestamp();
+
+ LOG.info("Handling assignment request for Processor ID: {} on host:
{}.", processorId, preferredHost);
+ if (hasAllocatedResource(preferredHost)) {
+
+ // Found allocated container on preferredHost
+ LOG.info("Found an available container for Processor ID: {} on the
host: {}", processorId, preferredHost);
+
+ // Needs to be only updated when host affinity is enabled
+ if (hostAffinityEnabled) {
+ state.matchedResourceRequests.incrementAndGet();
+ }
+
+ // Try to launch processor on this preferredHost if it all standby
constraints are met
Review comment:
sure
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services