[ 
https://issues.apache.org/jira/browse/FLINK-39039?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

junzhong qin updated FLINK-39039:
---------------------------------
    Description: 
 

There are {color:#000000}thousand{color}s Flink deployments in our K8s cluster. 
When restart the operator, we found the Informer startup error, the log is as 
follows:
{code:java}
// Operator start log
2026-02-06 10:56:30,688 INFO  io.javaoperatorsdk.operator.Operator              
           [] [.] - Operator SDK 4.9.4 (commit: 3588780) built on 
2024-08-30T00:52:57.000+0800 starting...2026-02-06 10:56:30,689 INFO  
io.javaoperatorsdk.operator.Operator                         [] [.] - Client 
version: 6.13.22026-02-06 10:56:30,691 INFO  
io.javaoperatorsdk.operator.processing.Controller            [] [.] - Starting 
'flinkdeploymentcontroller' controller for reconciler: 
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController, 
resource: org.apache.flink.kubernetes.operator.api.FlinkDeployment2026-02-06 
10:56:30,691 INFO  io.javaoperatorsdk.operator.processing.Controller            
[] [.] - Starting 'flinkstatesnapshotcontroller' controller for reconciler: 
org.apache.flink.kubernetes.operator.controller.FlinkStateSnapshotController, 
resource: org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot2026-02-06 
10:56:30,691 INFO  io.javaoperatorsdk.operator.processing.Controller            
[] [.] - Starting 'flinksessionjobcontroller' controller for reconciler: 
org.apache.flink.kubernetes.operator.controller.FlinkSessionJobController, 
resource: org.apache.flink.kubernetes.operator.api.FlinkSessionJob2026-02-06 
10:56:30,734 WARN  io.fabric8.kubernetes.client.dsl.internal.VersionUsageUtils  
[] [.] - The client is using resource type 'flinksessionjobs' with unstable 
version 'v1beta1'2026-02-06 10:56:30,734 WARN  
io.fabric8.kubernetes.client.dsl.internal.VersionUsageUtils  [] [.] - The 
client is using resource type 'flinkdeployments' with unstable version 
'v1beta1'2026-02-06 10:58:30,739 ERROR 
io.javaoperatorsdk.operator.processing.event.source.informer.InformerWrapper [] 
[.] - Informer startup error. Operator will be stopped. Informer: 
flink.apache.org/v1beta1/flinkdeploymentsjava.util.concurrent.TimeoutException: 
null        at 
java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1892) 
~[?:?]        at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2027) ~[?:?]  
      at 
io.javaoperatorsdk.operator.processing.event.source.informer.InformerWrapper.start(InformerWrapper.java:88)
 ~[flink-kubernetes-operator-1.11-SNAPSHOT-shaded.jar:1.11-SNAPSHOT]        at 
io.javaoperatorsdk.operator.processing.event.source.informer.InformerManager.lambda$start$0(InformerManager.java:62)
 ~[flink-kubernetes-operator-1.11-SNAPSHOT-shaded.jar:1.11-SNAPSHOT]        at 
io.javaoperatorsdk.operator.api.config.ExecutorServiceManager.lambda$executeAndWaitForAllToComplete$0(ExecutorServiceManager.java:61)
 ~[flink-kubernetes-operator-1.11-SNAPSHOT-shaded.jar:1.11-SNAPSHOT]        at 
java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 
[?:?]        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 
[?:?]        at java.lang.Thread.run(Thread.java:829) [?:?]{code}
The log shows the informer startup error occurred 2 minutes after the Operator 
SDK started. From the [Java Operator SDK 
doc|https://javadoc.io/static/io.javaoperatorsdk/operator-framework-core/5.0.0/io/javaoperatorsdk/operator/api/config/ConfigurationService.html?utm_source=openai#cacheSyncTimeout()]
 we can see the default is 2 minutes.

So we want to introduce a new ConfigOption 
`kubernetes.operator.startup.cache-sync-timeout` to make it configurable. 

 
{code:java}
// KubernetesOperatorConfigOptions

@Documentation.Section(SECTION_ADVANCED)
public static final ConfigOption<Duration> OPERATOR_CACHE_SYNC_TIMEOUT =
        operatorConfig("startup.cache-sync-timeout")
                .durationType()
                .defaultValue(Duration.ofMinutes(2))
                .withDescription("Cache sync timeout for JOSDK informer 
startup.");{code}
 

 

 
{code:java}
// FlinkOperator#overrideOperatorConfigs()
private void overrideOperatorConfigs(ConfigurationServiceOverrider overrider) {
    overrider.withKubernetesClient(client);
    var conf = configManager.getDefaultConfig();
    var operatorConf = FlinkOperatorConfiguration.fromConfiguration(conf);
    int parallelism = operatorConf.getReconcilerMaxParallelism();
    if (parallelism == -1) {
        LOG.info("Configuring operator with unbounded reconciliation thread 
pool.");
        overrider.withExecutorService(Executors.newCachedThreadPool());
    } else {
        LOG.info("Configuring operator with {} reconciliation threads.", 
parallelism);
        overrider.withConcurrentReconciliationThreads(parallelism);
    }

    if (operatorConf.isJosdkMetricsEnabled()) {
        overrider.withMetrics(new OperatorJosdkMetrics(metricGroup, 
configManager));
    }
    // override cache sync timeout
    overrider.withCacheSyncTimeout(conf.get(
        KubernetesOperatorConfigOptions.OPERATOR_CACHE_SYNC_TIMEOUT));
   // ...
}{code}
 

I’d like to work on this issue and contribute a PR. If the proposal is 
accepted, I can open a PR and follow up with the updates.

 

  was:
 

There are {color:#000000}thousand{color}s Flink deployments in our K8s cluster. 
When restart the operator, we found the Informer startup error, the log is as 
follows:
{code:java}
// Operator start log
2026-02-06 10:56:30,688 INFO  io.javaoperatorsdk.operator.Operator              
           [] [.] - Operator SDK 4.9.4 (commit: 3588780) built on 
2024-08-30T00:52:57.000+0800 starting...2026-02-06 10:56:30,689 INFO  
io.javaoperatorsdk.operator.Operator                         [] [.] - Client 
version: 6.13.22026-02-06 10:56:30,691 INFO  
io.javaoperatorsdk.operator.processing.Controller            [] [.] - Starting 
'flinkdeploymentcontroller' controller for reconciler: 
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController, 
resource: org.apache.flink.kubernetes.operator.api.FlinkDeployment2026-02-06 
10:56:30,691 INFO  io.javaoperatorsdk.operator.processing.Controller            
[] [.] - Starting 'flinkstatesnapshotcontroller' controller for reconciler: 
org.apache.flink.kubernetes.operator.controller.FlinkStateSnapshotController, 
resource: org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot2026-02-06 
10:56:30,691 INFO  io.javaoperatorsdk.operator.processing.Controller            
[] [.] - Starting 'flinksessionjobcontroller' controller for reconciler: 
org.apache.flink.kubernetes.operator.controller.FlinkSessionJobController, 
resource: org.apache.flink.kubernetes.operator.api.FlinkSessionJob2026-02-06 
10:56:30,734 WARN  io.fabric8.kubernetes.client.dsl.internal.VersionUsageUtils  
[] [.] - The client is using resource type 'flinksessionjobs' with unstable 
version 'v1beta1'2026-02-06 10:56:30,734 WARN  
io.fabric8.kubernetes.client.dsl.internal.VersionUsageUtils  [] [.] - The 
client is using resource type 'flinkdeployments' with unstable version 
'v1beta1'2026-02-06 10:58:30,739 ERROR 
io.javaoperatorsdk.operator.processing.event.source.informer.InformerWrapper [] 
[.] - Informer startup error. Operator will be stopped. Informer: 
flink.apache.org/v1beta1/flinkdeploymentsjava.util.concurrent.TimeoutException: 
null        at 
java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1892) 
~[?:?]        at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2027) ~[?:?]  
      at 
io.javaoperatorsdk.operator.processing.event.source.informer.InformerWrapper.start(InformerWrapper.java:88)
 ~[flink-kubernetes-operator-1.11-SNAPSHOT-shaded.jar:1.11-SNAPSHOT]        at 
io.javaoperatorsdk.operator.processing.event.source.informer.InformerManager.lambda$start$0(InformerManager.java:62)
 ~[flink-kubernetes-operator-1.11-SNAPSHOT-shaded.jar:1.11-SNAPSHOT]        at 
io.javaoperatorsdk.operator.api.config.ExecutorServiceManager.lambda$executeAndWaitForAllToComplete$0(ExecutorServiceManager.java:61)
 ~[flink-kubernetes-operator-1.11-SNAPSHOT-shaded.jar:1.11-SNAPSHOT]        at 
java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 
[?:?]        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 
[?:?]        at java.lang.Thread.run(Thread.java:829) [?:?] {code}
The log shows the informer startup error occurred 2 minutes after the Operator 
SDK started. From the [Java Operator SDK 
doc|https://javadoc.io/static/io.javaoperatorsdk/operator-framework-core/5.0.0/io/javaoperatorsdk/operator/api/config/ConfigurationService.html?utm_source=openai#cacheSyncTimeout()]
 we can see the default is 2 minutes.



So we want to introduce a new ConfigOption 
`kubernetes.operator.startup.cache-sync-timeout` to make it configurable. 

 
{code:java}
// KubernetesOperatorConfigOptions

@Documentation.Section(SECTION_ADVANCED)
public static final ConfigOption<Duration> OPERATOR_CACHE_SYNC_TIMEOUT =
        operatorConfig("startup.cache-sync-timeout")
                .durationType()
                .defaultValue(Duration.ofMinutes(2))
                .withDescription("Cache sync timeout for JOSDK informer 
startup.");{code}
 

 

 
{code:java}
// FlinkOperator#overrideOperatorConfigs()
private void overrideOperatorConfigs(ConfigurationServiceOverrider overrider) {
    overrider.withKubernetesClient(client);
    var conf = configManager.getDefaultConfig();
    var operatorConf = FlinkOperatorConfiguration.fromConfiguration(conf);
    int parallelism = operatorConf.getReconcilerMaxParallelism();
    if (parallelism == -1) {
        LOG.info("Configuring operator with unbounded reconciliation thread 
pool.");
        overrider.withExecutorService(Executors.newCachedThreadPool());
    } else {
        LOG.info("Configuring operator with {} reconciliation threads.", 
parallelism);
        overrider.withConcurrentReconciliationThreads(parallelism);
    }

    if (operatorConf.isJosdkMetricsEnabled()) {
        overrider.withMetrics(new OperatorJosdkMetrics(metricGroup, 
configManager));
    }
    // override cache sync timeout
    overrider.withCacheSyncTimeout(conf.get(
        KubernetesOperatorConfigOptions.OPERATOR_CACHE_SYNC_TIMEOUT));
   // ...
}{code}
 

I’d like to work on this issue and contribute a PR. If the proposal is 
accepted, I can open a PR and follow up with the updates.

 


> Expose JOSDK cacheSyncTimeout as operator config
> ------------------------------------------------
>
>                 Key: FLINK-39039
>                 URL: https://issues.apache.org/jira/browse/FLINK-39039
>             Project: Flink
>          Issue Type: Improvement
>          Components: Kubernetes Operator
>            Reporter: junzhong qin
>            Priority: Not a Priority
>
>  
> There are {color:#000000}thousand{color}s Flink deployments in our K8s 
> cluster. When restart the operator, we found the Informer startup error, the 
> log is as follows:
> {code:java}
> // Operator start log
> 2026-02-06 10:56:30,688 INFO  io.javaoperatorsdk.operator.Operator            
>              [] [.] - Operator SDK 4.9.4 (commit: 3588780) built on 
> 2024-08-30T00:52:57.000+0800 starting...2026-02-06 10:56:30,689 INFO  
> io.javaoperatorsdk.operator.Operator                         [] [.] - Client 
> version: 6.13.22026-02-06 10:56:30,691 INFO  
> io.javaoperatorsdk.operator.processing.Controller            [] [.] - 
> Starting 'flinkdeploymentcontroller' controller for reconciler: 
> org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController, 
> resource: org.apache.flink.kubernetes.operator.api.FlinkDeployment2026-02-06 
> 10:56:30,691 INFO  io.javaoperatorsdk.operator.processing.Controller          
>   [] [.] - Starting 'flinkstatesnapshotcontroller' controller for reconciler: 
> org.apache.flink.kubernetes.operator.controller.FlinkStateSnapshotController, 
> resource: 
> org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot2026-02-06 
> 10:56:30,691 INFO  io.javaoperatorsdk.operator.processing.Controller          
>   [] [.] - Starting 'flinksessionjobcontroller' controller for reconciler: 
> org.apache.flink.kubernetes.operator.controller.FlinkSessionJobController, 
> resource: org.apache.flink.kubernetes.operator.api.FlinkSessionJob2026-02-06 
> 10:56:30,734 WARN  
> io.fabric8.kubernetes.client.dsl.internal.VersionUsageUtils  [] [.] - The 
> client is using resource type 'flinksessionjobs' with unstable version 
> 'v1beta1'2026-02-06 10:56:30,734 WARN  
> io.fabric8.kubernetes.client.dsl.internal.VersionUsageUtils  [] [.] - The 
> client is using resource type 'flinkdeployments' with unstable version 
> 'v1beta1'2026-02-06 10:58:30,739 ERROR 
> io.javaoperatorsdk.operator.processing.event.source.informer.InformerWrapper 
> [] [.] - Informer startup error. Operator will be stopped. Informer: 
> flink.apache.org/v1beta1/flinkdeploymentsjava.util.concurrent.TimeoutException:
>  null        at 
> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1892) 
> ~[?:?]        at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2027) 
> ~[?:?]        at 
> io.javaoperatorsdk.operator.processing.event.source.informer.InformerWrapper.start(InformerWrapper.java:88)
>  ~[flink-kubernetes-operator-1.11-SNAPSHOT-shaded.jar:1.11-SNAPSHOT]        
> at 
> io.javaoperatorsdk.operator.processing.event.source.informer.InformerManager.lambda$start$0(InformerManager.java:62)
>  ~[flink-kubernetes-operator-1.11-SNAPSHOT-shaded.jar:1.11-SNAPSHOT]        
> at 
> io.javaoperatorsdk.operator.api.config.ExecutorServiceManager.lambda$executeAndWaitForAllToComplete$0(ExecutorServiceManager.java:61)
>  ~[flink-kubernetes-operator-1.11-SNAPSHOT-shaded.jar:1.11-SNAPSHOT]        
> at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]        at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>  [?:?]        at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>  [?:?]        at java.lang.Thread.run(Thread.java:829) [?:?]{code}
> The log shows the informer startup error occurred 2 minutes after the 
> Operator SDK started. From the [Java Operator SDK 
> doc|https://javadoc.io/static/io.javaoperatorsdk/operator-framework-core/5.0.0/io/javaoperatorsdk/operator/api/config/ConfigurationService.html?utm_source=openai#cacheSyncTimeout()]
>  we can see the default is 2 minutes.
> So we want to introduce a new ConfigOption 
> `kubernetes.operator.startup.cache-sync-timeout` to make it configurable. 
>  
> {code:java}
> // KubernetesOperatorConfigOptions
> @Documentation.Section(SECTION_ADVANCED)
> public static final ConfigOption<Duration> OPERATOR_CACHE_SYNC_TIMEOUT =
>         operatorConfig("startup.cache-sync-timeout")
>                 .durationType()
>                 .defaultValue(Duration.ofMinutes(2))
>                 .withDescription("Cache sync timeout for JOSDK informer 
> startup.");{code}
>  
>  
>  
> {code:java}
> // FlinkOperator#overrideOperatorConfigs()
> private void overrideOperatorConfigs(ConfigurationServiceOverrider overrider) 
> {
>     overrider.withKubernetesClient(client);
>     var conf = configManager.getDefaultConfig();
>     var operatorConf = FlinkOperatorConfiguration.fromConfiguration(conf);
>     int parallelism = operatorConf.getReconcilerMaxParallelism();
>     if (parallelism == -1) {
>         LOG.info("Configuring operator with unbounded reconciliation thread 
> pool.");
>         overrider.withExecutorService(Executors.newCachedThreadPool());
>     } else {
>         LOG.info("Configuring operator with {} reconciliation threads.", 
> parallelism);
>         overrider.withConcurrentReconciliationThreads(parallelism);
>     }
>     if (operatorConf.isJosdkMetricsEnabled()) {
>         overrider.withMetrics(new OperatorJosdkMetrics(metricGroup, 
> configManager));
>     }
>     // override cache sync timeout
>     overrider.withCacheSyncTimeout(conf.get(
>         KubernetesOperatorConfigOptions.OPERATOR_CACHE_SYNC_TIMEOUT));
>    // ...
> }{code}
>  
> I’d like to work on this issue and contribute a PR. If the proposal is 
> accepted, I can open a PR and follow up with the updates.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to