jtuglu1 commented on code in PR #19091:
URL: https://github.com/apache/druid/pull/19091#discussion_r2906970346
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java:
##########
@@ -427,6 +430,48 @@ public boolean registerUpgradedPendingSegmentOnSupervisor(
return false;
}
+ /**
+ * Checks if there is a Task distinct from the given {@code taskId} or its
replicas
+ * that is currently waiting to publish offsets for the given partitions.
+ */
+ public boolean isAnotherTaskGroupPublishingToPartitions(
+ String supervisorId,
+ String taskId,
+ DataSourceMetadata startMetadata
+ )
+ {
+ try {
+ InvalidInput.conditionalException(supervisorId != null, "'supervisorId'
cannot be null");
+ if (!(startMetadata instanceof SeekableStreamDataSourceMetadata<?, ?>)) {
+ throw InvalidInput.exception("Start metadata[%s] is not streaming
metadata", supervisorId);
Review Comment:
not sure when/how this could occur, but might want to log the type here.
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java:
##########
@@ -161,29 +172,26 @@ public int computeTaskCountForRollover()
public int computeTaskCountForScaleAction()
{
lastKnownMetrics = collectMetrics();
- if (lastKnownMetrics == null) {
- log.debug("Metrics not available for supervisorId [%s], skipping scaling
action", supervisorId);
- return -1;
- }
final int optimalTaskCount = computeOptimalTaskCount(lastKnownMetrics);
- final int currentTaskCount = lastKnownMetrics.getCurrentTaskCount();
+ final int currentTaskCount = supervisor.getIoConfig().getTaskCount();
// Perform scale-up actions; scale-down actions only if configured.
- int taskCount = -1;
+ final int taskCount;
if (isScaleActionAllowed() && optimalTaskCount > currentTaskCount) {
taskCount = optimalTaskCount;
lastScaleActionTimeMillis = DateTimes.nowUtc().getMillis();
- log.info("New task count [%d] on supervisor [%s], scaling up",
taskCount, supervisorId);
+ log.info("Updating taskCount for supervisor[%s] from [%d] to [%d] (scale
up).", supervisorId, currentTaskCount, taskCount);
} else if (!config.isScaleDownOnTaskRolloverOnly()
&& isScaleActionAllowed()
&& optimalTaskCount < currentTaskCount
&& optimalTaskCount > 0) {
taskCount = optimalTaskCount;
lastScaleActionTimeMillis = DateTimes.nowUtc().getMillis();
- log.info("New task count [%d] on supervisor [%s], scaling down",
taskCount, supervisorId);
+ log.info("Updating taskCount for supervisor[%s] from [%d] to [%d] (scale
down).", supervisorId, currentTaskCount, taskCount);
} else {
- log.info("No scaling required for supervisor [%s]", supervisorId);
+ taskCount = -1;
+ log.debug("No scaling required for supervisor [%s]", supervisorId);
Review Comment:
`supervisor [%s] -> supervisor[%s]`
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -2626,8 +2640,8 @@ private void verifyAndMergeCheckpoints(
taskGroup.checkpointSequences.clear();
taskGroup.checkpointSequences.putAll(latestCheckpoints);
} else {
- log.debug(
- "Adding task[%s] to kill list, checkpoints[%s], latestoffsets
from DB [%s]",
+ log.warn(
+ "Adding task[%s] to kill list as its checkpoints[%s] are not
consistent with checkpoints[%s] in metadata store",
Review Comment:
I wonder if it's useful to create a checkpoint diff function for operators
so they don't need to manually compare the 2 checkpoints?
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -2911,7 +2964,13 @@ protected boolean supportsPartitionExpiration()
public int getPartitionCount()
{
- return recordSupplier.getPartitionIds(ioConfig.getStream()).size();
+ recordSupplierLock.lock();
Review Comment:
nice: this fixes a race I've seen with auto-scaler racing with
recordsupplier on reads.
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
Review Comment:
do we not need to set `hasTaskRolloverStarted.set(true);` here as well for
completeness? For example, what if a user manually triggers this from the UI?
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java:
##########
@@ -62,10 +63,17 @@ public class CostBasedAutoScaler implements
SupervisorTaskAutoScaler
public static final String LAG_COST_METRIC =
"task/autoScaler/costBased/lagCost";
public static final String IDLE_COST_METRIC =
"task/autoScaler/costBased/idleCost";
public static final String OPTIMAL_TASK_COUNT_METRIC =
"task/autoScaler/costBased/optimalTaskCount";
+ public static final String INVALID_METRICS_COUNT =
"task/autoScaler/costBased/invalidMetrics";
static final int MAX_INCREASE_IN_PARTITIONS_PER_TASK = 2;
static final int MAX_DECREASE_IN_PARTITIONS_PER_TASK =
MAX_INCREASE_IN_PARTITIONS_PER_TASK * 2;
+ /**
+ * If average partition lag crosses this value and the processing rate is
+ * still zero, scaling actions are skipped and an alert is raised.
+ */
+ static final int MAX_IDLENESS_PARTITION_LAG = 10_000;
Review Comment:
I'm not too crazy about hardcoding this value. Could we expose this as a
config? The reason I ask it's totally feasible for a topic to never reach above
10k event lag (it could only see a few 100 event lag for critical topics).
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java:
##########
@@ -219,7 +219,30 @@ public SegmentPublishResult perform(Task task,
TaskActionToolbox toolbox)
}
IndexTaskUtils.emitSegmentPublishMetrics(retVal, task, toolbox);
- return retVal;
+
+ if (shouldFailImmediately(retVal, task, toolbox)) {
+ return SegmentPublishResult.fail(retVal.getErrorMsg());
+ } else {
+ return retVal;
+ }
+ }
+
+ /**
+ * Checks if the current publish action should be failed without allowing any
+ * more retries. A failed publish action should be retried only if there is
+ * another task waiting to publish offsets for an overlapping set of
partitions.
+ */
+ private boolean shouldFailImmediately(SegmentPublishResult result, Task
task, TaskActionToolbox toolbox)
+ {
+ if (result.isSuccess() || !result.isRetryable() || startMetadata == null) {
Review Comment:
I suppose this might regress the case where task B reads before task A
commits the write, then task B proceeds to write and fails (because older task
A publish completed), failing, then checking:
```
if there is another task waiting to publish offsets for an overlapping set
of partitions.
```
which of course there isn't since task A completed then proceeds to fail?
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java:
##########
@@ -427,6 +430,48 @@ public boolean registerUpgradedPendingSegmentOnSupervisor(
return false;
}
+ /**
+ * Checks if there is a Task distinct from the given {@code taskId} or its
replicas
+ * that is currently waiting to publish offsets for the given partitions.
+ */
+ public boolean isAnotherTaskGroupPublishingToPartitions(
+ String supervisorId,
+ String taskId,
+ DataSourceMetadata startMetadata
+ )
+ {
+ try {
+ InvalidInput.conditionalException(supervisorId != null, "'supervisorId'
cannot be null");
+ if (!(startMetadata instanceof SeekableStreamDataSourceMetadata<?, ?>)) {
+ throw InvalidInput.exception("Start metadata[%s] is not streaming
metadata", supervisorId);
+ }
+
+ Pair<Supervisor, SupervisorSpec> supervisor =
supervisors.get(supervisorId);
+ if (supervisor == null) {
+ throw NotFound.exception("Could not find supervisor[%s]",
supervisorId);
+ }
+ if (!(supervisor.lhs instanceof SeekableStreamSupervisor<?, ?, ?>)) {
+ throw InvalidInput.exception("Supervisor[%s] is not a streaming
supervisor", supervisorId);
Review Comment:
same about the type
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]