1996fanrui commented on code in PR #744:
URL:
https://github.com/apache/flink-kubernetes-operator/pull/744#discussion_r1440326118
##########
flink-autoscaler/src/test/java/org/apache/flink/autoscaler/event/TestingEventCollector.java:
##########
@@ -42,7 +42,7 @@ public class TestingEventCollector<KEY, Context extends
JobAutoScalerContext<KEY
public final Map<String, Event<KEY, Context>> eventMap = new
ConcurrentHashMap<>();
@Override
- public void handleEvent(
+ public synchronized void handleEvent(
Review Comment:
Some callers use the poll methods of LinkedList, these methods from Queue,
so I refactor it to `public final Queue<Event<KEY, Context>> events = new
LinkedBlockingQueue<>();`.
##########
flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java:
##########
@@ -48,22 +59,34 @@ public class StandaloneAutoscalerExecutor<KEY, Context
extends JobAutoScalerCont
private final AutoScalerEventHandler<KEY, Context> eventHandler;
private final JobAutoScaler<KEY, Context> autoScaler;
private final ScheduledExecutorService scheduledExecutorService;
+ private final ExecutorService scalingThreadPool;
public StandaloneAutoscalerExecutor(
- @Nonnull Duration scalingInterval,
+ @Nonnull Configuration conf,
@Nonnull JobListFetcher<KEY, Context> jobListFetcher,
@Nonnull AutoScalerEventHandler<KEY, Context> eventHandler,
@Nonnull JobAutoScaler<KEY, Context> autoScaler) {
- this.scalingInterval = scalingInterval;
+ this.scalingInterval = conf.get(CONTROL_LOOP_INTERVAL);
this.jobListFetcher = jobListFetcher;
this.eventHandler = eventHandler;
this.autoScaler = autoScaler;
this.scheduledExecutorService =
Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder()
-
.setNameFormat("StandaloneAutoscalerControlLoop")
+
.setNameFormat("autoscaler-standalone-control-loop")
.setDaemon(false)
.build());
+
+ int parallelism = conf.get(CONTROL_LOOP_PARALLELISM);
+ this.scalingThreadPool =
+ new ThreadPoolExecutor(
+ parallelism,
+ parallelism,
+ 0L,
+ TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<>(parallelism * 4),
Review Comment:
First of all, why this `scalingThreadPool` doesn't use unbounded
`LinkedBlockingQueue`?
My thought is if all threads are busy and `LinkedBlockingQueue` has a lot of
tasks, we can execute `task` in control loop scheduler thread. So I choose the
`new ThreadPoolExecutor.CallerRunsPolicy()`.
> Why *4?
It's not strict. My thought is scheduler thread only work when
`LinkedBlockingQueue` has a lot of tasks. So parallelism * 3 or > 3 makes sense.
-------------------------------------------
Of course, if you think `scalingThreadPool` already has a lot of thread, and
we don't need thejcontrol loop scheduler thread to execute any tasks. We can
let control loop scheduler thread only submit tasks, and wait for all tasks are
finished. It means we don't let control loop scheduler thread executes tasks.
WDYT?
##########
flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java:
##########
@@ -75,29 +98,38 @@ public void start() {
@Override
public void close() {
scheduledExecutorService.shutdownNow();
+ scalingThreadPool.shutdownNow();
}
@VisibleForTesting
protected void scaling() {
LOG.info("Standalone autoscaler starts scaling.");
try {
var jobList = jobListFetcher.fetch();
+ Collection<Future<?>> futures = new LinkedList<>();
for (var jobContext : jobList) {
- try {
- autoScaler.scale(jobContext);
- } catch (Throwable e) {
- LOG.error("Error while scaling job", e);
- eventHandler.handleEvent(
- jobContext,
- AutoScalerEventHandler.Type.Warning,
- AUTOSCALER_ERROR,
- e.getMessage(),
- null,
- null);
- }
+ futures.add(scalingThreadPool.submit(() ->
scalingSingleJob(jobContext)));
+ }
+ for (Future<?> future : futures) {
+ future.get();
}
} catch (Throwable e) {
LOG.error("Error while fetch job list.", e);
}
Review Comment:
From current code, `scalingThreadPool` execute the `scalingSingleJob` logic,
and `scalingSingleJob` catches all exceptions. So I think all exceptions of
control loop thread are from `fetch job list`.
WDYT?
##########
flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/config/AutoscalerStandaloneOptions.java:
##########
@@ -38,6 +38,12 @@ private static ConfigOptions.OptionBuilder
autoscalerStandaloneConfig(String key
.withDeprecatedKeys("scalingInterval")
.withDescription("The interval of autoscaler standalone
control loop.");
+ public static final ConfigOption<Integer> CONTROL_LOOP_PARALLELISM =
+ autoscalerStandaloneConfig("control-loop.parallelism")
+ .intType()
+ .defaultValue(100)
Review Comment:
Yeah, kubernetes operator is 200 by default.
Do you think 100 is enough? Align them is fine for me.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]