kfaraz commented on code in PR #18819: URL: https://github.com/apache/druid/pull/18819#discussion_r2605543075
########## indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfigTest.java: ########## @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.seekablestream.supervisor.autoscaler; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.junit.Assert; +import org.junit.Test; Review Comment: Let's use JUnit 5 for new tests. ########## indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfig.java: ########## @@ -0,0 +1,386 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.seekablestream.supervisor.autoscaler; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import org.apache.druid.indexing.overlord.supervisor.Supervisor; +import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec; +import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler; +import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; + +import javax.annotation.Nullable; +import java.util.Objects; + +/** + * Configuration for cost-based auto-scaling of seekable stream supervisor tasks. + * Uses a cost function combining lag and idle time metrics to determine optimal task counts. + * Task counts are constrained to be factors/divisors of the partition count. + */ +@JsonInclude(JsonInclude.Include.NON_NULL) +public class CostBasedAutoScalerConfig implements AutoScalerConfig +{ + private static final long DEFAULT_METRICS_COLLECTION_INTERVAL_MILLIS = 3 * 60 * 1000; // 3 minutes + private static final long DEFAULT_SCALE_ACTION_START_DELAY_MILLIS = 10 * 60 * 1000; // 10 minutes + private static final long DEFAULT_SCALE_ACTION_PERIOD_MILLIS = 15 * 60 * 1000; // 15 minutes + private static final long DEFAULT_MIN_TRIGGER_SCALE_ACTION_FREQUENCY_MILLIS = 1200000; // 20 minutes + private static final double DEFAULT_LAG_WEIGHT = 0.25; + private static final double DEFAULT_IDLE_WEIGHT = 0.75; + + private final boolean enableTaskAutoScaler; + private final int taskCountMax; + private final int taskCountMin; + private Integer taskCountStart; + private final long minTriggerScaleActionFrequencyMillis; + private final Double stopTaskCountRatio; + private final long metricsCollectionIntervalMillis; + private final long scaleActionStartDelayMillis; + private final long scaleActionPeriodMillis; Review Comment: Let's skip adding these configs for now and use some standard default values. If needed, we can add them later. ########## indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfig.java: ########## @@ -0,0 +1,386 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.seekablestream.supervisor.autoscaler; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import org.apache.druid.indexing.overlord.supervisor.Supervisor; +import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec; +import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler; +import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; + +import javax.annotation.Nullable; +import java.util.Objects; + +/** + * Configuration for cost-based auto-scaling of seekable stream supervisor tasks. + * Uses a cost function combining lag and idle time metrics to determine optimal task counts. + * Task counts are constrained to be factors/divisors of the partition count. + */ +@JsonInclude(JsonInclude.Include.NON_NULL) +public class CostBasedAutoScalerConfig implements AutoScalerConfig +{ + private static final long DEFAULT_METRICS_COLLECTION_INTERVAL_MILLIS = 3 * 60 * 1000; // 3 minutes + private static final long DEFAULT_SCALE_ACTION_START_DELAY_MILLIS = 10 * 60 * 1000; // 10 minutes + private static final long DEFAULT_SCALE_ACTION_PERIOD_MILLIS = 15 * 60 * 1000; // 15 minutes + private static final long DEFAULT_MIN_TRIGGER_SCALE_ACTION_FREQUENCY_MILLIS = 1200000; // 20 minutes + private static final double DEFAULT_LAG_WEIGHT = 0.25; + private static final double DEFAULT_IDLE_WEIGHT = 0.75; + + private final boolean enableTaskAutoScaler; + private final int taskCountMax; + private final int taskCountMin; + private Integer taskCountStart; + private final long minTriggerScaleActionFrequencyMillis; + private final Double stopTaskCountRatio; + private final long metricsCollectionIntervalMillis; + private final long scaleActionStartDelayMillis; + private final long scaleActionPeriodMillis; + + private final double lagWeight; + private final double idleWeight; + + @JsonCreator + public CostBasedAutoScalerConfig( + @JsonProperty("taskCountMax") Integer taskCountMax, + @JsonProperty("taskCountMin") Integer taskCountMin, + @Nullable @JsonProperty("enableTaskAutoScaler") Boolean enableTaskAutoScaler, + @Nullable @JsonProperty("taskCountStart") Integer taskCountStart, Review Comment: Based on the discussions on the other PR #18745 , I wonder if we should just skip adding a `taskCountStart` in this auto-scaler config. ########## indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java: ########## @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.seekablestream.supervisor.autoscaler; + +import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec; +import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats; +import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler; +import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import org.apache.druid.query.DruidMetrics; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Cost-based auto-scaler for seekable stream supervisors. + * Uses a cost function combining lag and idle time metrics to determine optimal task counts. + * Task counts are selected from predefined values (not arbitrary factors). + * Scale-up happens incrementally, scale-down only during task rollover. + */ +public class CostBasedAutoScaler implements SupervisorTaskAutoScaler +{ + private static final EmittingLogger log = new EmittingLogger(CostBasedAutoScaler.class); + + private static final int SCALE_FACTOR_DISCRETE_DISTANCE = 2; + + private static final Map<Integer, List<Integer>> FACTORS_CACHE = new HashMap<>(); + + private final String dataSource; + /** + * Atomic reference to CostMetrics object. All operations must be performed + * with sequentially consistent semantics (volatile reads/writes). + * However, it may be fine-tuned with acquire/release semantics, + * but requires careful reasoning about correctness. + */ + private final AtomicReference<CostMetrics> currentMetrics; + private final ScheduledExecutorService metricsCollectionExec; + private final ScheduledExecutorService scalingDecisionExec; + private final SupervisorSpec spec; + private final SeekableStreamSupervisor supervisor; + private final CostBasedAutoScalerConfig config; + private final ServiceEmitter emitter; + private final ServiceMetricEvent.Builder metricBuilder; + private final WeightedCostFunction costFunction; + + public CostBasedAutoScaler( + SeekableStreamSupervisor supervisor, + String dataSource, + CostBasedAutoScalerConfig config, + SupervisorSpec spec, + ServiceEmitter emitter + ) + { + this.dataSource = dataSource; + this.config = config; + this.spec = spec; + this.supervisor = supervisor; + this.emitter = emitter; + + final String supervisorId = StringUtils.format("Supervisor-%s", dataSource); + + this.currentMetrics = new AtomicReference<>(null); + this.costFunction = new WeightedCostFunction(); + + this.metricsCollectionExec = Execs.scheduledSingleThreaded( + StringUtils.encodeForFormat(supervisorId) + "-CostBasedMetrics-%d" + ); + this.scalingDecisionExec = Execs.scheduledSingleThreaded( + StringUtils.encodeForFormat(supervisorId) + "-CostBasedScaling-%d" + ); + + this.metricBuilder = ServiceMetricEvent.builder() + .setDimension(DruidMetrics.DATASOURCE, dataSource) + .setDimension( + DruidMetrics.STREAM, + this.supervisor.getIoConfig().getStream() + ); + } + + @Override + public void start() + { + Callable<Integer> scaleAction = () -> computeOptimalTaskCount(currentMetrics); + Runnable onSuccessfulScale = () -> currentMetrics.set(null); + + metricsCollectionExec.scheduleAtFixedRate( + collectMetrics(), + config.getMetricsCollectionIntervalMillis(), + config.getMetricsCollectionIntervalMillis(), + TimeUnit.MILLISECONDS + ); + + scalingDecisionExec.scheduleAtFixedRate( + supervisor.buildDynamicAllocationTask(scaleAction, onSuccessfulScale, emitter), + config.getScaleActionStartDelayMillis(), + config.getScaleActionPeriodMillis(), + TimeUnit.MILLISECONDS + ); + + log.info( + "CostBasedAutoScaler started for dataSource [%s]: collecting metrics every [%d]ms, " + + "evaluating scaling every [%d]ms", + dataSource, + config.getMetricsCollectionIntervalMillis(), + config.getScaleActionPeriodMillis() + ); + } + + @Override + public void stop() + { + scalingDecisionExec.shutdownNow(); + metricsCollectionExec.shutdownNow(); + log.info("CostBasedAutoScaler stopped for dataSource [%s]", dataSource); + } + + @Override + public void reset() + { + currentMetrics.set(null); + } + + private Runnable collectMetrics() + { + return () -> { + if (spec.isSuspended()) { + log.debug("Supervisor [%s] is suspended, skipping metrics collection", dataSource); + return; + } + + final LagStats lagStats = supervisor.computeLagStats(); + if (lagStats == null) { + log.debug("Lag stats unavailable for dataSource [%s], skipping collection", dataSource); + return; + } + + final int currentTaskCount = supervisor.getActiveTaskGroupsCount(); + final int partitionCount = supervisor.getPartitionCount(); + final double avgPartitionLag = partitionCount > 0 + ? (double) lagStats.getTotalLag() / partitionCount + : 0.0; + final double pollIdleRatio = supervisor.getPollIdleRatioMetric(); + + currentMetrics.compareAndSet( + null, + new CostMetrics( + System.currentTimeMillis(), + avgPartitionLag, + currentTaskCount, + partitionCount, + pollIdleRatio + ) + ); + + log.debug("Collected metrics for dataSource [%s]", dataSource); + }; + } + + /** + * @return optimal task count, or -1 if no scaling action needed + */ + public int computeOptimalTaskCount(AtomicReference<CostMetrics> currentMetricsRef) + { + final CostMetrics currentMetrics = currentMetricsRef.get(); + if (currentMetrics == null) { + log.debug("No metrics available yet for dataSource [%s]", dataSource); + return -1; + } + + if (currentMetrics.getPartitionCount() <= 0 || currentMetrics.getCurrentTaskCount() <= 0) { + return -1; + } + + final int currentTaskCount = currentMetrics.getCurrentTaskCount(); + final List<Integer> validTaskCounts = FACTORS_CACHE.computeIfAbsent( + currentMetrics.getPartitionCount(), + this::computeFactors + ); + + if (validTaskCounts.isEmpty()) { + log.warn("No valid task counts after applying constraints for dataSource [%s]", dataSource); + return -1; + } + + // Update bounds with observed lag BEFORE optimization loop + // This ensures normalization uses historical observed values, not predicted values + costFunction.updateLagBounds(currentMetrics.getAvgPartitionLag()); + + int optimalTaskCount = -1; + double optimalCost = Double.POSITIVE_INFINITY; + + // TODO: what if somehow it is not in the validTaskCounts list? + final int bestTaskCountIndex = validTaskCounts.indexOf(currentTaskCount); + for (int i = bestTaskCountIndex - SCALE_FACTOR_DISCRETE_DISTANCE; + i <= bestTaskCountIndex + SCALE_FACTOR_DISCRETE_DISTANCE; i++) { + // Range check. + if (i < 0 || i >= validTaskCounts.size()) { + continue; + } + int taskCount = validTaskCounts.get(i); + if (taskCount < config.getTaskCountMin()) { + continue; + } else if (taskCount > config.getTaskCountMax()) { + break; + } + double cost = costFunction.computeCost(currentMetrics, taskCount, config); + log.debug("Proposed task count: %d, Cost: %.4f", taskCount, cost); + if (cost < optimalCost) { + optimalTaskCount = taskCount; + optimalCost = cost; + } + } + + System.err.println("Emitting " + optimalTaskCount); + // emitter.emit(metricBuilder.setMetric(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC, optimalTaskCount)); Review Comment: Please remove these. ########## indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java: ########## @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.seekablestream.supervisor.autoscaler; + +import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec; +import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats; +import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler; +import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import org.apache.druid.query.DruidMetrics; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Cost-based auto-scaler for seekable stream supervisors. + * Uses a cost function combining lag and idle time metrics to determine optimal task counts. + * Task counts are selected from predefined values (not arbitrary factors). + * Scale-up happens incrementally, scale-down only during task rollover. + */ +public class CostBasedAutoScaler implements SupervisorTaskAutoScaler +{ + private static final EmittingLogger log = new EmittingLogger(CostBasedAutoScaler.class); + + private static final int SCALE_FACTOR_DISCRETE_DISTANCE = 2; + + private static final Map<Integer, List<Integer>> FACTORS_CACHE = new HashMap<>(); + + private final String dataSource; + /** + * Atomic reference to CostMetrics object. All operations must be performed + * with sequentially consistent semantics (volatile reads/writes). + * However, it may be fine-tuned with acquire/release semantics, + * but requires careful reasoning about correctness. + */ + private final AtomicReference<CostMetrics> currentMetrics; + private final ScheduledExecutorService metricsCollectionExec; + private final ScheduledExecutorService scalingDecisionExec; + private final SupervisorSpec spec; + private final SeekableStreamSupervisor supervisor; + private final CostBasedAutoScalerConfig config; + private final ServiceEmitter emitter; + private final ServiceMetricEvent.Builder metricBuilder; + private final WeightedCostFunction costFunction; + + public CostBasedAutoScaler( + SeekableStreamSupervisor supervisor, + String dataSource, + CostBasedAutoScalerConfig config, + SupervisorSpec spec, + ServiceEmitter emitter + ) + { + this.dataSource = dataSource; + this.config = config; + this.spec = spec; + this.supervisor = supervisor; + this.emitter = emitter; + + final String supervisorId = StringUtils.format("Supervisor-%s", dataSource); + + this.currentMetrics = new AtomicReference<>(null); + this.costFunction = new WeightedCostFunction(); + + this.metricsCollectionExec = Execs.scheduledSingleThreaded( + StringUtils.encodeForFormat(supervisorId) + "-CostBasedMetrics-%d" + ); + this.scalingDecisionExec = Execs.scheduledSingleThreaded( + StringUtils.encodeForFormat(supervisorId) + "-CostBasedScaling-%d" + ); + + this.metricBuilder = ServiceMetricEvent.builder() + .setDimension(DruidMetrics.DATASOURCE, dataSource) + .setDimension( + DruidMetrics.STREAM, + this.supervisor.getIoConfig().getStream() + ); + } + + @Override + public void start() + { + Callable<Integer> scaleAction = () -> computeOptimalTaskCount(currentMetrics); + Runnable onSuccessfulScale = () -> currentMetrics.set(null); + + metricsCollectionExec.scheduleAtFixedRate( + collectMetrics(), + config.getMetricsCollectionIntervalMillis(), + config.getMetricsCollectionIntervalMillis(), + TimeUnit.MILLISECONDS + ); + + scalingDecisionExec.scheduleAtFixedRate( + supervisor.buildDynamicAllocationTask(scaleAction, onSuccessfulScale, emitter), + config.getScaleActionStartDelayMillis(), + config.getScaleActionPeriodMillis(), + TimeUnit.MILLISECONDS + ); + + log.info( + "CostBasedAutoScaler started for dataSource [%s]: collecting metrics every [%d]ms, " + + "evaluating scaling every [%d]ms", + dataSource, + config.getMetricsCollectionIntervalMillis(), + config.getScaleActionPeriodMillis() + ); + } + + @Override + public void stop() + { + scalingDecisionExec.shutdownNow(); + metricsCollectionExec.shutdownNow(); + log.info("CostBasedAutoScaler stopped for dataSource [%s]", dataSource); + } + + @Override + public void reset() + { + currentMetrics.set(null); + } + + private Runnable collectMetrics() + { + return () -> { + if (spec.isSuspended()) { + log.debug("Supervisor [%s] is suspended, skipping metrics collection", dataSource); + return; + } + + final LagStats lagStats = supervisor.computeLagStats(); + if (lagStats == null) { + log.debug("Lag stats unavailable for dataSource [%s], skipping collection", dataSource); + return; + } + + final int currentTaskCount = supervisor.getActiveTaskGroupsCount(); + final int partitionCount = supervisor.getPartitionCount(); + final double avgPartitionLag = partitionCount > 0 + ? (double) lagStats.getTotalLag() / partitionCount + : 0.0; + final double pollIdleRatio = supervisor.getPollIdleRatioMetric(); + + currentMetrics.compareAndSet( + null, + new CostMetrics( + System.currentTimeMillis(), + avgPartitionLag, + currentTaskCount, + partitionCount, + pollIdleRatio + ) + ); + + log.debug("Collected metrics for dataSource [%s]", dataSource); + }; + } + + /** + * @return optimal task count, or -1 if no scaling action needed + */ + public int computeOptimalTaskCount(AtomicReference<CostMetrics> currentMetricsRef) + { + final CostMetrics currentMetrics = currentMetricsRef.get(); + if (currentMetrics == null) { + log.debug("No metrics available yet for dataSource [%s]", dataSource); + return -1; + } + + if (currentMetrics.getPartitionCount() <= 0 || currentMetrics.getCurrentTaskCount() <= 0) { + return -1; + } + + final int currentTaskCount = currentMetrics.getCurrentTaskCount(); + final List<Integer> validTaskCounts = FACTORS_CACHE.computeIfAbsent( + currentMetrics.getPartitionCount(), + this::computeFactors + ); + + if (validTaskCounts.isEmpty()) { + log.warn("No valid task counts after applying constraints for dataSource [%s]", dataSource); + return -1; + } + + // Update bounds with observed lag BEFORE optimization loop + // This ensures normalization uses historical observed values, not predicted values + costFunction.updateLagBounds(currentMetrics.getAvgPartitionLag()); + + int optimalTaskCount = -1; + double optimalCost = Double.POSITIVE_INFINITY; + + // TODO: what if somehow it is not in the validTaskCounts list? + final int bestTaskCountIndex = validTaskCounts.indexOf(currentTaskCount); + for (int i = bestTaskCountIndex - SCALE_FACTOR_DISCRETE_DISTANCE; + i <= bestTaskCountIndex + SCALE_FACTOR_DISCRETE_DISTANCE; i++) { + // Range check. + if (i < 0 || i >= validTaskCounts.size()) { + continue; + } + int taskCount = validTaskCounts.get(i); + if (taskCount < config.getTaskCountMin()) { + continue; + } else if (taskCount > config.getTaskCountMax()) { + break; + } + double cost = costFunction.computeCost(currentMetrics, taskCount, config); + log.debug("Proposed task count: %d, Cost: %.4f", taskCount, cost); + if (cost < optimalCost) { + optimalTaskCount = taskCount; + optimalCost = cost; + } + } + + System.err.println("Emitting " + optimalTaskCount); + // emitter.emit(metricBuilder.setMetric(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC, optimalTaskCount)); + emitter.emit(ServiceMetricEvent.builder() + .setMetric("task/autoScaler/costBased/optimalTaskCount", (long) optimalTaskCount)); + + log.info( + "Cost-based scaling evaluation for dataSource [%s]: current=%d, optimal=%d, cost=%.4f, " + + "avgPartitionLag=%.2f, pollIdleRatio=%.3f, validOptions=%d", + dataSource, + currentMetrics.getCurrentTaskCount(), + optimalTaskCount, + optimalCost, + currentMetrics.getAvgPartitionLag(), + currentMetrics.getPollIdleRatio(), + validTaskCounts.size() + ); + + if (optimalTaskCount > currentTaskCount) { + log.info( + "Scale UP dataSource [%s] from %d to %d tasks", + dataSource, + currentTaskCount, + optimalTaskCount + ); + return optimalTaskCount; + } else if (optimalTaskCount < currentTaskCount) { + log.info( + "Scale DOWN dataSource [%s] from %d to %d tasks (during rollover only)", + dataSource, + currentTaskCount, + optimalTaskCount + ); Review Comment: This logging is already done by the supervisor https://github.com/apache/druid/blob/b0214b86d93fe418497c1f6f77de50a2cca2c6a1/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java#L566-L572 ########## indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java: ########## @@ -4371,6 +4371,11 @@ public ConcurrentHashMap<PartitionIdType, SequenceOffsetType> getPartitionOffset return partitionOffsets; } + public double getPollIdleRatioMetric() + { + return recordSupplier.getPollIdleRatioMetric(); Review Comment: Would this be the correct metric? This would tell us the poll-to-idle ratio for the consumer used by the supervisor itself. This consumer is never used to poll records and is used only for fetching the latest offsets from the stream. We would need to get the poll-to-idle ratio from the consumers used by individual tasks and then compute their average. ########## indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfig.java: ########## @@ -0,0 +1,386 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.seekablestream.supervisor.autoscaler; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import org.apache.druid.indexing.overlord.supervisor.Supervisor; +import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec; +import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler; +import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; + +import javax.annotation.Nullable; +import java.util.Objects; + +/** + * Configuration for cost-based auto-scaling of seekable stream supervisor tasks. + * Uses a cost function combining lag and idle time metrics to determine optimal task counts. + * Task counts are constrained to be factors/divisors of the partition count. + */ +@JsonInclude(JsonInclude.Include.NON_NULL) +public class CostBasedAutoScalerConfig implements AutoScalerConfig +{ + private static final long DEFAULT_METRICS_COLLECTION_INTERVAL_MILLIS = 3 * 60 * 1000; // 3 minutes + private static final long DEFAULT_SCALE_ACTION_START_DELAY_MILLIS = 10 * 60 * 1000; // 10 minutes + private static final long DEFAULT_SCALE_ACTION_PERIOD_MILLIS = 15 * 60 * 1000; // 15 minutes + private static final long DEFAULT_MIN_TRIGGER_SCALE_ACTION_FREQUENCY_MILLIS = 1200000; // 20 minutes + private static final double DEFAULT_LAG_WEIGHT = 0.25; + private static final double DEFAULT_IDLE_WEIGHT = 0.75; + + private final boolean enableTaskAutoScaler; + private final int taskCountMax; + private final int taskCountMin; + private Integer taskCountStart; + private final long minTriggerScaleActionFrequencyMillis; + private final Double stopTaskCountRatio; + private final long metricsCollectionIntervalMillis; + private final long scaleActionStartDelayMillis; + private final long scaleActionPeriodMillis; + + private final double lagWeight; + private final double idleWeight; + + @JsonCreator + public CostBasedAutoScalerConfig( + @JsonProperty("taskCountMax") Integer taskCountMax, + @JsonProperty("taskCountMin") Integer taskCountMin, + @Nullable @JsonProperty("enableTaskAutoScaler") Boolean enableTaskAutoScaler, + @Nullable @JsonProperty("taskCountStart") Integer taskCountStart, + @Nullable @JsonProperty("minTriggerScaleActionFrequencyMillis") Long minTriggerScaleActionFrequencyMillis, + @Nullable @JsonProperty("stopTaskCountRatio") Double stopTaskCountRatio, + @Nullable @JsonProperty("metricsCollectionIntervalMillis") Long metricsCollectionIntervalMillis, + @Nullable @JsonProperty("scaleActionStartDelayMillis") Long scaleActionStartDelayMillis, + @Nullable @JsonProperty("scaleActionPeriodMillis") Long scaleActionPeriodMillis, + @Nullable @JsonProperty("lagWeight") Double lagWeight, + @Nullable @JsonProperty("idleWeight") Double idleWeight + ) + { + this.enableTaskAutoScaler = enableTaskAutoScaler != null ? enableTaskAutoScaler : false; + + // Timing configuration with defaults + this.metricsCollectionIntervalMillis = metricsCollectionIntervalMillis != null + ? metricsCollectionIntervalMillis + : DEFAULT_METRICS_COLLECTION_INTERVAL_MILLIS; + this.scaleActionStartDelayMillis = scaleActionStartDelayMillis != null + ? scaleActionStartDelayMillis + : DEFAULT_SCALE_ACTION_START_DELAY_MILLIS; + this.scaleActionPeriodMillis = scaleActionPeriodMillis != null + ? scaleActionPeriodMillis + : DEFAULT_SCALE_ACTION_PERIOD_MILLIS; + this.minTriggerScaleActionFrequencyMillis = minTriggerScaleActionFrequencyMillis != null + ? minTriggerScaleActionFrequencyMillis + : DEFAULT_MIN_TRIGGER_SCALE_ACTION_FREQUENCY_MILLIS; + + // Cost function weights with defaults + this.lagWeight = lagWeight != null ? lagWeight : DEFAULT_LAG_WEIGHT; + this.idleWeight = idleWeight != null ? idleWeight : DEFAULT_IDLE_WEIGHT; + + if (this.enableTaskAutoScaler) { + Preconditions.checkNotNull(taskCountMax, "taskCountMax is required when enableTaskAutoScaler is true"); + Preconditions.checkNotNull(taskCountMin, "taskCountMin is required when enableTaskAutoScaler is true"); + Preconditions.checkArgument(taskCountMax >= taskCountMin, "taskCountMax must be >= taskCountMin"); + Preconditions.checkArgument( + taskCountStart == null || (taskCountStart >= taskCountMin && taskCountStart <= taskCountMax), + "taskCountMin <= taskCountStart <= taskCountMax" + ); + this.taskCountMax = taskCountMax; + this.taskCountMin = taskCountMin; + } else { + this.taskCountMax = taskCountMax != null ? taskCountMax : 0; + this.taskCountMin = taskCountMin != null ? taskCountMin : 0; + } + this.taskCountStart = taskCountStart; + + // Validate stopTaskCountRatio + Preconditions.checkArgument( + stopTaskCountRatio == null || (stopTaskCountRatio > 0.0 && stopTaskCountRatio <= 1.0), + "0.0 < stopTaskCountRatio <= 1.0" + ); + this.stopTaskCountRatio = stopTaskCountRatio; + + // Validate weights are non-negative + Preconditions.checkArgument(this.lagWeight >= 0, "lagWeight must be >= 0"); + Preconditions.checkArgument(this.idleWeight >= 0, "idleWeight must be >= 0"); + } + + /** + * Creates a new Builder for constructing CostBasedAutoScalerConfig instances. + */ + public static Builder builder() + { + return new Builder(); + } + + @Override + @JsonProperty + public boolean getEnableTaskAutoScaler() + { + return enableTaskAutoScaler; + } + + @Override + @JsonProperty + public int getTaskCountMax() + { + return taskCountMax; + } + + @Override + @JsonProperty + public int getTaskCountMin() + { + return taskCountMin; + } + + @Override + @JsonProperty + @Nullable + public Integer getTaskCountStart() + { + return taskCountStart; + } + + @Override + public void setTaskCountStart(int newTaskCountStart) + { + this.taskCountStart = newTaskCountStart; + } + + @Override + @JsonProperty + public long getMinTriggerScaleActionFrequencyMillis() + { + return minTriggerScaleActionFrequencyMillis; + } + + @Override + @JsonProperty + @Nullable + public Double getStopTaskCountRatio() + { + return stopTaskCountRatio; + } + + @JsonProperty + public long getMetricsCollectionIntervalMillis() + { + return metricsCollectionIntervalMillis; + } + + @JsonProperty + public long getScaleActionStartDelayMillis() + { + return scaleActionStartDelayMillis; + } + + @JsonProperty + public long getScaleActionPeriodMillis() + { + return scaleActionPeriodMillis; + } + + @JsonProperty + public double getLagWeight() + { + return lagWeight; + } + + @JsonProperty + public double getIdleWeight() + { + return idleWeight; + } + + @Override + public SupervisorTaskAutoScaler createAutoScaler(Supervisor supervisor, SupervisorSpec spec, ServiceEmitter emitter) + { + return new CostBasedAutoScaler((SeekableStreamSupervisor) supervisor, spec.getId(), this, spec, emitter); + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + CostBasedAutoScalerConfig that = (CostBasedAutoScalerConfig) o; + + return enableTaskAutoScaler == that.enableTaskAutoScaler + && taskCountMax == that.taskCountMax + && taskCountMin == that.taskCountMin + && minTriggerScaleActionFrequencyMillis == that.minTriggerScaleActionFrequencyMillis + && metricsCollectionIntervalMillis == that.metricsCollectionIntervalMillis + && scaleActionStartDelayMillis == that.scaleActionStartDelayMillis + && scaleActionPeriodMillis == that.scaleActionPeriodMillis + && Double.compare(that.lagWeight, lagWeight) == 0 + && Double.compare(that.idleWeight, idleWeight) == 0 + && Objects.equals(taskCountStart, that.taskCountStart) + && Objects.equals(stopTaskCountRatio, that.stopTaskCountRatio); + } + + @Override + public int hashCode() + { + return Objects.hash( + enableTaskAutoScaler, + taskCountMax, + taskCountMin, + taskCountStart, + minTriggerScaleActionFrequencyMillis, + stopTaskCountRatio, + metricsCollectionIntervalMillis, + scaleActionStartDelayMillis, + scaleActionPeriodMillis, + lagWeight, + idleWeight + ); + } + + @Override + public String toString() + { + return "CostBasedAutoScalerConfig{" + + "enableTaskAutoScaler=" + enableTaskAutoScaler + + ", taskCountMax=" + taskCountMax + + ", taskCountMin=" + taskCountMin + + ", taskCountStart=" + taskCountStart + + ", minTriggerScaleActionFrequencyMillis=" + minTriggerScaleActionFrequencyMillis + + ", stopTaskCountRatio=" + stopTaskCountRatio + + ", metricsCollectionIntervalMillis=" + metricsCollectionIntervalMillis + + ", scaleActionStartDelayMillis=" + scaleActionStartDelayMillis + + ", scaleActionPeriodMillis=" + scaleActionPeriodMillis + + ", lagWeight=" + lagWeight + + ", idleWeight=" + idleWeight + + '}'; + } + + /** + * Builder for CostBasedAutoScalerConfig. + * Provides a fluent API for constructing configuration instances. + */ + public static class Builder Review Comment: Is the Builder used anywhere? ########## indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java: ########## @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.seekablestream.supervisor.autoscaler; + +import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec; +import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats; +import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler; +import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import org.apache.druid.query.DruidMetrics; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Cost-based auto-scaler for seekable stream supervisors. + * Uses a cost function combining lag and idle time metrics to determine optimal task counts. + * Task counts are selected from predefined values (not arbitrary factors). + * Scale-up happens incrementally, scale-down only during task rollover. + */ +public class CostBasedAutoScaler implements SupervisorTaskAutoScaler +{ + private static final EmittingLogger log = new EmittingLogger(CostBasedAutoScaler.class); + + private static final int SCALE_FACTOR_DISCRETE_DISTANCE = 2; + + private static final Map<Integer, List<Integer>> FACTORS_CACHE = new HashMap<>(); + + private final String dataSource; + /** + * Atomic reference to CostMetrics object. All operations must be performed + * with sequentially consistent semantics (volatile reads/writes). + * However, it may be fine-tuned with acquire/release semantics, + * but requires careful reasoning about correctness. + */ + private final AtomicReference<CostMetrics> currentMetrics; + private final ScheduledExecutorService metricsCollectionExec; + private final ScheduledExecutorService scalingDecisionExec; + private final SupervisorSpec spec; + private final SeekableStreamSupervisor supervisor; + private final CostBasedAutoScalerConfig config; + private final ServiceEmitter emitter; + private final ServiceMetricEvent.Builder metricBuilder; + private final WeightedCostFunction costFunction; + + public CostBasedAutoScaler( + SeekableStreamSupervisor supervisor, + String dataSource, + CostBasedAutoScalerConfig config, + SupervisorSpec spec, + ServiceEmitter emitter + ) + { + this.dataSource = dataSource; + this.config = config; + this.spec = spec; + this.supervisor = supervisor; + this.emitter = emitter; + + final String supervisorId = StringUtils.format("Supervisor-%s", dataSource); + + this.currentMetrics = new AtomicReference<>(null); + this.costFunction = new WeightedCostFunction(); + + this.metricsCollectionExec = Execs.scheduledSingleThreaded( + StringUtils.encodeForFormat(supervisorId) + "-CostBasedMetrics-%d" + ); + this.scalingDecisionExec = Execs.scheduledSingleThreaded( + StringUtils.encodeForFormat(supervisorId) + "-CostBasedScaling-%d" + ); + + this.metricBuilder = ServiceMetricEvent.builder() + .setDimension(DruidMetrics.DATASOURCE, dataSource) + .setDimension( + DruidMetrics.STREAM, + this.supervisor.getIoConfig().getStream() + ); + } + + @Override + public void start() + { + Callable<Integer> scaleAction = () -> computeOptimalTaskCount(currentMetrics); + Runnable onSuccessfulScale = () -> currentMetrics.set(null); + + metricsCollectionExec.scheduleAtFixedRate( + collectMetrics(), + config.getMetricsCollectionIntervalMillis(), + config.getMetricsCollectionIntervalMillis(), + TimeUnit.MILLISECONDS + ); + + scalingDecisionExec.scheduleAtFixedRate( + supervisor.buildDynamicAllocationTask(scaleAction, onSuccessfulScale, emitter), + config.getScaleActionStartDelayMillis(), + config.getScaleActionPeriodMillis(), + TimeUnit.MILLISECONDS + ); + + log.info( + "CostBasedAutoScaler started for dataSource [%s]: collecting metrics every [%d]ms, " + + "evaluating scaling every [%d]ms", + dataSource, + config.getMetricsCollectionIntervalMillis(), + config.getScaleActionPeriodMillis() + ); + } + + @Override + public void stop() + { + scalingDecisionExec.shutdownNow(); + metricsCollectionExec.shutdownNow(); + log.info("CostBasedAutoScaler stopped for dataSource [%s]", dataSource); + } + + @Override + public void reset() + { + currentMetrics.set(null); + } + + private Runnable collectMetrics() + { + return () -> { + if (spec.isSuspended()) { + log.debug("Supervisor [%s] is suspended, skipping metrics collection", dataSource); + return; + } + + final LagStats lagStats = supervisor.computeLagStats(); + if (lagStats == null) { + log.debug("Lag stats unavailable for dataSource [%s], skipping collection", dataSource); + return; + } + + final int currentTaskCount = supervisor.getActiveTaskGroupsCount(); + final int partitionCount = supervisor.getPartitionCount(); + final double avgPartitionLag = partitionCount > 0 + ? (double) lagStats.getTotalLag() / partitionCount + : 0.0; + final double pollIdleRatio = supervisor.getPollIdleRatioMetric(); + + currentMetrics.compareAndSet( + null, + new CostMetrics( + System.currentTimeMillis(), + avgPartitionLag, + currentTaskCount, + partitionCount, + pollIdleRatio + ) + ); + + log.debug("Collected metrics for dataSource [%s]", dataSource); + }; + } + + /** + * @return optimal task count, or -1 if no scaling action needed + */ + public int computeOptimalTaskCount(AtomicReference<CostMetrics> currentMetricsRef) + { + final CostMetrics currentMetrics = currentMetricsRef.get(); + if (currentMetrics == null) { + log.debug("No metrics available yet for dataSource [%s]", dataSource); + return -1; + } + + if (currentMetrics.getPartitionCount() <= 0 || currentMetrics.getCurrentTaskCount() <= 0) { + return -1; + } + + final int currentTaskCount = currentMetrics.getCurrentTaskCount(); + final List<Integer> validTaskCounts = FACTORS_CACHE.computeIfAbsent( + currentMetrics.getPartitionCount(), + this::computeFactors + ); + + if (validTaskCounts.isEmpty()) { + log.warn("No valid task counts after applying constraints for dataSource [%s]", dataSource); + return -1; + } + + // Update bounds with observed lag BEFORE optimization loop + // This ensures normalization uses historical observed values, not predicted values + costFunction.updateLagBounds(currentMetrics.getAvgPartitionLag()); + + int optimalTaskCount = -1; + double optimalCost = Double.POSITIVE_INFINITY; + + // TODO: what if somehow it is not in the validTaskCounts list? + final int bestTaskCountIndex = validTaskCounts.indexOf(currentTaskCount); + for (int i = bestTaskCountIndex - SCALE_FACTOR_DISCRETE_DISTANCE; + i <= bestTaskCountIndex + SCALE_FACTOR_DISCRETE_DISTANCE; i++) { + // Range check. + if (i < 0 || i >= validTaskCounts.size()) { + continue; + } + int taskCount = validTaskCounts.get(i); + if (taskCount < config.getTaskCountMin()) { + continue; + } else if (taskCount > config.getTaskCountMax()) { + break; + } + double cost = costFunction.computeCost(currentMetrics, taskCount, config); + log.debug("Proposed task count: %d, Cost: %.4f", taskCount, cost); + if (cost < optimalCost) { + optimalTaskCount = taskCount; + optimalCost = cost; + } + } + + System.err.println("Emitting " + optimalTaskCount); + // emitter.emit(metricBuilder.setMetric(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC, optimalTaskCount)); + emitter.emit(ServiceMetricEvent.builder() Review Comment: Do we need to emit this if AUTOSCALER_REQUIRED_TASKS_METRIC would be emitted by the supervisor anyway? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
