XComp commented on a change in pull request #13186: URL: https://github.com/apache/flink/pull/13186#discussion_r472002192
########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/TestingResourceEventHandler.java ########## @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.resourcemanager.active; + +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; +import java.util.function.Consumer; + +/** + * Testing implementation of {@link ResourceEventHandler}. + */ +public class TestingResourceEventHandler<WorkerType extends ResourceIDRetrievable> implements ResourceEventHandler<WorkerType> { + + private final Consumer<Collection<WorkerType>> onPreviousAttemptWorkersRecoveredConsumer; + private final Consumer<ResourceID> onWorkerTerminatedConsumer; + private final Consumer<Throwable> onErrorConsumer; + + private TestingResourceEventHandler( + Consumer<Collection<WorkerType>> onPreviousAttemptWorkersRecoveredConsumer, + Consumer<ResourceID> onWorkerTerminatedConsumer, + Consumer<Throwable> onErrorConsumer) { + this.onPreviousAttemptWorkersRecoveredConsumer = onPreviousAttemptWorkersRecoveredConsumer; + this.onWorkerTerminatedConsumer = onWorkerTerminatedConsumer; + this.onErrorConsumer = onErrorConsumer; + } + + @Override + public void onPreviousAttemptWorkersRecovered(Collection<WorkerType> recoveredWorkers) { + onPreviousAttemptWorkersRecoveredConsumer.accept(recoveredWorkers); + } + + @Override + public void onWorkerTerminated(ResourceID resourceId) { + onWorkerTerminatedConsumer.accept(resourceId); + } + + @Override + public void onError(Throwable exception) { + onErrorConsumer.accept(exception); + } + + public static class Builder<WorkerType extends ResourceIDRetrievable> { Review comment: Checkstyle complains about missing JavaDoc here. ########## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java ########## @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import org.apache.flink.kubernetes.configuration.KubernetesResourceManagerDriverConfiguration; +import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient; +import org.apache.flink.kubernetes.kubeclient.factory.KubernetesTaskManagerFactory; +import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesTaskManagerParameters; +import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod; +import org.apache.flink.kubernetes.kubeclient.resources.KubernetesWatch; +import org.apache.flink.kubernetes.utils.KubernetesUtils; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; +import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.externalresource.ExternalResourceUtils; +import org.apache.flink.runtime.resourcemanager.active.AbstractResourceManagerDriver; +import org.apache.flink.runtime.resourcemanager.active.ResourceManagerDriver; +import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkException; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +/** + * Implementation of {@link ResourceManagerDriver} for Kubernetes deployment. + */ +public class KubernetesResourceManagerDriver extends AbstractResourceManagerDriver<KubernetesWorkerNode> + implements FlinkKubeClient.PodCallbackHandler { + + /** The taskmanager pod name pattern is {clusterId}-{taskmanager}-{attemptId}-{podIndex}. */ + private static final String TASK_MANAGER_POD_FORMAT = "%s-taskmanager-%d-%d"; + + private final String clusterId; + + private final Time podCreationRetryInterval; + + private final FlinkKubeClient kubeClient; + + /** Request resource futures, keyed by pod names. */ + private final Map<String, CompletableFuture<KubernetesWorkerNode>> requestResourceFutures; + + /** When ResourceManager failover, the max attempt should recover. */ + private long currentMaxAttemptId = 0; + + /** Current max pod index. When creating a new pod, it should increase one. */ + private long currentMaxPodId = 0; + + private KubernetesWatch podsWatch; + + /** + * Incompletion of this future indicates that there was a pod creation failure recently and the driver should not + * retry creating pods until the future become completed again. It's guaranteed to be modified in main thread. + */ + private CompletableFuture<Void> podCreationCoolDown; + + public KubernetesResourceManagerDriver( + Configuration flinkConfig, + FlinkKubeClient kubeClient, + KubernetesResourceManagerDriverConfiguration configuration) { + super(flinkConfig, GlobalConfiguration.loadConfiguration()); + + this.clusterId = configuration.getClusterId(); + this.podCreationRetryInterval = configuration.getPodCreationRetryInterval(); + this.kubeClient = kubeClient; Review comment: Should we add a `null` check here? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/AbstractResourceManagerDriver.java ########## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.resourcemanager.active; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Abstract common base class for implementations of {@link ResourceManagerDriver}. + */ +public abstract class AbstractResourceManagerDriver<WorkerType extends ResourceIDRetrievable> + implements ResourceManagerDriver<WorkerType> { + + protected final Logger log = LoggerFactory.getLogger(getClass()); + + protected final Configuration flinkConfig; + protected final Configuration flinkClientConfig; + + private ResourceEventHandler<WorkerType> resourceEventHandler = null; + private ScheduledExecutor mainThreadExecutor = null; + + public AbstractResourceManagerDriver( + final Configuration flinkConfig, + final Configuration flinkClientConfig) { + this.flinkConfig = Preconditions.checkNotNull(flinkConfig); + this.flinkClientConfig = Preconditions.checkNotNull(flinkClientConfig); + } + + protected final ResourceEventHandler<WorkerType> getResourceEventHandler() { + return Preconditions.checkNotNull( Review comment: How about using `IllegalStateException` instead of `NullPointerException` here? Wouldn't that describe the error class in a better way? ########## File path: flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriverTest.java ########## @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import org.apache.flink.kubernetes.configuration.KubernetesResourceManagerDriverConfiguration; +import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient.PodCallbackHandler; +import org.apache.flink.kubernetes.kubeclient.TestingFlinkKubeClient; +import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod; +import org.apache.flink.kubernetes.kubeclient.resources.TestingKubernetesPod; +import org.apache.flink.kubernetes.utils.Constants; +import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.resourcemanager.active.ResourceManagerDriver; +import org.apache.flink.runtime.resourcemanager.active.ResourceManagerDriverTestBase; + +import io.fabric8.kubernetes.api.model.ResourceRequirements; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; + +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; + +/** + * Tests for {@link KubernetesResourceManagerDriver}. + */ +public class KubernetesResourceManagerDriverTest extends ResourceManagerDriverTestBase<KubernetesWorkerNode> { + + private final static String CLUSTER_ID = "testing-flink-cluster"; + private final static Time POD_CREATION_INTERVAL = Time.milliseconds(50L); + private final static KubernetesResourceManagerDriverConfiguration KUBERNETES_RESOURCE_MANAGER_CONFIGURATION = + new KubernetesResourceManagerDriverConfiguration(CLUSTER_ID, POD_CREATION_INTERVAL); Review comment: Checkstyle is complaining about the order of keywords here: `private static final` is the way to go. ########## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java ########## @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import org.apache.flink.kubernetes.configuration.KubernetesResourceManagerDriverConfiguration; +import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient; +import org.apache.flink.kubernetes.kubeclient.factory.KubernetesTaskManagerFactory; +import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesTaskManagerParameters; +import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod; +import org.apache.flink.kubernetes.kubeclient.resources.KubernetesWatch; +import org.apache.flink.kubernetes.utils.KubernetesUtils; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; +import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.externalresource.ExternalResourceUtils; +import org.apache.flink.runtime.resourcemanager.active.AbstractResourceManagerDriver; +import org.apache.flink.runtime.resourcemanager.active.ResourceManagerDriver; +import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkException; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +/** + * Implementation of {@link ResourceManagerDriver} for Kubernetes deployment. + */ +public class KubernetesResourceManagerDriver extends AbstractResourceManagerDriver<KubernetesWorkerNode> + implements FlinkKubeClient.PodCallbackHandler { + + /** The taskmanager pod name pattern is {clusterId}-{taskmanager}-{attemptId}-{podIndex}. */ + private static final String TASK_MANAGER_POD_FORMAT = "%s-taskmanager-%d-%d"; + + private final String clusterId; + + private final Time podCreationRetryInterval; + + private final FlinkKubeClient kubeClient; + + /** Request resource futures, keyed by pod names. */ + private final Map<String, CompletableFuture<KubernetesWorkerNode>> requestResourceFutures; + + /** When ResourceManager failover, the max attempt should recover. */ + private long currentMaxAttemptId = 0; + + /** Current max pod index. When creating a new pod, it should increase one. */ + private long currentMaxPodId = 0; + + private KubernetesWatch podsWatch; + + /** + * Incompletion of this future indicates that there was a pod creation failure recently and the driver should not + * retry creating pods until the future become completed again. It's guaranteed to be modified in main thread. + */ + private CompletableFuture<Void> podCreationCoolDown; + + public KubernetesResourceManagerDriver( + Configuration flinkConfig, + FlinkKubeClient kubeClient, + KubernetesResourceManagerDriverConfiguration configuration) { + super(flinkConfig, GlobalConfiguration.loadConfiguration()); + + this.clusterId = configuration.getClusterId(); + this.podCreationRetryInterval = configuration.getPodCreationRetryInterval(); + this.kubeClient = kubeClient; + requestResourceFutures = new HashMap<>(); + podCreationCoolDown = FutureUtils.completedVoidFuture(); Review comment: Maybe, align the member access: either use `this.` in all cases or in non. ########## File path: flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/TestingFlinkKubeClient.java ########## @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.kubeclient; + +import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod; +import org.apache.flink.kubernetes.kubeclient.resources.KubernetesService; +import org.apache.flink.kubernetes.kubeclient.resources.KubernetesWatch; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.util.Preconditions; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; + +/** + * Testing implementation of {@link FlinkKubeClient}. + */ +public class TestingFlinkKubeClient implements FlinkKubeClient { + + private final Function<KubernetesPod, CompletableFuture<Void>> createTaskManagerPodFunction; + private final Function<String, CompletableFuture<Void>> stopPodFunction; + private final Consumer<String> stopAndCleanupClusterConsumer; + private final Function<Map<String, String>, List<KubernetesPod>> getPodsWithLabelsFunction; + private final BiFunction<Map<String, String>, PodCallbackHandler, KubernetesWatch> watchPodsAndDoCallbackFunction; + + private TestingFlinkKubeClient( + Function<KubernetesPod, CompletableFuture<Void>> createTaskManagerPodFunction, + Function<String, CompletableFuture<Void>> stopPodFunction, + Consumer<String> stopAndCleanupClusterConsumer, + Function<Map<String, String>, List<KubernetesPod>> getPodsWithLabelsFunction, + BiFunction<Map<String, String>, PodCallbackHandler, KubernetesWatch> watchPodsAndDoCallbackFunction) { + + this.createTaskManagerPodFunction = createTaskManagerPodFunction; + this.stopPodFunction = stopPodFunction; + this.stopAndCleanupClusterConsumer = stopAndCleanupClusterConsumer; + this.getPodsWithLabelsFunction = getPodsWithLabelsFunction; + this.watchPodsAndDoCallbackFunction = watchPodsAndDoCallbackFunction; + } + + @Override + public void createJobManagerComponent(KubernetesJobManagerSpecification kubernetesJMSpec) { + throw new UnsupportedOperationException(); + } + + @Override + public CompletableFuture<Void> createTaskManagerPod(KubernetesPod kubernetesPod) { + return createTaskManagerPodFunction.apply(kubernetesPod); + } + + @Override + public CompletableFuture<Void> stopPod(String podName) { + return stopPodFunction.apply(podName); + } + + @Override + public void stopAndCleanupCluster(String clusterId) { + stopAndCleanupClusterConsumer.accept(clusterId); + } + + @Override + public Optional<KubernetesService> getRestService(String clusterId) { + throw new UnsupportedOperationException(); + } + + @Override + public Optional<Endpoint> getRestEndpoint(String clusterId) { + throw new UnsupportedOperationException(); + } + + @Override + public List<KubernetesPod> getPodsWithLabels(Map<String, String> labels) { + return getPodsWithLabelsFunction.apply(labels); + } + + @Override + public void handleException(Exception e) { + throw new UnsupportedOperationException(); + } + + @Override + public KubernetesWatch watchPodsAndDoCallback(Map<String, String> labels, PodCallbackHandler podCallbackHandler) { + return watchPodsAndDoCallbackFunction.apply(labels, podCallbackHandler); + } + + @Override + public void close() throws Exception { + // noop + } + + public static class Builder { Review comment: Syntactic sugar: Using a `static` method `TestingFlinkKubeClient.builder()` instead of calling `new TestingFlinkKubeClient.Builder()` could be used here as well. ########## File path: flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/TestingFlinkKubeClient.java ########## @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.kubeclient; + +import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod; +import org.apache.flink.kubernetes.kubeclient.resources.KubernetesService; +import org.apache.flink.kubernetes.kubeclient.resources.KubernetesWatch; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.util.Preconditions; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; + +/** + * Testing implementation of {@link FlinkKubeClient}. + */ +public class TestingFlinkKubeClient implements FlinkKubeClient { + + private final Function<KubernetesPod, CompletableFuture<Void>> createTaskManagerPodFunction; + private final Function<String, CompletableFuture<Void>> stopPodFunction; + private final Consumer<String> stopAndCleanupClusterConsumer; + private final Function<Map<String, String>, List<KubernetesPod>> getPodsWithLabelsFunction; + private final BiFunction<Map<String, String>, PodCallbackHandler, KubernetesWatch> watchPodsAndDoCallbackFunction; + + private TestingFlinkKubeClient( + Function<KubernetesPod, CompletableFuture<Void>> createTaskManagerPodFunction, + Function<String, CompletableFuture<Void>> stopPodFunction, + Consumer<String> stopAndCleanupClusterConsumer, + Function<Map<String, String>, List<KubernetesPod>> getPodsWithLabelsFunction, + BiFunction<Map<String, String>, PodCallbackHandler, KubernetesWatch> watchPodsAndDoCallbackFunction) { + + this.createTaskManagerPodFunction = createTaskManagerPodFunction; + this.stopPodFunction = stopPodFunction; + this.stopAndCleanupClusterConsumer = stopAndCleanupClusterConsumer; + this.getPodsWithLabelsFunction = getPodsWithLabelsFunction; + this.watchPodsAndDoCallbackFunction = watchPodsAndDoCallbackFunction; + } + + @Override + public void createJobManagerComponent(KubernetesJobManagerSpecification kubernetesJMSpec) { + throw new UnsupportedOperationException(); + } + + @Override + public CompletableFuture<Void> createTaskManagerPod(KubernetesPod kubernetesPod) { + return createTaskManagerPodFunction.apply(kubernetesPod); + } + + @Override + public CompletableFuture<Void> stopPod(String podName) { + return stopPodFunction.apply(podName); + } + + @Override + public void stopAndCleanupCluster(String clusterId) { + stopAndCleanupClusterConsumer.accept(clusterId); + } + + @Override + public Optional<KubernetesService> getRestService(String clusterId) { + throw new UnsupportedOperationException(); + } + + @Override + public Optional<Endpoint> getRestEndpoint(String clusterId) { + throw new UnsupportedOperationException(); + } + + @Override + public List<KubernetesPod> getPodsWithLabels(Map<String, String> labels) { + return getPodsWithLabelsFunction.apply(labels); + } + + @Override + public void handleException(Exception e) { + throw new UnsupportedOperationException(); + } + + @Override + public KubernetesWatch watchPodsAndDoCallback(Map<String, String> labels, PodCallbackHandler podCallbackHandler) { + return watchPodsAndDoCallbackFunction.apply(labels, podCallbackHandler); + } + + @Override + public void close() throws Exception { + // noop + } + + public static class Builder { + private Function<KubernetesPod, CompletableFuture<Void>> createTaskManagerPodFunction = + (ignore) -> FutureUtils.completedVoidFuture(); + private Function<String, CompletableFuture<Void>> stopPodFunction = + (ignore) -> FutureUtils.completedVoidFuture(); + private Consumer<String> stopAndCleanupClusterConsumer = + (ignore) -> {}; + private Function<Map<String, String>, List<KubernetesPod>> getPodsWithLabelsFunction = + (ignore) -> Collections.emptyList(); + private Consumer<Exception> handleExceptionConsumer = + (ignore) -> {}; Review comment: The `handleExceptionConsumer` is never used. Did you miss adding it to the `TestingFlinkKubeClient` constructor. `TestingFlinkKubeClient.handleException(Exception)` is throwing an `UnsupportedOperationException()` right now. This needs to be fixed as well, if you decide to actually use the `handleExceptionConsumer`. Otherwise, the member can be removed. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ResourceManagerDriverTestBase.java ########## @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.resourcemanager.active; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec; +import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils; +import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; +import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter; +import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec; +import org.apache.flink.util.TestLogger; +import org.apache.flink.util.function.RunnableWithException; + +import org.junit.Test; + +import java.util.Collection; +import java.util.Collections; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; + +/** + * Common test cases for implementations of {@link ResourceManagerDriver}. + */ +public abstract class ResourceManagerDriverTestBase<WorkerType extends ResourceIDRetrievable> extends TestLogger { + + protected static final long TIMEOUT_SEC = 5L; + + protected static final TaskExecutorProcessSpec TASK_EXECUTOR_PROCESS_SPEC = TaskExecutorProcessUtils + .processSpecFromWorkerResourceSpec(new Configuration(), WorkerResourceSpec.ZERO); + + private static final String MAIN_THREAD_NAME = "testing-rpc-main-thread"; + private static final ScheduledExecutor MAIN_THREAD_EXECUTOR = + new ScheduledExecutorServiceAdapter(Executors.newSingleThreadScheduledExecutor(runnable -> new Thread(runnable, MAIN_THREAD_NAME))); + + @Test + public void testInitialize() throws Exception { + final Context context = createContext(); + context.runTest(context::validateInitialization); + } + + @Test + public void testRecoverPreviousAttemptWorkers() throws Exception { + final CompletableFuture<Collection<WorkerType>> recoveredWorkersFuture = new CompletableFuture<>(); + final Context context = createContext(); + context.resourceEventHandlerBuilder.setOnPreviousAttemptWorkersRecoveredConsumer(recoveredWorkersFuture::complete); + context.preparePreviousAttemptWorkers(); + context.runTest(() -> context.validateWorkersRecoveredFromPreviousAttempt(recoveredWorkersFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS))); + } + + @Test + public void testTerminate() throws Exception { + final Context context = createContext(); + context.runTest(() -> { + context.getDriver().terminate(); + context.validateTermination(); + }); + } + + @Test + public void testDeregisterApplication() throws Exception { + final Context context = createContext(); + context.runTest(() -> { + context.getDriver().deregisterApplication(ApplicationStatus.SUCCEEDED, null); Review comment: I realize that the value is not really used in the actual implementation. But shouldn't that be reflected in the test in a way that all values of the enum are tested having the same outcome? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/AbstractResourceManagerDriver.java ########## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.resourcemanager.active; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Abstract common base class for implementations of {@link ResourceManagerDriver}. + */ +public abstract class AbstractResourceManagerDriver<WorkerType extends ResourceIDRetrievable> + implements ResourceManagerDriver<WorkerType> { + + protected final Logger log = LoggerFactory.getLogger(getClass()); + + protected final Configuration flinkConfig; + protected final Configuration flinkClientConfig; + + private ResourceEventHandler<WorkerType> resourceEventHandler = null; + private ScheduledExecutor mainThreadExecutor = null; + + public AbstractResourceManagerDriver( + final Configuration flinkConfig, + final Configuration flinkClientConfig) { + this.flinkConfig = Preconditions.checkNotNull(flinkConfig); + this.flinkClientConfig = Preconditions.checkNotNull(flinkClientConfig); + } + + protected final ResourceEventHandler<WorkerType> getResourceEventHandler() { + return Preconditions.checkNotNull( + this.resourceEventHandler, + "Cannot get resource event handler. Resource manager driver is not initialized."); + } + + protected final ScheduledExecutor getMainThreadExecutor() { + return Preconditions.checkNotNull( Review comment: Same here: I'd say that throwing an `IllegalStateException` would be a better match for the actual error here since having the private member being set to `null` is more like an implementation detail that does not need to be exposed. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/TestingResourceEventHandler.java ########## @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.resourcemanager.active; + +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; +import java.util.function.Consumer; + +/** + * Testing implementation of {@link ResourceEventHandler}. + */ +public class TestingResourceEventHandler<WorkerType extends ResourceIDRetrievable> implements ResourceEventHandler<WorkerType> { + + private final Consumer<Collection<WorkerType>> onPreviousAttemptWorkersRecoveredConsumer; + private final Consumer<ResourceID> onWorkerTerminatedConsumer; + private final Consumer<Throwable> onErrorConsumer; + + private TestingResourceEventHandler( + Consumer<Collection<WorkerType>> onPreviousAttemptWorkersRecoveredConsumer, + Consumer<ResourceID> onWorkerTerminatedConsumer, + Consumer<Throwable> onErrorConsumer) { + this.onPreviousAttemptWorkersRecoveredConsumer = onPreviousAttemptWorkersRecoveredConsumer; + this.onWorkerTerminatedConsumer = onWorkerTerminatedConsumer; + this.onErrorConsumer = onErrorConsumer; + } + + @Override + public void onPreviousAttemptWorkersRecovered(Collection<WorkerType> recoveredWorkers) { + onPreviousAttemptWorkersRecoveredConsumer.accept(recoveredWorkers); + } + + @Override + public void onWorkerTerminated(ResourceID resourceId) { + onWorkerTerminatedConsumer.accept(resourceId); + } + + @Override + public void onError(Throwable exception) { + onErrorConsumer.accept(exception); + } + + public static class Builder<WorkerType extends ResourceIDRetrievable> { + private Consumer<Collection<WorkerType>> onPreviousAttemptWorkersRecoveredConsumer = (ignore) -> {}; + private Consumer<ResourceID> onWorkerTerminatedConsumer = (ignore) -> {}; + private Consumer<Throwable> onErrorConsumer = (ignore) -> {}; + + public Builder<WorkerType> setOnPreviousAttemptWorkersRecoveredConsumer( Review comment: That's just syntactic sugar, but: I always like having a static `builder()` method in the parent class instead of a public `Builder` constructor. This way, you won't need to use `new` when instantiating the `Builder` (like it's done [here](https://github.com/apache/flink/blob/master/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/TestRestServerEndpoint.java#L39-L53)). But that's just a suggestion... ########## File path: flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriverTest.java ########## @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import org.apache.flink.kubernetes.configuration.KubernetesResourceManagerDriverConfiguration; +import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient.PodCallbackHandler; +import org.apache.flink.kubernetes.kubeclient.TestingFlinkKubeClient; +import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod; +import org.apache.flink.kubernetes.kubeclient.resources.TestingKubernetesPod; +import org.apache.flink.kubernetes.utils.Constants; +import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.resourcemanager.active.ResourceManagerDriver; +import org.apache.flink.runtime.resourcemanager.active.ResourceManagerDriverTestBase; + +import io.fabric8.kubernetes.api.model.ResourceRequirements; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; + +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; + +/** + * Tests for {@link KubernetesResourceManagerDriver}. + */ +public class KubernetesResourceManagerDriverTest extends ResourceManagerDriverTestBase<KubernetesWorkerNode> { + + private final static String CLUSTER_ID = "testing-flink-cluster"; + private final static Time POD_CREATION_INTERVAL = Time.milliseconds(50L); + private final static KubernetesResourceManagerDriverConfiguration KUBERNETES_RESOURCE_MANAGER_CONFIGURATION = + new KubernetesResourceManagerDriverConfiguration(CLUSTER_ID, POD_CREATION_INTERVAL); + + @Test + public void testOnPodAdded() throws Exception { + new Context() {{ + final CompletableFuture<KubernetesPod> createPodFuture = new CompletableFuture<>(); + final CompletableFuture<KubernetesWorkerNode> requestResourceFuture = new CompletableFuture<>(); + + flinkKubeClientBuilder.setCreateTaskManagerPodFunction((pod) -> { + createPodFuture.complete(pod); + return FutureUtils.completedVoidFuture(); + }); + + runTest(() -> { + // request new pod + runInMainThread(() -> getDriver().requestResource(TASK_EXECUTOR_PROCESS_SPEC).thenAccept(requestResourceFuture::complete)); + final KubernetesPod pod = createPodFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS); + + // prepare validation: + // - complete requestResourceFuture in main thread with correct KubernetesWorkerNode + final CompletableFuture<Void> validationFuture = requestResourceFuture.thenAccept((workerNode) -> { + validateInMainThread(); + assertThat(workerNode.getResourceID().toString(), is(pod.getName())); + }); + + // send onAdded event + getPodCallbackHandler().onAdded(Collections.singletonList(pod)); + + // make sure finishing validation + validationFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS); + }); + }}; + } + + @Test + public void testOnPodModified() throws Exception { + new Context() {{ + testOnPodTerminated((pod) -> getPodCallbackHandler().onModified(pod)); + }}; + } + + @Test + public void testOnPodDeleted() throws Exception { + new Context() {{ + testOnPodTerminated((pod) -> getPodCallbackHandler().onDeleted(pod)); + }}; + } + + @Test + public void testOnError() throws Exception { + new Context() {{ + testOnPodTerminated((pod) -> getPodCallbackHandler().onError(pod)); + }}; + } + + @Test + public void testFatalHandleError() throws Exception { + new Context() {{ + final CompletableFuture<Throwable> onErrorFuture = new CompletableFuture<>(); + resourceEventHandlerBuilder.setOnErrorConsumer(onErrorFuture::complete); + + runTest(() -> { + final Throwable testingError = new Throwable("testing error"); + getPodCallbackHandler().handleFatalError(testingError); + assertThat(onErrorFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), is(testingError)); + }); + }}; + } + + @Test + public void testPodCreationInterval() throws Exception { + new Context() {{ + final AtomicInteger createPodCount = new AtomicInteger(0); + final List<CompletableFuture<Long>> createPodTimeFutures = new ArrayList<>(); + createPodTimeFutures.add(new CompletableFuture<>()); + createPodTimeFutures.add(new CompletableFuture<>()); + + flinkKubeClientBuilder.setCreateTaskManagerPodFunction((ignore) -> { + int idx = createPodCount.getAndIncrement(); + if (idx < createPodTimeFutures.size()) { + createPodTimeFutures.get(idx).complete(System.currentTimeMillis()); + } + return FutureUtils.completedExceptionally(new Throwable("testing error")); + }); + + runTest(() -> { + // re-request resource on pod creation failed + runInMainThread(() -> getDriver().requestResource(TASK_EXECUTOR_PROCESS_SPEC) + .whenComplete((ignore1, ignore2) -> getDriver().requestResource(TASK_EXECUTOR_PROCESS_SPEC))); + + // validate trying creating pod twice, with proper interval + long t1 = createPodTimeFutures.get(0).get(TIMEOUT_SEC, TimeUnit.SECONDS); + long t2 = createPodTimeFutures.get(1).get(TIMEOUT_SEC, TimeUnit.SECONDS); + assertThat((t2 - t1), greaterThanOrEqualTo(POD_CREATION_INTERVAL.toMilliseconds())); + }); + }}; + } + + @Override + protected ResourceManagerDriverTestBase<KubernetesWorkerNode>.Context createContext() { + return new Context(); + } + + private class Context extends ResourceManagerDriverTestBase<KubernetesWorkerNode>.Context { + private final KubernetesPod PREVIOUS_ATTEMPT_POD = new TestingKubernetesPod(CLUSTER_ID + "-taskmanager-1-1"); Review comment: Checkstyle: non-`static` members should be camel-cased instead of snake-cased. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ResourceManagerDriverTestBase.java ########## @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.resourcemanager.active; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec; +import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils; +import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; +import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter; +import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec; +import org.apache.flink.util.TestLogger; +import org.apache.flink.util.function.RunnableWithException; + +import org.junit.Test; + +import java.util.Collection; +import java.util.Collections; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; + +/** + * Common test cases for implementations of {@link ResourceManagerDriver}. + */ +public abstract class ResourceManagerDriverTestBase<WorkerType extends ResourceIDRetrievable> extends TestLogger { + + protected static final long TIMEOUT_SEC = 5L; + + protected static final TaskExecutorProcessSpec TASK_EXECUTOR_PROCESS_SPEC = TaskExecutorProcessUtils + .processSpecFromWorkerResourceSpec(new Configuration(), WorkerResourceSpec.ZERO); + + private static final String MAIN_THREAD_NAME = "testing-rpc-main-thread"; + private static final ScheduledExecutor MAIN_THREAD_EXECUTOR = + new ScheduledExecutorServiceAdapter(Executors.newSingleThreadScheduledExecutor(runnable -> new Thread(runnable, MAIN_THREAD_NAME))); + + @Test + public void testInitialize() throws Exception { + final Context context = createContext(); + context.runTest(context::validateInitialization); + } + + @Test + public void testRecoverPreviousAttemptWorkers() throws Exception { + final CompletableFuture<Collection<WorkerType>> recoveredWorkersFuture = new CompletableFuture<>(); + final Context context = createContext(); + context.resourceEventHandlerBuilder.setOnPreviousAttemptWorkersRecoveredConsumer(recoveredWorkersFuture::complete); + context.preparePreviousAttemptWorkers(); + context.runTest(() -> context.validateWorkersRecoveredFromPreviousAttempt(recoveredWorkersFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS))); + } + + @Test + public void testTerminate() throws Exception { + final Context context = createContext(); + context.runTest(() -> { + context.getDriver().terminate(); + context.validateTermination(); + }); + } + + @Test + public void testDeregisterApplication() throws Exception { + final Context context = createContext(); + context.runTest(() -> { + context.getDriver().deregisterApplication(ApplicationStatus.SUCCEEDED, null); + context.validateDeregisterApplication(); + }); + } + + @Test + public void testRequestResource() throws Exception { + final Context context = createContext(); + context.runTest(() -> { + context.runInMainThread(() -> context.getDriver().requestResource(TASK_EXECUTOR_PROCESS_SPEC)); + context.validateRequestedResources(Collections.singleton(TASK_EXECUTOR_PROCESS_SPEC)); + }); + } + + @Test + public void testReleaseResource() throws Exception { + final CompletableFuture<WorkerType> requestResourceFuture = new CompletableFuture<>(); + final CompletableFuture<WorkerType> releaseResourceFuture = new CompletableFuture<>(); + final Context context = createContext(); + context.runTest(() -> { + context.runInMainThread(() -> context.getDriver() + .requestResource(TASK_EXECUTOR_PROCESS_SPEC) + .thenAccept(requestResourceFuture::complete)); + requestResourceFuture.thenApply((workerNode) -> + context.runInMainThread(() -> { + context.getDriver().releaseResource(workerNode); + releaseResourceFuture.complete(workerNode); + })); + final WorkerType worker = releaseResourceFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS); + context.validateReleaseResources(Collections.singleton(worker)); + }); + } + + protected abstract Context createContext(); + + protected abstract class Context { Review comment: Checkstyle: Missing JavaDoc ########## File path: flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/TestingFlinkKubeClient.java ########## @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.kubeclient; + +import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod; +import org.apache.flink.kubernetes.kubeclient.resources.KubernetesService; +import org.apache.flink.kubernetes.kubeclient.resources.KubernetesWatch; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.util.Preconditions; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; + +/** + * Testing implementation of {@link FlinkKubeClient}. + */ +public class TestingFlinkKubeClient implements FlinkKubeClient { + + private final Function<KubernetesPod, CompletableFuture<Void>> createTaskManagerPodFunction; + private final Function<String, CompletableFuture<Void>> stopPodFunction; + private final Consumer<String> stopAndCleanupClusterConsumer; + private final Function<Map<String, String>, List<KubernetesPod>> getPodsWithLabelsFunction; + private final BiFunction<Map<String, String>, PodCallbackHandler, KubernetesWatch> watchPodsAndDoCallbackFunction; + + private TestingFlinkKubeClient( + Function<KubernetesPod, CompletableFuture<Void>> createTaskManagerPodFunction, + Function<String, CompletableFuture<Void>> stopPodFunction, + Consumer<String> stopAndCleanupClusterConsumer, + Function<Map<String, String>, List<KubernetesPod>> getPodsWithLabelsFunction, + BiFunction<Map<String, String>, PodCallbackHandler, KubernetesWatch> watchPodsAndDoCallbackFunction) { + + this.createTaskManagerPodFunction = createTaskManagerPodFunction; + this.stopPodFunction = stopPodFunction; + this.stopAndCleanupClusterConsumer = stopAndCleanupClusterConsumer; + this.getPodsWithLabelsFunction = getPodsWithLabelsFunction; + this.watchPodsAndDoCallbackFunction = watchPodsAndDoCallbackFunction; + } + + @Override + public void createJobManagerComponent(KubernetesJobManagerSpecification kubernetesJMSpec) { + throw new UnsupportedOperationException(); + } + + @Override + public CompletableFuture<Void> createTaskManagerPod(KubernetesPod kubernetesPod) { + return createTaskManagerPodFunction.apply(kubernetesPod); + } + + @Override + public CompletableFuture<Void> stopPod(String podName) { + return stopPodFunction.apply(podName); + } + + @Override + public void stopAndCleanupCluster(String clusterId) { + stopAndCleanupClusterConsumer.accept(clusterId); + } + + @Override + public Optional<KubernetesService> getRestService(String clusterId) { + throw new UnsupportedOperationException(); + } + + @Override + public Optional<Endpoint> getRestEndpoint(String clusterId) { + throw new UnsupportedOperationException(); + } + + @Override + public List<KubernetesPod> getPodsWithLabels(Map<String, String> labels) { + return getPodsWithLabelsFunction.apply(labels); + } + + @Override + public void handleException(Exception e) { + throw new UnsupportedOperationException(); + } + + @Override + public KubernetesWatch watchPodsAndDoCallback(Map<String, String> labels, PodCallbackHandler podCallbackHandler) { + return watchPodsAndDoCallbackFunction.apply(labels, podCallbackHandler); + } + + @Override + public void close() throws Exception { + // noop + } + + public static class Builder { Review comment: CheckStyle: Missing JavaDoc ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ResourceManagerDriverTestBase.java ########## @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.resourcemanager.active; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec; +import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils; +import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; +import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter; +import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec; +import org.apache.flink.util.TestLogger; +import org.apache.flink.util.function.RunnableWithException; + +import org.junit.Test; + +import java.util.Collection; +import java.util.Collections; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; + +/** + * Common test cases for implementations of {@link ResourceManagerDriver}. + */ +public abstract class ResourceManagerDriverTestBase<WorkerType extends ResourceIDRetrievable> extends TestLogger { + + protected static final long TIMEOUT_SEC = 5L; + + protected static final TaskExecutorProcessSpec TASK_EXECUTOR_PROCESS_SPEC = TaskExecutorProcessUtils + .processSpecFromWorkerResourceSpec(new Configuration(), WorkerResourceSpec.ZERO); + + private static final String MAIN_THREAD_NAME = "testing-rpc-main-thread"; + private static final ScheduledExecutor MAIN_THREAD_EXECUTOR = + new ScheduledExecutorServiceAdapter(Executors.newSingleThreadScheduledExecutor(runnable -> new Thread(runnable, MAIN_THREAD_NAME))); + + @Test + public void testInitialize() throws Exception { + final Context context = createContext(); + context.runTest(context::validateInitialization); + } + + @Test + public void testRecoverPreviousAttemptWorkers() throws Exception { + final CompletableFuture<Collection<WorkerType>> recoveredWorkersFuture = new CompletableFuture<>(); + final Context context = createContext(); + context.resourceEventHandlerBuilder.setOnPreviousAttemptWorkersRecoveredConsumer(recoveredWorkersFuture::complete); + context.preparePreviousAttemptWorkers(); + context.runTest(() -> context.validateWorkersRecoveredFromPreviousAttempt(recoveredWorkersFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS))); + } + + @Test + public void testTerminate() throws Exception { + final Context context = createContext(); + context.runTest(() -> { + context.getDriver().terminate(); + context.validateTermination(); + }); + } + + @Test + public void testDeregisterApplication() throws Exception { + final Context context = createContext(); + context.runTest(() -> { + context.getDriver().deregisterApplication(ApplicationStatus.SUCCEEDED, null); Review comment: Is it enough to test for `ApplicationStatus.SUCCEEDED` here? I'm asking since there are other values in the `ApplicationStatus` enum that are not tested. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerFactory.java ########## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.resourcemanager.active; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable; +import org.apache.flink.runtime.entrypoint.ClusterInformation; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.io.network.partition.ResourceManagerPartitionTrackerImpl; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory; +import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; + +import javax.annotation.Nullable; + +/** + * Factory class for creating {@link ActiveResourceManager} with various implementations of {@link ResourceManagerDriver}. + */ +public abstract class ActiveResourceManagerFactory<WorkerType extends ResourceIDRetrievable> + extends ResourceManagerFactory<WorkerType> { + + @Override + public ResourceManager<WorkerType> createResourceManager( + Configuration configuration, + ResourceID resourceId, + RpcService rpcService, + HighAvailabilityServices highAvailabilityServices, + HeartbeatServices heartbeatServices, + FatalErrorHandler fatalErrorHandler, + ClusterInformation clusterInformation, + @Nullable String webInterfaceUrl, + MetricRegistry metricRegistry, + String hostname) throws Exception { + return super.createResourceManager( + createActiveResourceManagerConfiguration(configuration), + resourceId, + rpcService, + highAvailabilityServices, + heartbeatServices, + fatalErrorHandler, + clusterInformation, + webInterfaceUrl, + metricRegistry, + hostname); + } + + private Configuration createActiveResourceManagerConfiguration(Configuration originalConfiguration) { + final Configuration copiedConfig = new Configuration(originalConfiguration); + // In active mode, it's depend on the ResourceManager to set the ResourceID of TaskManagers. Review comment: ```suggestion // In active mode, it depends on the ResourceManager to set the ResourceID of TaskManagers. ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
