jtuglu1 commented on code in PR #19091:
URL: https://github.com/apache/druid/pull/19091#discussion_r2906970346


##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java:
##########
@@ -427,6 +430,48 @@ public boolean registerUpgradedPendingSegmentOnSupervisor(
     return false;
   }
 
+  /**
+   * Checks if there is a Task distinct from the given {@code taskId} or its 
replicas
+   * that is currently waiting to publish offsets for the given partitions.
+   */
+  public boolean isAnotherTaskGroupPublishingToPartitions(
+      String supervisorId,
+      String taskId,
+      DataSourceMetadata startMetadata
+  )
+  {
+    try {
+      InvalidInput.conditionalException(supervisorId != null, "'supervisorId' 
cannot be null");
+      if (!(startMetadata instanceof SeekableStreamDataSourceMetadata<?, ?>)) {
+        throw InvalidInput.exception("Start metadata[%s] is not streaming 
metadata", supervisorId);

Review Comment:
   not sure when/how this could occur, but might want to log the type here.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java:
##########
@@ -161,29 +172,26 @@ public int computeTaskCountForRollover()
   public int computeTaskCountForScaleAction()
   {
     lastKnownMetrics = collectMetrics();
-    if (lastKnownMetrics == null) {
-      log.debug("Metrics not available for supervisorId [%s], skipping scaling 
action", supervisorId);
-      return -1;
-    }
 
     final int optimalTaskCount = computeOptimalTaskCount(lastKnownMetrics);
-    final int currentTaskCount = lastKnownMetrics.getCurrentTaskCount();
+    final int currentTaskCount = supervisor.getIoConfig().getTaskCount();
 
     // Perform scale-up actions; scale-down actions only if configured.
-    int taskCount = -1;
+    final int taskCount;
     if (isScaleActionAllowed() && optimalTaskCount > currentTaskCount) {
       taskCount = optimalTaskCount;
       lastScaleActionTimeMillis = DateTimes.nowUtc().getMillis();
-      log.info("New task count [%d] on supervisor [%s], scaling up", 
taskCount, supervisorId);
+      log.info("Updating taskCount for supervisor[%s] from [%d] to [%d] (scale 
up).", supervisorId, currentTaskCount, taskCount);
     } else if (!config.isScaleDownOnTaskRolloverOnly()
                && isScaleActionAllowed()
                && optimalTaskCount < currentTaskCount
                && optimalTaskCount > 0) {
       taskCount = optimalTaskCount;
       lastScaleActionTimeMillis = DateTimes.nowUtc().getMillis();
-      log.info("New task count [%d] on supervisor [%s], scaling down", 
taskCount, supervisorId);
+      log.info("Updating taskCount for supervisor[%s] from [%d] to [%d] (scale 
down).", supervisorId, currentTaskCount, taskCount);
     } else {
-      log.info("No scaling required for supervisor [%s]", supervisorId);
+      taskCount = -1;
+      log.debug("No scaling required for supervisor [%s]", supervisorId);

Review Comment:
   `supervisor [%s] -> supervisor[%s]`



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -2626,8 +2640,8 @@ private void verifyAndMergeCheckpoints(
           taskGroup.checkpointSequences.clear();
           taskGroup.checkpointSequences.putAll(latestCheckpoints);
         } else {
-          log.debug(
-              "Adding task[%s] to kill list, checkpoints[%s], latestoffsets 
from DB [%s]",
+          log.warn(
+              "Adding task[%s] to kill list as its checkpoints[%s] are not 
consistent with checkpoints[%s] in metadata store",

Review Comment:
   I wonder if it's useful to create a checkpoint diff function for operators 
so they don't need to manually compare the 2 checkpoints?



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -2911,7 +2964,13 @@ protected boolean supportsPartitionExpiration()
 
   public int getPartitionCount()
   {
-    return recordSupplier.getPartitionIds(ioConfig.getStream()).size();
+    recordSupplierLock.lock();

Review Comment:
   nice: this fixes a race I've seen with auto-scaler racing with 
recordsupplier on reads.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########


Review Comment:
   do we not need to set `hasTaskRolloverStarted.set(true);` here as well for 
completeness? For example, what if a user manually triggers this from the UI?
   
   



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java:
##########
@@ -62,10 +63,17 @@ public class CostBasedAutoScaler implements 
SupervisorTaskAutoScaler
   public static final String LAG_COST_METRIC = 
"task/autoScaler/costBased/lagCost";
   public static final String IDLE_COST_METRIC = 
"task/autoScaler/costBased/idleCost";
   public static final String OPTIMAL_TASK_COUNT_METRIC = 
"task/autoScaler/costBased/optimalTaskCount";
+  public static final String INVALID_METRICS_COUNT = 
"task/autoScaler/costBased/invalidMetrics";
 
   static final int MAX_INCREASE_IN_PARTITIONS_PER_TASK = 2;
   static final int MAX_DECREASE_IN_PARTITIONS_PER_TASK = 
MAX_INCREASE_IN_PARTITIONS_PER_TASK * 2;
 
+  /**
+   * If average partition lag crosses this value and the processing rate is
+   * still zero, scaling actions are skipped and an alert is raised.
+   */
+  static final int MAX_IDLENESS_PARTITION_LAG = 10_000;

Review Comment:
   I'm not too crazy about hardcoding this value. Could we expose this as a 
config? The reason I ask it's totally feasible for a topic to never reach above 
10k event lag (it could only see a few 100 event lag for critical topics).



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java:
##########
@@ -219,7 +219,30 @@ public SegmentPublishResult perform(Task task, 
TaskActionToolbox toolbox)
     }
 
     IndexTaskUtils.emitSegmentPublishMetrics(retVal, task, toolbox);
-    return retVal;
+
+    if (shouldFailImmediately(retVal, task, toolbox)) {
+      return SegmentPublishResult.fail(retVal.getErrorMsg());
+    } else {
+      return retVal;
+    }
+  }
+
+  /**
+   * Checks if the current publish action should be failed without allowing any
+   * more retries. A failed publish action should be retried only if there is
+   * another task waiting to publish offsets for an overlapping set of 
partitions.
+   */
+  private boolean shouldFailImmediately(SegmentPublishResult result, Task 
task, TaskActionToolbox toolbox)
+  {
+    if (result.isSuccess() || !result.isRetryable() || startMetadata == null) {

Review Comment:
   I suppose this might regress the case where task B reads before task A 
commits the write, then task B proceeds to write and fails (because older task 
A publish completed), failing, then checking:
   ```
   if there is another task waiting to publish offsets for an overlapping set 
of partitions.
   ```
   which of course there isn't since task A completed then proceeds to fail?



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java:
##########
@@ -427,6 +430,48 @@ public boolean registerUpgradedPendingSegmentOnSupervisor(
     return false;
   }
 
+  /**
+   * Checks if there is a Task distinct from the given {@code taskId} or its 
replicas
+   * that is currently waiting to publish offsets for the given partitions.
+   */
+  public boolean isAnotherTaskGroupPublishingToPartitions(
+      String supervisorId,
+      String taskId,
+      DataSourceMetadata startMetadata
+  )
+  {
+    try {
+      InvalidInput.conditionalException(supervisorId != null, "'supervisorId' 
cannot be null");
+      if (!(startMetadata instanceof SeekableStreamDataSourceMetadata<?, ?>)) {
+        throw InvalidInput.exception("Start metadata[%s] is not streaming 
metadata", supervisorId);
+      }
+
+      Pair<Supervisor, SupervisorSpec> supervisor = 
supervisors.get(supervisorId);
+      if (supervisor == null) {
+        throw NotFound.exception("Could not find supervisor[%s]", 
supervisorId);
+      }
+      if (!(supervisor.lhs instanceof SeekableStreamSupervisor<?, ?, ?>)) {
+        throw InvalidInput.exception("Supervisor[%s] is not a streaming 
supervisor", supervisorId);

Review Comment:
   same about the type



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to