[
https://issues.apache.org/jira/browse/FLINK-39039?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
junzhong qin updated FLINK-39039:
---------------------------------
Description:
There are {color:#000000}thousand{color}s Flink deployments in our K8s cluster.
When restart the operator, we found the Informer startup error, the log is as
follows:
{code:java}
// Operator start log
2026-02-06 10:56:30,688 INFO io.javaoperatorsdk.operator.Operator
[] [.] - Operator SDK 4.9.4 (commit: 3588780) built on
2024-08-30T00:52:57.000+0800 starting...2026-02-06 10:56:30,689 INFO
io.javaoperatorsdk.operator.Operator [] [.] - Client
version: 6.13.22026-02-06 10:56:30,691 INFO
io.javaoperatorsdk.operator.processing.Controller [] [.] - Starting
'flinkdeploymentcontroller' controller for reconciler:
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController,
resource: org.apache.flink.kubernetes.operator.api.FlinkDeployment2026-02-06
10:56:30,691 INFO io.javaoperatorsdk.operator.processing.Controller
[] [.] - Starting 'flinkstatesnapshotcontroller' controller for reconciler:
org.apache.flink.kubernetes.operator.controller.FlinkStateSnapshotController,
resource: org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot2026-02-06
10:56:30,691 INFO io.javaoperatorsdk.operator.processing.Controller
[] [.] - Starting 'flinksessionjobcontroller' controller for reconciler:
org.apache.flink.kubernetes.operator.controller.FlinkSessionJobController,
resource: org.apache.flink.kubernetes.operator.api.FlinkSessionJob2026-02-06
10:56:30,734 WARN io.fabric8.kubernetes.client.dsl.internal.VersionUsageUtils
[] [.] - The client is using resource type 'flinksessionjobs' with unstable
version 'v1beta1'2026-02-06 10:56:30,734 WARN
io.fabric8.kubernetes.client.dsl.internal.VersionUsageUtils [] [.] - The
client is using resource type 'flinkdeployments' with unstable version
'v1beta1'2026-02-06 10:58:30,739 ERROR
io.javaoperatorsdk.operator.processing.event.source.informer.InformerWrapper []
[.] - Informer startup error. Operator will be stopped. Informer:
flink.apache.org/v1beta1/flinkdeploymentsjava.util.concurrent.TimeoutException:
null at
java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1892)
~[?:?] at
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2027) ~[?:?]
at
io.javaoperatorsdk.operator.processing.event.source.informer.InformerWrapper.start(InformerWrapper.java:88)
~[flink-kubernetes-operator-1.11-SNAPSHOT-shaded.jar:1.11-SNAPSHOT] at
io.javaoperatorsdk.operator.processing.event.source.informer.InformerManager.lambda$start$0(InformerManager.java:62)
~[flink-kubernetes-operator-1.11-SNAPSHOT-shaded.jar:1.11-SNAPSHOT] at
io.javaoperatorsdk.operator.api.config.ExecutorServiceManager.lambda$executeAndWaitForAllToComplete$0(ExecutorServiceManager.java:61)
~[flink-kubernetes-operator-1.11-SNAPSHOT-shaded.jar:1.11-SNAPSHOT] at
java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?] at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
[?:?] at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
[?:?] at java.lang.Thread.run(Thread.java:829) [?:?]{code}
The log shows the informer startup error occurred 2 minutes after the Operator
SDK started. From the [Java Operator SDK
doc|https://javadoc.io/static/io.javaoperatorsdk/operator-framework-core/5.0.0/io/javaoperatorsdk/operator/api/config/ConfigurationService.html?utm_source=openai#cacheSyncTimeout()]
we can see the default is 2 minutes.
So we want to introduce a new ConfigOption
`kubernetes.operator.startup.cache-sync-timeout` to make it configurable.
{code:java}
// KubernetesOperatorConfigOptions
@Documentation.Section(SECTION_ADVANCED)
public static final ConfigOption<Duration> OPERATOR_CACHE_SYNC_TIMEOUT =
operatorConfig("startup.cache-sync-timeout")
.durationType()
.defaultValue(Duration.ofMinutes(2))
.withDescription("Cache sync timeout for JOSDK informer
startup.");{code}
{code:java}
// FlinkOperator#overrideOperatorConfigs()
private void overrideOperatorConfigs(ConfigurationServiceOverrider overrider) {
overrider.withKubernetesClient(client);
var conf = configManager.getDefaultConfig();
var operatorConf = FlinkOperatorConfiguration.fromConfiguration(conf);
int parallelism = operatorConf.getReconcilerMaxParallelism();
if (parallelism == -1) {
LOG.info("Configuring operator with unbounded reconciliation thread
pool.");
overrider.withExecutorService(Executors.newCachedThreadPool());
} else {
LOG.info("Configuring operator with {} reconciliation threads.",
parallelism);
overrider.withConcurrentReconciliationThreads(parallelism);
}
if (operatorConf.isJosdkMetricsEnabled()) {
overrider.withMetrics(new OperatorJosdkMetrics(metricGroup,
configManager));
}
// override cache sync timeout
overrider.withCacheSyncTimeout(conf.get(
KubernetesOperatorConfigOptions.OPERATOR_CACHE_SYNC_TIMEOUT));
// ...
}{code}
I’d like to work on this issue and contribute a PR. If the proposal is
accepted, I can open a PR and follow up with the updates.
was:
There are {color:#000000}thousand{color}s Flink deployments in our K8s cluster.
When restart the operator, we found the Informer startup error, the log is as
follows:
{code:java}
// Operator start log
2026-02-06 10:56:30,688 INFO io.javaoperatorsdk.operator.Operator
[] [.] - Operator SDK 4.9.4 (commit: 3588780) built on
2024-08-30T00:52:57.000+0800 starting...2026-02-06 10:56:30,689 INFO
io.javaoperatorsdk.operator.Operator [] [.] - Client
version: 6.13.22026-02-06 10:56:30,691 INFO
io.javaoperatorsdk.operator.processing.Controller [] [.] - Starting
'flinkdeploymentcontroller' controller for reconciler:
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController,
resource: org.apache.flink.kubernetes.operator.api.FlinkDeployment2026-02-06
10:56:30,691 INFO io.javaoperatorsdk.operator.processing.Controller
[] [.] - Starting 'flinkstatesnapshotcontroller' controller for reconciler:
org.apache.flink.kubernetes.operator.controller.FlinkStateSnapshotController,
resource: org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot2026-02-06
10:56:30,691 INFO io.javaoperatorsdk.operator.processing.Controller
[] [.] - Starting 'flinksessionjobcontroller' controller for reconciler:
org.apache.flink.kubernetes.operator.controller.FlinkSessionJobController,
resource: org.apache.flink.kubernetes.operator.api.FlinkSessionJob2026-02-06
10:56:30,734 WARN io.fabric8.kubernetes.client.dsl.internal.VersionUsageUtils
[] [.] - The client is using resource type 'flinksessionjobs' with unstable
version 'v1beta1'2026-02-06 10:56:30,734 WARN
io.fabric8.kubernetes.client.dsl.internal.VersionUsageUtils [] [.] - The
client is using resource type 'flinkdeployments' with unstable version
'v1beta1'2026-02-06 10:58:30,739 ERROR
io.javaoperatorsdk.operator.processing.event.source.informer.InformerWrapper []
[.] - Informer startup error. Operator will be stopped. Informer:
flink.apache.org/v1beta1/flinkdeploymentsjava.util.concurrent.TimeoutException:
null at
java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1892)
~[?:?] at
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2027) ~[?:?]
at
io.javaoperatorsdk.operator.processing.event.source.informer.InformerWrapper.start(InformerWrapper.java:88)
~[flink-kubernetes-operator-1.11-SNAPSHOT-shaded.jar:1.11-SNAPSHOT] at
io.javaoperatorsdk.operator.processing.event.source.informer.InformerManager.lambda$start$0(InformerManager.java:62)
~[flink-kubernetes-operator-1.11-SNAPSHOT-shaded.jar:1.11-SNAPSHOT] at
io.javaoperatorsdk.operator.api.config.ExecutorServiceManager.lambda$executeAndWaitForAllToComplete$0(ExecutorServiceManager.java:61)
~[flink-kubernetes-operator-1.11-SNAPSHOT-shaded.jar:1.11-SNAPSHOT] at
java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?] at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
[?:?] at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
[?:?] at java.lang.Thread.run(Thread.java:829) [?:?] {code}
The log shows the informer startup error occurred 2 minutes after the Operator
SDK started. From the [Java Operator SDK
doc|https://javadoc.io/static/io.javaoperatorsdk/operator-framework-core/5.0.0/io/javaoperatorsdk/operator/api/config/ConfigurationService.html?utm_source=openai#cacheSyncTimeout()]
we can see the default is 2 minutes.
So we want to introduce a new ConfigOption
`kubernetes.operator.startup.cache-sync-timeout` to make it configurable.
{code:java}
// KubernetesOperatorConfigOptions
@Documentation.Section(SECTION_ADVANCED)
public static final ConfigOption<Duration> OPERATOR_CACHE_SYNC_TIMEOUT =
operatorConfig("startup.cache-sync-timeout")
.durationType()
.defaultValue(Duration.ofMinutes(2))
.withDescription("Cache sync timeout for JOSDK informer
startup.");{code}
{code:java}
// FlinkOperator#overrideOperatorConfigs()
private void overrideOperatorConfigs(ConfigurationServiceOverrider overrider) {
overrider.withKubernetesClient(client);
var conf = configManager.getDefaultConfig();
var operatorConf = FlinkOperatorConfiguration.fromConfiguration(conf);
int parallelism = operatorConf.getReconcilerMaxParallelism();
if (parallelism == -1) {
LOG.info("Configuring operator with unbounded reconciliation thread
pool.");
overrider.withExecutorService(Executors.newCachedThreadPool());
} else {
LOG.info("Configuring operator with {} reconciliation threads.",
parallelism);
overrider.withConcurrentReconciliationThreads(parallelism);
}
if (operatorConf.isJosdkMetricsEnabled()) {
overrider.withMetrics(new OperatorJosdkMetrics(metricGroup,
configManager));
}
// override cache sync timeout
overrider.withCacheSyncTimeout(conf.get(
KubernetesOperatorConfigOptions.OPERATOR_CACHE_SYNC_TIMEOUT));
// ...
}{code}
I’d like to work on this issue and contribute a PR. If the proposal is
accepted, I can open a PR and follow up with the updates.
> Expose JOSDK cacheSyncTimeout as operator config
> ------------------------------------------------
>
> Key: FLINK-39039
> URL: https://issues.apache.org/jira/browse/FLINK-39039
> Project: Flink
> Issue Type: Improvement
> Components: Kubernetes Operator
> Reporter: junzhong qin
> Priority: Not a Priority
>
>
> There are {color:#000000}thousand{color}s Flink deployments in our K8s
> cluster. When restart the operator, we found the Informer startup error, the
> log is as follows:
> {code:java}
> // Operator start log
> 2026-02-06 10:56:30,688 INFO io.javaoperatorsdk.operator.Operator
> [] [.] - Operator SDK 4.9.4 (commit: 3588780) built on
> 2024-08-30T00:52:57.000+0800 starting...2026-02-06 10:56:30,689 INFO
> io.javaoperatorsdk.operator.Operator [] [.] - Client
> version: 6.13.22026-02-06 10:56:30,691 INFO
> io.javaoperatorsdk.operator.processing.Controller [] [.] -
> Starting 'flinkdeploymentcontroller' controller for reconciler:
> org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController,
> resource: org.apache.flink.kubernetes.operator.api.FlinkDeployment2026-02-06
> 10:56:30,691 INFO io.javaoperatorsdk.operator.processing.Controller
> [] [.] - Starting 'flinkstatesnapshotcontroller' controller for reconciler:
> org.apache.flink.kubernetes.operator.controller.FlinkStateSnapshotController,
> resource:
> org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot2026-02-06
> 10:56:30,691 INFO io.javaoperatorsdk.operator.processing.Controller
> [] [.] - Starting 'flinksessionjobcontroller' controller for reconciler:
> org.apache.flink.kubernetes.operator.controller.FlinkSessionJobController,
> resource: org.apache.flink.kubernetes.operator.api.FlinkSessionJob2026-02-06
> 10:56:30,734 WARN
> io.fabric8.kubernetes.client.dsl.internal.VersionUsageUtils [] [.] - The
> client is using resource type 'flinksessionjobs' with unstable version
> 'v1beta1'2026-02-06 10:56:30,734 WARN
> io.fabric8.kubernetes.client.dsl.internal.VersionUsageUtils [] [.] - The
> client is using resource type 'flinkdeployments' with unstable version
> 'v1beta1'2026-02-06 10:58:30,739 ERROR
> io.javaoperatorsdk.operator.processing.event.source.informer.InformerWrapper
> [] [.] - Informer startup error. Operator will be stopped. Informer:
> flink.apache.org/v1beta1/flinkdeploymentsjava.util.concurrent.TimeoutException:
> null at
> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1892)
> ~[?:?] at
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2027)
> ~[?:?] at
> io.javaoperatorsdk.operator.processing.event.source.informer.InformerWrapper.start(InformerWrapper.java:88)
> ~[flink-kubernetes-operator-1.11-SNAPSHOT-shaded.jar:1.11-SNAPSHOT]
> at
> io.javaoperatorsdk.operator.processing.event.source.informer.InformerManager.lambda$start$0(InformerManager.java:62)
> ~[flink-kubernetes-operator-1.11-SNAPSHOT-shaded.jar:1.11-SNAPSHOT]
> at
> io.javaoperatorsdk.operator.api.config.ExecutorServiceManager.lambda$executeAndWaitForAllToComplete$0(ExecutorServiceManager.java:61)
> ~[flink-kubernetes-operator-1.11-SNAPSHOT-shaded.jar:1.11-SNAPSHOT]
> at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?] at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> [?:?] at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> [?:?] at java.lang.Thread.run(Thread.java:829) [?:?]{code}
> The log shows the informer startup error occurred 2 minutes after the
> Operator SDK started. From the [Java Operator SDK
> doc|https://javadoc.io/static/io.javaoperatorsdk/operator-framework-core/5.0.0/io/javaoperatorsdk/operator/api/config/ConfigurationService.html?utm_source=openai#cacheSyncTimeout()]
> we can see the default is 2 minutes.
> So we want to introduce a new ConfigOption
> `kubernetes.operator.startup.cache-sync-timeout` to make it configurable.
>
> {code:java}
> // KubernetesOperatorConfigOptions
> @Documentation.Section(SECTION_ADVANCED)
> public static final ConfigOption<Duration> OPERATOR_CACHE_SYNC_TIMEOUT =
> operatorConfig("startup.cache-sync-timeout")
> .durationType()
> .defaultValue(Duration.ofMinutes(2))
> .withDescription("Cache sync timeout for JOSDK informer
> startup.");{code}
>
>
>
> {code:java}
> // FlinkOperator#overrideOperatorConfigs()
> private void overrideOperatorConfigs(ConfigurationServiceOverrider overrider)
> {
> overrider.withKubernetesClient(client);
> var conf = configManager.getDefaultConfig();
> var operatorConf = FlinkOperatorConfiguration.fromConfiguration(conf);
> int parallelism = operatorConf.getReconcilerMaxParallelism();
> if (parallelism == -1) {
> LOG.info("Configuring operator with unbounded reconciliation thread
> pool.");
> overrider.withExecutorService(Executors.newCachedThreadPool());
> } else {
> LOG.info("Configuring operator with {} reconciliation threads.",
> parallelism);
> overrider.withConcurrentReconciliationThreads(parallelism);
> }
> if (operatorConf.isJosdkMetricsEnabled()) {
> overrider.withMetrics(new OperatorJosdkMetrics(metricGroup,
> configManager));
> }
> // override cache sync timeout
> overrider.withCacheSyncTimeout(conf.get(
> KubernetesOperatorConfigOptions.OPERATOR_CACHE_SYNC_TIMEOUT));
> // ...
> }{code}
>
> I’d like to work on this issue and contribute a PR. If the proposal is
> accepted, I can open a PR and follow up with the updates.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)