junzhong qin created FLINK-39039:
------------------------------------

             Summary: Expose JOSDK cacheSyncTimeout as operator config
                 Key: FLINK-39039
                 URL: https://issues.apache.org/jira/browse/FLINK-39039
             Project: Flink
          Issue Type: Improvement
          Components: Kubernetes Operator
            Reporter: junzhong qin


 

There are {color:#000000}thousand{color}s Flink deployments in our K8s cluster. 
When restart the operator, we found the Informer startup error, the log is as 
follows:
{code:java}
// Operator start log
2026-02-06 10:56:30,688 INFO  io.javaoperatorsdk.operator.Operator              
           [] [.] - Operator SDK 4.9.4 (commit: 3588780) built on 
2024-08-30T00:52:57.000+0800 starting...2026-02-06 10:56:30,689 INFO  
io.javaoperatorsdk.operator.Operator                         [] [.] - Client 
version: 6.13.22026-02-06 10:56:30,691 INFO  
io.javaoperatorsdk.operator.processing.Controller            [] [.] - Starting 
'flinkdeploymentcontroller' controller for reconciler: 
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController, 
resource: org.apache.flink.kubernetes.operator.api.FlinkDeployment2026-02-06 
10:56:30,691 INFO  io.javaoperatorsdk.operator.processing.Controller            
[] [.] - Starting 'flinkstatesnapshotcontroller' controller for reconciler: 
org.apache.flink.kubernetes.operator.controller.FlinkStateSnapshotController, 
resource: org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot2026-02-06 
10:56:30,691 INFO  io.javaoperatorsdk.operator.processing.Controller            
[] [.] - Starting 'flinksessionjobcontroller' controller for reconciler: 
org.apache.flink.kubernetes.operator.controller.FlinkSessionJobController, 
resource: org.apache.flink.kubernetes.operator.api.FlinkSessionJob2026-02-06 
10:56:30,734 WARN  io.fabric8.kubernetes.client.dsl.internal.VersionUsageUtils  
[] [.] - The client is using resource type 'flinksessionjobs' with unstable 
version 'v1beta1'2026-02-06 10:56:30,734 WARN  
io.fabric8.kubernetes.client.dsl.internal.VersionUsageUtils  [] [.] - The 
client is using resource type 'flinkdeployments' with unstable version 
'v1beta1'2026-02-06 10:58:30,739 ERROR 
io.javaoperatorsdk.operator.processing.event.source.informer.InformerWrapper [] 
[.] - Informer startup error. Operator will be stopped. Informer: 
flink.apache.org/v1beta1/flinkdeploymentsjava.util.concurrent.TimeoutException: 
null        at 
java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1892) 
~[?:?]        at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2027) ~[?:?]  
      at 
io.javaoperatorsdk.operator.processing.event.source.informer.InformerWrapper.start(InformerWrapper.java:88)
 ~[flink-kubernetes-operator-1.11-SNAPSHOT-shaded.jar:1.11-SNAPSHOT]        at 
io.javaoperatorsdk.operator.processing.event.source.informer.InformerManager.lambda$start$0(InformerManager.java:62)
 ~[flink-kubernetes-operator-1.11-SNAPSHOT-shaded.jar:1.11-SNAPSHOT]        at 
io.javaoperatorsdk.operator.api.config.ExecutorServiceManager.lambda$executeAndWaitForAllToComplete$0(ExecutorServiceManager.java:61)
 ~[flink-kubernetes-operator-1.11-SNAPSHOT-shaded.jar:1.11-SNAPSHOT]        at 
java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 
[?:?]        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 
[?:?]        at java.lang.Thread.run(Thread.java:829) [?:?] {code}
The log shows the informer startup error occurred 2 minutes after the Operator 
SDK started. From the [Java Operator SDK 
doc|https://javadoc.io/static/io.javaoperatorsdk/operator-framework-core/5.0.0/io/javaoperatorsdk/operator/api/config/ConfigurationService.html?utm_source=openai#cacheSyncTimeout()]
 we can see the default is 2 minutes.



So we want to introduce a new ConfigOption 
`kubernetes.operator.startup.cache-sync-timeout` to make it configurable. 

 
{code:java}
// KubernetesOperatorConfigOptions

@Documentation.Section(SECTION_ADVANCED)
public static final ConfigOption<Duration> OPERATOR_CACHE_SYNC_TIMEOUT =
        operatorConfig("startup.cache-sync-timeout")
                .durationType()
                .defaultValue(Duration.ofMinutes(2))
                .withDescription("Cache sync timeout for JOSDK informer 
startup.");{code}
 

 

 
{code:java}
// FlinkOperator#overrideOperatorConfigs()
private void overrideOperatorConfigs(ConfigurationServiceOverrider overrider) {
    overrider.withKubernetesClient(client);
    var conf = configManager.getDefaultConfig();
    var operatorConf = FlinkOperatorConfiguration.fromConfiguration(conf);
    int parallelism = operatorConf.getReconcilerMaxParallelism();
    if (parallelism == -1) {
        LOG.info("Configuring operator with unbounded reconciliation thread 
pool.");
        overrider.withExecutorService(Executors.newCachedThreadPool());
    } else {
        LOG.info("Configuring operator with {} reconciliation threads.", 
parallelism);
        overrider.withConcurrentReconciliationThreads(parallelism);
    }

    if (operatorConf.isJosdkMetricsEnabled()) {
        overrider.withMetrics(new OperatorJosdkMetrics(metricGroup, 
configManager));
    }
    // override cache sync timeout
    overrider.withCacheSyncTimeout(conf.get(
        KubernetesOperatorConfigOptions.OPERATOR_CACHE_SYNC_TIMEOUT));
   // ...
}{code}
 

I’d like to work on this issue and contribute a PR. If the proposal is 
accepted, I can open a PR and follow up with the updates.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to