Sanil15 commented on a change in pull request #1152: SAMZA-2319: Simplify 
Container Allocation logic
URL: https://github.com/apache/samza/pull/1152#discussion_r327380693
 
 

 ##########
 File path: 
samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
 ##########
 @@ -121,18 +146,101 @@ public void run() {
 
         Thread.sleep(allocatorSleepIntervalMs);
       } catch (InterruptedException e) {
-        log.warn("Got InterruptedException in AllocatorThread.", e);
+        LOG.warn("Got InterruptedException in AllocatorThread.", e);
         Thread.currentThread().interrupt();
       } catch (Exception e) {
-        log.error("Got unknown Exception in AllocatorThread.", e);
+        LOG.error("Got unknown Exception in AllocatorThread.", e);
       }
     }
   }
 
   /**
    * Assigns resources received from the cluster manager to processors.
+   *
+   * During the run() method, the thread sleeps for allocatorSleepIntervalMs 
ms. It then invokes assignResourceRequests,
+   * and tries to allocate any unsatisfied request that is still in the 
request queue {@link ResourceRequestState})
+   * with allocated resources.
+   * When {@code hostAffinityEnabled} is disabled, all allocated resources are 
buffered in the list keyed by "ANY_HOST".
+   * When {@code hostAffinityEnabled} is enabled, all allocated resources are 
buffered in the list keyed by "preferredHost
+   *
+   * If the requested host is not available, the thread checks to see if the 
request has expired. If it has expired
+   * then two cases are handled seperately
+   *
+   * Case 1: host-affinity is disabled, cancels the current request and issues 
another ANY_HOST request
+   * Case 2: host-affinity is enabled, looks for allocated resouces on 
ANY_HOST and issues a container start if available,
+   *         otherwise issues an ANY_HOST request
+   *
+   * In either of the scenarious if a {@code StandbyContainerManager} is 
present, the allocator transfers the request
+   * to it for checking StandByConstraints
    */
-  protected abstract void assignResourceRequests();
+  protected void assignResourceRequests() {
+    while (hasReadyPendingRequest()) {
+      SamzaResourceRequest request = peekReadyPendingRequest().get();
+      String processorId = request.getProcessorId();
+      String preferredHost = hostAffinityEnabled ? request.getPreferredHost() 
: ResourceRequestState.ANY_HOST;
+      Instant requestCreationTime = request.getRequestTimestamp();
+
 
 Review comment:
   makes it readable

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to