This is an automated email from the ASF dual-hosted git repository.
liujun pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.1 by this push:
new da5b3559b1 feat: replace kube-apiserver watch with informer (#10543)
da5b3559b1 is described below
commit da5b3559b1576da5d5e41bd8402514e1057cf81d
Author: conghuhu <[email protected]>
AuthorDate: Fri Sep 16 08:26:36 2022 +0800
feat: replace kube-apiserver watch with informer (#10543)
fixes #10535
---
dubbo-dependencies-bom/pom.xml | 2 +-
.../kubernetes/KubernetesMeshEnvListener.java | 106 +++---
.../kubernetes/KubernetesServiceDiscovery.java | 308 +++++++++-------
.../kubernetes/KubernetesServiceDiscoveryTest.java | 399 +++++++++++----------
4 files changed, 449 insertions(+), 366 deletions(-)
diff --git a/dubbo-dependencies-bom/pom.xml b/dubbo-dependencies-bom/pom.xml
index 375e0a5f21..4f38e1432e 100644
--- a/dubbo-dependencies-bom/pom.xml
+++ b/dubbo-dependencies-bom/pom.xml
@@ -159,7 +159,7 @@
<eureka.version>1.9.12</eureka.version>
<!-- Fabric8 for Kubernetes -->
- <fabric8_kubernetes_version>5.3.2</fabric8_kubernetes_version>
+ <fabric8_kubernetes_version>6.1.1</fabric8_kubernetes_version>
<!-- Alibaba -->
<alibaba_spring_context_support_version>1.0.8</alibaba_spring_context_support_version>
diff --git
a/dubbo-kubernetes/src/main/java/org/apache/dubbo/registry/kubernetes/KubernetesMeshEnvListener.java
b/dubbo-kubernetes/src/main/java/org/apache/dubbo/registry/kubernetes/KubernetesMeshEnvListener.java
index 1a0c1fa6b2..300dae3b56 100644
---
a/dubbo-kubernetes/src/main/java/org/apache/dubbo/registry/kubernetes/KubernetesMeshEnvListener.java
+++
b/dubbo-kubernetes/src/main/java/org/apache/dubbo/registry/kubernetes/KubernetesMeshEnvListener.java
@@ -21,8 +21,7 @@ import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.rpc.cluster.router.mesh.route.MeshAppRuleListener;
import org.apache.dubbo.rpc.cluster.router.mesh.route.MeshEnvListener;
-import com.google.gson.Gson;
-import io.fabric8.kubernetes.api.model.ListOptionsBuilder;
+import io.fabric8.kubernetes.api.model.GenericKubernetesResource;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.Watch;
import io.fabric8.kubernetes.client.Watcher;
@@ -30,7 +29,6 @@ import io.fabric8.kubernetes.client.WatcherException;
import org.yaml.snakeyaml.Yaml;
import org.yaml.snakeyaml.constructor.SafeConstructor;
-import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -81,24 +79,27 @@ public class KubernetesMeshEnvListener implements
MeshEnvListener {
try {
Watch watch = kubernetesClient
- .customResource(
- MeshConstant.getVsDefinition())
- .watch(namespace, appName, null, new
ListOptionsBuilder().build(), new Watcher<String>() {
- @Override
- public void eventReceived(Action action, String resource) {
- logger.info("Received VS Rule notification. AppName: "
+ appName + " Action:" + action + " Resource:" + resource);
-
- if (action == Action.ADDED || action ==
Action.MODIFIED) {
- Map drRuleMap = new Gson().fromJson(resource,
Map.class);
- String vsRule = new Yaml(new
SafeConstructor()).dump(drRuleMap);
- vsAppCache.put(appName, vsRule);
- if (drAppCache.containsKey(appName)) {
- notifyListener(vsRule, appName,
drAppCache.get(appName));
+ .genericKubernetesResources(
+ MeshConstant.getVsDefinition())
+ .inNamespace(namespace)
+ .withName(appName)
+ .watch(new Watcher<GenericKubernetesResource>() {
+ @Override
+ public void eventReceived(Action action,
GenericKubernetesResource resource) {
+ if (logger.isInfoEnabled()) {
+ logger.info("Received VS Rule notification.
AppName: " + appName + " Action:" + action + " Resource:" + resource);
+ }
+
+ if (action == Action.ADDED || action ==
Action.MODIFIED) {
+ String vsRule = new Yaml(new
SafeConstructor()).dump(resource);
+ vsAppCache.put(appName, vsRule);
+ if (drAppCache.containsKey(appName)) {
+ notifyListener(vsRule, appName,
drAppCache.get(appName));
+ }
+ } else {
+
appRuleListenerMap.get(appName).receiveConfigInfo("");
}
- } else {
-
appRuleListenerMap.get(appName).receiveConfigInfo("");
}
- }
@Override
public void onClose(WatcherException cause) {
@@ -107,15 +108,17 @@ public class KubernetesMeshEnvListener implements
MeshEnvListener {
});
vsAppWatch.put(appName, watch);
try {
- Map<String, Object> vsRule = kubernetesClient
- .customResource(
- MeshConstant.getVsDefinition())
- .get(namespace, appName);
+ GenericKubernetesResource vsRule = kubernetesClient
+ .genericKubernetesResources(
+ MeshConstant.getVsDefinition())
+ .inNamespace(namespace)
+ .withName(appName)
+ .get();
vsAppCache.put(appName, new Yaml(new
SafeConstructor()).dump(vsRule));
} catch (Throwable ignore) {
}
- } catch (IOException e) {
+ } catch (Exception e) {
logger.error("Error occurred when listen kubernetes crd.", e);
}
}
@@ -134,42 +137,47 @@ public class KubernetesMeshEnvListener implements
MeshEnvListener {
try {
Watch watch = kubernetesClient
- .customResource(
- MeshConstant.getDrDefinition())
- .watch(namespace, appName, null, new
ListOptionsBuilder().build(), new Watcher<String>() {
- @Override
- public void eventReceived(Action action, String resource) {
- logger.info("Received VS Rule notification. AppName: "
+ appName + " Action:" + action + " Resource:" + resource);
+ .genericKubernetesResources(
+ MeshConstant.getDrDefinition())
+ .inNamespace(namespace)
+ .withName(appName)
+ .watch(new Watcher<GenericKubernetesResource>() {
+ @Override
+ public void eventReceived(Action action,
GenericKubernetesResource resource) {
+ if (logger.isInfoEnabled()) {
+ logger.info("Received VS Rule notification.
AppName: " + appName + " Action:" + action + " Resource:" + resource);
+ }
- if (action == Action.ADDED || action ==
Action.MODIFIED) {
- Map drRuleMap = new Gson().fromJson(resource,
Map.class);
- String drRule = new Yaml(new
SafeConstructor()).dump(drRuleMap);
+ if (action == Action.ADDED || action ==
Action.MODIFIED) {
+ String drRule = new Yaml(new
SafeConstructor()).dump(resource);
- drAppCache.put(appName, drRule);
- if (vsAppCache.containsKey(appName)) {
- notifyListener(vsAppCache.get(appName),
appName, drRule);
+ drAppCache.put(appName, drRule);
+ if (vsAppCache.containsKey(appName)) {
+ notifyListener(vsAppCache.get(appName),
appName, drRule);
+ }
+ } else {
+
appRuleListenerMap.get(appName).receiveConfigInfo("");
}
- } else {
-
appRuleListenerMap.get(appName).receiveConfigInfo("");
}
- }
- @Override
- public void onClose(WatcherException cause) {
- // ignore
- }
- });
+ @Override
+ public void onClose(WatcherException cause) {
+ // ignore
+ }
+ });
drAppWatch.put(appName, watch);
try {
- Map<String, Object> drRule = kubernetesClient
- .customResource(
- MeshConstant.getDrDefinition())
- .get(namespace, appName);
+ GenericKubernetesResource drRule = kubernetesClient
+ .genericKubernetesResources(
+ MeshConstant.getDrDefinition())
+ .inNamespace(namespace)
+ .withName(appName)
+ .get();
drAppCache.put(appName, new Yaml(new
SafeConstructor()).dump(drRule));
} catch (Throwable ignore) {
}
- } catch (IOException e) {
+ } catch (Exception e) {
logger.error("Error occurred when listen kubernetes crd.", e);
}
}
diff --git
a/dubbo-kubernetes/src/main/java/org/apache/dubbo/registry/kubernetes/KubernetesServiceDiscovery.java
b/dubbo-kubernetes/src/main/java/org/apache/dubbo/registry/kubernetes/KubernetesServiceDiscovery.java
index e0f47f1f95..087de56d31 100644
---
a/dubbo-kubernetes/src/main/java/org/apache/dubbo/registry/kubernetes/KubernetesServiceDiscovery.java
+++
b/dubbo-kubernetes/src/main/java/org/apache/dubbo/registry/kubernetes/KubernetesServiceDiscovery.java
@@ -39,11 +39,10 @@ import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodBuilder;
import io.fabric8.kubernetes.api.model.Service;
import io.fabric8.kubernetes.client.Config;
-import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClient;
-import io.fabric8.kubernetes.client.Watch;
-import io.fabric8.kubernetes.client.Watcher;
-import io.fabric8.kubernetes.client.WatcherException;
+import io.fabric8.kubernetes.client.KubernetesClientBuilder;
+import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
+import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
import java.util.HashSet;
import java.util.LinkedList;
@@ -69,18 +68,18 @@ public class KubernetesServiceDiscovery extends
AbstractServiceDiscovery {
public final static String KUBERNETES_PROPERTIES_KEY = "io.dubbo/metadata";
- private final static ConcurrentHashMap<String, Watch> SERVICE_WATCHER =
new ConcurrentHashMap<>(64);
+ private final static ConcurrentHashMap<String, AtomicLong>
SERVICE_UPDATE_TIME = new ConcurrentHashMap<>(64);
- private final static ConcurrentHashMap<String, Watch> PODS_WATCHER = new
ConcurrentHashMap<>(64);
+ private final static ConcurrentHashMap<String,
SharedIndexInformer<Service>> SERVICE_INFORMER = new ConcurrentHashMap<>(64);
- private final static ConcurrentHashMap<String, Watch> ENDPOINTS_WATCHER =
new ConcurrentHashMap<>(64);
+ private final static ConcurrentHashMap<String, SharedIndexInformer<Pod>>
PODS_INFORMER = new ConcurrentHashMap<>(64);
- private final static ConcurrentHashMap<String, AtomicLong>
SERVICE_UPDATE_TIME = new ConcurrentHashMap<>(64);
+ private final static ConcurrentHashMap<String,
SharedIndexInformer<Endpoints>> ENDPOINTS_INFORMER = new
ConcurrentHashMap<>(64);
public KubernetesServiceDiscovery(ApplicationModel applicationModel, URL
registryURL) {
super(applicationModel, registryURL);
Config config =
KubernetesConfigUtils.createKubernetesConfig(registryURL);
- this.kubernetesClient = new DefaultKubernetesClient(config);
+ this.kubernetesClient = new
KubernetesClientBuilder().withConfig(config).build();
this.currentHostname = System.getenv("HOSTNAME");
this.registryURL = registryURL;
this.namespace = config.getNamespace();
@@ -94,9 +93,9 @@ public class KubernetesServiceDiscovery extends
AbstractServiceDiscovery {
}
if (!availableAccess) {
String message = "Unable to access api server. " +
- "Please check your url config." +
- " Master URL: " + config.getMasterUrl() +
- " Hostname: " + currentHostname;
+ "Please check your url config." +
+ " Master URL: " + config.getMasterUrl() +
+ " Hostname: " + currentHostname;
logger.error(message);
} else {
KubernetesMeshEnvListener.injectKubernetesEnv(kubernetesClient,
namespace);
@@ -104,15 +103,15 @@ public class KubernetesServiceDiscovery extends
AbstractServiceDiscovery {
}
@Override
- public void doDestroy() throws Exception {
- SERVICE_WATCHER.forEach((k, v) -> v.close());
- SERVICE_WATCHER.clear();
+ public void doDestroy() {
+ SERVICE_INFORMER.forEach((k, v) -> v.close());
+ SERVICE_INFORMER.clear();
- PODS_WATCHER.forEach((k, v) -> v.close());
- PODS_WATCHER.clear();
+ PODS_INFORMER.forEach((k, v) -> v.close());
+ PODS_INFORMER.clear();
- ENDPOINTS_WATCHER.forEach((k, v) -> v.close());
- ENDPOINTS_WATCHER.clear();
+ ENDPOINTS_INFORMER.forEach((k, v) -> v.close());
+ ENDPOINTS_INFORMER.clear();
kubernetesClient.close();
}
@@ -121,18 +120,18 @@ public class KubernetesServiceDiscovery extends
AbstractServiceDiscovery {
public void doRegister(ServiceInstance serviceInstance) throws
RuntimeException {
if (enableRegister) {
kubernetesClient
- .pods()
- .inNamespace(namespace)
- .withName(currentHostname)
- .edit(pod ->
- new PodBuilder(pod)
- .editOrNewMetadata()
- .addToAnnotations(KUBERNETES_PROPERTIES_KEY,
JSONObject.toJSONString(serviceInstance.getMetadata()))
- .endMetadata()
- .build());
+ .pods()
+ .inNamespace(namespace)
+ .withName(currentHostname)
+ .edit(pod ->
+ new PodBuilder(pod)
+ .editOrNewMetadata()
+
.addToAnnotations(KUBERNETES_PROPERTIES_KEY,
JSONObject.toJSONString(serviceInstance.getMetadata()))
+ .endMetadata()
+ .build());
if (logger.isInfoEnabled()) {
logger.info("Write Current Service Instance Metadata to
Kubernetes pod. " +
- "Current pod name: " + currentHostname);
+ "Current pod name: " + currentHostname);
}
}
}
@@ -150,15 +149,15 @@ public class KubernetesServiceDiscovery extends
AbstractServiceDiscovery {
public void doUnregister(ServiceInstance serviceInstance) throws
RuntimeException {
if (enableRegister) {
kubernetesClient
- .pods()
- .inNamespace(namespace)
- .withName(currentHostname)
- .edit(pod ->
- new PodBuilder(pod)
- .editOrNewMetadata()
- .removeFromAnnotations(KUBERNETES_PROPERTIES_KEY)
- .endMetadata()
- .build());
+ .pods()
+ .inNamespace(namespace)
+ .withName(currentHostname)
+ .edit(pod ->
+ new PodBuilder(pod)
+ .editOrNewMetadata()
+
.removeFromAnnotations(KUBERNETES_PROPERTIES_KEY)
+ .endMetadata()
+ .build());
if (logger.isInfoEnabled()) {
logger.info("Remove Current Service Instance from Kubernetes
pod. Current pod name: " + currentHostname);
}
@@ -168,23 +167,33 @@ public class KubernetesServiceDiscovery extends
AbstractServiceDiscovery {
@Override
public Set<String> getServices() {
return kubernetesClient
- .services()
- .inNamespace(namespace)
- .list()
- .getItems()
- .stream()
- .map(service -> service.getMetadata().getName())
- .collect(Collectors.toSet());
+ .services()
+ .inNamespace(namespace)
+ .list()
+ .getItems()
+ .stream()
+ .map(service -> service.getMetadata().getName())
+ .collect(Collectors.toSet());
}
@Override
public List<ServiceInstance> getInstances(String serviceName) throws
NullPointerException {
- Endpoints endpoints =
- kubernetesClient
- .endpoints()
- .inNamespace(namespace)
- .withName(serviceName)
- .get();
+ Endpoints endpoints = null;
+ SharedIndexInformer<Endpoints> endInformer =
ENDPOINTS_INFORMER.get(serviceName);
+ if (endInformer != null) {
+ // get endpoints directly from informer local store
+ List<Endpoints> endpointsList = endInformer.getStore().list();
+ if (endpointsList.size() > 0) {
+ endpoints = endpointsList.get(0);
+ }
+ }
+ if (endpoints == null) {
+ endpoints = kubernetesClient
+ .endpoints()
+ .inNamespace(namespace)
+ .withName(serviceName)
+ .get();
+ }
return toServiceInstance(endpoints, serviceName);
}
@@ -206,28 +215,40 @@ public class KubernetesServiceDiscovery extends
AbstractServiceDiscovery {
}
private void watchEndpoints(ServiceInstancesChangedListener listener,
String serviceName) {
- Watch watch = kubernetesClient
- .endpoints()
- .inNamespace(namespace)
- .withName(serviceName)
- .watch(new Watcher<Endpoints>() {
- @Override
- public void eventReceived(Action action, Endpoints resource) {
- if (logger.isDebugEnabled()) {
- logger.debug("Received Endpoint Event. Event type: " +
action.name() +
- ". Current pod name: " + currentHostname);
+ SharedIndexInformer<Endpoints> endInformer = kubernetesClient
+ .endpoints()
+ .inNamespace(namespace)
+ .withName(serviceName)
+ .inform(new ResourceEventHandler<Endpoints>() {
+ @Override
+ public void onAdd(Endpoints endpoints) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Received Endpoint Event. Event type:
added. Current pod name: " + currentHostname +
+ ". Endpoints is: " + endpoints);
+ }
+ notifyServiceChanged(serviceName, listener,
toServiceInstance(endpoints, serviceName));
}
- notifyServiceChanged(serviceName, listener);
- }
+ @Override
+ public void onUpdate(Endpoints oldEndpoints, Endpoints
newEndpoints) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Received Endpoint Event. Event type:
updated. Current pod name: " + currentHostname +
+ ". The new Endpoints is: " + newEndpoints);
+ }
+ notifyServiceChanged(serviceName, listener,
toServiceInstance(newEndpoints, serviceName));
+ }
- @Override
- public void onClose(WatcherException cause) {
- // ignore
- }
- });
+ @Override
+ public void onDelete(Endpoints endpoints, boolean
deletedFinalStateUnknown) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Received Endpoint Event. Event type:
deleted. Current pod name: " + currentHostname +
+ ". Endpoints is: " + endpoints);
+ }
+ notifyServiceChanged(serviceName, listener,
toServiceInstance(endpoints, serviceName));
+ }
+ });
- ENDPOINTS_WATCHER.put(serviceName, watch);
+ ENDPOINTS_INFORMER.put(serviceName, endInformer);
}
private void watchPods(ServiceInstancesChangedListener listener, String
serviceName) {
@@ -236,68 +257,88 @@ public class KubernetesServiceDiscovery extends
AbstractServiceDiscovery {
return;
}
- Watch watch = kubernetesClient
- .pods()
- .inNamespace(namespace)
- .withLabels(serviceSelector)
- .watch(new Watcher<Pod>() {
- @Override
- public void eventReceived(Action action, Pod resource) {
- if (Action.MODIFIED.equals(action)) {
+ SharedIndexInformer<Pod> podInformer = kubernetesClient
+ .pods()
+ .inNamespace(namespace)
+ .withLabels(serviceSelector)
+ .inform(new ResourceEventHandler<Pod>() {
+ @Override
+ public void onAdd(Pod pod) {
if (logger.isDebugEnabled()) {
- logger.debug("Received Pods Update Event. Current
pod name: " + currentHostname);
+ logger.debug("Received Pods Event. Event type:
added. Current pod name: " + currentHostname +
+ ". Pod is: " + pod);
}
-
- notifyServiceChanged(serviceName, listener);
}
- }
-
- @Override
- public void onClose(WatcherException cause) {
- // ignore
- }
- });
- PODS_WATCHER.put(serviceName, watch);
- }
-
- private void watchService(ServiceInstancesChangedListener listener, String
serviceName) {
- Watch watch = kubernetesClient
- .services()
- .inNamespace(namespace)
- .withName(serviceName)
- .watch(new Watcher<Service>() {
- @Override
- public void eventReceived(Action action, Service resource) {
- if (Action.MODIFIED.equals(action)) {
+ @Override
+ public void onUpdate(Pod oldPod, Pod newPod) {
if (logger.isDebugEnabled()) {
- logger.debug("Received Service Update Event.
Update Pods Watcher. " +
- "Current pod name: " + currentHostname);
+ logger.debug("Received Pods Event. Event type:
updated. Current pod name: " + currentHostname +
+ ". new Pod is: " + newPod);
}
- if (PODS_WATCHER.containsKey(serviceName)) {
- PODS_WATCHER.get(serviceName).close();
- PODS_WATCHER.remove(serviceName);
+ notifyServiceChanged(serviceName, listener,
getInstances(serviceName));
+ }
+
+ @Override
+ public void onDelete(Pod pod, boolean
deletedFinalStateUnknown) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Received Pods Event. Event type:
deleted. Current pod name: " + currentHostname +
+ ". Pod is: " + pod);
}
- watchPods(listener, serviceName);
}
- }
+ });
- @Override
- public void onClose(WatcherException cause) {
- // ignore
- }
- });
+ PODS_INFORMER.put(serviceName, podInformer);
+ }
- SERVICE_WATCHER.put(serviceName, watch);
+ private void watchService(ServiceInstancesChangedListener listener, String
serviceName) {
+ SharedIndexInformer<Service> serviceInformer = kubernetesClient
+ .services()
+ .inNamespace(namespace)
+ .withName(serviceName)
+ .inform(
+ new ResourceEventHandler<Service>() {
+ @Override
+ public void onAdd(Service service) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Received Service Added
Event. " +
+ "Current pod name: " +
currentHostname);
+ }
+ }
+
+ @Override
+ public void onUpdate(Service oldService, Service
newService) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Received Service Update
Event. Update Pods Watcher. Current pod name: " + currentHostname +
+ ". The new Service is: " +
newService);
+ }
+ if (PODS_INFORMER.containsKey(serviceName)) {
+ PODS_INFORMER.get(serviceName).close();
+ PODS_INFORMER.remove(serviceName);
+ }
+ watchPods(listener, serviceName);
+ }
+
+ @Override
+ public void onDelete(Service service, boolean
deletedFinalStateUnknown) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Received Service Delete
Event. " +
+ "Current pod name: " +
currentHostname);
+ }
+ }
+ }
+ );
+
+ SERVICE_INFORMER.put(serviceName, serviceInformer);
}
- private void notifyServiceChanged(String serviceName,
ServiceInstancesChangedListener listener) {
+ private void notifyServiceChanged(String serviceName,
ServiceInstancesChangedListener listener, List<ServiceInstance>
serviceInstanceList) {
long receivedTime = System.nanoTime();
ServiceInstancesChangedEvent event;
- event = new ServiceInstancesChangedEvent(serviceName,
getInstances(serviceName));
+ event = new ServiceInstancesChangedEvent(serviceName,
serviceInstanceList);
AtomicLong updateTime = SERVICE_UPDATE_TIME.get(serviceName);
long lastUpdateTime = updateTime.get();
@@ -311,9 +352,9 @@ public class KubernetesServiceDiscovery extends
AbstractServiceDiscovery {
if (logger.isInfoEnabled()) {
logger.info("Discard Service Instance Data. " +
- "Possible Cause: Newer message has been processed or Failed to
update time record by CAS. " +
- "Current Data received time: " + receivedTime + ". " +
- "Newer Data received time: " + lastUpdateTime + ".");
+ "Possible Cause: Newer message has been processed or
Failed to update time record by CAS. " +
+ "Current Data received time: " + receivedTime + ". " +
+ "Newer Data received time: " + lastUpdateTime + ".");
}
}
@@ -336,25 +377,25 @@ public class KubernetesServiceDiscovery extends
AbstractServiceDiscovery {
return new LinkedList<>();
}
Map<String, Pod> pods = kubernetesClient
- .pods()
- .inNamespace(namespace)
- .withLabels(serviceSelector)
- .list()
- .getItems()
- .stream()
- .collect(
- Collectors.toMap(
- pod -> pod.getMetadata().getName(),
- pod -> pod));
+ .pods()
+ .inNamespace(namespace)
+ .withLabels(serviceSelector)
+ .list()
+ .getItems()
+ .stream()
+ .collect(
+ Collectors.toMap(
+ pod -> pod.getMetadata().getName(),
+ pod -> pod));
List<ServiceInstance> instances = new LinkedList<>();
Set<Integer> instancePorts = new HashSet<>();
for (EndpointSubset endpointSubset : endpoints.getSubsets()) {
instancePorts.addAll(
- endpointSubset.getPorts()
- .stream().map(EndpointPort::getPort)
- .collect(Collectors.toSet()));
+ endpointSubset.getPorts()
+ .stream().map(EndpointPort::getPort)
+ .collect(Collectors.toSet()));
}
for (EndpointSubset endpointSubset : endpoints.getSubsets()) {
@@ -363,10 +404,9 @@ public class KubernetesServiceDiscovery extends
AbstractServiceDiscovery {
String ip = address.getIp();
if (pod == null) {
logger.warn("Unable to match Kubernetes Endpoint address
with Pod. " +
- "EndpointAddress Hostname: " +
address.getTargetRef().getName());
+ "EndpointAddress Hostname: " +
address.getTargetRef().getName());
continue;
}
-
instancePorts.forEach(port -> {
ServiceInstance serviceInstance = new
DefaultServiceInstance(serviceName, ip, port,
ScopeModelUtil.getApplicationModel(getUrl().getScopeModel()));
@@ -376,8 +416,8 @@ public class KubernetesServiceDiscovery extends
AbstractServiceDiscovery {
instances.add(serviceInstance);
} else {
logger.warn("Unable to find Service Instance metadata
in Pod Annotations. " +
- "Possibly cause: provider has not been initialized
successfully. " +
- "EndpointAddress Hostname: " +
address.getTargetRef().getName());
+ "Possibly cause: provider has not been
initialized successfully. " +
+ "EndpointAddress Hostname: " +
address.getTargetRef().getName());
}
});
}
diff --git
a/dubbo-kubernetes/src/test/java/org/apache/dubbo/registry/kubernetes/KubernetesServiceDiscoveryTest.java
b/dubbo-kubernetes/src/test/java/org/apache/dubbo/registry/kubernetes/KubernetesServiceDiscoveryTest.java
index 6b1a1b0318..28aa0027c4 100644
---
a/dubbo-kubernetes/src/test/java/org/apache/dubbo/registry/kubernetes/KubernetesServiceDiscoveryTest.java
+++
b/dubbo-kubernetes/src/test/java/org/apache/dubbo/registry/kubernetes/KubernetesServiceDiscoveryTest.java
@@ -14,185 +14,220 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-//package org.apache.dubbo.registry.kubernetes;
-//
-//import org.apache.dubbo.common.URL;
-//import org.apache.dubbo.registry.client.DefaultServiceInstance;
-//import org.apache.dubbo.registry.client.ServiceInstance;
-//import org.apache.dubbo.registry.client.event.ServiceInstancesChangedEvent;
-//import
org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener;
-//import org.apache.dubbo.registry.kubernetes.util.KubernetesClientConst;
-//import org.apache.dubbo.rpc.model.ApplicationModel;
-//import org.apache.dubbo.rpc.model.ScopeModelUtil;
-//
-//import io.fabric8.kubernetes.api.model.Endpoints;
-//import io.fabric8.kubernetes.api.model.EndpointsBuilder;
-//import io.fabric8.kubernetes.api.model.Pod;
-//import io.fabric8.kubernetes.api.model.PodBuilder;
-//import io.fabric8.kubernetes.api.model.Service;
-//import io.fabric8.kubernetes.api.model.ServiceBuilder;
-//import io.fabric8.kubernetes.client.Config;
-//import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
-//import io.fabric8.kubernetes.client.server.mock.KubernetesServer;
-//import org.junit.jupiter.api.AfterEach;
-//import org.junit.jupiter.api.Assertions;
-//import org.junit.jupiter.api.BeforeEach;
-//import org.junit.jupiter.api.Test;
-//import org.junit.jupiter.api.extension.ExtendWith;
-//import org.mockito.ArgumentCaptor;
-//import org.mockito.Mockito;
-//import org.mockito.junit.jupiter.MockitoExtension;
-//
-//import java.util.HashMap;
-//import java.util.HashSet;
-//import java.util.Map;
-//
-//@ExtendWith({MockitoExtension.class})
-//public class KubernetesServiceDiscoveryTest {
-// public KubernetesServer mockServer = new KubernetesServer(false, true);
-//
-// private NamespacedKubernetesClient mockClient;
-//
-// private ServiceInstancesChangedListener mockListener =
Mockito.mock(ServiceInstancesChangedListener.class);
-//
-// private URL serverUrl;
-//
-// private Map<String, String> selector;
-//
-// @BeforeEach
-// public void setUp() {
-// mockServer.before();
-// mockClient = mockServer.getClient();
-//
-// serverUrl = URL.valueOf(mockClient.getConfiguration().getMasterUrl())
-// .setProtocol("kubernetes")
-// .addParameter(KubernetesClientConst.USE_HTTPS, "false")
-// .addParameter(KubernetesClientConst.HTTP2_DISABLE, "true");
-// serverUrl.setScopeModel(ApplicationModel.defaultModel());
-//
-//
System.setProperty(Config.KUBERNETES_AUTH_TRYKUBECONFIG_SYSTEM_PROPERTY,
"false");
-//
System.setProperty(Config.KUBERNETES_AUTH_TRYSERVICEACCOUNT_SYSTEM_PROPERTY,
"false");
-//
-// selector = new HashMap<>(4);
-// selector.put("l", "v");
-// Pod pod = new PodBuilder()
-//
.withNewMetadata().withName("TestServer").withLabels(selector).endMetadata()
-// .build();
-//
-// Service service = new ServiceBuilder()
-// .withNewMetadata().withName("TestService").endMetadata()
-// .withNewSpec().withSelector(selector).endSpec().build();
-//
-// Endpoints endPoints = new EndpointsBuilder()
-// .withNewMetadata().withName("TestService").endMetadata()
-// .addNewSubset()
-// .addNewAddress().withIp("ip1")
-//
.withNewTargetRef().withUid("uid1").withName("TestServer").endTargetRef().endAddress()
-// .addNewPort("Test", "Test", 12345, "TCP").endSubset()
-// .build();
-//
-// mockClient.pods().create(pod);
-// mockClient.services().create(service);
-// mockClient.endpoints().create(endPoints);
-// }
-//
-// @AfterEach
-// public void destroy() {
-// mockServer.after();
-// }
-//
-// @Test
-// public void testEndpointsUpdate() throws Exception {
-//
-// KubernetesServiceDiscovery serviceDiscovery = new
KubernetesServiceDiscovery();
-// serviceDiscovery.initialize(serverUrl);
-//
-// serviceDiscovery.setCurrentHostname("TestServer");
-// serviceDiscovery.setKubernetesClient(mockClient);
-//
-// ServiceInstance serviceInstance = new
DefaultServiceInstance("TestService", "Test", 12345,
ScopeModelUtil.getApplicationModel(serviceDiscovery.getUrl().getScopeModel()));
-// serviceDiscovery.register(serviceInstance);
-//
-// HashSet<String> serviceList = new HashSet<>(4);
-// serviceList.add("TestService");
-// Mockito.when(mockListener.getServiceNames()).thenReturn(serviceList);
-// Mockito.doNothing().when(mockListener).onEvent(Mockito.any());
-//
-// serviceDiscovery.addServiceInstancesChangedListener(mockListener);
-// mockClient.endpoints().withName("TestService")
-// .edit(endpoints ->
-// new EndpointsBuilder(endpoints)
-// .editFirstSubset()
-// .addNewAddress()
-// .withIp("ip2")
-//
.withNewTargetRef().withUid("uid2").withName("TestServer").endTargetRef()
-// .endAddress().endSubset()
-// .build());
-//
-// Thread.sleep(5000);
-// ArgumentCaptor<ServiceInstancesChangedEvent> eventArgumentCaptor =
-// ArgumentCaptor.forClass(ServiceInstancesChangedEvent.class);
-// Mockito.verify(mockListener,
Mockito.times(2)).onEvent(eventArgumentCaptor.capture());
-// Assertions.assertEquals(2,
eventArgumentCaptor.getValue().getServiceInstances().size());
-//
-// serviceDiscovery.unregister(serviceInstance);
-//
-// serviceDiscovery.destroy();
-// }
-//
-// @Test
-// public void testPodsUpdate() throws Exception {
-//
-// KubernetesServiceDiscovery serviceDiscovery = new
KubernetesServiceDiscovery();
-// serviceDiscovery.initialize(serverUrl);
-//
-// serviceDiscovery.setCurrentHostname("TestServer");
-// serviceDiscovery.setKubernetesClient(mockClient);
-//
-// ServiceInstance serviceInstance = new
DefaultServiceInstance("TestService", "Test", 12345,
ScopeModelUtil.getApplicationModel(serviceDiscovery.getUrl().getScopeModel()));
-// serviceDiscovery.register(serviceInstance);
-//
-// HashSet<String> serviceList = new HashSet<>(4);
-// serviceList.add("TestService");
-// Mockito.when(mockListener.getServiceNames()).thenReturn(serviceList);
-// Mockito.doNothing().when(mockListener).onEvent(Mockito.any());
-//
-// serviceDiscovery.addServiceInstancesChangedListener(mockListener);
-//
-// serviceInstance = new DefaultServiceInstance("TestService",
"Test12345", 12345,
ScopeModelUtil.getApplicationModel(serviceDiscovery.getUrl().getScopeModel()));
-// serviceDiscovery.update(serviceInstance);
-//
-// Thread.sleep(5000);
-// ArgumentCaptor<ServiceInstancesChangedEvent> eventArgumentCaptor =
-// ArgumentCaptor.forClass(ServiceInstancesChangedEvent.class);
-// Mockito.verify(mockListener,
Mockito.times(1)).onEvent(eventArgumentCaptor.capture());
-// Assertions.assertEquals(1,
eventArgumentCaptor.getValue().getServiceInstances().size());
-//
-// serviceDiscovery.unregister(serviceInstance);
-//
-// serviceDiscovery.destroy();
-// }
-//
-// @Test
-// public void testGetInstance() throws Exception {
-// KubernetesServiceDiscovery serviceDiscovery = new
KubernetesServiceDiscovery();
-// serviceDiscovery.initialize(serverUrl);
-//
-// serviceDiscovery.setCurrentHostname("TestServer");
-// serviceDiscovery.setKubernetesClient(mockClient);
-//
-// ServiceInstance serviceInstance = new
DefaultServiceInstance("TestService", "Test", 12345,
ScopeModelUtil.getApplicationModel(serviceDiscovery.getUrl().getScopeModel()));
-// serviceDiscovery.register(serviceInstance);
-//
-// serviceDiscovery.update(serviceInstance);
-//
-// Assertions.assertEquals(1, serviceDiscovery.getServices().size());
-// Assertions.assertEquals(1,
serviceDiscovery.getInstances("TestService").size());
-//
-// Assertions.assertEquals(serviceInstance,
serviceDiscovery.getLocalInstance());
-//
-// serviceDiscovery.unregister(serviceInstance);
-//
-// serviceDiscovery.destroy();
-// }
-//}
+package org.apache.dubbo.registry.kubernetes;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.config.ApplicationConfig;
+import org.apache.dubbo.registry.client.DefaultServiceInstance;
+import org.apache.dubbo.registry.client.ServiceInstance;
+import org.apache.dubbo.registry.client.event.ServiceInstancesChangedEvent;
+import
org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener;
+import org.apache.dubbo.registry.kubernetes.util.KubernetesClientConst;
+import org.apache.dubbo.rpc.model.ApplicationModel;
+import org.apache.dubbo.rpc.model.ScopeModelUtil;
+
+import io.fabric8.kubernetes.api.model.Endpoints;
+import io.fabric8.kubernetes.api.model.EndpointsBuilder;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodBuilder;
+import io.fabric8.kubernetes.api.model.Service;
+import io.fabric8.kubernetes.api.model.ServiceBuilder;
+import io.fabric8.kubernetes.client.Config;
+import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
+import io.fabric8.kubernetes.client.server.mock.KubernetesServer;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+
+import static
org.apache.dubbo.registry.kubernetes.util.KubernetesClientConst.NAMESPACE;
+
+@ExtendWith({MockitoExtension.class})
+public class KubernetesServiceDiscoveryTest {
+ private static final String SERVICE_NAME = "TestService";
+
+ private static final String POD_NAME = "TestServer";
+
+ public KubernetesServer mockServer = new KubernetesServer(false, true);
+
+ private NamespacedKubernetesClient mockClient;
+
+ private ServiceInstancesChangedListener mockListener =
Mockito.mock(ServiceInstancesChangedListener.class);
+
+ private URL serverUrl;
+
+ private Map<String, String> selector;
+
+ private KubernetesServiceDiscovery serviceDiscovery;
+
+
+ @BeforeEach
+ public void setUp() {
+ mockServer.before();
+ mockClient = mockServer.getClient().inNamespace("dubbo-demo");
+
+ ApplicationModel applicationModel = ApplicationModel.defaultModel();
+ applicationModel.getApplicationConfigManager().setApplication(new
ApplicationConfig());
+
+ serverUrl = URL.valueOf(mockClient.getConfiguration().getMasterUrl())
+ .setProtocol("kubernetes")
+ .addParameter(NAMESPACE, "dubbo-demo")
+ .addParameter(KubernetesClientConst.USE_HTTPS, "false")
+ .addParameter(KubernetesClientConst.HTTP2_DISABLE, "true");
+ serverUrl.setScopeModel(applicationModel);
+
+ this.serviceDiscovery = new
KubernetesServiceDiscovery(applicationModel, serverUrl);
+
+
System.setProperty(Config.KUBERNETES_AUTH_TRYKUBECONFIG_SYSTEM_PROPERTY,
"false");
+
System.setProperty(Config.KUBERNETES_AUTH_TRYSERVICEACCOUNT_SYSTEM_PROPERTY,
"false");
+
+ selector = new HashMap<>(4);
+ selector.put("l", "v");
+ Pod pod = new PodBuilder()
+
.withNewMetadata().withName(POD_NAME).withLabels(selector).endMetadata()
+ .build();
+
+ Service service = new ServiceBuilder()
+ .withNewMetadata().withName(SERVICE_NAME).endMetadata()
+ .withNewSpec().withSelector(selector).endSpec().build();
+
+ Endpoints endPoints = new EndpointsBuilder()
+ .withNewMetadata().withName(SERVICE_NAME).endMetadata()
+ .addNewSubset()
+ .addNewAddress().withIp("ip1")
+
.withNewTargetRef().withUid("uid1").withName(POD_NAME).endTargetRef().endAddress()
+ .addNewPort("Test", "Test", 12345, "TCP").endSubset()
+ .build();
+
+ mockClient.pods().resource(pod).create();
+ mockClient.services().resource(service).create();
+ mockClient.endpoints().resource(endPoints).create();
+ }
+
+ @AfterEach
+ public void destroy() throws Exception {
+ serviceDiscovery.destroy();
+ mockClient.close();
+ mockServer.after();
+ }
+
+ @Test
+ public void testEndpointsUpdate() throws Exception {
+ serviceDiscovery.setCurrentHostname(POD_NAME);
+ serviceDiscovery.setKubernetesClient(mockClient);
+
+ ServiceInstance serviceInstance = new
DefaultServiceInstance(SERVICE_NAME, "Test", 12345,
ScopeModelUtil.getApplicationModel(serviceDiscovery.getUrl().getScopeModel()));
+
+ serviceDiscovery.doRegister(serviceInstance);
+
+ HashSet<String> serviceList = new HashSet<>(4);
+ serviceList.add(SERVICE_NAME);
+ Mockito.when(mockListener.getServiceNames()).thenReturn(serviceList);
+ Mockito.doNothing().when(mockListener).onEvent(Mockito.any());
+
+ serviceDiscovery.addServiceInstancesChangedListener(mockListener);
+ mockClient.endpoints().withName(SERVICE_NAME)
+ .edit(endpoints ->
+ new EndpointsBuilder(endpoints)
+ .editFirstSubset()
+ .addNewAddress()
+ .withIp("ip2")
+
.withNewTargetRef().withUid("uid2").withName(POD_NAME).endTargetRef()
+ .endAddress().endSubset()
+ .build());
+
+ Thread.sleep(2000);
+ ArgumentCaptor<ServiceInstancesChangedEvent> eventArgumentCaptor =
+ ArgumentCaptor.forClass(ServiceInstancesChangedEvent.class);
+ Mockito.verify(mockListener,
Mockito.times(2)).onEvent(eventArgumentCaptor.capture());
+ Assertions.assertEquals(2,
eventArgumentCaptor.getValue().getServiceInstances().size());
+
+ serviceDiscovery.doUnregister(serviceInstance);
+ }
+
+ @Test
+ public void testPodsUpdate() throws Exception {
+ serviceDiscovery.setCurrentHostname(POD_NAME);
+ serviceDiscovery.setKubernetesClient(mockClient);
+
+ ServiceInstance serviceInstance = new
DefaultServiceInstance(SERVICE_NAME, "Test", 12345,
ScopeModelUtil.getApplicationModel(serviceDiscovery.getUrl().getScopeModel()));
+
+ serviceDiscovery.doRegister(serviceInstance);
+
+ HashSet<String> serviceList = new HashSet<>(4);
+ serviceList.add(SERVICE_NAME);
+ Mockito.when(mockListener.getServiceNames()).thenReturn(serviceList);
+ Mockito.doNothing().when(mockListener).onEvent(Mockito.any());
+
+ serviceDiscovery.addServiceInstancesChangedListener(mockListener);
+
+ serviceInstance = new DefaultServiceInstance(SERVICE_NAME,
"Test12345", 12345,
ScopeModelUtil.getApplicationModel(serviceDiscovery.getUrl().getScopeModel()));
+ serviceDiscovery.doUpdate(serviceInstance);
+
+ Thread.sleep(2000);
+ ArgumentCaptor<ServiceInstancesChangedEvent> eventArgumentCaptor =
+ ArgumentCaptor.forClass(ServiceInstancesChangedEvent.class);
+ Mockito.verify(mockListener,
Mockito.times(1)).onEvent(eventArgumentCaptor.capture());
+ Assertions.assertEquals(1,
eventArgumentCaptor.getValue().getServiceInstances().size());
+
+ serviceDiscovery.doUnregister(serviceInstance);
+ }
+
+ @Test
+ public void testServiceUpdate() throws Exception {
+ serviceDiscovery.setCurrentHostname(POD_NAME);
+ serviceDiscovery.setKubernetesClient(mockClient);
+
+ ServiceInstance serviceInstance = new
DefaultServiceInstance(SERVICE_NAME, "Test", 12345,
ScopeModelUtil.getApplicationModel(serviceDiscovery.getUrl().getScopeModel()));
+
+ serviceDiscovery.doRegister(serviceInstance);
+
+ HashSet<String> serviceList = new HashSet<>(4);
+ serviceList.add(SERVICE_NAME);
+ Mockito.when(mockListener.getServiceNames()).thenReturn(serviceList);
+ Mockito.doNothing().when(mockListener).onEvent(Mockito.any());
+
+ serviceDiscovery.addServiceInstancesChangedListener(mockListener);
+
+ selector.put("app", "test");
+ mockClient.services().withName(SERVICE_NAME)
+ .edit(service -> new ServiceBuilder(service)
+ .editSpec()
+ .addToSelector(selector)
+ .endSpec()
+ .build());
+
+ Thread.sleep(2000);
+ ArgumentCaptor<ServiceInstancesChangedEvent> eventArgumentCaptor =
+ ArgumentCaptor.forClass(ServiceInstancesChangedEvent.class);
+ Mockito.verify(mockListener,
Mockito.times(1)).onEvent(eventArgumentCaptor.capture());
+ Assertions.assertEquals(1,
eventArgumentCaptor.getValue().getServiceInstances().size());
+
+ serviceDiscovery.doUnregister(serviceInstance);
+ }
+
+ @Test
+ public void testGetInstance() {
+ serviceDiscovery.setCurrentHostname(POD_NAME);
+ serviceDiscovery.setKubernetesClient(mockClient);
+
+ ServiceInstance serviceInstance = new
DefaultServiceInstance(SERVICE_NAME, "Test", 12345,
ScopeModelUtil.getApplicationModel(serviceDiscovery.getUrl().getScopeModel()));
+
+ serviceDiscovery.doRegister(serviceInstance);
+
+ serviceDiscovery.doUpdate(serviceInstance);
+
+ Assertions.assertEquals(1, serviceDiscovery.getServices().size());
+ Assertions.assertEquals(1,
serviceDiscovery.getInstances(SERVICE_NAME).size());
+
+ serviceDiscovery.doUnregister(serviceInstance);
+ }
+}