kfaraz commented on code in PR #13197:
URL: https://github.com/apache/druid/pull/13197#discussion_r1231768921


##########
server/src/main/java/org/apache/druid/server/coordinator/loadqueue/HttpLoadQueuePeon.java:
##########
@@ -470,134 +517,127 @@
     return Collections.unmodifiableSet(segmentsMarkedToDrop);
   }
 
-  private abstract class SegmentHolder
+  /**
+   * A request is considered to have timed out if the time elapsed since it was
+   * first sent to the server is greater than the configured load timeout.
+   *
+   * @see DruidCoordinatorConfig#getLoadTimeoutDelay()
+   */
+  private boolean hasRequestTimedOut(SegmentHolder holder, long 
currentTimeMillis)
   {
-    private final DataSegment segment;
-    private final DataSegmentChangeRequest changeRequest;
-    private final List<LoadPeonCallback> callbacks = new ArrayList<>();
-
-    // Time when this request was sent to target server the first time.
-    private volatile long scheduleTime = -1;
-
-    private SegmentHolder(
-        DataSegment segment,
-        DataSegmentChangeRequest changeRequest,
-        LoadPeonCallback callback
-    )
-    {
-      this.segment = segment;
-      this.changeRequest = changeRequest;
+    return holder.isRequestSentToServer()
+           && currentTimeMillis - holder.getFirstRequestMillis()
+              > config.getLoadTimeoutDelay().getMillis();
+  }
 
-      if (callback != null) {
-        this.callbacks.add(callback);
-      }
-    }
+  private void onRequestFailed(SegmentHolder holder, String failureCause)
+  {
+    log.error(
+        "Server[%s] failed segment[%s] request[%s] with cause [%s].",
+        serverId, holder.getSegment().getId(), holder.getAction(), failureCause
+    );
+    onRequestCompleted(holder, QueueStatus.FAILED);
+  }
 
-    public void addCallback(LoadPeonCallback newCallback)
-    {
-      synchronized (callbacks) {
-        if (newCallback != null) {
-          callbacks.add(newCallback);
-        }
-      }
-    }
+  private void onRequestCompleted(SegmentHolder holder, QueueStatus status)
+  {
+    final SegmentAction action = holder.getAction();
+    log.trace(
+        "Server[%s] completed request[%s] on segment[%s] with status[%s].",
+        serverId, action, holder.getSegment().getId(), status
+    );
 
-    public DataSegment getSegment()
-    {
-      return segment;
+    if (holder.isLoad()) {
+      queuedSize.addAndGet(-holder.getSegment().getSize());
     }
+    incrementStat(holder, status);
+    executeCallbacks(holder, status == QueueStatus.SUCCESS);
+  }
 
-    public DataSegmentChangeRequest getChangeRequest()
-    {
-      return changeRequest;
+  private void incrementStat(SegmentHolder holder, QueueStatus status)
+  {
+    stats.add(status.getStatForAction(holder.getAction()), 1);
+    if (status.datasourceStat != null) {
+      stats.addToDatasourceStat(status.datasourceStat, 
holder.getSegment().getDataSource(), 1);
     }
+  }
 
-    public boolean hasTimedOut()
-    {
-      if (scheduleTime < 0) {
-        scheduleTime = System.currentTimeMillis();
-        return false;
-      } else if (System.currentTimeMillis() - scheduleTime > 
config.getLoadTimeoutDelay().getMillis()) {
-        return true;
-      } else {
+  private void executeCallbacks(SegmentHolder holder, boolean success)
+  {
+    callBackExecutor.execute(() -> {
+      for (LoadPeonCallback callback : holder.getCallbacks()) {
+        callback.execute(success);
+      }
+    });
+  }
+
+  /**
+   * Tries to cancel a load/drop operation. An load/drop request can be 
cancelled
+   * only if it has not already been sent to the corresponding server.
+   */
+  @Override
+  public boolean cancelOperation(DataSegment segment)
+  {
+    synchronized (lock) {
+      if (activeRequestSegments.contains(segment)) {
         return false;
       }
-    }
 
-    public void requestSucceeded()
-    {
-      log.trace(
-          "Server[%s] Successfully processed segment[%s] request[%s].",
-          serverId,
-          segment.getId(),
-          changeRequest.getClass().getSimpleName()
-      );
+      // Find the action on this segment, if any
+      final SegmentHolder holder = segmentsToLoad.containsKey(segment)
+                                   ? segmentsToLoad.remove(segment)
+                                   : segmentsToDrop.remove(segment);
+      if (holder == null) {
+        return false;
+      }
 
-      callBackExecutor.execute(() -> {
-        for (LoadPeonCallback callback : callbacks) {
-          if (callback != null) {
-            callback.execute(true);
-          }
-        }
-      });
+      queuedSegments.remove(holder);
+      onRequestCompleted(holder, QueueStatus.CANCELLED);
+      return true;
     }
+  }
 
-    public void requestFailed(String failureCause)
-    {
-      log.error(
-          "Server[%s] Failed segment[%s] request[%s] with cause [%s].",
-          serverId,
-          segment.getId(),
-          changeRequest.getClass().getSimpleName(),
-          failureCause
-      );
-
-      failedAssignCount.getAndIncrement();
+  private enum QueueStatus
+  {
+    ASSIGNED(Stats.SegmentQueue.ASSIGNED_ACTIONS),
+    SUCCESS(Stats.SegmentQueue.COMPLETED_ACTIONS),
+    FAILED(Stats.SegmentQueue.FAILED_ACTIONS),
+    CANCELLED(Stats.SegmentQueue.CANCELLED_ACTIONS);
 
-      callBackExecutor.execute(() -> {
-        for (LoadPeonCallback callback : callbacks) {
-          if (callback != null) {
-            callback.execute(false);
-          }
-        }
-      });
-    }
+    final CoordinatorStat loadStat;
+    final CoordinatorStat moveStat;
+    final CoordinatorStat dropStat;
+    final CoordinatorStat datasourceStat;
 
-    @Override
-    public String toString()
+    QueueStatus()
     {
-      return changeRequest.toString();
+      this(null);
     }
-  }
 
-  private class LoadSegmentHolder extends SegmentHolder
-  {
-    public LoadSegmentHolder(DataSegment segment, LoadPeonCallback callback)
+    QueueStatus(CoordinatorStat datasourceStat)
     {
-      super(segment, new SegmentChangeRequestLoad(segment), callback);
-      queuedSize.addAndGet(segment.getSize());
-    }
+      // These stats are not emitted and are tracked for debugging purposes 
only
+      final String prefix = StringUtils.toLowerCase(name());
+      this.loadStat = new CoordinatorStat(prefix + "Load");
+      this.moveStat = new CoordinatorStat(prefix + "Move");
+      this.dropStat = new CoordinatorStat(prefix + "Drop");
 
-    @Override
-    public void requestSucceeded()
-    {
-      queuedSize.addAndGet(-getSegment().getSize());
-      super.requestSucceeded();
+      this.datasourceStat = datasourceStat;
     }
 
-    @Override
-    public void requestFailed(String failureCause)
+    CoordinatorStat getStatForAction(SegmentAction action)
     {
-      queuedSize.addAndGet(-getSegment().getSize());
-      super.requestFailed(failureCause);
-    }
-  }
+      switch (action) {

Review Comment:
   No, it is okay, `MOVE_FROM` operations aren't supposed to come to the peon. 
So the next line will throw an IAE. I can just have the `default` case throw 
the exception too so that `CodeQL` does not complain.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to