1996fanrui commented on code in PR #744:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/744#discussion_r1440326118


##########
flink-autoscaler/src/test/java/org/apache/flink/autoscaler/event/TestingEventCollector.java:
##########
@@ -42,7 +42,7 @@ public class TestingEventCollector<KEY, Context extends 
JobAutoScalerContext<KEY
     public final Map<String, Event<KEY, Context>> eventMap = new 
ConcurrentHashMap<>();
 
     @Override
-    public void handleEvent(
+    public synchronized void handleEvent(

Review Comment:
   Some callers use the poll methods of LinkedList, these methods from Queue, 
so I refactor it to `public final Queue<Event<KEY, Context>> events = new 
LinkedBlockingQueue<>();`.



##########
flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java:
##########
@@ -48,22 +59,34 @@ public class StandaloneAutoscalerExecutor<KEY, Context 
extends JobAutoScalerCont
     private final AutoScalerEventHandler<KEY, Context> eventHandler;
     private final JobAutoScaler<KEY, Context> autoScaler;
     private final ScheduledExecutorService scheduledExecutorService;
+    private final ExecutorService scalingThreadPool;
 
     public StandaloneAutoscalerExecutor(
-            @Nonnull Duration scalingInterval,
+            @Nonnull Configuration conf,
             @Nonnull JobListFetcher<KEY, Context> jobListFetcher,
             @Nonnull AutoScalerEventHandler<KEY, Context> eventHandler,
             @Nonnull JobAutoScaler<KEY, Context> autoScaler) {
-        this.scalingInterval = scalingInterval;
+        this.scalingInterval = conf.get(CONTROL_LOOP_INTERVAL);
         this.jobListFetcher = jobListFetcher;
         this.eventHandler = eventHandler;
         this.autoScaler = autoScaler;
         this.scheduledExecutorService =
                 Executors.newSingleThreadScheduledExecutor(
                         new ThreadFactoryBuilder()
-                                
.setNameFormat("StandaloneAutoscalerControlLoop")
+                                
.setNameFormat("autoscaler-standalone-control-loop")
                                 .setDaemon(false)
                                 .build());
+
+        int parallelism = conf.get(CONTROL_LOOP_PARALLELISM);
+        this.scalingThreadPool =
+                new ThreadPoolExecutor(
+                        parallelism,
+                        parallelism,
+                        0L,
+                        TimeUnit.MILLISECONDS,
+                        new LinkedBlockingQueue<>(parallelism * 4),

Review Comment:
   First of all, why this `scalingThreadPool` doesn't use unbounded 
`LinkedBlockingQueue`?
   
   My thought is if all threads are busy and `LinkedBlockingQueue` has a lot of 
tasks, we can execute `task` in control loop scheduler thread. So I choose the 
`new ThreadPoolExecutor.CallerRunsPolicy()`.
   
   > Why *4?
   
   It's not strict. My thought is scheduler thread only work when 
`LinkedBlockingQueue` has a lot of tasks. So parallelism * 3 or > 3 makes sense.
   
   -------------------------------------------
   
   Of course, if you think `scalingThreadPool` already has a lot of thread, and 
we don't need thejcontrol loop scheduler thread to execute any tasks. We can 
let control loop scheduler thread only submit tasks, and wait for all tasks are 
finished. It means we don't let control loop scheduler thread executes tasks.
   
   WDYT?



##########
flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java:
##########
@@ -75,29 +98,38 @@ public void start() {
     @Override
     public void close() {
         scheduledExecutorService.shutdownNow();
+        scalingThreadPool.shutdownNow();
     }
 
     @VisibleForTesting
     protected void scaling() {
         LOG.info("Standalone autoscaler starts scaling.");
         try {
             var jobList = jobListFetcher.fetch();
+            Collection<Future<?>> futures = new LinkedList<>();
             for (var jobContext : jobList) {
-                try {
-                    autoScaler.scale(jobContext);
-                } catch (Throwable e) {
-                    LOG.error("Error while scaling job", e);
-                    eventHandler.handleEvent(
-                            jobContext,
-                            AutoScalerEventHandler.Type.Warning,
-                            AUTOSCALER_ERROR,
-                            e.getMessage(),
-                            null,
-                            null);
-                }
+                futures.add(scalingThreadPool.submit(() -> 
scalingSingleJob(jobContext)));
+            }
+            for (Future<?> future : futures) {
+                future.get();
             }
         } catch (Throwable e) {
             LOG.error("Error while fetch job list.", e);
         }

Review Comment:
   From current code, `scalingThreadPool` execute the `scalingSingleJob` logic, 
and `scalingSingleJob` catches all exceptions. So I think all exceptions of 
control loop thread are from `fetch job list`.
   
   WDYT?



##########
flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/config/AutoscalerStandaloneOptions.java:
##########
@@ -38,6 +38,12 @@ private static ConfigOptions.OptionBuilder 
autoscalerStandaloneConfig(String key
                     .withDeprecatedKeys("scalingInterval")
                     .withDescription("The interval of autoscaler standalone 
control loop.");
 
+    public static final ConfigOption<Integer> CONTROL_LOOP_PARALLELISM =
+            autoscalerStandaloneConfig("control-loop.parallelism")
+                    .intType()
+                    .defaultValue(100)

Review Comment:
   Yeah, kubernetes operator is 200 by default.
   
   Do you think 100 is enough? Align them is fine for me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to