1996fanrui commented on code in PR #744:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/744#discussion_r1440355436


##########
flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java:
##########
@@ -48,22 +59,34 @@ public class StandaloneAutoscalerExecutor<KEY, Context 
extends JobAutoScalerCont
     private final AutoScalerEventHandler<KEY, Context> eventHandler;
     private final JobAutoScaler<KEY, Context> autoScaler;
     private final ScheduledExecutorService scheduledExecutorService;
+    private final ExecutorService scalingThreadPool;
 
     public StandaloneAutoscalerExecutor(
-            @Nonnull Duration scalingInterval,
+            @Nonnull Configuration conf,
             @Nonnull JobListFetcher<KEY, Context> jobListFetcher,
             @Nonnull AutoScalerEventHandler<KEY, Context> eventHandler,
             @Nonnull JobAutoScaler<KEY, Context> autoScaler) {
-        this.scalingInterval = scalingInterval;
+        this.scalingInterval = conf.get(CONTROL_LOOP_INTERVAL);
         this.jobListFetcher = jobListFetcher;
         this.eventHandler = eventHandler;
         this.autoScaler = autoScaler;
         this.scheduledExecutorService =
                 Executors.newSingleThreadScheduledExecutor(
                         new ThreadFactoryBuilder()
-                                
.setNameFormat("StandaloneAutoscalerControlLoop")
+                                
.setNameFormat("autoscaler-standalone-control-loop")
                                 .setDaemon(false)
                                 .build());
+
+        int parallelism = conf.get(CONTROL_LOOP_PARALLELISM);
+        this.scalingThreadPool =
+                new ThreadPoolExecutor(
+                        parallelism,
+                        parallelism,
+                        0L,
+                        TimeUnit.MILLISECONDS,
+                        new LinkedBlockingQueue<>(parallelism * 4),

Review Comment:
   > What is the benefit of the queue? If we are waiting on all tasks to 
finish, then we do not need the queue.
   
   Sounds make sense, if we want to waiting, the queue isn't useful.
   
   > I think the current logic of waiting for all tasks to finish is a bit 
fragile. For example, when we are executing a scaling decision, we may block 
for a long time on savepointing which will halt the next scaling for all 
pipelines. We could alternatively not block all but only the specific job for 
the next scaling.
   
   Sorry, I didn't notice the `scalingSingleJob` will cost too many times. 
Currently, the scaling action only call the rescale api, so the thread won't be 
wait the savepointing.
   
   But I think your concern is reasonable. We define the `ScalingRealizer` 
interface, it means users can custom define the action. It might cost long time.
   
   --------------------------------------------------------
   
   
   Based on your suggestion, I think the `control loop scheduler thread` cannot 
wait all scaling. When the scaling of one job is slow, it shouldn't affect 
other jobs.
   
   Also, if a new scaling round is started, and the last scaling of some of 
jobs aren't finished. Should we ignore this round for these jobs? Some reasons 
might cause this issues:
   
   - The scaling action is slow
   - It has too many jobs, and the thread number isn't enough. So the last 
round always cannot be finished when the new round is started.
   
   So, I think we should ignore unfinished jobs in the new round.
   
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to