[ 
https://issues.apache.org/jira/browse/FLINK-39039?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-39039:
-----------------------------------
    Labels: pull-request-available  (was: )

> Expose JOSDK cacheSyncTimeout as operator config
> ------------------------------------------------
>
>                 Key: FLINK-39039
>                 URL: https://issues.apache.org/jira/browse/FLINK-39039
>             Project: Flink
>          Issue Type: Improvement
>          Components: Kubernetes Operator
>            Reporter: junzhong qin
>            Priority: Not a Priority
>              Labels: pull-request-available
>
>  
> There are {color:#000000}thousand{color}s Flink deployments in our K8s 
> cluster. When restart the operator, we found the Informer startup error, the 
> log is as follows:
> {code:java}
> // Operator start log
> 2026-02-06 10:56:30,688 INFO  io.javaoperatorsdk.operator.Operator            
>              [] [.] - Operator SDK 4.9.4 (commit: 3588780) built on 
> 2024-08-30T00:52:57.000+0800 starting...
> 2026-02-06 10:56:30,689 INFO  io.javaoperatorsdk.operator.Operator            
>              [] [.] - Client version: 6.13.2
> 2026-02-06 10:56:30,691 INFO  
> io.javaoperatorsdk.operator.processing.Controller            [] [.] - 
> Starting 'flinkdeploymentcontroller' controller for reconciler: 
> org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController, 
> resource: org.apache.flink.kubernetes.operator.api.FlinkDeployment
> 2026-02-06 10:56:30,691 INFO  
> io.javaoperatorsdk.operator.processing.Controller            [] [.] - 
> Starting 'flinkstatesnapshotcontroller' controller for reconciler: 
> org.apache.flink.kubernetes.operator.controller.FlinkStateSnapshotController, 
> resource: org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot
> 2026-02-06 10:56:30,691 INFO  
> io.javaoperatorsdk.operator.processing.Controller            [] [.] - 
> Starting 'flinksessionjobcontroller' controller for reconciler: 
> org.apache.flink.kubernetes.operator.controller.FlinkSessionJobController, 
> resource: org.apache.flink.kubernetes.operator.api.FlinkSessionJob
> 2026-02-06 10:56:30,734 WARN  
> io.fabric8.kubernetes.client.dsl.internal.VersionUsageUtils  [] [.] - The 
> client is using resource type 'flinksessionjobs' with unstable version 
> 'v1beta1'
> 2026-02-06 10:56:30,734 WARN  
> io.fabric8.kubernetes.client.dsl.internal.VersionUsageUtils  [] [.] - The 
> client is using resource type 'flinkdeployments' with unstable version 
> 'v1beta1'
> 2026-02-06 10:58:30,739 ERROR 
> io.javaoperatorsdk.operator.processing.event.source.informer.InformerWrapper 
> [] [.] - Informer startup error. Operator will be stopped. Informer: 
> flink.apache.org/v1beta1/java.util.concurrent.TimeoutException: null        
>         at 
> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1892) 
> ~[?:?]        
>         at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2027) ~[?:?]
>         at 
> io.javaoperatorsdk.operator.processing.event.source.informer.InformerWrapper.start(InformerWrapper.java:88)
>  ~[flink-kubernetes-operator-1.11-SNAPSHOT-shaded.jar:1.11-SNAPSHOT]
>         at 
> io.javaoperatorsdk.operator.processing.event.source.informer.InformerManager.lambda$start$0(InformerManager.java:62)
>  ~[flink-kubernetes-operator-1.11-SNAPSHOT-shaded.jar:1.11-SNAPSHOT]
>         at 
> io.javaoperatorsdk.operator.api.config.ExecutorServiceManager.lambda$executeAndWaitForAllToComplete$0(ExecutorServiceManager.java:61)
>  ~[flink-kubernetes-operator-1.11-SNAPSHOT-shaded.jar:1.11-SNAPSHOT]
>         at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>  [?:?]
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>  [?:?]
>         at java.lang.Thread.run(Thread.java:829) [?:?]{code}
> The log shows the informer startup error occurred 2 minutes after the 
> Operator SDK started. From the [Java Operator SDK 
> doc|https://javadoc.io/static/io.javaoperatorsdk/operator-framework-core/5.0.0/io/javaoperatorsdk/operator/api/config/ConfigurationService.html?utm_source=openai#cacheSyncTimeout()]
>  we can see the default is 2 minutes.
> So we want to introduce a new ConfigOption 
> `kubernetes.operator.startup.cache-sync-timeout` to make it configurable. 
>  
> {code:java}
> // KubernetesOperatorConfigOptions
> @Documentation.Section(SECTION_ADVANCED)
> public static final ConfigOption<Duration> OPERATOR_CACHE_SYNC_TIMEOUT =
>         operatorConfig("startup.cache-sync-timeout")
>                 .durationType()
>                 .defaultValue(Duration.ofMinutes(2))
>                 .withDescription("Cache sync timeout for JOSDK informer 
> startup.");{code}
>  
> {code:java}
> // FlinkOperator#overrideOperatorConfigs()
> private void overrideOperatorConfigs(ConfigurationServiceOverrider overrider) 
> {
>     overrider.withKubernetesClient(client);
>     var conf = configManager.getDefaultConfig();
>     var operatorConf = FlinkOperatorConfiguration.fromConfiguration(conf);
>     int parallelism = operatorConf.getReconcilerMaxParallelism();
>     if (parallelism == -1) {
>         LOG.info("Configuring operator with unbounded reconciliation thread 
> pool.");
>         overrider.withExecutorService(Executors.newCachedThreadPool());
>     } else {
>         LOG.info("Configuring operator with {} reconciliation threads.", 
> parallelism);
>         overrider.withConcurrentReconciliationThreads(parallelism);
>     }
>     if (operatorConf.isJosdkMetricsEnabled()) {
>         overrider.withMetrics(new OperatorJosdkMetrics(metricGroup, 
> configManager));
>     }
>     // override cache sync timeout
>     overrider.withCacheSyncTimeout(conf.get(
>         KubernetesOperatorConfigOptions.OPERATOR_CACHE_SYNC_TIMEOUT));
>    // ...
> }{code}
>  
> I’d like to work on this issue and contribute a PR. If the proposal is 
> accepted, I can open a PR and follow up with the updates.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to