This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 460e179 [FLINK-20798][k8s] Use namespaced kubernetes client when
creating FlinkKubeClient
460e179 is described below
commit 460e1799d2b9069835ecae295c75f571c7d6b65a
Author: wangyang0918 <[email protected]>
AuthorDate: Wed Jan 6 20:24:11 2021 +0800
[FLINK-20798][k8s] Use namespaced kubernetes client when creating
FlinkKubeClient
After using namespaced kubernetes client, we will not need to always set
the namespace when creating kubernetes resources(e.g. deployment, pods,
configmap, watch, etc.).
Address comments: Update the unit test to verify the configmap watch is
created in appropriate namespace
This closes #14570.
---
.../kubeclient/DefaultKubeClientFactory.java | 8 +++-
.../kubeclient/Fabric8FlinkKubeClient.java | 55 ++++------------------
.../resources/KubernetesLeaderElector.java | 3 +-
.../flink/kubernetes/KubernetesTestBase.java | 37 ++++++++++++++-
.../flink/kubernetes/MixedKubernetesServer.java | 17 +++++--
.../kubeclient/Fabric8FlinkKubeClientTest.java | 24 ++++++++++
.../kubeclient/TestingFlinkKubeClient.java | 5 +-
.../resources/NoOpWatchCallbackHandler.java | 55 ++++++++++++++++++++++
8 files changed, 147 insertions(+), 57 deletions(-)
diff --git
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/DefaultKubeClientFactory.java
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/DefaultKubeClientFactory.java
index 59f157a..e7b5759 100644
---
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/DefaultKubeClientFactory.java
+++
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/DefaultKubeClientFactory.java
@@ -25,8 +25,8 @@ import org.apache.flink.util.FileUtils;
import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
-import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientException;
+import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -83,7 +83,11 @@ public class DefaultKubeClientFactory implements
KubeClientFactory {
config = Config.autoConfigure(kubeContext);
}
- final KubernetesClient client = new DefaultKubernetesClient(config);
+ final String namespace =
flinkConfig.getString(KubernetesConfigOptions.NAMESPACE);
+ LOG.debug("Setting namespace of Kubernetes client to {}", namespace);
+ config.setNamespace(namespace);
+
+ final NamespacedKubernetesClient client = new
DefaultKubernetesClient(config);
return new Fabric8FlinkKubeClient(flinkConfig, client, () ->
ioExecutor);
}
diff --git
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java
index bd2943c..779b369 100644
---
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java
+++
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java
@@ -44,7 +44,6 @@ import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.Service;
import io.fabric8.kubernetes.api.model.ServicePort;
import io.fabric8.kubernetes.api.model.apps.Deployment;
-import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
import org.slf4j.Logger;
@@ -69,7 +68,7 @@ public class Fabric8FlinkKubeClient implements
FlinkKubeClient {
private static final Logger LOG =
LoggerFactory.getLogger(Fabric8FlinkKubeClient.class);
- private final KubernetesClient internalClient;
+ private final NamespacedKubernetesClient internalClient;
private final String clusterId;
private final String namespace;
private final int maxRetryAttempts;
@@ -78,7 +77,7 @@ public class Fabric8FlinkKubeClient implements
FlinkKubeClient {
public Fabric8FlinkKubeClient(
Configuration flinkConfig,
- KubernetesClient client,
+ NamespacedKubernetesClient client,
Supplier<Executor> asyncExecutorFactory) {
this.internalClient = checkNotNull(client);
this.clusterId =
checkNotNull(flinkConfig.getString(KubernetesConfigOptions.CLUSTER_ID));
@@ -100,19 +99,12 @@ public class Fabric8FlinkKubeClient implements
FlinkKubeClient {
// create Deployment
LOG.debug("Start to create deployment with spec {}",
deployment.getSpec().toString());
final Deployment createdDeployment =
- this.internalClient
- .apps()
- .deployments()
- .inNamespace(this.namespace)
- .create(deployment);
+ this.internalClient.apps().deployments().create(deployment);
// Note that we should use the uid of the created Deployment for the
OwnerReference.
setOwnerReference(createdDeployment, accompanyingResources);
- this.internalClient
- .resourceList(accompanyingResources)
- .inNamespace(this.namespace)
- .createOrReplace();
+
this.internalClient.resourceList(accompanyingResources).createOrReplace();
}
@Override
@@ -123,7 +115,6 @@ public class Fabric8FlinkKubeClient implements
FlinkKubeClient {
this.internalClient
.apps()
.deployments()
- .inNamespace(this.namespace)
.withName(KubernetesUtils.getDeploymentName(clusterId))
.get();
@@ -146,10 +137,7 @@ public class Fabric8FlinkKubeClient implements
FlinkKubeClient {
kubernetesPod.getInternalResource().getMetadata(),
kubernetesPod.getInternalResource().getSpec());
- this.internalClient
- .pods()
- .inNamespace(this.namespace)
- .create(kubernetesPod.getInternalResource());
+
this.internalClient.pods().create(kubernetesPod.getInternalResource());
},
kubeClientExecutorService);
}
@@ -201,7 +189,6 @@ public class Fabric8FlinkKubeClient implements
FlinkKubeClient {
this.internalClient
.apps()
.deployments()
- .inNamespace(this.namespace)
.withName(KubernetesUtils.getDeploymentName(clusterId))
.cascading(true)
.delete();
@@ -217,12 +204,7 @@ public class Fabric8FlinkKubeClient implements
FlinkKubeClient {
final String serviceName =
ExternalServiceDecorator.getExternalServiceName(clusterId);
final Service service =
- this.internalClient
- .services()
- .inNamespace(namespace)
- .withName(serviceName)
- .fromServer()
- .get();
+
this.internalClient.services().withName(serviceName).fromServer().get();
if (service == null) {
LOG.debug("Service {} does not exist", serviceName);
@@ -247,10 +229,7 @@ public class Fabric8FlinkKubeClient implements
FlinkKubeClient {
KubernetesLeaderElectionConfiguration leaderElectionConfiguration,
KubernetesLeaderElector.LeaderCallbackHandler
leaderCallbackHandler) {
return new KubernetesLeaderElector(
- (NamespacedKubernetesClient) this.internalClient,
- namespace,
- leaderElectionConfiguration,
- leaderCallbackHandler);
+ this.internalClient, leaderElectionConfiguration,
leaderCallbackHandler);
}
@Override
@@ -260,7 +239,6 @@ public class Fabric8FlinkKubeClient implements
FlinkKubeClient {
() ->
this.internalClient
.configMaps()
- .inNamespace(namespace)
.create(configMap.getInternalResource()),
kubeClientExecutorService)
.exceptionally(
@@ -274,8 +252,7 @@ public class Fabric8FlinkKubeClient implements
FlinkKubeClient {
@Override
public Optional<KubernetesConfigMap> getConfigMap(String name) {
- final ConfigMap configMap =
-
this.internalClient.configMaps().inNamespace(namespace).withName(name).get();
+ final ConfigMap configMap =
this.internalClient.configMaps().withName(name).get();
return configMap == null
? Optional.empty()
: Optional.of(new KubernetesConfigMap(configMap));
@@ -299,8 +276,6 @@ public class Fabric8FlinkKubeClient implements
FlinkKubeClient {
this
.internalClient
.configMaps()
-
.inNamespace(
-
namespace)
.withName(
configMapName)
.lockResourceVersion(
@@ -353,24 +328,14 @@ public class Fabric8FlinkKubeClient implements
FlinkKubeClient {
@Override
public CompletableFuture<Void> deleteConfigMapsByLabels(Map<String,
String> labels) {
return CompletableFuture.runAsync(
- () ->
- this.internalClient
- .configMaps()
- .inNamespace(namespace)
- .withLabels(labels)
- .delete(),
+ () ->
this.internalClient.configMaps().withLabels(labels).delete(),
kubeClientExecutorService);
}
@Override
public CompletableFuture<Void> deleteConfigMap(String configMapName) {
return CompletableFuture.runAsync(
- () ->
- this.internalClient
- .configMaps()
- .inNamespace(namespace)
- .withName(configMapName)
- .delete(),
+ () ->
this.internalClient.configMaps().withName(configMapName).delete(),
kubeClientExecutorService);
}
diff --git
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElector.java
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElector.java
index abdf0e2..82a0bf9 100644
---
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElector.java
+++
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElector.java
@@ -61,7 +61,6 @@ public class KubernetesLeaderElector {
public KubernetesLeaderElector(
NamespacedKubernetesClient kubernetesClient,
- String namespace,
KubernetesLeaderElectionConfiguration leaderConfig,
LeaderCallbackHandler leaderCallbackHandler) {
final LeaderElectionConfig leaderElectionConfig =
@@ -70,7 +69,7 @@ public class KubernetesLeaderElector {
.withLeaseDuration(leaderConfig.getLeaseDuration())
.withLock(
new ConfigMapLock(
- namespace,
+ kubernetesClient.getNamespace(),
leaderConfig.getConfigMapName(),
leaderConfig.getLockIdentity()))
.withRenewDeadline(leaderConfig.getRenewDeadline())
diff --git
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesTestBase.java
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesTestBase.java
index 7f452ee..4b9b26c 100644
---
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesTestBase.java
+++
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesTestBase.java
@@ -31,7 +31,12 @@ import
org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.util.TestLogger;
-import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.api.model.Config;
+import io.fabric8.kubernetes.api.model.ConfigBuilder;
+import io.fabric8.kubernetes.api.model.NamedClusterBuilder;
+import io.fabric8.kubernetes.api.model.NamedContextBuilder;
+import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
+import io.fabric8.kubernetes.client.utils.Serialization;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
@@ -66,7 +71,7 @@ public class KubernetesTestBase extends TestLogger {
protected final Configuration flinkConfig = new Configuration();
- protected KubernetesClient kubeClient;
+ protected NamespacedKubernetesClient kubeClient;
protected FlinkKubeClient flinkKubeClient;
@@ -132,4 +137,32 @@ public class KubernetesTestBase extends TestLogger {
KubernetesTestUtils.createTemporyFile("some keytab", kerberosDir,
KEYTAB_FILE);
KubernetesTestUtils.createTemporyFile("some conf", kerberosDir,
KRB5_CONF_FILE);
}
+
+ protected String writeKubeConfigForMockKubernetesServer() throws Exception
{
+ final Config kubeConfig =
+ new ConfigBuilder()
+ .withApiVersion(server.getClient().getApiVersion())
+ .withClusters(
+ new NamedClusterBuilder()
+ .withName(CLUSTER_ID)
+ .withNewCluster()
+
.withNewServer(server.getClient().getMasterUrl().toString())
+ .withInsecureSkipTlsVerify(true)
+ .endCluster()
+ .build())
+ .withContexts(
+ new NamedContextBuilder()
+ .withName(CLUSTER_ID)
+ .withNewContext()
+ .withCluster(CLUSTER_ID)
+ .withUser(
+
server.getClient().getConfiguration().getUsername())
+ .endContext()
+ .build())
+ .withNewCurrentContext(CLUSTER_ID)
+ .build();
+ final File kubeConfigFile = new
File(temporaryFolder.newFolder(".kube"), "config");
+ Serialization.yamlMapper().writeValue(kubeConfigFile, kubeConfig);
+ return kubeConfigFile.getAbsolutePath();
+ }
}
diff --git
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/MixedKubernetesServer.java
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/MixedKubernetesServer.java
index 57b9a9d..acdf697 100644
---
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/MixedKubernetesServer.java
+++
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/MixedKubernetesServer.java
@@ -25,23 +25,28 @@ import io.fabric8.mockwebserver.ServerRequest;
import io.fabric8.mockwebserver.ServerResponse;
import io.fabric8.mockwebserver.dsl.MockServerExpectation;
import okhttp3.mockwebserver.MockWebServer;
+import okhttp3.mockwebserver.RecordedRequest;
import org.junit.rules.ExternalResource;
import java.util.HashMap;
import java.util.Queue;
+import java.util.concurrent.TimeUnit;
/** The mock server that host MixedDispatcher. */
public class MixedKubernetesServer extends ExternalResource {
private KubernetesMockServer mock;
private NamespacedKubernetesClient client;
- private boolean https;
+ private final boolean https;
- private boolean crudMode;
+ private final boolean crudMode;
+
+ private final MockWebServer mockWebServer;
public MixedKubernetesServer(boolean https, boolean crudMode) {
this.https = https;
this.crudMode = crudMode;
+ mockWebServer = new MockWebServer();
}
public void before() {
@@ -50,11 +55,11 @@ public class MixedKubernetesServer extends ExternalResource
{
crudMode
? new KubernetesMockServer(
new Context(),
- new MockWebServer(),
+ mockWebServer,
response,
new MixedDispatcher(response),
true)
- : new KubernetesMockServer(https);
+ : new KubernetesMockServer(mockWebServer, response,
https);
mock.init();
client = mock.createClient();
}
@@ -68,6 +73,10 @@ public class MixedKubernetesServer extends ExternalResource {
return client;
}
+ public RecordedRequest takeRequest(long timeout, TimeUnit unit) throws
Exception {
+ return mockWebServer.takeRequest(timeout, unit);
+ }
+
public MockServerExpectation expect() {
return mock.expect();
}
diff --git
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java
index 3c49e03..b0195ef 100644
---
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java
+++
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java
@@ -33,6 +33,8 @@ import
org.apache.flink.kubernetes.kubeclient.factory.KubernetesJobManagerFactor
import
org.apache.flink.kubernetes.kubeclient.parameters.KubernetesJobManagerParameters;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod;
+import
org.apache.flink.kubernetes.kubeclient.resources.NoOpWatchCallbackHandler;
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
import io.fabric8.kubernetes.api.model.ConfigMap;
import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
@@ -42,6 +44,7 @@ import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodBuilder;
import io.fabric8.kubernetes.api.model.Service;
import io.fabric8.kubernetes.api.model.apps.Deployment;
+import okhttp3.mockwebserver.RecordedRequest;
import org.junit.Test;
import java.util.HashMap;
@@ -50,6 +53,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static
org.apache.flink.kubernetes.utils.Constants.CONFIG_FILE_LOG4J_NAME;
@@ -64,6 +68,7 @@ import static org.junit.Assert.fail;
/** Tests for Fabric implementation of {@link FlinkKubeClient}. */
public class Fabric8FlinkKubeClientTest extends KubernetesClientTestBase {
+ private static final long TIMEOUT = 10 * 1000;
private static final int RPC_PORT = 7123;
private static final int BLOB_SERVER_PORT = 8346;
@@ -456,6 +461,25 @@ public class Fabric8FlinkKubeClientTest extends
KubernetesClientTestBase {
}
}
+ @Test
+ public void testWatchConfigMaps() throws Exception {
+ final String kubeConfigFile = writeKubeConfigForMockKubernetesServer();
+ flinkConfig.set(KubernetesConfigOptions.KUBE_CONFIG_FILE,
kubeConfigFile);
+
+ final FlinkKubeClient realFlinkKubeClient =
+
DefaultKubeClientFactory.getInstance().fromConfiguration(flinkConfig);
+ realFlinkKubeClient.watchConfigMaps(CLUSTER_ID, new
NoOpWatchCallbackHandler<>());
+ final String path =
+ "/api/v1/namespaces/"
+ + NAMESPACE
+ + "/configmaps?fieldSelector=metadata.name%3D"
+ + CLUSTER_ID
+ + "&watch=true";
+ final RecordedRequest watchRequest = server.takeRequest(TIMEOUT,
TimeUnit.MILLISECONDS);
+ assertThat(watchRequest.getPath(), is(path));
+ assertThat(watchRequest.getMethod(),
is(HttpMethodWrapper.GET.toString()));
+ }
+
private KubernetesConfigMap buildTestingConfigMap() {
final Map<String, String> data = new HashMap<>();
data.put(TESTING_CONFIG_MAP_KEY, TESTING_CONFIG_MAP_VALUE);
diff --git
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/TestingFlinkKubeClient.java
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/TestingFlinkKubeClient.java
index 5879dcd..c046613 100644
---
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/TestingFlinkKubeClient.java
+++
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/TestingFlinkKubeClient.java
@@ -27,6 +27,8 @@ import
org.apache.flink.kubernetes.kubeclient.resources.KubernetesWatch;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.util.Preconditions;
+import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
+
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -413,12 +415,11 @@ public class TestingFlinkKubeClient implements
FlinkKubeClient {
/** Testing implementation of {@link KubernetesLeaderElector}. */
public static class TestingKubernetesLeaderElector extends
KubernetesLeaderElector {
- private static final String NAMESPACE = "test";
public TestingKubernetesLeaderElector(
KubernetesLeaderElectionConfiguration leaderConfig,
LeaderCallbackHandler leaderCallbackHandler) {
- super(null, NAMESPACE, leaderConfig, leaderCallbackHandler);
+ super(new KubernetesMockServer().createClient(), leaderConfig,
leaderCallbackHandler);
}
@Override
diff --git
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/NoOpWatchCallbackHandler.java
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/NoOpWatchCallbackHandler.java
new file mode 100644
index 0000000..6e64a4d
--- /dev/null
+++
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/NoOpWatchCallbackHandler.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient.resources;
+
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+
+import java.util.List;
+
+/**
+ * Empty implementation of {@link FlinkKubeClient.WatchCallbackHandler}.
+ *
+ * @param <T> Type of resource to be watched
+ */
+public class NoOpWatchCallbackHandler<T> implements
FlinkKubeClient.WatchCallbackHandler<T> {
+ @Override
+ public void onAdded(List<T> resources) {
+ // noop
+ }
+
+ @Override
+ public void onModified(List<T> resources) {
+ // noop
+ }
+
+ @Override
+ public void onDeleted(List<T> resources) {
+ // noop
+ }
+
+ @Override
+ public void onError(List<T> resources) {
+ // noop
+ }
+
+ @Override
+ public void handleFatalError(Throwable throwable) {
+ // noop
+ }
+}