This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch 6.0
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git


The following commit(s) were added to refs/heads/6.0 by this push:
     new becf708  Add kubernetes as a new cluster manager.
     new 58b83e8  Merge remote-tracking branch 'origin/6.0' into 6.0
becf708 is described below

commit becf708073b81e7358ec9dabc5e5b0a9b86d12c8
Author: Gao Hongtao <[email protected]>
AuthorDate: Thu Jul 26 00:01:26 2018 +0800

    Add kubernetes as a new cluster manager.
    
    # 1447
    Use `GET /api/v1/watch/namespaces/{namespace}/pods` api to watch pod's
    containerIp which help collector containers to discovery each other.
---
 oap-server/pom.xml                                 |   7 ++
 .../{ => cluster-kubernetes-plugin}/pom.xml        |  23 +++--
 .../kubernetes/ClusterModuleKubernetesConfig.java  |  65 ++++++++++++
 .../ClusterModuleKubernetesProvider.java           |  76 ++++++++++++++
 .../server/cluster/plugin/kubernetes/Event.java    |  48 +++++++++
 .../plugin/kubernetes/KubernetesCoordinator.java   | 113 +++++++++++++++++++++
 .../cluster/plugin/kubernetes/ReusableWatch.java   |  33 ++++++
 .../dependencies/NamespacedPodListWatch.java       |  88 ++++++++++++++++
 .../kubernetes/dependencies/UidEnvSupplier.java    |  38 +++++++
 ...alking.oap.server.library.module.ModuleProvider |  19 ++++
 .../ClusterModuleKubernetesProviderTest.java       |  65 ++++++++++++
 .../kubernetes/KubernetesCoordinatorTest.java      |  72 +++++++++++++
 .../plugin/kubernetes/fixture/PlainWatch.java      |  87 ++++++++++++++++
 .../src/test/resources/log4j2.xml                  |  31 ++++++
 oap-server/server-cluster-plugin/pom.xml           |   1 +
 .../oap/server/core/cluster/RemoteInstance.java    |  10 ++
 .../src/main/resources/application.yml             |  11 +-
 17 files changed, 773 insertions(+), 14 deletions(-)

diff --git a/oap-server/pom.xml b/oap-server/pom.xml
index 6aeaac0..4c57bdc 100644
--- a/oap-server/pom.xml
+++ b/oap-server/pom.xml
@@ -54,6 +54,8 @@
         <shardingjdbc.version>2.0.3</shardingjdbc.version>
         <commons-dbcp.version>1.4</commons-dbcp.version>
         <elasticsearch.version>6.3.1</elasticsearch.version>
+        <joda-time.version>2.9.9</joda-time.version>
+        <kubernetes.version>2.0.0</kubernetes.version>
     </properties>
 
     <dependencies>
@@ -224,6 +226,11 @@
                 <artifactId>commons-dbcp</artifactId>
                 <version>${commons-dbcp.version}</version>
             </dependency>
+            <dependency>
+                <groupId>io.kubernetes</groupId>
+                <artifactId>client-java</artifactId>
+                <version>${kubernetes.version}</version>
+            </dependency>
         </dependencies>
     </dependencyManagement>
 </project>
\ No newline at end of file
diff --git a/oap-server/server-cluster-plugin/pom.xml 
b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/pom.xml
similarity index 75%
copy from oap-server/server-cluster-plugin/pom.xml
copy to oap-server/server-cluster-plugin/cluster-kubernetes-plugin/pom.xml
index 408fca6..3a507a1 100644
--- a/oap-server/server-cluster-plugin/pom.xml
+++ b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/pom.xml
@@ -21,29 +21,30 @@
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
     <parent>
-        <artifactId>oap-server</artifactId>
+        <artifactId>server-cluster-plugin</artifactId>
         <groupId>org.apache.skywalking</groupId>
         <version>6.0.0-alpha-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 
-    <artifactId>server-cluster-plugin</artifactId>
-    <packaging>pom</packaging>
-    <modules>
-        <module>cluster-zookeeper-plugin</module>
-        <module>cluster-standalone-plugin</module>
-    </modules>
+    <artifactId>cluster-kubernetes-plugin</artifactId>
+    <packaging>jar</packaging>
+
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    </properties>
 
     <dependencies>
         <dependency>
             <groupId>org.apache.skywalking</groupId>
-            <artifactId>library-module</artifactId>
+            <artifactId>server-core</artifactId>
             <version>${project.version}</version>
         </dependency>
         <dependency>
-            <groupId>org.apache.skywalking</groupId>
-            <artifactId>library-util</artifactId>
-            <version>${project.version}</version>
+            <groupId>io.kubernetes</groupId>
+            <artifactId>client-java</artifactId>
         </dependency>
     </dependencies>
+
+
 </project>
\ No newline at end of file
diff --git 
a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/ClusterModuleKubernetesConfig.java
 
b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/ClusterModuleKubernetesConfig.java
new file mode 100644
index 0000000..84ca8bc
--- /dev/null
+++ 
b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/ClusterModuleKubernetesConfig.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.cluster.plugin.kubernetes;
+
+import org.apache.skywalking.oap.server.library.module.ModuleConfig;
+
+/**
+ * The configuration of the module of cluster.kubernetes
+ *
+ * @author gaohongtao
+ */
+public class ClusterModuleKubernetesConfig extends ModuleConfig {
+    private int watchTimeoutSeconds;
+    private String namespace;
+    private String labelSelector;
+    private String uidEnvName;
+
+    public int getWatchTimeoutSeconds() {
+        return watchTimeoutSeconds;
+    }
+
+    public void setWatchTimeoutSeconds(int watchTimeoutSeconds) {
+        this.watchTimeoutSeconds = watchTimeoutSeconds;
+    }
+
+    public String getNamespace() {
+        return namespace;
+    }
+
+    public void setNamespace(String namespace) {
+        this.namespace = namespace;
+    }
+
+    public String getLabelSelector() {
+        return labelSelector;
+    }
+
+    public void setLabelSelector(String labelSelector) {
+        this.labelSelector = labelSelector;
+    }
+
+    public String getUidEnvName() {
+        return uidEnvName;
+    }
+
+    public void setUidEnvName(String uidEnvName) {
+        this.uidEnvName = uidEnvName;
+    }
+}
diff --git 
a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/ClusterModuleKubernetesProvider.java
 
b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/ClusterModuleKubernetesProvider.java
new file mode 100644
index 0000000..659e2ab
--- /dev/null
+++ 
b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/ClusterModuleKubernetesProvider.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.cluster.plugin.kubernetes;
+
+import 
org.apache.skywalking.oap.server.cluster.plugin.kubernetes.dependencies.NamespacedPodListWatch;
+import 
org.apache.skywalking.oap.server.cluster.plugin.kubernetes.dependencies.UidEnvSupplier;
+import org.apache.skywalking.oap.server.core.cluster.ClusterModule;
+import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery;
+import org.apache.skywalking.oap.server.core.cluster.ClusterRegister;
+import org.apache.skywalking.oap.server.library.module.ModuleConfig;
+import org.apache.skywalking.oap.server.library.module.ModuleDefine;
+import org.apache.skywalking.oap.server.library.module.ModuleProvider;
+import 
org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
+
+/**
+ * Use kubernetes to manage all instances in Skywalking cluster.
+ *
+ * @author gaohongtao
+ */
+public class ClusterModuleKubernetesProvider extends ModuleProvider {
+
+    private final ClusterModuleKubernetesConfig config;
+
+    public ClusterModuleKubernetesProvider() {
+        super();
+        this.config = new ClusterModuleKubernetesConfig();
+    }
+
+    @Override public String name() {
+        return "kubernetes";
+    }
+
+    @Override public Class<? extends ModuleDefine> module() {
+        return ClusterModule.class;
+    }
+
+    @Override public ModuleConfig createConfigBeanIfAbsent() {
+        return config;
+    }
+
+    @Override public void prepare() throws ServiceNotProvidedException {
+        KubernetesCoordinator coordinator = new KubernetesCoordinator(
+            new NamespacedPodListWatch(config.getNamespace(), 
config.getLabelSelector(), config.getWatchTimeoutSeconds()),
+            new UidEnvSupplier(config.getUidEnvName()));
+        this.registerServiceImplementation(ClusterRegister.class, coordinator);
+        this.registerServiceImplementation(ClusterNodesQuery.class, 
coordinator);
+    }
+
+    @Override public void start() {
+
+    }
+
+    @Override public void notifyAfterCompleted() {
+
+    }
+
+    @Override public String[] requiredModules() {
+        return new String[0];
+    }
+}
diff --git 
a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/Event.java
 
b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/Event.java
new file mode 100644
index 0000000..35b6f06
--- /dev/null
+++ 
b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/Event.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.cluster.plugin.kubernetes;
+
+/**
+ * The event of watch.
+ *
+ * @author gaohongtao
+ */
+public class Event {
+    private final String type;
+    private final String uid;
+    private final String host;
+
+    public Event(final String type, final String uid, final String host) {
+        this.type = type;
+        this.uid = uid;
+        this.host = host;
+    }
+
+    String getType() {
+        return type;
+    }
+
+    String getUid() {
+        return uid;
+    }
+
+    String getHost() {
+        return host;
+    }
+}
diff --git 
a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java
 
b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java
new file mode 100644
index 0000000..ca3446c
--- /dev/null
+++ 
b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.cluster.plugin.kubernetes;
+
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.function.Supplier;
+import javax.annotation.Nullable;
+import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery;
+import org.apache.skywalking.oap.server.core.cluster.ClusterRegister;
+import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
+import org.apache.skywalking.oap.server.core.cluster.ServiceRegisterException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Read collector pod info from api-server of kubernetes, then using all 
containerIp list to
+ * construct the list of {@link RemoteInstance}.
+ *
+ * @author gaohongtao
+ */
+public class KubernetesCoordinator implements ClusterRegister, 
ClusterNodesQuery {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(KubernetesCoordinator.class);
+
+    private final String uid;
+
+    private final Map<String, RemoteInstance> cache = new 
ConcurrentHashMap<>();
+
+    private final ReusableWatch<Event> watch;
+
+    private int port;
+
+    KubernetesCoordinator(final ReusableWatch<Event> watch, final 
Supplier<String> uidSupplier) {
+        this.watch = watch;
+        this.uid = uidSupplier.get();
+    }
+
+    @Override public void registerRemote(RemoteInstance remoteInstance) throws 
ServiceRegisterException {
+        this.port = remoteInstance.getPort();
+        
submitTask(MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor(new
 ThreadFactoryBuilder()
+            
.setDaemon(true).setNameFormat("Kubernetes-ApiServer-%s").build())));
+    }
+
+    private void submitTask(final ListeningExecutorService service) {
+        watch.initOrReset();
+        ListenableFuture<?> watchFuture = service.submit(newWatch());
+        Futures.addCallback(watchFuture, new FutureCallback<Object>() {
+            @Override public void onSuccess(@Nullable Object ignored) {
+                submitTask(service);
+            }
+
+            @Override public void onFailure(@Nullable Throwable throwable) {
+                logger.debug("Generate remote nodes error", throwable);
+                submitTask(service);
+            }
+        });
+    }
+
+    private Callable<Object> newWatch() {
+        return () -> {
+            generateRemoteNodes();
+            return null;
+        };
+    }
+
+    private void generateRemoteNodes() {
+        for (Event event : watch) {
+            logger.debug("Received event {} {}-{}", event.getType(), 
event.getUid(), event.getHost());
+            switch (event.getType()) {
+                case "ADDED":
+                case "MODIFIED":
+                    cache.put(event.getUid(), new 
RemoteInstance(event.getHost(), port, event.getUid().equals(this.uid)));
+                    break;
+                case "DELETED":
+                    cache.remove(event.getUid());
+                    break;
+                default:
+                    throw new RuntimeException(String.format("Unknown event 
%s", event.getType()));
+            }
+        }
+    }
+
+    @Override public List<RemoteInstance> queryRemoteNodes() {
+        return Lists.newArrayList(cache.values());
+    }
+}
diff --git 
a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/ReusableWatch.java
 
b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/ReusableWatch.java
new file mode 100644
index 0000000..bbe5ea9
--- /dev/null
+++ 
b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/ReusableWatch.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.cluster.plugin.kubernetes;
+
+/**
+ * This watch can init or reset internal state.
+ *
+ * @param <T> event of watch
+ * @author gaohongtao
+ */
+public interface ReusableWatch<T> extends Iterable<T> {
+
+    /**
+     * Reset internal state.
+     */
+    void initOrReset();
+}
diff --git 
a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/dependencies/NamespacedPodListWatch.java
 
b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/dependencies/NamespacedPodListWatch.java
new file mode 100644
index 0000000..915bf86
--- /dev/null
+++ 
b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/dependencies/NamespacedPodListWatch.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package 
org.apache.skywalking.oap.server.cluster.plugin.kubernetes.dependencies;
+
+import com.google.common.collect.Iterators;
+import com.google.common.reflect.TypeToken;
+import io.kubernetes.client.ApiClient;
+import io.kubernetes.client.ApiException;
+import io.kubernetes.client.Configuration;
+import io.kubernetes.client.apis.CoreV1Api;
+import io.kubernetes.client.models.V1Pod;
+import io.kubernetes.client.util.Config;
+import io.kubernetes.client.util.Watch;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.concurrent.TimeUnit;
+import org.apache.skywalking.oap.server.cluster.plugin.kubernetes.Event;
+import 
org.apache.skywalking.oap.server.cluster.plugin.kubernetes.ReusableWatch;
+
+/**
+ * Watch the api {@literal 
https://v1-9.docs.kubernetes.io/docs/reference/generated/kubernetes-api/v1.9/#watch-64}.
+ *
+ * @author gaohongtao
+ */
+public class NamespacedPodListWatch implements ReusableWatch<Event> {
+
+    private final CoreV1Api api = new CoreV1Api();
+
+    private final String namespace;
+
+    private final String labelSelector;
+
+    private final int watchTimeoutSeconds;
+
+    private Watch<V1Pod> watch;
+
+    public NamespacedPodListWatch(final String namespace, final String 
labelSelector, final int watchTimeoutSeconds) {
+        this.namespace = namespace;
+        this.labelSelector = labelSelector;
+        this.watchTimeoutSeconds = watchTimeoutSeconds;
+    }
+
+    @Override public void initOrReset() {
+        ApiClient client;
+        try {
+            client = Config.defaultClient();
+        } catch (IOException e) {
+            throw new RuntimeException(e.getMessage(), e);
+        }
+        client.getHttpClient().setReadTimeout(watchTimeoutSeconds, 
TimeUnit.SECONDS);
+        Configuration.setDefaultApiClient(client);
+        try {
+            watch = Watch.createWatch(
+                client,
+                api.listNamespacedPodCall(namespace, null, null, null,
+                    null, labelSelector, Integer.MAX_VALUE,null,null, 
Boolean.TRUE,
+                    null, null),
+                new TypeToken<Watch.Response<V1Pod>>() { }.getType());
+        } catch (ApiException e) {
+            throw new RuntimeException(e.getMessage(), e);
+        }
+    }
+
+    @Override public Iterator<Event> iterator() {
+        return Iterators.transform(watch.iterator(), response -> {
+            if (response == null) {
+                throw new NullPointerException("Original event is null");
+            }
+            return new Event(response.type, 
response.object.getMetadata().getUid(), response.object.getStatus().getPodIP());
+        });
+    }
+}
diff --git 
a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/dependencies/UidEnvSupplier.java
 
b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/dependencies/UidEnvSupplier.java
new file mode 100644
index 0000000..9512c1f
--- /dev/null
+++ 
b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/dependencies/UidEnvSupplier.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package 
org.apache.skywalking.oap.server.cluster.plugin.kubernetes.dependencies;
+
+import java.util.function.Supplier;
+
+/**
+ * Supply uid from environment variable.
+ *
+ * @author gaohongtao
+ */
+public class UidEnvSupplier implements Supplier<String> {
+
+    private final String uidEnvName;
+
+    public UidEnvSupplier(final String uidEnvName) {
+        this.uidEnvName = uidEnvName == null ? "" : uidEnvName;
+    }
+    @Override public String get() {
+        return System.getenv(uidEnvName);
+    }
+}
diff --git 
a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/resources/services/org.apache.skywalking.oap.server.library.module.ModuleProvider
 
b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/resources/services/org.apache.skywalking.oap.server.library.module.ModuleProvider
new file mode 100644
index 0000000..81818d4
--- /dev/null
+++ 
b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/resources/services/org.apache.skywalking.oap.server.library.module.ModuleProvider
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+
+org.apache.skywalking.oap.server.cluster.plugin.kubernetes.ClusterModuleKubernetesProvider
\ No newline at end of file
diff --git 
a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/ClusterModuleKubernetesProviderTest.java
 
b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/ClusterModuleKubernetesProviderTest.java
new file mode 100644
index 0000000..1ba4bed
--- /dev/null
+++ 
b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/ClusterModuleKubernetesProviderTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.cluster.plugin.kubernetes;
+
+import org.apache.skywalking.oap.server.core.cluster.ClusterModule;
+import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery;
+import org.apache.skywalking.oap.server.core.cluster.ClusterRegister;
+import 
org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+public class ClusterModuleKubernetesProviderTest {
+
+    private ClusterModuleKubernetesProvider provider;
+
+    @Before
+    public void setUp() {
+        provider = new ClusterModuleKubernetesProvider();
+    }
+
+    @Test
+    public void assertName() {
+        assertThat(provider.name(), is("kubernetes"));
+    }
+
+    @Test
+    public void assertModule() {
+        assertTrue(provider.module().isAssignableFrom(ClusterModule.class));
+    }
+
+    @Test
+    public void assertCreateConfigBeanIfAbsent() {
+        
assertTrue(ClusterModuleKubernetesConfig.class.isInstance(provider.createConfigBeanIfAbsent()));
+    }
+
+    @Test
+    public void assertPrepare() throws ServiceNotProvidedException {
+        provider.prepare();
+        ClusterRegister register = provider.getService(ClusterRegister.class);
+        ClusterNodesQuery query = provider.getService(ClusterNodesQuery.class);
+        assertSame(register, query);
+        assertTrue(KubernetesCoordinator.class.isInstance(register));
+    }
+}
\ No newline at end of file
diff --git 
a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinatorTest.java
 
b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinatorTest.java
new file mode 100644
index 0000000..35c57f8
--- /dev/null
+++ 
b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinatorTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.cluster.plugin.kubernetes;
+
+import 
org.apache.skywalking.oap.server.cluster.plugin.kubernetes.fixture.PlainWatch;
+import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
+import org.junit.Test;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+public class KubernetesCoordinatorTest {
+
+    private KubernetesCoordinator coordinator;
+
+
+    @Test
+    public void assertAdded() throws InterruptedException {
+        PlainWatch watch = PlainWatch.create(2, "ADDED", "1", "10.0.0.1", 
"ADDED", "2", "10.0.0.2");
+        coordinator = new KubernetesCoordinator(watch, () -> "1");
+        coordinator.registerRemote(new RemoteInstance("0.0.0.0", 8454, true));
+        watch.await();
+        assertThat(coordinator.queryRemoteNodes().size(), is(2));
+        
assertThat(coordinator.queryRemoteNodes().stream().filter(RemoteInstance::isSelf).findFirst().get().getHost(),
 is("10.0.0.1"));
+    }
+
+    @Test
+    public void assertModified() throws InterruptedException {
+        PlainWatch watch = PlainWatch.create(3, "ADDED", "1", "10.0.0.1", 
"ADDED", "2", "10.0.0.2", "MODIFIED", "1", "10.0.0.3");
+        coordinator = new KubernetesCoordinator(watch, () -> "1");
+        coordinator.registerRemote(new RemoteInstance("0.0.0.0", 8454, true));
+        watch.await();
+        assertThat(coordinator.queryRemoteNodes().size(), is(2));
+        
assertThat(coordinator.queryRemoteNodes().stream().filter(RemoteInstance::isSelf).findFirst().get().getHost(),
 is("10.0.0.3"));
+    }
+
+    @Test
+    public void assertDeleted() throws InterruptedException {
+        PlainWatch watch = PlainWatch.create(3, "ADDED", "1", "10.0.0.1", 
"ADDED", "2", "10.0.0.2", "DELETED", "2", "10.0.0.2");
+        coordinator = new KubernetesCoordinator(watch, () -> "1");
+        coordinator.registerRemote(new RemoteInstance("0.0.0.0", 8454, true));
+        watch.await();
+        assertThat(coordinator.queryRemoteNodes().size(), is(1));
+        
assertThat(coordinator.queryRemoteNodes().stream().filter(RemoteInstance::isSelf).findFirst().get().getHost(),
 is("10.0.0.1"));
+    }
+
+    @Test
+    public void assertError() throws InterruptedException {
+        PlainWatch watch = PlainWatch.create(3, "ADDED", "1", "10.0.0.1", 
"ERROR", "X", "10.0.0.2", "ADDED", "2", "10.0.0.2");
+        coordinator = new KubernetesCoordinator(watch, () -> "1");
+        coordinator.registerRemote(new RemoteInstance("0.0.0.0", 8454, true));
+        watch.await();
+        assertThat(coordinator.queryRemoteNodes().size(), is(2));
+        
assertThat(coordinator.queryRemoteNodes().stream().filter(RemoteInstance::isSelf).findFirst().get().getHost(),
 is("10.0.0.1"));
+    }
+}
\ No newline at end of file
diff --git 
a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/fixture/PlainWatch.java
 
b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/fixture/PlainWatch.java
new file mode 100644
index 0000000..effab35
--- /dev/null
+++ 
b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/fixture/PlainWatch.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.cluster.plugin.kubernetes.fixture;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.CountDownLatch;
+import org.apache.skywalking.oap.server.cluster.plugin.kubernetes.Event;
+import 
org.apache.skywalking.oap.server.cluster.plugin.kubernetes.ReusableWatch;
+
+public class PlainWatch implements ReusableWatch<Event> {
+
+    public static PlainWatch create(final int size, final String... args) {
+        List<Event> events = new ArrayList<>(args.length / 3);
+        for (int i = 0; i < args.length; i++) {
+            events.add(new Event(args[i++], args[i++], args[i]));
+        }
+        return new PlainWatch(events, size);
+    }
+
+    private final List<Event> events;
+
+    private final int size;
+
+    private final CountDownLatch latch = new CountDownLatch(1);
+
+    private Iterator<Event> iterator;
+
+    private int count;
+
+    private PlainWatch(final List<Event> events, final int size) {
+        this.events = events;
+        this.size = size;
+    }
+
+    @Override public void initOrReset() {
+        final Iterator<Event> internal = events.subList(count, 
events.size()).iterator();
+        iterator = new Iterator<Event>() {
+            public boolean hasNext() {
+                boolean result =  count < size && internal.hasNext();
+                if (!result) {
+                    latch.countDown();
+                }
+                return result;
+            }
+
+            public Event next() {
+                if (!this.hasNext()) {
+                    throw new NoSuchElementException();
+                } else {
+                    ++count;
+                    return internal.next();
+                }
+            }
+
+            public void remove() {
+                internal.remove();
+            }
+        };
+    }
+
+    @Override public Iterator<Event> iterator() {
+        return iterator;
+    }
+
+    public void await() throws InterruptedException {
+        latch.await();
+    }
+}
diff --git 
a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/resources/log4j2.xml
 
b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/resources/log4j2.xml
new file mode 100644
index 0000000..bc91b6d
--- /dev/null
+++ 
b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/resources/log4j2.xml
@@ -0,0 +1,31 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  ~
+  -->
+
+<Configuration status="info">
+    <Appenders>
+        <Console name="Console" target="SYSTEM_OUT">
+            <PatternLayout charset="UTF-8" pattern="%d - %c -%-4r [%t] %-5p %x 
- %m%n"/>
+        </Console>
+    </Appenders>
+    <Loggers>
+        <Root level="debug">
+            <AppenderRef ref="Console"/>
+        </Root>
+    </Loggers>
+</Configuration>
diff --git a/oap-server/server-cluster-plugin/pom.xml 
b/oap-server/server-cluster-plugin/pom.xml
index 408fca6..2e65980 100644
--- a/oap-server/server-cluster-plugin/pom.xml
+++ b/oap-server/server-cluster-plugin/pom.xml
@@ -32,6 +32,7 @@
     <modules>
         <module>cluster-zookeeper-plugin</module>
         <module>cluster-standalone-plugin</module>
+        <module>cluster-kubernetes-plugin</module>
     </modules>
 
     <dependencies>
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cluster/RemoteInstance.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cluster/RemoteInstance.java
index 2c8d761..53c4ea0 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cluster/RemoteInstance.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cluster/RemoteInstance.java
@@ -29,6 +29,16 @@ public class RemoteInstance {
     private int port;
     private boolean self = false;
 
+    public RemoteInstance() {
+
+    }
+
+    public RemoteInstance(String host, int port, boolean self) {
+        this.host = host;
+        this.port = port;
+        this.self = self;
+    }
+
     public String getHost() {
         return host;
     }
diff --git a/oap-server/server-starter/src/main/resources/application.yml 
b/oap-server/server-starter/src/main/resources/application.yml
index fac7415..2f27b61 100644
--- a/oap-server/server-starter/src/main/resources/application.yml
+++ b/oap-server/server-starter/src/main/resources/application.yml
@@ -15,18 +15,23 @@
 # limitations under the License.
 
 cluster:
-   standalone:
+  standalone:
 #  zookeeper:
 #    hostPort: localhost:2181
 #    # Retry Policy
 #    baseSleepTimeMs: 1000 # initial amount of time to wait between retries
 #    maxRetries: 3 # max number of times to retry
+#  kubernetes:
+#    watchTimeoutSeconds: 60
+#    namespace: default
+#    labelSelector: app=collector,release=skywalking
+#    uidEnvName: SKYWALKING_COLLECTOR_UID
 core:
   default:
-    restHost: localhost
+    restHost: 0.0.0.0
     restPort: 12800
     restContextPath: /
-    gRPCHost: localhost
+    gRPCHost: 0.0.0.0
     gRPCPort: 11800
 storage:
   elasticsearch:

Reply via email to