tillrohrmann commented on a change in pull request #11010:
URL: https://github.com/apache/flink/pull/11010#discussion_r425292724
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
##########
@@ -91,6 +93,8 @@
/** The number of pods requested, but not yet granted. */
private int numPendingPodRequests = 0;
+ private KubernetesWatch podsWatch;
Review comment:
I think it is a good idea to limit the surface area of external
dependencies as much as possible. This makes future refactorings (e.g. changing
the actual implementation to use a different client) much easier.
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
##########
@@ -148,6 +156,7 @@ protected void initialize() throws ResourceManagerException
{
Throwable exception = null;
try {
+ podsWatch.close();
Review comment:
This is a good idea. We can combine multiple exceptions coming from
different services via `ExceptionUtils.firstOrSuppressed`.
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesPodsWatcher.java
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient.resources;
+
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.client.KubernetesClientException;
+import io.fabric8.kubernetes.client.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.function.Consumer;
+
+/**
+ * Represent Watcher resource for Pods in kubernetes.
Review comment:
```suggestion
* Watcher for pods in Kubernetes.
```
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesWatch.java
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient.resources;
+
+import io.fabric8.kubernetes.client.Watch;
+
+/**
+ * Represent Watch resource in kubernetes.
Review comment:
```suggestion
* Watch resource in Kubernetes.
```
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesPodsWatcher.java
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient.resources;
+
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.client.KubernetesClientException;
+import io.fabric8.kubernetes.client.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.function.Consumer;
+
+/**
+ * Represent Watcher resource for Pods in kubernetes.
+ */
+public class KubernetesPodsWatcher implements Watcher<Pod> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(KubernetesPodsWatcher.class);
+
+ private final FlinkKubeClient.PodCallbackHandler podsCallbackHandler;
+ private final Consumer<Exception> podsWatcherCloseHandler;
+
+ public KubernetesPodsWatcher(
+ FlinkKubeClient.PodCallbackHandler callbackHandler,
+ Consumer<Exception> podsWatcherCloseHandler) {
Review comment:
This could also be a `FatalErrorHandler`.
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/FlinkKubeClient.java
##########
@@ -96,9 +98,10 @@
* Watch the pods selected by labels and do the {@link
PodCallbackHandler}.
*
* @param labels labels to filter the pods to watch
- * @param callbackHandler {@link PodCallbackHandler} will be called
when the watcher receive the corresponding events.
+ * @param podsWatcher watcher used to process the {@link
PodCallbackHandler}
+ * @return Return a watch for pods. It needs to be closed after use.
*/
- void watchPodsAndDoCallback(Map<String, String> labels,
PodCallbackHandler callbackHandler);
+ KubernetesWatch watchPodsAndDoCallback(Map<String, String> labels,
KubernetesPodsWatcher podsWatcher);
Review comment:
With this change, we make a fabric8 class part of the API since
`KubernetesPodsWatcher` extends `Watcher<Pod>`. Why don't we add a
`handleError` method to `PodCallBackHandler`?
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesPodsWatcher.java
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient.resources;
+
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.client.KubernetesClientException;
+import io.fabric8.kubernetes.client.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.function.Consumer;
+
+/**
+ * Represent Watcher resource for Pods in kubernetes.
+ */
+public class KubernetesPodsWatcher implements Watcher<Pod> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(KubernetesPodsWatcher.class);
+
+ private final FlinkKubeClient.PodCallbackHandler podsCallbackHandler;
+ private final Consumer<Exception> podsWatcherCloseHandler;
+
+ public KubernetesPodsWatcher(
+ FlinkKubeClient.PodCallbackHandler callbackHandler,
+ Consumer<Exception> podsWatcherCloseHandler) {
+ this.podsCallbackHandler = callbackHandler;
+ this.podsWatcherCloseHandler = podsWatcherCloseHandler;
+ }
+
+ @Override
+ public void eventReceived(Action action, Pod pod) {
+ LOG.debug("Received {} event for pod {}, details: {}", action,
pod.getMetadata().getName(), pod.getStatus());
+ switch (action) {
+ case ADDED:
+
podsCallbackHandler.onAdded(Collections.singletonList(new KubernetesPod(pod)));
+ break;
+ case MODIFIED:
+
podsCallbackHandler.onModified(Collections.singletonList(new
KubernetesPod(pod)));
+ break;
+ case ERROR:
+
podsCallbackHandler.onError(Collections.singletonList(new KubernetesPod(pod)));
+ break;
+ case DELETED:
+
podsCallbackHandler.onDeleted(Collections.singletonList(new
KubernetesPod(pod)));
+ break;
+ default:
+ LOG.debug("Ignore handling {} event for pod
{}", action, pod.getMetadata().getName());
+ break;
+ }
+ }
+
+ @Override
+ public void onClose(KubernetesClientException cause) {
+ LOG.debug("The pods watcher is closing with exception {}.",
cause == null ? "null" : cause);
Review comment:
Sounds like a good idea.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]