This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 726e484c [FLINK-34566] Pass a FixedThreadPool to set reconciliation
parallelism correctly
726e484c is described below
commit 726e484c6a9b4121563829bc094b3eebeb8ddcf3
Author: fengfei02 <[email protected]>
AuthorDate: Tue Mar 5 15:37:01 2024 +0800
[FLINK-34566] Pass a FixedThreadPool to set reconciliation parallelism
correctly
---
.../org/apache/flink/kubernetes/operator/FlinkOperator.java | 2 +-
.../apache/flink/kubernetes/operator/FlinkOperatorTest.java | 13 ++++++++++++-
2 files changed, 13 insertions(+), 2 deletions(-)
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index 0ecd7c83..a5846f59 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -137,7 +137,7 @@ public class FlinkOperator {
overrider.withExecutorService(Executors.newCachedThreadPool());
} else {
LOG.info("Configuring operator with {} reconciliation threads.",
parallelism);
- overrider.withConcurrentReconciliationThreads(parallelism);
+
overrider.withExecutorService(Executors.newFixedThreadPool(parallelism));
}
if (operatorConf.isJosdkMetricsEnabled()) {
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/FlinkOperatorTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/FlinkOperatorTest.java
index b60551a9..64550964 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/FlinkOperatorTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/FlinkOperatorTest.java
@@ -70,10 +70,21 @@ public class FlinkOperatorTest {
var configService =
testOperator.getOperator().getConfigurationService();
- // Test parallelism being passed
+ // Test parallelism being passed expectedly
var executorService = configService.getExecutorService();
Assertions.assertInstanceOf(ThreadPoolExecutor.class, executorService);
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor)
executorService;
+ for (int i = 0; i < testParallelism * 2; i++) {
+ threadPoolExecutor.execute(
+ () -> {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ });
+ }
+ Assertions.assertEquals(threadPoolExecutor.getPoolSize(),
testParallelism);
Assertions.assertEquals(threadPoolExecutor.getMaximumPoolSize(),
testParallelism);
// Test label selector being passed