junzhong qin created FLINK-39039:
------------------------------------
Summary: Expose JOSDK cacheSyncTimeout as operator config
Key: FLINK-39039
URL: https://issues.apache.org/jira/browse/FLINK-39039
Project: Flink
Issue Type: Improvement
Components: Kubernetes Operator
Reporter: junzhong qin
There are {color:#000000}thousand{color}s Flink deployments in our K8s cluster.
When restart the operator, we found the Informer startup error, the log is as
follows:
{code:java}
// Operator start log
2026-02-06 10:56:30,688 INFO io.javaoperatorsdk.operator.Operator
[] [.] - Operator SDK 4.9.4 (commit: 3588780) built on
2024-08-30T00:52:57.000+0800 starting...2026-02-06 10:56:30,689 INFO
io.javaoperatorsdk.operator.Operator [] [.] - Client
version: 6.13.22026-02-06 10:56:30,691 INFO
io.javaoperatorsdk.operator.processing.Controller [] [.] - Starting
'flinkdeploymentcontroller' controller for reconciler:
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController,
resource: org.apache.flink.kubernetes.operator.api.FlinkDeployment2026-02-06
10:56:30,691 INFO io.javaoperatorsdk.operator.processing.Controller
[] [.] - Starting 'flinkstatesnapshotcontroller' controller for reconciler:
org.apache.flink.kubernetes.operator.controller.FlinkStateSnapshotController,
resource: org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot2026-02-06
10:56:30,691 INFO io.javaoperatorsdk.operator.processing.Controller
[] [.] - Starting 'flinksessionjobcontroller' controller for reconciler:
org.apache.flink.kubernetes.operator.controller.FlinkSessionJobController,
resource: org.apache.flink.kubernetes.operator.api.FlinkSessionJob2026-02-06
10:56:30,734 WARN io.fabric8.kubernetes.client.dsl.internal.VersionUsageUtils
[] [.] - The client is using resource type 'flinksessionjobs' with unstable
version 'v1beta1'2026-02-06 10:56:30,734 WARN
io.fabric8.kubernetes.client.dsl.internal.VersionUsageUtils [] [.] - The
client is using resource type 'flinkdeployments' with unstable version
'v1beta1'2026-02-06 10:58:30,739 ERROR
io.javaoperatorsdk.operator.processing.event.source.informer.InformerWrapper []
[.] - Informer startup error. Operator will be stopped. Informer:
flink.apache.org/v1beta1/flinkdeploymentsjava.util.concurrent.TimeoutException:
null at
java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1892)
~[?:?] at
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2027) ~[?:?]
at
io.javaoperatorsdk.operator.processing.event.source.informer.InformerWrapper.start(InformerWrapper.java:88)
~[flink-kubernetes-operator-1.11-SNAPSHOT-shaded.jar:1.11-SNAPSHOT] at
io.javaoperatorsdk.operator.processing.event.source.informer.InformerManager.lambda$start$0(InformerManager.java:62)
~[flink-kubernetes-operator-1.11-SNAPSHOT-shaded.jar:1.11-SNAPSHOT] at
io.javaoperatorsdk.operator.api.config.ExecutorServiceManager.lambda$executeAndWaitForAllToComplete$0(ExecutorServiceManager.java:61)
~[flink-kubernetes-operator-1.11-SNAPSHOT-shaded.jar:1.11-SNAPSHOT] at
java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?] at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
[?:?] at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
[?:?] at java.lang.Thread.run(Thread.java:829) [?:?] {code}
The log shows the informer startup error occurred 2 minutes after the Operator
SDK started. From the [Java Operator SDK
doc|https://javadoc.io/static/io.javaoperatorsdk/operator-framework-core/5.0.0/io/javaoperatorsdk/operator/api/config/ConfigurationService.html?utm_source=openai#cacheSyncTimeout()]
we can see the default is 2 minutes.
So we want to introduce a new ConfigOption
`kubernetes.operator.startup.cache-sync-timeout` to make it configurable.
{code:java}
// KubernetesOperatorConfigOptions
@Documentation.Section(SECTION_ADVANCED)
public static final ConfigOption<Duration> OPERATOR_CACHE_SYNC_TIMEOUT =
operatorConfig("startup.cache-sync-timeout")
.durationType()
.defaultValue(Duration.ofMinutes(2))
.withDescription("Cache sync timeout for JOSDK informer
startup.");{code}
{code:java}
// FlinkOperator#overrideOperatorConfigs()
private void overrideOperatorConfigs(ConfigurationServiceOverrider overrider) {
overrider.withKubernetesClient(client);
var conf = configManager.getDefaultConfig();
var operatorConf = FlinkOperatorConfiguration.fromConfiguration(conf);
int parallelism = operatorConf.getReconcilerMaxParallelism();
if (parallelism == -1) {
LOG.info("Configuring operator with unbounded reconciliation thread
pool.");
overrider.withExecutorService(Executors.newCachedThreadPool());
} else {
LOG.info("Configuring operator with {} reconciliation threads.",
parallelism);
overrider.withConcurrentReconciliationThreads(parallelism);
}
if (operatorConf.isJosdkMetricsEnabled()) {
overrider.withMetrics(new OperatorJosdkMetrics(metricGroup,
configManager));
}
// override cache sync timeout
overrider.withCacheSyncTimeout(conf.get(
KubernetesOperatorConfigOptions.OPERATOR_CACHE_SYNC_TIMEOUT));
// ...
}{code}
I’d like to work on this issue and contribute a PR. If the proposal is
accepted, I can open a PR and follow up with the updates.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)