This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 726e484c [FLINK-34566] Pass a FixedThreadPool to set reconciliation 
parallelism correctly
726e484c is described below

commit 726e484c6a9b4121563829bc094b3eebeb8ddcf3
Author: fengfei02 <[email protected]>
AuthorDate: Tue Mar 5 15:37:01 2024 +0800

    [FLINK-34566] Pass a FixedThreadPool to set reconciliation parallelism 
correctly
---
 .../org/apache/flink/kubernetes/operator/FlinkOperator.java |  2 +-
 .../apache/flink/kubernetes/operator/FlinkOperatorTest.java | 13 ++++++++++++-
 2 files changed, 13 insertions(+), 2 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index 0ecd7c83..a5846f59 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -137,7 +137,7 @@ public class FlinkOperator {
             overrider.withExecutorService(Executors.newCachedThreadPool());
         } else {
             LOG.info("Configuring operator with {} reconciliation threads.", 
parallelism);
-            overrider.withConcurrentReconciliationThreads(parallelism);
+            
overrider.withExecutorService(Executors.newFixedThreadPool(parallelism));
         }
 
         if (operatorConf.isJosdkMetricsEnabled()) {
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/FlinkOperatorTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/FlinkOperatorTest.java
index b60551a9..64550964 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/FlinkOperatorTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/FlinkOperatorTest.java
@@ -70,10 +70,21 @@ public class FlinkOperatorTest {
 
         var configService = 
testOperator.getOperator().getConfigurationService();
 
-        // Test parallelism being passed
+        // Test parallelism being passed expectedly
         var executorService = configService.getExecutorService();
         Assertions.assertInstanceOf(ThreadPoolExecutor.class, executorService);
         ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) 
executorService;
+        for (int i = 0; i < testParallelism * 2; i++) {
+            threadPoolExecutor.execute(
+                    () -> {
+                        try {
+                            Thread.sleep(1000);
+                        } catch (InterruptedException e) {
+                            e.printStackTrace();
+                        }
+                    });
+        }
+        Assertions.assertEquals(threadPoolExecutor.getPoolSize(), 
testParallelism);
         Assertions.assertEquals(threadPoolExecutor.getMaximumPoolSize(), 
testParallelism);
 
         // Test label selector being passed

Reply via email to