This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch 6.0
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git
The following commit(s) were added to refs/heads/6.0 by this push:
new becf708 Add kubernetes as a new cluster manager.
new 58b83e8 Merge remote-tracking branch 'origin/6.0' into 6.0
becf708 is described below
commit becf708073b81e7358ec9dabc5e5b0a9b86d12c8
Author: Gao Hongtao <[email protected]>
AuthorDate: Thu Jul 26 00:01:26 2018 +0800
Add kubernetes as a new cluster manager.
# 1447
Use `GET /api/v1/watch/namespaces/{namespace}/pods` api to watch pod's
containerIp which help collector containers to discovery each other.
---
oap-server/pom.xml | 7 ++
.../{ => cluster-kubernetes-plugin}/pom.xml | 23 +++--
.../kubernetes/ClusterModuleKubernetesConfig.java | 65 ++++++++++++
.../ClusterModuleKubernetesProvider.java | 76 ++++++++++++++
.../server/cluster/plugin/kubernetes/Event.java | 48 +++++++++
.../plugin/kubernetes/KubernetesCoordinator.java | 113 +++++++++++++++++++++
.../cluster/plugin/kubernetes/ReusableWatch.java | 33 ++++++
.../dependencies/NamespacedPodListWatch.java | 88 ++++++++++++++++
.../kubernetes/dependencies/UidEnvSupplier.java | 38 +++++++
...alking.oap.server.library.module.ModuleProvider | 19 ++++
.../ClusterModuleKubernetesProviderTest.java | 65 ++++++++++++
.../kubernetes/KubernetesCoordinatorTest.java | 72 +++++++++++++
.../plugin/kubernetes/fixture/PlainWatch.java | 87 ++++++++++++++++
.../src/test/resources/log4j2.xml | 31 ++++++
oap-server/server-cluster-plugin/pom.xml | 1 +
.../oap/server/core/cluster/RemoteInstance.java | 10 ++
.../src/main/resources/application.yml | 11 +-
17 files changed, 773 insertions(+), 14 deletions(-)
diff --git a/oap-server/pom.xml b/oap-server/pom.xml
index 6aeaac0..4c57bdc 100644
--- a/oap-server/pom.xml
+++ b/oap-server/pom.xml
@@ -54,6 +54,8 @@
<shardingjdbc.version>2.0.3</shardingjdbc.version>
<commons-dbcp.version>1.4</commons-dbcp.version>
<elasticsearch.version>6.3.1</elasticsearch.version>
+ <joda-time.version>2.9.9</joda-time.version>
+ <kubernetes.version>2.0.0</kubernetes.version>
</properties>
<dependencies>
@@ -224,6 +226,11 @@
<artifactId>commons-dbcp</artifactId>
<version>${commons-dbcp.version}</version>
</dependency>
+ <dependency>
+ <groupId>io.kubernetes</groupId>
+ <artifactId>client-java</artifactId>
+ <version>${kubernetes.version}</version>
+ </dependency>
</dependencies>
</dependencyManagement>
</project>
\ No newline at end of file
diff --git a/oap-server/server-cluster-plugin/pom.xml
b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/pom.xml
similarity index 75%
copy from oap-server/server-cluster-plugin/pom.xml
copy to oap-server/server-cluster-plugin/cluster-kubernetes-plugin/pom.xml
index 408fca6..3a507a1 100644
--- a/oap-server/server-cluster-plugin/pom.xml
+++ b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/pom.xml
@@ -21,29 +21,30 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
- <artifactId>oap-server</artifactId>
+ <artifactId>server-cluster-plugin</artifactId>
<groupId>org.apache.skywalking</groupId>
<version>6.0.0-alpha-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
- <artifactId>server-cluster-plugin</artifactId>
- <packaging>pom</packaging>
- <modules>
- <module>cluster-zookeeper-plugin</module>
- <module>cluster-standalone-plugin</module>
- </modules>
+ <artifactId>cluster-kubernetes-plugin</artifactId>
+ <packaging>jar</packaging>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ </properties>
<dependencies>
<dependency>
<groupId>org.apache.skywalking</groupId>
- <artifactId>library-module</artifactId>
+ <artifactId>server-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
- <groupId>org.apache.skywalking</groupId>
- <artifactId>library-util</artifactId>
- <version>${project.version}</version>
+ <groupId>io.kubernetes</groupId>
+ <artifactId>client-java</artifactId>
</dependency>
</dependencies>
+
+
</project>
\ No newline at end of file
diff --git
a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/ClusterModuleKubernetesConfig.java
b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/ClusterModuleKubernetesConfig.java
new file mode 100644
index 0000000..84ca8bc
--- /dev/null
+++
b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/ClusterModuleKubernetesConfig.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.cluster.plugin.kubernetes;
+
+import org.apache.skywalking.oap.server.library.module.ModuleConfig;
+
+/**
+ * The configuration of the module of cluster.kubernetes
+ *
+ * @author gaohongtao
+ */
+public class ClusterModuleKubernetesConfig extends ModuleConfig {
+ private int watchTimeoutSeconds;
+ private String namespace;
+ private String labelSelector;
+ private String uidEnvName;
+
+ public int getWatchTimeoutSeconds() {
+ return watchTimeoutSeconds;
+ }
+
+ public void setWatchTimeoutSeconds(int watchTimeoutSeconds) {
+ this.watchTimeoutSeconds = watchTimeoutSeconds;
+ }
+
+ public String getNamespace() {
+ return namespace;
+ }
+
+ public void setNamespace(String namespace) {
+ this.namespace = namespace;
+ }
+
+ public String getLabelSelector() {
+ return labelSelector;
+ }
+
+ public void setLabelSelector(String labelSelector) {
+ this.labelSelector = labelSelector;
+ }
+
+ public String getUidEnvName() {
+ return uidEnvName;
+ }
+
+ public void setUidEnvName(String uidEnvName) {
+ this.uidEnvName = uidEnvName;
+ }
+}
diff --git
a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/ClusterModuleKubernetesProvider.java
b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/ClusterModuleKubernetesProvider.java
new file mode 100644
index 0000000..659e2ab
--- /dev/null
+++
b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/ClusterModuleKubernetesProvider.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.cluster.plugin.kubernetes;
+
+import
org.apache.skywalking.oap.server.cluster.plugin.kubernetes.dependencies.NamespacedPodListWatch;
+import
org.apache.skywalking.oap.server.cluster.plugin.kubernetes.dependencies.UidEnvSupplier;
+import org.apache.skywalking.oap.server.core.cluster.ClusterModule;
+import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery;
+import org.apache.skywalking.oap.server.core.cluster.ClusterRegister;
+import org.apache.skywalking.oap.server.library.module.ModuleConfig;
+import org.apache.skywalking.oap.server.library.module.ModuleDefine;
+import org.apache.skywalking.oap.server.library.module.ModuleProvider;
+import
org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
+
+/**
+ * Use kubernetes to manage all instances in Skywalking cluster.
+ *
+ * @author gaohongtao
+ */
+public class ClusterModuleKubernetesProvider extends ModuleProvider {
+
+ private final ClusterModuleKubernetesConfig config;
+
+ public ClusterModuleKubernetesProvider() {
+ super();
+ this.config = new ClusterModuleKubernetesConfig();
+ }
+
+ @Override public String name() {
+ return "kubernetes";
+ }
+
+ @Override public Class<? extends ModuleDefine> module() {
+ return ClusterModule.class;
+ }
+
+ @Override public ModuleConfig createConfigBeanIfAbsent() {
+ return config;
+ }
+
+ @Override public void prepare() throws ServiceNotProvidedException {
+ KubernetesCoordinator coordinator = new KubernetesCoordinator(
+ new NamespacedPodListWatch(config.getNamespace(),
config.getLabelSelector(), config.getWatchTimeoutSeconds()),
+ new UidEnvSupplier(config.getUidEnvName()));
+ this.registerServiceImplementation(ClusterRegister.class, coordinator);
+ this.registerServiceImplementation(ClusterNodesQuery.class,
coordinator);
+ }
+
+ @Override public void start() {
+
+ }
+
+ @Override public void notifyAfterCompleted() {
+
+ }
+
+ @Override public String[] requiredModules() {
+ return new String[0];
+ }
+}
diff --git
a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/Event.java
b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/Event.java
new file mode 100644
index 0000000..35b6f06
--- /dev/null
+++
b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/Event.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.cluster.plugin.kubernetes;
+
+/**
+ * The event of watch.
+ *
+ * @author gaohongtao
+ */
+public class Event {
+ private final String type;
+ private final String uid;
+ private final String host;
+
+ public Event(final String type, final String uid, final String host) {
+ this.type = type;
+ this.uid = uid;
+ this.host = host;
+ }
+
+ String getType() {
+ return type;
+ }
+
+ String getUid() {
+ return uid;
+ }
+
+ String getHost() {
+ return host;
+ }
+}
diff --git
a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java
b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java
new file mode 100644
index 0000000..ca3446c
--- /dev/null
+++
b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.cluster.plugin.kubernetes;
+
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.function.Supplier;
+import javax.annotation.Nullable;
+import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery;
+import org.apache.skywalking.oap.server.core.cluster.ClusterRegister;
+import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
+import org.apache.skywalking.oap.server.core.cluster.ServiceRegisterException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Read collector pod info from api-server of kubernetes, then using all
containerIp list to
+ * construct the list of {@link RemoteInstance}.
+ *
+ * @author gaohongtao
+ */
+public class KubernetesCoordinator implements ClusterRegister,
ClusterNodesQuery {
+
+ private static final Logger logger =
LoggerFactory.getLogger(KubernetesCoordinator.class);
+
+ private final String uid;
+
+ private final Map<String, RemoteInstance> cache = new
ConcurrentHashMap<>();
+
+ private final ReusableWatch<Event> watch;
+
+ private int port;
+
+ KubernetesCoordinator(final ReusableWatch<Event> watch, final
Supplier<String> uidSupplier) {
+ this.watch = watch;
+ this.uid = uidSupplier.get();
+ }
+
+ @Override public void registerRemote(RemoteInstance remoteInstance) throws
ServiceRegisterException {
+ this.port = remoteInstance.getPort();
+
submitTask(MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor(new
ThreadFactoryBuilder()
+
.setDaemon(true).setNameFormat("Kubernetes-ApiServer-%s").build())));
+ }
+
+ private void submitTask(final ListeningExecutorService service) {
+ watch.initOrReset();
+ ListenableFuture<?> watchFuture = service.submit(newWatch());
+ Futures.addCallback(watchFuture, new FutureCallback<Object>() {
+ @Override public void onSuccess(@Nullable Object ignored) {
+ submitTask(service);
+ }
+
+ @Override public void onFailure(@Nullable Throwable throwable) {
+ logger.debug("Generate remote nodes error", throwable);
+ submitTask(service);
+ }
+ });
+ }
+
+ private Callable<Object> newWatch() {
+ return () -> {
+ generateRemoteNodes();
+ return null;
+ };
+ }
+
+ private void generateRemoteNodes() {
+ for (Event event : watch) {
+ logger.debug("Received event {} {}-{}", event.getType(),
event.getUid(), event.getHost());
+ switch (event.getType()) {
+ case "ADDED":
+ case "MODIFIED":
+ cache.put(event.getUid(), new
RemoteInstance(event.getHost(), port, event.getUid().equals(this.uid)));
+ break;
+ case "DELETED":
+ cache.remove(event.getUid());
+ break;
+ default:
+ throw new RuntimeException(String.format("Unknown event
%s", event.getType()));
+ }
+ }
+ }
+
+ @Override public List<RemoteInstance> queryRemoteNodes() {
+ return Lists.newArrayList(cache.values());
+ }
+}
diff --git
a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/ReusableWatch.java
b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/ReusableWatch.java
new file mode 100644
index 0000000..bbe5ea9
--- /dev/null
+++
b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/ReusableWatch.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.cluster.plugin.kubernetes;
+
+/**
+ * This watch can init or reset internal state.
+ *
+ * @param <T> event of watch
+ * @author gaohongtao
+ */
+public interface ReusableWatch<T> extends Iterable<T> {
+
+ /**
+ * Reset internal state.
+ */
+ void initOrReset();
+}
diff --git
a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/dependencies/NamespacedPodListWatch.java
b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/dependencies/NamespacedPodListWatch.java
new file mode 100644
index 0000000..915bf86
--- /dev/null
+++
b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/dependencies/NamespacedPodListWatch.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package
org.apache.skywalking.oap.server.cluster.plugin.kubernetes.dependencies;
+
+import com.google.common.collect.Iterators;
+import com.google.common.reflect.TypeToken;
+import io.kubernetes.client.ApiClient;
+import io.kubernetes.client.ApiException;
+import io.kubernetes.client.Configuration;
+import io.kubernetes.client.apis.CoreV1Api;
+import io.kubernetes.client.models.V1Pod;
+import io.kubernetes.client.util.Config;
+import io.kubernetes.client.util.Watch;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.concurrent.TimeUnit;
+import org.apache.skywalking.oap.server.cluster.plugin.kubernetes.Event;
+import
org.apache.skywalking.oap.server.cluster.plugin.kubernetes.ReusableWatch;
+
+/**
+ * Watch the api {@literal
https://v1-9.docs.kubernetes.io/docs/reference/generated/kubernetes-api/v1.9/#watch-64}.
+ *
+ * @author gaohongtao
+ */
+public class NamespacedPodListWatch implements ReusableWatch<Event> {
+
+ private final CoreV1Api api = new CoreV1Api();
+
+ private final String namespace;
+
+ private final String labelSelector;
+
+ private final int watchTimeoutSeconds;
+
+ private Watch<V1Pod> watch;
+
+ public NamespacedPodListWatch(final String namespace, final String
labelSelector, final int watchTimeoutSeconds) {
+ this.namespace = namespace;
+ this.labelSelector = labelSelector;
+ this.watchTimeoutSeconds = watchTimeoutSeconds;
+ }
+
+ @Override public void initOrReset() {
+ ApiClient client;
+ try {
+ client = Config.defaultClient();
+ } catch (IOException e) {
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ client.getHttpClient().setReadTimeout(watchTimeoutSeconds,
TimeUnit.SECONDS);
+ Configuration.setDefaultApiClient(client);
+ try {
+ watch = Watch.createWatch(
+ client,
+ api.listNamespacedPodCall(namespace, null, null, null,
+ null, labelSelector, Integer.MAX_VALUE,null,null,
Boolean.TRUE,
+ null, null),
+ new TypeToken<Watch.Response<V1Pod>>() { }.getType());
+ } catch (ApiException e) {
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ }
+
+ @Override public Iterator<Event> iterator() {
+ return Iterators.transform(watch.iterator(), response -> {
+ if (response == null) {
+ throw new NullPointerException("Original event is null");
+ }
+ return new Event(response.type,
response.object.getMetadata().getUid(), response.object.getStatus().getPodIP());
+ });
+ }
+}
diff --git
a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/dependencies/UidEnvSupplier.java
b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/dependencies/UidEnvSupplier.java
new file mode 100644
index 0000000..9512c1f
--- /dev/null
+++
b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/dependencies/UidEnvSupplier.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package
org.apache.skywalking.oap.server.cluster.plugin.kubernetes.dependencies;
+
+import java.util.function.Supplier;
+
+/**
+ * Supply uid from environment variable.
+ *
+ * @author gaohongtao
+ */
+public class UidEnvSupplier implements Supplier<String> {
+
+ private final String uidEnvName;
+
+ public UidEnvSupplier(final String uidEnvName) {
+ this.uidEnvName = uidEnvName == null ? "" : uidEnvName;
+ }
+ @Override public String get() {
+ return System.getenv(uidEnvName);
+ }
+}
diff --git
a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/resources/services/org.apache.skywalking.oap.server.library.module.ModuleProvider
b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/resources/services/org.apache.skywalking.oap.server.library.module.ModuleProvider
new file mode 100644
index 0000000..81818d4
--- /dev/null
+++
b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/resources/services/org.apache.skywalking.oap.server.library.module.ModuleProvider
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+
+org.apache.skywalking.oap.server.cluster.plugin.kubernetes.ClusterModuleKubernetesProvider
\ No newline at end of file
diff --git
a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/ClusterModuleKubernetesProviderTest.java
b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/ClusterModuleKubernetesProviderTest.java
new file mode 100644
index 0000000..1ba4bed
--- /dev/null
+++
b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/ClusterModuleKubernetesProviderTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.cluster.plugin.kubernetes;
+
+import org.apache.skywalking.oap.server.core.cluster.ClusterModule;
+import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery;
+import org.apache.skywalking.oap.server.core.cluster.ClusterRegister;
+import
org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+public class ClusterModuleKubernetesProviderTest {
+
+ private ClusterModuleKubernetesProvider provider;
+
+ @Before
+ public void setUp() {
+ provider = new ClusterModuleKubernetesProvider();
+ }
+
+ @Test
+ public void assertName() {
+ assertThat(provider.name(), is("kubernetes"));
+ }
+
+ @Test
+ public void assertModule() {
+ assertTrue(provider.module().isAssignableFrom(ClusterModule.class));
+ }
+
+ @Test
+ public void assertCreateConfigBeanIfAbsent() {
+
assertTrue(ClusterModuleKubernetesConfig.class.isInstance(provider.createConfigBeanIfAbsent()));
+ }
+
+ @Test
+ public void assertPrepare() throws ServiceNotProvidedException {
+ provider.prepare();
+ ClusterRegister register = provider.getService(ClusterRegister.class);
+ ClusterNodesQuery query = provider.getService(ClusterNodesQuery.class);
+ assertSame(register, query);
+ assertTrue(KubernetesCoordinator.class.isInstance(register));
+ }
+}
\ No newline at end of file
diff --git
a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinatorTest.java
b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinatorTest.java
new file mode 100644
index 0000000..35c57f8
--- /dev/null
+++
b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinatorTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.cluster.plugin.kubernetes;
+
+import
org.apache.skywalking.oap.server.cluster.plugin.kubernetes.fixture.PlainWatch;
+import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
+import org.junit.Test;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+public class KubernetesCoordinatorTest {
+
+ private KubernetesCoordinator coordinator;
+
+
+ @Test
+ public void assertAdded() throws InterruptedException {
+ PlainWatch watch = PlainWatch.create(2, "ADDED", "1", "10.0.0.1",
"ADDED", "2", "10.0.0.2");
+ coordinator = new KubernetesCoordinator(watch, () -> "1");
+ coordinator.registerRemote(new RemoteInstance("0.0.0.0", 8454, true));
+ watch.await();
+ assertThat(coordinator.queryRemoteNodes().size(), is(2));
+
assertThat(coordinator.queryRemoteNodes().stream().filter(RemoteInstance::isSelf).findFirst().get().getHost(),
is("10.0.0.1"));
+ }
+
+ @Test
+ public void assertModified() throws InterruptedException {
+ PlainWatch watch = PlainWatch.create(3, "ADDED", "1", "10.0.0.1",
"ADDED", "2", "10.0.0.2", "MODIFIED", "1", "10.0.0.3");
+ coordinator = new KubernetesCoordinator(watch, () -> "1");
+ coordinator.registerRemote(new RemoteInstance("0.0.0.0", 8454, true));
+ watch.await();
+ assertThat(coordinator.queryRemoteNodes().size(), is(2));
+
assertThat(coordinator.queryRemoteNodes().stream().filter(RemoteInstance::isSelf).findFirst().get().getHost(),
is("10.0.0.3"));
+ }
+
+ @Test
+ public void assertDeleted() throws InterruptedException {
+ PlainWatch watch = PlainWatch.create(3, "ADDED", "1", "10.0.0.1",
"ADDED", "2", "10.0.0.2", "DELETED", "2", "10.0.0.2");
+ coordinator = new KubernetesCoordinator(watch, () -> "1");
+ coordinator.registerRemote(new RemoteInstance("0.0.0.0", 8454, true));
+ watch.await();
+ assertThat(coordinator.queryRemoteNodes().size(), is(1));
+
assertThat(coordinator.queryRemoteNodes().stream().filter(RemoteInstance::isSelf).findFirst().get().getHost(),
is("10.0.0.1"));
+ }
+
+ @Test
+ public void assertError() throws InterruptedException {
+ PlainWatch watch = PlainWatch.create(3, "ADDED", "1", "10.0.0.1",
"ERROR", "X", "10.0.0.2", "ADDED", "2", "10.0.0.2");
+ coordinator = new KubernetesCoordinator(watch, () -> "1");
+ coordinator.registerRemote(new RemoteInstance("0.0.0.0", 8454, true));
+ watch.await();
+ assertThat(coordinator.queryRemoteNodes().size(), is(2));
+
assertThat(coordinator.queryRemoteNodes().stream().filter(RemoteInstance::isSelf).findFirst().get().getHost(),
is("10.0.0.1"));
+ }
+}
\ No newline at end of file
diff --git
a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/fixture/PlainWatch.java
b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/fixture/PlainWatch.java
new file mode 100644
index 0000000..effab35
--- /dev/null
+++
b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/fixture/PlainWatch.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.cluster.plugin.kubernetes.fixture;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.CountDownLatch;
+import org.apache.skywalking.oap.server.cluster.plugin.kubernetes.Event;
+import
org.apache.skywalking.oap.server.cluster.plugin.kubernetes.ReusableWatch;
+
+public class PlainWatch implements ReusableWatch<Event> {
+
+ public static PlainWatch create(final int size, final String... args) {
+ List<Event> events = new ArrayList<>(args.length / 3);
+ for (int i = 0; i < args.length; i++) {
+ events.add(new Event(args[i++], args[i++], args[i]));
+ }
+ return new PlainWatch(events, size);
+ }
+
+ private final List<Event> events;
+
+ private final int size;
+
+ private final CountDownLatch latch = new CountDownLatch(1);
+
+ private Iterator<Event> iterator;
+
+ private int count;
+
+ private PlainWatch(final List<Event> events, final int size) {
+ this.events = events;
+ this.size = size;
+ }
+
+ @Override public void initOrReset() {
+ final Iterator<Event> internal = events.subList(count,
events.size()).iterator();
+ iterator = new Iterator<Event>() {
+ public boolean hasNext() {
+ boolean result = count < size && internal.hasNext();
+ if (!result) {
+ latch.countDown();
+ }
+ return result;
+ }
+
+ public Event next() {
+ if (!this.hasNext()) {
+ throw new NoSuchElementException();
+ } else {
+ ++count;
+ return internal.next();
+ }
+ }
+
+ public void remove() {
+ internal.remove();
+ }
+ };
+ }
+
+ @Override public Iterator<Event> iterator() {
+ return iterator;
+ }
+
+ public void await() throws InterruptedException {
+ latch.await();
+ }
+}
diff --git
a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/resources/log4j2.xml
b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/resources/log4j2.xml
new file mode 100644
index 0000000..bc91b6d
--- /dev/null
+++
b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/resources/log4j2.xml
@@ -0,0 +1,31 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ ~
+ -->
+
+<Configuration status="info">
+ <Appenders>
+ <Console name="Console" target="SYSTEM_OUT">
+ <PatternLayout charset="UTF-8" pattern="%d - %c -%-4r [%t] %-5p %x
- %m%n"/>
+ </Console>
+ </Appenders>
+ <Loggers>
+ <Root level="debug">
+ <AppenderRef ref="Console"/>
+ </Root>
+ </Loggers>
+</Configuration>
diff --git a/oap-server/server-cluster-plugin/pom.xml
b/oap-server/server-cluster-plugin/pom.xml
index 408fca6..2e65980 100644
--- a/oap-server/server-cluster-plugin/pom.xml
+++ b/oap-server/server-cluster-plugin/pom.xml
@@ -32,6 +32,7 @@
<modules>
<module>cluster-zookeeper-plugin</module>
<module>cluster-standalone-plugin</module>
+ <module>cluster-kubernetes-plugin</module>
</modules>
<dependencies>
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cluster/RemoteInstance.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cluster/RemoteInstance.java
index 2c8d761..53c4ea0 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cluster/RemoteInstance.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cluster/RemoteInstance.java
@@ -29,6 +29,16 @@ public class RemoteInstance {
private int port;
private boolean self = false;
+ public RemoteInstance() {
+
+ }
+
+ public RemoteInstance(String host, int port, boolean self) {
+ this.host = host;
+ this.port = port;
+ this.self = self;
+ }
+
public String getHost() {
return host;
}
diff --git a/oap-server/server-starter/src/main/resources/application.yml
b/oap-server/server-starter/src/main/resources/application.yml
index fac7415..2f27b61 100644
--- a/oap-server/server-starter/src/main/resources/application.yml
+++ b/oap-server/server-starter/src/main/resources/application.yml
@@ -15,18 +15,23 @@
# limitations under the License.
cluster:
- standalone:
+ standalone:
# zookeeper:
# hostPort: localhost:2181
# # Retry Policy
# baseSleepTimeMs: 1000 # initial amount of time to wait between retries
# maxRetries: 3 # max number of times to retry
+# kubernetes:
+# watchTimeoutSeconds: 60
+# namespace: default
+# labelSelector: app=collector,release=skywalking
+# uidEnvName: SKYWALKING_COLLECTOR_UID
core:
default:
- restHost: localhost
+ restHost: 0.0.0.0
restPort: 12800
restContextPath: /
- gRPCHost: localhost
+ gRPCHost: 0.0.0.0
gRPCPort: 11800
storage:
elasticsearch: