mxm commented on code in PR #744:
URL:
https://github.com/apache/flink-kubernetes-operator/pull/744#discussion_r1440293457
##########
flink-autoscaler/src/test/java/org/apache/flink/autoscaler/event/TestingEventCollector.java:
##########
@@ -42,7 +42,7 @@ public class TestingEventCollector<KEY, Context extends
JobAutoScalerContext<KEY
public final Map<String, Event<KEY, Context>> eventMap = new
ConcurrentHashMap<>();
@Override
- public void handleEvent(
+ public synchronized void handleEvent(
Review Comment:
Is there a way to make this method thread-safe without synchronizing on its
entirety?
##########
flink-autoscaler/src/test/java/org/apache/flink/autoscaler/event/TestingEventCollector.java:
##########
@@ -42,7 +42,7 @@ public class TestingEventCollector<KEY, Context extends
JobAutoScalerContext<KEY
public final Map<String, Event<KEY, Context>> eventMap = new
ConcurrentHashMap<>();
@Override
- public void handleEvent(
+ public synchronized void handleEvent(
Review Comment:
Maybe using `Collections.synchronizedList(..)`?
##########
flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/config/AutoscalerStandaloneOptions.java:
##########
@@ -38,6 +38,12 @@ private static ConfigOptions.OptionBuilder
autoscalerStandaloneConfig(String key
.withDeprecatedKeys("scalingInterval")
.withDescription("The interval of autoscaler standalone
control loop.");
+ public static final ConfigOption<Integer> CONTROL_LOOP_PARALLELISM =
+ autoscalerStandaloneConfig("control-loop.parallelism")
+ .intType()
+ .defaultValue(100)
Review Comment:
FYI, the default for the Kubernetes operator is 200 but those two do not
have to be aligned.
##########
flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java:
##########
@@ -48,22 +59,34 @@ public class StandaloneAutoscalerExecutor<KEY, Context
extends JobAutoScalerCont
private final AutoScalerEventHandler<KEY, Context> eventHandler;
private final JobAutoScaler<KEY, Context> autoScaler;
private final ScheduledExecutorService scheduledExecutorService;
+ private final ExecutorService scalingThreadPool;
public StandaloneAutoscalerExecutor(
- @Nonnull Duration scalingInterval,
+ @Nonnull Configuration conf,
@Nonnull JobListFetcher<KEY, Context> jobListFetcher,
@Nonnull AutoScalerEventHandler<KEY, Context> eventHandler,
@Nonnull JobAutoScaler<KEY, Context> autoScaler) {
- this.scalingInterval = scalingInterval;
+ this.scalingInterval = conf.get(CONTROL_LOOP_INTERVAL);
this.jobListFetcher = jobListFetcher;
this.eventHandler = eventHandler;
this.autoScaler = autoScaler;
this.scheduledExecutorService =
Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder()
-
.setNameFormat("StandaloneAutoscalerControlLoop")
+
.setNameFormat("autoscaler-standalone-control-loop")
.setDaemon(false)
.build());
+
+ int parallelism = conf.get(CONTROL_LOOP_PARALLELISM);
+ this.scalingThreadPool =
+ new ThreadPoolExecutor(
+ parallelism,
+ parallelism,
+ 0L,
+ TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<>(parallelism * 4),
Review Comment:
Why `*4`? Shouldn't `parallelism` be enough for the work queue?
##########
flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java:
##########
@@ -75,29 +98,38 @@ public void start() {
@Override
public void close() {
scheduledExecutorService.shutdownNow();
+ scalingThreadPool.shutdownNow();
}
@VisibleForTesting
protected void scaling() {
LOG.info("Standalone autoscaler starts scaling.");
try {
var jobList = jobListFetcher.fetch();
+ Collection<Future<?>> futures = new LinkedList<>();
for (var jobContext : jobList) {
- try {
- autoScaler.scale(jobContext);
- } catch (Throwable e) {
- LOG.error("Error while scaling job", e);
- eventHandler.handleEvent(
- jobContext,
- AutoScalerEventHandler.Type.Warning,
- AUTOSCALER_ERROR,
- e.getMessage(),
- null,
- null);
- }
+ futures.add(scalingThreadPool.submit(() ->
scalingSingleJob(jobContext)));
+ }
+ for (Future<?> future : futures) {
+ future.get();
}
} catch (Throwable e) {
LOG.error("Error while fetch job list.", e);
}
Review Comment:
```suggestion
} catch (Throwable e) {
LOG.error("Error while executing autoscaling.", e);
}
```
Note that none of the individual scheduled futures have error handling here.
I guess that is ok because the contract is that the autoscaler handles all
exceptions, so this is just a safe-guard to return a better error message.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]