XComp commented on a change in pull request #13186:
URL: https://github.com/apache/flink/pull/13186#discussion_r472002192



##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/TestingResourceEventHandler.java
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.active;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.function.Consumer;
+
+/**
+ * Testing implementation of {@link ResourceEventHandler}.
+ */
+public class TestingResourceEventHandler<WorkerType extends 
ResourceIDRetrievable> implements ResourceEventHandler<WorkerType> {
+
+       private final Consumer<Collection<WorkerType>> 
onPreviousAttemptWorkersRecoveredConsumer;
+       private final Consumer<ResourceID> onWorkerTerminatedConsumer;
+       private final Consumer<Throwable> onErrorConsumer;
+
+       private TestingResourceEventHandler(
+                       Consumer<Collection<WorkerType>> 
onPreviousAttemptWorkersRecoveredConsumer,
+                       Consumer<ResourceID> onWorkerTerminatedConsumer,
+                       Consumer<Throwable> onErrorConsumer) {
+               this.onPreviousAttemptWorkersRecoveredConsumer = 
onPreviousAttemptWorkersRecoveredConsumer;
+               this.onWorkerTerminatedConsumer = onWorkerTerminatedConsumer;
+               this.onErrorConsumer = onErrorConsumer;
+       }
+
+       @Override
+       public void onPreviousAttemptWorkersRecovered(Collection<WorkerType> 
recoveredWorkers) {
+               
onPreviousAttemptWorkersRecoveredConsumer.accept(recoveredWorkers);
+       }
+
+       @Override
+       public void onWorkerTerminated(ResourceID resourceId) {
+               onWorkerTerminatedConsumer.accept(resourceId);
+       }
+
+       @Override
+       public void onError(Throwable exception) {
+               onErrorConsumer.accept(exception);
+       }
+
+       public static class Builder<WorkerType extends ResourceIDRetrievable> {

Review comment:
       Checkstyle complains about missing JavaDoc here.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java
##########
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import 
org.apache.flink.kubernetes.configuration.KubernetesResourceManagerDriverConfiguration;
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import 
org.apache.flink.kubernetes.kubeclient.factory.KubernetesTaskManagerFactory;
+import 
org.apache.flink.kubernetes.kubeclient.parameters.KubernetesTaskManagerParameters;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesWatch;
+import org.apache.flink.kubernetes.utils.KubernetesUtils;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.externalresource.ExternalResourceUtils;
+import 
org.apache.flink.runtime.resourcemanager.active.AbstractResourceManagerDriver;
+import org.apache.flink.runtime.resourcemanager.active.ResourceManagerDriver;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Implementation of {@link ResourceManagerDriver} for Kubernetes deployment.
+ */
+public class KubernetesResourceManagerDriver extends 
AbstractResourceManagerDriver<KubernetesWorkerNode>
+       implements FlinkKubeClient.PodCallbackHandler {
+
+       /** The taskmanager pod name pattern is 
{clusterId}-{taskmanager}-{attemptId}-{podIndex}. */
+       private static final String TASK_MANAGER_POD_FORMAT = 
"%s-taskmanager-%d-%d";
+
+       private final String clusterId;
+
+       private final Time podCreationRetryInterval;
+
+       private final FlinkKubeClient kubeClient;
+
+       /** Request resource futures, keyed by pod names. */
+       private final Map<String, CompletableFuture<KubernetesWorkerNode>> 
requestResourceFutures;
+
+       /** When ResourceManager failover, the max attempt should recover. */
+       private long currentMaxAttemptId = 0;
+
+       /** Current max pod index. When creating a new pod, it should increase 
one. */
+       private long currentMaxPodId = 0;
+
+       private KubernetesWatch podsWatch;
+
+       /**
+        * Incompletion of this future indicates that there was a pod creation 
failure recently and the driver should not
+        * retry creating pods until the future become completed again. It's 
guaranteed to be modified in main thread.
+        */
+       private CompletableFuture<Void> podCreationCoolDown;
+
+       public KubernetesResourceManagerDriver(
+                       Configuration flinkConfig,
+                       FlinkKubeClient kubeClient,
+                       KubernetesResourceManagerDriverConfiguration 
configuration) {
+               super(flinkConfig, GlobalConfiguration.loadConfiguration());
+
+               this.clusterId = configuration.getClusterId();
+               this.podCreationRetryInterval = 
configuration.getPodCreationRetryInterval();
+               this.kubeClient = kubeClient;

Review comment:
       Should we add a `null` check here?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/AbstractResourceManagerDriver.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.active;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Abstract common base class for implementations of {@link 
ResourceManagerDriver}.
+ */
+public abstract class AbstractResourceManagerDriver<WorkerType extends 
ResourceIDRetrievable>
+       implements ResourceManagerDriver<WorkerType> {
+
+       protected final Logger log = LoggerFactory.getLogger(getClass());
+
+       protected final Configuration flinkConfig;
+       protected final Configuration flinkClientConfig;
+
+       private ResourceEventHandler<WorkerType> resourceEventHandler = null;
+       private ScheduledExecutor mainThreadExecutor = null;
+
+       public AbstractResourceManagerDriver(
+                       final Configuration flinkConfig,
+                       final Configuration flinkClientConfig) {
+               this.flinkConfig = Preconditions.checkNotNull(flinkConfig);
+               this.flinkClientConfig = 
Preconditions.checkNotNull(flinkClientConfig);
+       }
+
+       protected final ResourceEventHandler<WorkerType> 
getResourceEventHandler() {
+               return Preconditions.checkNotNull(

Review comment:
       How about using `IllegalStateException` instead of 
`NullPointerException` here? Wouldn't that describe the error class in a better 
way?

##########
File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriverTest.java
##########
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import 
org.apache.flink.kubernetes.configuration.KubernetesResourceManagerDriverConfiguration;
+import 
org.apache.flink.kubernetes.kubeclient.FlinkKubeClient.PodCallbackHandler;
+import org.apache.flink.kubernetes.kubeclient.TestingFlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod;
+import org.apache.flink.kubernetes.kubeclient.resources.TestingKubernetesPod;
+import org.apache.flink.kubernetes.utils.Constants;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.resourcemanager.active.ResourceManagerDriver;
+import 
org.apache.flink.runtime.resourcemanager.active.ResourceManagerDriverTestBase;
+
+import io.fabric8.kubernetes.api.model.ResourceRequirements;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link KubernetesResourceManagerDriver}.
+ */
+public class KubernetesResourceManagerDriverTest extends 
ResourceManagerDriverTestBase<KubernetesWorkerNode> {
+
+       private final static String CLUSTER_ID = "testing-flink-cluster";
+       private final static Time POD_CREATION_INTERVAL = 
Time.milliseconds(50L);
+       private final static KubernetesResourceManagerDriverConfiguration 
KUBERNETES_RESOURCE_MANAGER_CONFIGURATION =
+                       new 
KubernetesResourceManagerDriverConfiguration(CLUSTER_ID, POD_CREATION_INTERVAL);

Review comment:
       Checkstyle is complaining about the order of keywords here: `private 
static final` is the way to go.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java
##########
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import 
org.apache.flink.kubernetes.configuration.KubernetesResourceManagerDriverConfiguration;
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import 
org.apache.flink.kubernetes.kubeclient.factory.KubernetesTaskManagerFactory;
+import 
org.apache.flink.kubernetes.kubeclient.parameters.KubernetesTaskManagerParameters;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesWatch;
+import org.apache.flink.kubernetes.utils.KubernetesUtils;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.externalresource.ExternalResourceUtils;
+import 
org.apache.flink.runtime.resourcemanager.active.AbstractResourceManagerDriver;
+import org.apache.flink.runtime.resourcemanager.active.ResourceManagerDriver;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Implementation of {@link ResourceManagerDriver} for Kubernetes deployment.
+ */
+public class KubernetesResourceManagerDriver extends 
AbstractResourceManagerDriver<KubernetesWorkerNode>
+       implements FlinkKubeClient.PodCallbackHandler {
+
+       /** The taskmanager pod name pattern is 
{clusterId}-{taskmanager}-{attemptId}-{podIndex}. */
+       private static final String TASK_MANAGER_POD_FORMAT = 
"%s-taskmanager-%d-%d";
+
+       private final String clusterId;
+
+       private final Time podCreationRetryInterval;
+
+       private final FlinkKubeClient kubeClient;
+
+       /** Request resource futures, keyed by pod names. */
+       private final Map<String, CompletableFuture<KubernetesWorkerNode>> 
requestResourceFutures;
+
+       /** When ResourceManager failover, the max attempt should recover. */
+       private long currentMaxAttemptId = 0;
+
+       /** Current max pod index. When creating a new pod, it should increase 
one. */
+       private long currentMaxPodId = 0;
+
+       private KubernetesWatch podsWatch;
+
+       /**
+        * Incompletion of this future indicates that there was a pod creation 
failure recently and the driver should not
+        * retry creating pods until the future become completed again. It's 
guaranteed to be modified in main thread.
+        */
+       private CompletableFuture<Void> podCreationCoolDown;
+
+       public KubernetesResourceManagerDriver(
+                       Configuration flinkConfig,
+                       FlinkKubeClient kubeClient,
+                       KubernetesResourceManagerDriverConfiguration 
configuration) {
+               super(flinkConfig, GlobalConfiguration.loadConfiguration());
+
+               this.clusterId = configuration.getClusterId();
+               this.podCreationRetryInterval = 
configuration.getPodCreationRetryInterval();
+               this.kubeClient = kubeClient;
+               requestResourceFutures = new HashMap<>();
+               podCreationCoolDown = FutureUtils.completedVoidFuture();

Review comment:
       Maybe, align the member access: either use `this.` in all cases or in 
non.

##########
File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/TestingFlinkKubeClient.java
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient;
+
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesService;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesWatch;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+/**
+ * Testing implementation of {@link FlinkKubeClient}.
+ */
+public class TestingFlinkKubeClient implements FlinkKubeClient {
+
+       private final Function<KubernetesPod, CompletableFuture<Void>> 
createTaskManagerPodFunction;
+       private final Function<String, CompletableFuture<Void>> stopPodFunction;
+       private final Consumer<String> stopAndCleanupClusterConsumer;
+       private final Function<Map<String, String>, List<KubernetesPod>> 
getPodsWithLabelsFunction;
+       private final BiFunction<Map<String, String>, PodCallbackHandler, 
KubernetesWatch> watchPodsAndDoCallbackFunction;
+
+       private TestingFlinkKubeClient(
+                       Function<KubernetesPod, CompletableFuture<Void>> 
createTaskManagerPodFunction,
+                       Function<String, CompletableFuture<Void>> 
stopPodFunction,
+                       Consumer<String> stopAndCleanupClusterConsumer,
+                       Function<Map<String, String>, List<KubernetesPod>> 
getPodsWithLabelsFunction,
+                       BiFunction<Map<String, String>, PodCallbackHandler, 
KubernetesWatch> watchPodsAndDoCallbackFunction) {
+
+               this.createTaskManagerPodFunction = 
createTaskManagerPodFunction;
+               this.stopPodFunction = stopPodFunction;
+               this.stopAndCleanupClusterConsumer = 
stopAndCleanupClusterConsumer;
+               this.getPodsWithLabelsFunction = getPodsWithLabelsFunction;
+               this.watchPodsAndDoCallbackFunction = 
watchPodsAndDoCallbackFunction;
+       }
+
+       @Override
+       public void createJobManagerComponent(KubernetesJobManagerSpecification 
kubernetesJMSpec) {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public CompletableFuture<Void> createTaskManagerPod(KubernetesPod 
kubernetesPod) {
+               return createTaskManagerPodFunction.apply(kubernetesPod);
+       }
+
+       @Override
+       public CompletableFuture<Void> stopPod(String podName) {
+               return stopPodFunction.apply(podName);
+       }
+
+       @Override
+       public void stopAndCleanupCluster(String clusterId) {
+               stopAndCleanupClusterConsumer.accept(clusterId);
+       }
+
+       @Override
+       public Optional<KubernetesService> getRestService(String clusterId) {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public Optional<Endpoint> getRestEndpoint(String clusterId) {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public List<KubernetesPod> getPodsWithLabels(Map<String, String> 
labels) {
+               return getPodsWithLabelsFunction.apply(labels);
+       }
+
+       @Override
+       public void handleException(Exception e) {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public KubernetesWatch watchPodsAndDoCallback(Map<String, String> 
labels, PodCallbackHandler podCallbackHandler) {
+               return watchPodsAndDoCallbackFunction.apply(labels, 
podCallbackHandler);
+       }
+
+       @Override
+       public void close() throws Exception {
+               // noop
+       }
+
+       public static class Builder {

Review comment:
       Syntactic sugar: Using a `static` method 
`TestingFlinkKubeClient.builder()` instead of calling `new 
TestingFlinkKubeClient.Builder()` could be used here as well.

##########
File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/TestingFlinkKubeClient.java
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient;
+
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesService;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesWatch;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+/**
+ * Testing implementation of {@link FlinkKubeClient}.
+ */
+public class TestingFlinkKubeClient implements FlinkKubeClient {
+
+       private final Function<KubernetesPod, CompletableFuture<Void>> 
createTaskManagerPodFunction;
+       private final Function<String, CompletableFuture<Void>> stopPodFunction;
+       private final Consumer<String> stopAndCleanupClusterConsumer;
+       private final Function<Map<String, String>, List<KubernetesPod>> 
getPodsWithLabelsFunction;
+       private final BiFunction<Map<String, String>, PodCallbackHandler, 
KubernetesWatch> watchPodsAndDoCallbackFunction;
+
+       private TestingFlinkKubeClient(
+                       Function<KubernetesPod, CompletableFuture<Void>> 
createTaskManagerPodFunction,
+                       Function<String, CompletableFuture<Void>> 
stopPodFunction,
+                       Consumer<String> stopAndCleanupClusterConsumer,
+                       Function<Map<String, String>, List<KubernetesPod>> 
getPodsWithLabelsFunction,
+                       BiFunction<Map<String, String>, PodCallbackHandler, 
KubernetesWatch> watchPodsAndDoCallbackFunction) {
+
+               this.createTaskManagerPodFunction = 
createTaskManagerPodFunction;
+               this.stopPodFunction = stopPodFunction;
+               this.stopAndCleanupClusterConsumer = 
stopAndCleanupClusterConsumer;
+               this.getPodsWithLabelsFunction = getPodsWithLabelsFunction;
+               this.watchPodsAndDoCallbackFunction = 
watchPodsAndDoCallbackFunction;
+       }
+
+       @Override
+       public void createJobManagerComponent(KubernetesJobManagerSpecification 
kubernetesJMSpec) {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public CompletableFuture<Void> createTaskManagerPod(KubernetesPod 
kubernetesPod) {
+               return createTaskManagerPodFunction.apply(kubernetesPod);
+       }
+
+       @Override
+       public CompletableFuture<Void> stopPod(String podName) {
+               return stopPodFunction.apply(podName);
+       }
+
+       @Override
+       public void stopAndCleanupCluster(String clusterId) {
+               stopAndCleanupClusterConsumer.accept(clusterId);
+       }
+
+       @Override
+       public Optional<KubernetesService> getRestService(String clusterId) {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public Optional<Endpoint> getRestEndpoint(String clusterId) {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public List<KubernetesPod> getPodsWithLabels(Map<String, String> 
labels) {
+               return getPodsWithLabelsFunction.apply(labels);
+       }
+
+       @Override
+       public void handleException(Exception e) {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public KubernetesWatch watchPodsAndDoCallback(Map<String, String> 
labels, PodCallbackHandler podCallbackHandler) {
+               return watchPodsAndDoCallbackFunction.apply(labels, 
podCallbackHandler);
+       }
+
+       @Override
+       public void close() throws Exception {
+               // noop
+       }
+
+       public static class Builder {
+               private Function<KubernetesPod, CompletableFuture<Void>> 
createTaskManagerPodFunction =
+                               (ignore) -> FutureUtils.completedVoidFuture();
+               private Function<String, CompletableFuture<Void>> 
stopPodFunction =
+                               (ignore) -> FutureUtils.completedVoidFuture();
+               private Consumer<String> stopAndCleanupClusterConsumer =
+                               (ignore) -> {};
+               private Function<Map<String, String>, List<KubernetesPod>> 
getPodsWithLabelsFunction =
+                               (ignore) -> Collections.emptyList();
+               private Consumer<Exception> handleExceptionConsumer =
+                               (ignore) -> {};

Review comment:
       The `handleExceptionConsumer` is never used. Did you miss adding it to 
the `TestingFlinkKubeClient` constructor. 
`TestingFlinkKubeClient.handleException(Exception)` is throwing an 
`UnsupportedOperationException()` right now. This needs to be fixed as well, if 
you decide to actually use the `handleExceptionConsumer`. Otherwise, the member 
can be removed.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ResourceManagerDriverTestBase.java
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.active;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
+import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
+import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.RunnableWithException;
+
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Common test cases for implementations of {@link ResourceManagerDriver}.
+ */
+public abstract class ResourceManagerDriverTestBase<WorkerType extends 
ResourceIDRetrievable> extends TestLogger {
+
+       protected static final long TIMEOUT_SEC = 5L;
+
+       protected static final TaskExecutorProcessSpec 
TASK_EXECUTOR_PROCESS_SPEC = TaskExecutorProcessUtils
+                       .processSpecFromWorkerResourceSpec(new Configuration(), 
WorkerResourceSpec.ZERO);
+
+       private static final String MAIN_THREAD_NAME = 
"testing-rpc-main-thread";
+       private static final ScheduledExecutor MAIN_THREAD_EXECUTOR =
+                       new 
ScheduledExecutorServiceAdapter(Executors.newSingleThreadScheduledExecutor(runnable
 -> new Thread(runnable, MAIN_THREAD_NAME)));
+
+       @Test
+       public void testInitialize() throws Exception {
+               final Context context = createContext();
+               context.runTest(context::validateInitialization);
+       }
+
+       @Test
+       public void testRecoverPreviousAttemptWorkers() throws Exception {
+               final CompletableFuture<Collection<WorkerType>> 
recoveredWorkersFuture = new CompletableFuture<>();
+               final Context context = createContext();
+               
context.resourceEventHandlerBuilder.setOnPreviousAttemptWorkersRecoveredConsumer(recoveredWorkersFuture::complete);
+               context.preparePreviousAttemptWorkers();
+               context.runTest(() -> 
context.validateWorkersRecoveredFromPreviousAttempt(recoveredWorkersFuture.get(TIMEOUT_SEC,
 TimeUnit.SECONDS)));
+       }
+
+       @Test
+       public void testTerminate() throws Exception {
+               final Context context = createContext();
+               context.runTest(() -> {
+                       context.getDriver().terminate();
+                       context.validateTermination();
+               });
+       }
+
+       @Test
+       public void testDeregisterApplication() throws Exception {
+               final Context context = createContext();
+               context.runTest(() -> {
+                       
context.getDriver().deregisterApplication(ApplicationStatus.SUCCEEDED, null);

Review comment:
       I realize that the value is not really used in the actual 
implementation. But shouldn't that be reflected in the test in a way that all 
values of the enum are tested having the same outcome?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/AbstractResourceManagerDriver.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.active;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Abstract common base class for implementations of {@link 
ResourceManagerDriver}.
+ */
+public abstract class AbstractResourceManagerDriver<WorkerType extends 
ResourceIDRetrievable>
+       implements ResourceManagerDriver<WorkerType> {
+
+       protected final Logger log = LoggerFactory.getLogger(getClass());
+
+       protected final Configuration flinkConfig;
+       protected final Configuration flinkClientConfig;
+
+       private ResourceEventHandler<WorkerType> resourceEventHandler = null;
+       private ScheduledExecutor mainThreadExecutor = null;
+
+       public AbstractResourceManagerDriver(
+                       final Configuration flinkConfig,
+                       final Configuration flinkClientConfig) {
+               this.flinkConfig = Preconditions.checkNotNull(flinkConfig);
+               this.flinkClientConfig = 
Preconditions.checkNotNull(flinkClientConfig);
+       }
+
+       protected final ResourceEventHandler<WorkerType> 
getResourceEventHandler() {
+               return Preconditions.checkNotNull(
+                               this.resourceEventHandler,
+                               "Cannot get resource event handler. Resource 
manager driver is not initialized.");
+       }
+
+       protected final ScheduledExecutor getMainThreadExecutor() {
+               return Preconditions.checkNotNull(

Review comment:
       Same here: I'd say that throwing an `IllegalStateException` would be a 
better match for the actual error here since having the private member being 
set to `null` is more like an implementation detail that does not need to be 
exposed.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/TestingResourceEventHandler.java
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.active;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.function.Consumer;
+
+/**
+ * Testing implementation of {@link ResourceEventHandler}.
+ */
+public class TestingResourceEventHandler<WorkerType extends 
ResourceIDRetrievable> implements ResourceEventHandler<WorkerType> {
+
+       private final Consumer<Collection<WorkerType>> 
onPreviousAttemptWorkersRecoveredConsumer;
+       private final Consumer<ResourceID> onWorkerTerminatedConsumer;
+       private final Consumer<Throwable> onErrorConsumer;
+
+       private TestingResourceEventHandler(
+                       Consumer<Collection<WorkerType>> 
onPreviousAttemptWorkersRecoveredConsumer,
+                       Consumer<ResourceID> onWorkerTerminatedConsumer,
+                       Consumer<Throwable> onErrorConsumer) {
+               this.onPreviousAttemptWorkersRecoveredConsumer = 
onPreviousAttemptWorkersRecoveredConsumer;
+               this.onWorkerTerminatedConsumer = onWorkerTerminatedConsumer;
+               this.onErrorConsumer = onErrorConsumer;
+       }
+
+       @Override
+       public void onPreviousAttemptWorkersRecovered(Collection<WorkerType> 
recoveredWorkers) {
+               
onPreviousAttemptWorkersRecoveredConsumer.accept(recoveredWorkers);
+       }
+
+       @Override
+       public void onWorkerTerminated(ResourceID resourceId) {
+               onWorkerTerminatedConsumer.accept(resourceId);
+       }
+
+       @Override
+       public void onError(Throwable exception) {
+               onErrorConsumer.accept(exception);
+       }
+
+       public static class Builder<WorkerType extends ResourceIDRetrievable> {
+               private Consumer<Collection<WorkerType>> 
onPreviousAttemptWorkersRecoveredConsumer = (ignore) -> {};
+               private Consumer<ResourceID> onWorkerTerminatedConsumer = 
(ignore) -> {};
+               private Consumer<Throwable> onErrorConsumer = (ignore) -> {};
+
+               public Builder<WorkerType> 
setOnPreviousAttemptWorkersRecoveredConsumer(

Review comment:
       That's just syntactic sugar, but: I always like having a static 
`builder()` method in the parent class instead of a public `Builder` 
constructor. This way, you won't need to use `new` when instantiating the 
`Builder` (like it's done 
[here](https://github.com/apache/flink/blob/master/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/TestRestServerEndpoint.java#L39-L53)).
   
   But that's just a suggestion...

##########
File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriverTest.java
##########
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import 
org.apache.flink.kubernetes.configuration.KubernetesResourceManagerDriverConfiguration;
+import 
org.apache.flink.kubernetes.kubeclient.FlinkKubeClient.PodCallbackHandler;
+import org.apache.flink.kubernetes.kubeclient.TestingFlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod;
+import org.apache.flink.kubernetes.kubeclient.resources.TestingKubernetesPod;
+import org.apache.flink.kubernetes.utils.Constants;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.resourcemanager.active.ResourceManagerDriver;
+import 
org.apache.flink.runtime.resourcemanager.active.ResourceManagerDriverTestBase;
+
+import io.fabric8.kubernetes.api.model.ResourceRequirements;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link KubernetesResourceManagerDriver}.
+ */
+public class KubernetesResourceManagerDriverTest extends 
ResourceManagerDriverTestBase<KubernetesWorkerNode> {
+
+       private final static String CLUSTER_ID = "testing-flink-cluster";
+       private final static Time POD_CREATION_INTERVAL = 
Time.milliseconds(50L);
+       private final static KubernetesResourceManagerDriverConfiguration 
KUBERNETES_RESOURCE_MANAGER_CONFIGURATION =
+                       new 
KubernetesResourceManagerDriverConfiguration(CLUSTER_ID, POD_CREATION_INTERVAL);
+
+       @Test
+       public void testOnPodAdded() throws Exception {
+               new Context() {{
+                       final CompletableFuture<KubernetesPod> createPodFuture 
= new CompletableFuture<>();
+                       final CompletableFuture<KubernetesWorkerNode> 
requestResourceFuture = new CompletableFuture<>();
+
+                       
flinkKubeClientBuilder.setCreateTaskManagerPodFunction((pod) -> {
+                               createPodFuture.complete(pod);
+                               return FutureUtils.completedVoidFuture();
+                       });
+
+                       runTest(() -> {
+                               // request new pod
+                               runInMainThread(() -> 
getDriver().requestResource(TASK_EXECUTOR_PROCESS_SPEC).thenAccept(requestResourceFuture::complete));
+                               final KubernetesPod pod = 
createPodFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS);
+
+                               // prepare validation:
+                               // - complete requestResourceFuture in main 
thread with correct KubernetesWorkerNode
+                               final CompletableFuture<Void> validationFuture 
= requestResourceFuture.thenAccept((workerNode) -> {
+                                       validateInMainThread();
+                                       
assertThat(workerNode.getResourceID().toString(), is(pod.getName()));
+                               });
+
+                               // send onAdded event
+                               
getPodCallbackHandler().onAdded(Collections.singletonList(pod));
+
+                               // make sure finishing validation
+                               validationFuture.get(TIMEOUT_SEC, 
TimeUnit.SECONDS);
+                       });
+               }};
+       }
+
+       @Test
+       public void testOnPodModified() throws Exception {
+               new Context() {{
+                       testOnPodTerminated((pod) -> 
getPodCallbackHandler().onModified(pod));
+               }};
+       }
+
+       @Test
+       public void testOnPodDeleted() throws Exception {
+               new Context() {{
+                       testOnPodTerminated((pod) -> 
getPodCallbackHandler().onDeleted(pod));
+               }};
+       }
+
+       @Test
+       public void testOnError() throws Exception {
+               new Context() {{
+                       testOnPodTerminated((pod) -> 
getPodCallbackHandler().onError(pod));
+               }};
+       }
+
+       @Test
+       public void testFatalHandleError() throws Exception {
+               new Context() {{
+                       final CompletableFuture<Throwable> onErrorFuture = new 
CompletableFuture<>();
+                       
resourceEventHandlerBuilder.setOnErrorConsumer(onErrorFuture::complete);
+
+                       runTest(() -> {
+                               final Throwable testingError = new 
Throwable("testing error");
+                               
getPodCallbackHandler().handleFatalError(testingError);
+                               assertThat(onErrorFuture.get(TIMEOUT_SEC, 
TimeUnit.SECONDS), is(testingError));
+                       });
+               }};
+       }
+
+       @Test
+       public void testPodCreationInterval() throws Exception {
+               new Context() {{
+                       final AtomicInteger createPodCount = new 
AtomicInteger(0);
+                       final List<CompletableFuture<Long>> 
createPodTimeFutures = new ArrayList<>();
+                       createPodTimeFutures.add(new CompletableFuture<>());
+                       createPodTimeFutures.add(new CompletableFuture<>());
+
+                       
flinkKubeClientBuilder.setCreateTaskManagerPodFunction((ignore) -> {
+                               int idx = createPodCount.getAndIncrement();
+                               if (idx < createPodTimeFutures.size()) {
+                                       
createPodTimeFutures.get(idx).complete(System.currentTimeMillis());
+                               }
+                               return FutureUtils.completedExceptionally(new 
Throwable("testing error"));
+                       });
+
+                       runTest(() -> {
+                               // re-request resource on pod creation failed
+                               runInMainThread(() -> 
getDriver().requestResource(TASK_EXECUTOR_PROCESS_SPEC)
+                                               .whenComplete((ignore1, 
ignore2) -> getDriver().requestResource(TASK_EXECUTOR_PROCESS_SPEC)));
+
+                               // validate trying creating pod twice, with 
proper interval
+                               long t1 = 
createPodTimeFutures.get(0).get(TIMEOUT_SEC, TimeUnit.SECONDS);
+                               long t2 = 
createPodTimeFutures.get(1).get(TIMEOUT_SEC, TimeUnit.SECONDS);
+                               assertThat((t2 - t1), 
greaterThanOrEqualTo(POD_CREATION_INTERVAL.toMilliseconds()));
+                       });
+               }};
+       }
+
+       @Override
+       protected ResourceManagerDriverTestBase<KubernetesWorkerNode>.Context 
createContext() {
+               return new Context();
+       }
+
+       private class Context extends 
ResourceManagerDriverTestBase<KubernetesWorkerNode>.Context {
+               private final KubernetesPod PREVIOUS_ATTEMPT_POD = new 
TestingKubernetesPod(CLUSTER_ID + "-taskmanager-1-1");

Review comment:
       Checkstyle: non-`static` members should be camel-cased instead of 
snake-cased.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ResourceManagerDriverTestBase.java
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.active;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
+import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
+import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.RunnableWithException;
+
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Common test cases for implementations of {@link ResourceManagerDriver}.
+ */
+public abstract class ResourceManagerDriverTestBase<WorkerType extends 
ResourceIDRetrievable> extends TestLogger {
+
+       protected static final long TIMEOUT_SEC = 5L;
+
+       protected static final TaskExecutorProcessSpec 
TASK_EXECUTOR_PROCESS_SPEC = TaskExecutorProcessUtils
+                       .processSpecFromWorkerResourceSpec(new Configuration(), 
WorkerResourceSpec.ZERO);
+
+       private static final String MAIN_THREAD_NAME = 
"testing-rpc-main-thread";
+       private static final ScheduledExecutor MAIN_THREAD_EXECUTOR =
+                       new 
ScheduledExecutorServiceAdapter(Executors.newSingleThreadScheduledExecutor(runnable
 -> new Thread(runnable, MAIN_THREAD_NAME)));
+
+       @Test
+       public void testInitialize() throws Exception {
+               final Context context = createContext();
+               context.runTest(context::validateInitialization);
+       }
+
+       @Test
+       public void testRecoverPreviousAttemptWorkers() throws Exception {
+               final CompletableFuture<Collection<WorkerType>> 
recoveredWorkersFuture = new CompletableFuture<>();
+               final Context context = createContext();
+               
context.resourceEventHandlerBuilder.setOnPreviousAttemptWorkersRecoveredConsumer(recoveredWorkersFuture::complete);
+               context.preparePreviousAttemptWorkers();
+               context.runTest(() -> 
context.validateWorkersRecoveredFromPreviousAttempt(recoveredWorkersFuture.get(TIMEOUT_SEC,
 TimeUnit.SECONDS)));
+       }
+
+       @Test
+       public void testTerminate() throws Exception {
+               final Context context = createContext();
+               context.runTest(() -> {
+                       context.getDriver().terminate();
+                       context.validateTermination();
+               });
+       }
+
+       @Test
+       public void testDeregisterApplication() throws Exception {
+               final Context context = createContext();
+               context.runTest(() -> {
+                       
context.getDriver().deregisterApplication(ApplicationStatus.SUCCEEDED, null);
+                       context.validateDeregisterApplication();
+               });
+       }
+
+       @Test
+       public void testRequestResource() throws Exception {
+               final Context context = createContext();
+               context.runTest(() -> {
+                       context.runInMainThread(() -> 
context.getDriver().requestResource(TASK_EXECUTOR_PROCESS_SPEC));
+                       
context.validateRequestedResources(Collections.singleton(TASK_EXECUTOR_PROCESS_SPEC));
+               });
+       }
+
+       @Test
+       public void testReleaseResource() throws Exception {
+               final CompletableFuture<WorkerType> requestResourceFuture = new 
CompletableFuture<>();
+               final CompletableFuture<WorkerType> releaseResourceFuture = new 
CompletableFuture<>();
+               final Context context = createContext();
+               context.runTest(() -> {
+                       context.runInMainThread(() -> context.getDriver()
+                                       
.requestResource(TASK_EXECUTOR_PROCESS_SPEC)
+                                       
.thenAccept(requestResourceFuture::complete));
+                       requestResourceFuture.thenApply((workerNode) ->
+                                       context.runInMainThread(() -> {
+                                               
context.getDriver().releaseResource(workerNode);
+                                               
releaseResourceFuture.complete(workerNode);
+                                       }));
+                       final WorkerType worker = 
releaseResourceFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS);
+                       
context.validateReleaseResources(Collections.singleton(worker));
+               });
+       }
+
+       protected abstract Context createContext();
+
+       protected abstract class Context {

Review comment:
       Checkstyle: Missing JavaDoc

##########
File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/TestingFlinkKubeClient.java
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient;
+
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesService;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesWatch;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+/**
+ * Testing implementation of {@link FlinkKubeClient}.
+ */
+public class TestingFlinkKubeClient implements FlinkKubeClient {
+
+       private final Function<KubernetesPod, CompletableFuture<Void>> 
createTaskManagerPodFunction;
+       private final Function<String, CompletableFuture<Void>> stopPodFunction;
+       private final Consumer<String> stopAndCleanupClusterConsumer;
+       private final Function<Map<String, String>, List<KubernetesPod>> 
getPodsWithLabelsFunction;
+       private final BiFunction<Map<String, String>, PodCallbackHandler, 
KubernetesWatch> watchPodsAndDoCallbackFunction;
+
+       private TestingFlinkKubeClient(
+                       Function<KubernetesPod, CompletableFuture<Void>> 
createTaskManagerPodFunction,
+                       Function<String, CompletableFuture<Void>> 
stopPodFunction,
+                       Consumer<String> stopAndCleanupClusterConsumer,
+                       Function<Map<String, String>, List<KubernetesPod>> 
getPodsWithLabelsFunction,
+                       BiFunction<Map<String, String>, PodCallbackHandler, 
KubernetesWatch> watchPodsAndDoCallbackFunction) {
+
+               this.createTaskManagerPodFunction = 
createTaskManagerPodFunction;
+               this.stopPodFunction = stopPodFunction;
+               this.stopAndCleanupClusterConsumer = 
stopAndCleanupClusterConsumer;
+               this.getPodsWithLabelsFunction = getPodsWithLabelsFunction;
+               this.watchPodsAndDoCallbackFunction = 
watchPodsAndDoCallbackFunction;
+       }
+
+       @Override
+       public void createJobManagerComponent(KubernetesJobManagerSpecification 
kubernetesJMSpec) {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public CompletableFuture<Void> createTaskManagerPod(KubernetesPod 
kubernetesPod) {
+               return createTaskManagerPodFunction.apply(kubernetesPod);
+       }
+
+       @Override
+       public CompletableFuture<Void> stopPod(String podName) {
+               return stopPodFunction.apply(podName);
+       }
+
+       @Override
+       public void stopAndCleanupCluster(String clusterId) {
+               stopAndCleanupClusterConsumer.accept(clusterId);
+       }
+
+       @Override
+       public Optional<KubernetesService> getRestService(String clusterId) {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public Optional<Endpoint> getRestEndpoint(String clusterId) {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public List<KubernetesPod> getPodsWithLabels(Map<String, String> 
labels) {
+               return getPodsWithLabelsFunction.apply(labels);
+       }
+
+       @Override
+       public void handleException(Exception e) {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public KubernetesWatch watchPodsAndDoCallback(Map<String, String> 
labels, PodCallbackHandler podCallbackHandler) {
+               return watchPodsAndDoCallbackFunction.apply(labels, 
podCallbackHandler);
+       }
+
+       @Override
+       public void close() throws Exception {
+               // noop
+       }
+
+       public static class Builder {

Review comment:
       CheckStyle: Missing JavaDoc

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ResourceManagerDriverTestBase.java
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.active;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
+import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
+import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.RunnableWithException;
+
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Common test cases for implementations of {@link ResourceManagerDriver}.
+ */
+public abstract class ResourceManagerDriverTestBase<WorkerType extends 
ResourceIDRetrievable> extends TestLogger {
+
+       protected static final long TIMEOUT_SEC = 5L;
+
+       protected static final TaskExecutorProcessSpec 
TASK_EXECUTOR_PROCESS_SPEC = TaskExecutorProcessUtils
+                       .processSpecFromWorkerResourceSpec(new Configuration(), 
WorkerResourceSpec.ZERO);
+
+       private static final String MAIN_THREAD_NAME = 
"testing-rpc-main-thread";
+       private static final ScheduledExecutor MAIN_THREAD_EXECUTOR =
+                       new 
ScheduledExecutorServiceAdapter(Executors.newSingleThreadScheduledExecutor(runnable
 -> new Thread(runnable, MAIN_THREAD_NAME)));
+
+       @Test
+       public void testInitialize() throws Exception {
+               final Context context = createContext();
+               context.runTest(context::validateInitialization);
+       }
+
+       @Test
+       public void testRecoverPreviousAttemptWorkers() throws Exception {
+               final CompletableFuture<Collection<WorkerType>> 
recoveredWorkersFuture = new CompletableFuture<>();
+               final Context context = createContext();
+               
context.resourceEventHandlerBuilder.setOnPreviousAttemptWorkersRecoveredConsumer(recoveredWorkersFuture::complete);
+               context.preparePreviousAttemptWorkers();
+               context.runTest(() -> 
context.validateWorkersRecoveredFromPreviousAttempt(recoveredWorkersFuture.get(TIMEOUT_SEC,
 TimeUnit.SECONDS)));
+       }
+
+       @Test
+       public void testTerminate() throws Exception {
+               final Context context = createContext();
+               context.runTest(() -> {
+                       context.getDriver().terminate();
+                       context.validateTermination();
+               });
+       }
+
+       @Test
+       public void testDeregisterApplication() throws Exception {
+               final Context context = createContext();
+               context.runTest(() -> {
+                       
context.getDriver().deregisterApplication(ApplicationStatus.SUCCEEDED, null);

Review comment:
       Is it enough to test for `ApplicationStatus.SUCCEEDED` here? I'm asking 
since there are other values in the `ApplicationStatus` enum that are not 
tested.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerFactory.java
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.active;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.io.network.partition.ResourceManagerPartitionTrackerImpl;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+
+import javax.annotation.Nullable;
+
+/**
+ * Factory class for creating {@link ActiveResourceManager} with various 
implementations of {@link ResourceManagerDriver}.
+ */
+public abstract class ActiveResourceManagerFactory<WorkerType extends 
ResourceIDRetrievable>
+               extends ResourceManagerFactory<WorkerType> {
+
+       @Override
+       public ResourceManager<WorkerType> createResourceManager(
+                       Configuration configuration,
+                       ResourceID resourceId,
+                       RpcService rpcService,
+                       HighAvailabilityServices highAvailabilityServices,
+                       HeartbeatServices heartbeatServices,
+                       FatalErrorHandler fatalErrorHandler,
+                       ClusterInformation clusterInformation,
+                       @Nullable String webInterfaceUrl,
+                       MetricRegistry metricRegistry,
+                       String hostname) throws Exception {
+               return super.createResourceManager(
+                               
createActiveResourceManagerConfiguration(configuration),
+                               resourceId,
+                               rpcService,
+                               highAvailabilityServices,
+                               heartbeatServices,
+                               fatalErrorHandler,
+                               clusterInformation,
+                               webInterfaceUrl,
+                               metricRegistry,
+                               hostname);
+       }
+
+       private Configuration 
createActiveResourceManagerConfiguration(Configuration originalConfiguration) {
+               final Configuration copiedConfig = new 
Configuration(originalConfiguration);
+               // In active mode, it's depend on the ResourceManager to set 
the ResourceID of TaskManagers.

Review comment:
       ```suggestion
                // In active mode, it depends on the ResourceManager to set the 
ResourceID of TaskManagers.
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to