tillrohrmann commented on a change in pull request #14837: URL: https://github.com/apache/flink/pull/14837#discussion_r571896290
########## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderRetrievalDriver.java ########## @@ -114,10 +127,24 @@ public void onError(List<KubernetesConfigMap> configMaps) { } @Override - public void handleFatalError(Throwable throwable) { - fatalErrorHandler.onFatalError( - new LeaderRetrievalException( - "Error while watching the ConfigMap " + configMapName)); + public void handleError(Throwable throwable) { + if (throwable instanceof KubernetesTooOldResourceVersionException) { + synchronized (watchLock) { + if (running) { + if (kubernetesWatch != null) { + kubernetesWatch.close(); + } + LOG.info("Creating a new watch on ConfigMap {}.", configMapName); + kubernetesWatch = + kubeClient.watchConfigMaps( + configMapName, new ConfigMapCallbackHandlerImpl()); + } + } Review comment: Same here. It would be great to guard this behaviour with a test. ########## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java ########## @@ -359,8 +372,20 @@ public void onError(List<KubernetesPod> pods) { } @Override - public void handleFatalError(Throwable throwable) { - getResourceEventHandler().onError(throwable); + public void handleError(Throwable throwable) { + if (throwable instanceof KubernetesTooOldResourceVersionException) { + getMainThreadExecutor() + .execute( + () -> { + if (running) { + podsWatchOpt.ifPresent(KubernetesWatch::close); + log.info("Creating a new watch on TaskManager pods."); + podsWatchOpt = watchTaskManagerPods(); + } + }); Review comment: Great :-) I think the only thing missing is a test case for this error case. The test case should make sure that we create a new watch if a `KubernetesTooOldResourceVersionException` occurs. ########## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionDriver.java ########## @@ -234,10 +245,24 @@ public void onError(List<KubernetesConfigMap> configMaps) { } @Override - public void handleFatalError(Throwable throwable) { - fatalErrorHandler.onFatalError( - new LeaderElectionException( - "Error while watching the ConfigMap " + configMapName, throwable)); + public void handleError(Throwable throwable) { + if (throwable instanceof KubernetesTooOldResourceVersionException) { + synchronized (watchLock) { + if (running) { + if (kubernetesWatch != null) { + kubernetesWatch.close(); + } + LOG.info("Creating a new watch on ConfigMap {}.", configMapName); + kubernetesWatch = + kubeClient.watchConfigMaps( + configMapName, new ConfigMapCallbackHandlerImpl()); + } + } Review comment: Testing this behaviour would be great as well. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org