This is an automated email from the ASF dual-hosted git repository.
liujun pushed a commit to branch feature/xds
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/feature/xds by this push:
new 623ca4bd82 【xDS】Run through the process & Add example (#14953)
623ca4bd82 is described below
commit 623ca4bd82ffba55549541bb71e3fc8d89ecc930
Author: huajiao-hjyp <[email protected]>
AuthorDate: Wed Dec 4 09:54:30 2024 +0800
【xDS】Run through the process & Add example (#14953)
---
.../org/apache/dubbo/config/ReferenceConfig.java | 1 +
dubbo-demo/dubbo-demo-xds/debug-document.md | 104 +++++++++
.../dubbo-demo-xds-consumer/Dockerfile | 1 +
.../xds/demo/consumer/XdsConsumerApplication.java | 8 +-
.../src/main/resources/application.yml | 2 +-
.../src/main/resources/bootstrap.json | 8 +-
.../dubbo-demo-xds-provider/Dockerfile | 1 +
dubbo-demo/dubbo-demo-xds/images/1.png | Bin 0 -> 185699 bytes
dubbo-demo/dubbo-demo-xds/images/2.png | Bin 0 -> 118355 bytes
dubbo-demo/dubbo-demo-xds/images/3.png | Bin 0 -> 366503 bytes
dubbo-demo/dubbo-demo-xds/images/4.png | Bin 0 -> 295934 bytes
dubbo-demo/dubbo-demo-xds/images/5.png | Bin 0 -> 409066 bytes
dubbo-demo/dubbo-demo-xds/pom.xml | 3 +-
dubbo-demo/dubbo-demo-xds/port_forward.sh | 2 +-
dubbo-demo/dubbo-demo-xds/service-echo.yaml | 197 ++++++++++++++++
dubbo-demo/dubbo-demo-xds/services.yaml | 6 +-
dubbo-demo/dubbo-demo-xds/services_remote.yaml | 4 +-
dubbo-demo/dubbo-demo-xds/{update.sh => start.sh} | 4 +-
.../java/org/apache/dubbo/rpc/RpcInvocation.java | 12 +
.../dubbo-spring-boot/pom.xml | 5 +
.../java/org/apache/dubbo/xds/AdsObserver.java | 22 +-
.../java/org/apache/dubbo/xds/NodeBuilder.java | 29 ++-
.../java/org/apache/dubbo/xds/PilotExchanger.java | 7 +-
.../main/java/org/apache/dubbo/xds/XdsChannel.java | 20 +-
.../dubbo/xds/XdsInitializationException.java | 2 +-
.../apache/dubbo/xds/bootstrap/BootstrapInfo.java | 133 +++++++++++
.../apache/dubbo/xds/bootstrap/Bootstrapper.java | 191 ++++++----------
.../java/org/apache/dubbo/xds/bootstrap/Node.java | 253 +++++++++++++++++++++
.../apache/dubbo/xds/directory/XdsDirectory.java | 139 +++++++----
.../dubbo/xds/resource/XdsClusterResource.java | 4 +
.../dubbo/xds/resource/XdsListenerResource.java | 30 ++-
.../apache/dubbo/xds/resource/XdsResourceType.java | 6 +-
.../apache/dubbo/xds/resource/filter/Filter.java | 57 +++++
.../dubbo/xds/resource/update/CdsUpdate.java | 17 +-
.../org/apache/dubbo/xds/router/XdsRouter.java | 37 ++-
.../apache/dubbo/xds/test/BootstrapperlTest.java | 2 +-
pom.xml | 1 +
37 files changed, 1061 insertions(+), 247 deletions(-)
diff --git
a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ReferenceConfig.java
b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ReferenceConfig.java
index c28011d7fc..14400deb46 100644
---
a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ReferenceConfig.java
+++
b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ReferenceConfig.java
@@ -720,6 +720,7 @@ public class ReferenceConfig<T> extends
ReferenceConfigBase<T> {
return;
}
boolean available = invoker.isAvailable();
+ available = true;
if (available) {
return;
}
diff --git a/dubbo-demo/dubbo-demo-xds/debug-document.md
b/dubbo-demo/dubbo-demo-xds/debug-document.md
new file mode 100644
index 0000000000..4923bfce01
--- /dev/null
+++ b/dubbo-demo/dubbo-demo-xds/debug-document.md
@@ -0,0 +1,104 @@
+# 01 环境配置
+## 1.1 安装Docker Desktop
+前往 **Docker**
官网下载安装。[https://www.docker.com/products/docker-desktop/](https://www.docker.com/products/docker-desktop/)
+
+
+安装完成后,在 **Docker Desktop**中点击 **设置**-> **kubernetes**-> **Enable
kubernetes**开启k8s集群。
+
+> 注意: Mac 开启 k8s 集群时可能会存在拉取镜像问题,解决方法可参考
[https://blog.csdn.net/qq_43705697/article/details/143894239](https://blog.csdn.net/qq_43705697/article/details/143894239)
+>
+
+## 1.2 安装istio
+下载对应的 istioctl 安装包
[https://github.com/istio/istio/releases](https://github.com/istio/istio/releases)
+
+进入到下载包所在路径,执行命令`istioctl install`进行安装。
+
+
+
+> 注意:若 Mac电脑 安装过程中提示无法校验安全性,此时先不要关闭弹出窗口,只需要打开
「设置」-「隐私与安全性」-「仍要运行」,随后再执行一次`istioctl install` 命令,就会看到一个弹窗,点击打开,即可安装。
+
+# 02 远程K8s调试示例
+## 2.1 开启镜像仓库
+部署示例时会在本地打包并推送镜像,所以需要先在本地启动一个镜像仓库。
+
+执行如下命令后,会自动在本地启动一个镜像仓库容器用于存放镜像。
+
+```shell
+docker run -d -p 5000:5000 --restart=always --name local-registry registry:2
+```
+
+## 2.2 拉取&编译代码
+**1、执行命令拉取Dubbo的`feature/xds`分支**
+
+```shell
+git clone -b feature/xds https://github.com/apache/dubbo.git
+```
+
+**2、代码格式化**
+
+```shell
+mvn spotless:apply
+```
+
+**3、编译代码时跳过测试**
+
+```shell
+mvn clean install -DskipTests
+```
+
+## 2.3 运行示例
+在`dubbo/dubbo-demo/dubbo-demo-xds`目录下执行`./start.sh`命令即可运行示例。
+
+`start.sh`脚本主要完成的任务如下:
+
+1、新建名为`dubbo-demo`的`namespace`,并切换到此`namespace`。
+
+2、构建`dubbo-demo-xds-provider`和`dubbo-demo-xds-consumer`镜像,并推送至刚刚开启的本地镜像仓库。构建镜像时将`resource/bootstrap.json`文件拷贝至镜像
`/bootstrap.json`目录下,同时开启远程`debug`端口。
+
+3、通过`service.yaml`文件,创建`k8s`资源。
+
+4、端口转发,将`istiod`的`15010`端口进行转发,方便本地直连`istiod`。将`dubbo-demo-xds-consumer`服务的`31000`端口进行转发,方便远程`debug`。
+
+## 2.4 IDEA开启远程debug
+运行`start.sh`脚本后,通过`Docker
Desktop`查看对应`pod`日志,可以看到`dubbo-demo-xds-provider`服务会自动运行,而`dubbo-demo-xds-consumer`服务暂时挂起,等待调试中。此时需要编辑本地`idea调试配置`,增加断点,即可开始调试。
+
+**1、编辑调试配置**
+
+
+
+**2、新增`Remote JVM
Debug`类型的配置,端口设置为`31000`,`module`选择`dubbo-demo-xds-consumer`。**
+
+
+
+**3、新增断点后,点击调试按钮,即可进行远程调试。**
+
+---
+
+**特别说明**
+
+1、`dubbo-demo-xds-consumer`服务挂起的原因是因为通过`service.yaml`文件部署资源时设置了`suspen=y`,如果仅仅是运行示例,不需要调试,可以修改为`suspend=n`,编译代码后,重新执行`start.sh`进行部署,此时会看到两个服务都会启动。
+
+
+
+2、对于开发人员,每次修改`dubbo-xds`模块代码后,都需要重新执行`mvn spotless:apply`代码格式化,然后执行`mvn clean
install -DskipTests`编译打包,最后执行`./start.sh`构建镜像,重新部署容器。
+
+# 03 本地调试
+上面的示例我们将`provider`和`consumer`服务都部署在了`K8s`中进行远程调试,但是这样有个缺点:一旦更改了`dubbo-xds`模块中的代码,都需要重新编译打包整个项目,耗时较长。
+
+所以现在介绍一种效率更高的开发方法,修改代码后直接点击调试即可,不需要重新编译打包部署。
+
+但这种方式只能用于调试资源加载过程,实际调用`k8s`中的`provider`会因为网络访问不到而失败。
+
+原理:仍然在`k8s`中部署`provider`服务,但是`consumer`服务在本地`IDEA`中进行启动,同时转发`istiod`服务的`15010`端口,确保可以从`istiod`中获取`xds`资源。
+
+整体步骤如下:
+
+1、还是运行`./start.sh`将`provider`服务部署到`k8s`环境中,并且转发了`istiod`服务的`15010`端口。
+
+2、修改本地`dubbo-demo-xds-consumer/src/resource/bootstrap.json`
文件,修改`server_uri为localhost:15010`。
+
+3、在 `XdsConsumerApplication` 启动类`main()`函数中设置环境变量指明`bootstrap.json`所在路径,
`System.setProperty("GRPC_XDS_BOOTSTRAP", "修改为自己的路径")`
+
+4、点击调试即可进行本地调试。
+
+
diff --git a/dubbo-demo/dubbo-demo-xds/dubbo-demo-xds-consumer/Dockerfile
b/dubbo-demo/dubbo-demo-xds/dubbo-demo-xds-consumer/Dockerfile
index d14156fa09..1ea816e857 100644
--- a/dubbo-demo/dubbo-demo-xds/dubbo-demo-xds-consumer/Dockerfile
+++ b/dubbo-demo/dubbo-demo-xds/dubbo-demo-xds-consumer/Dockerfile
@@ -16,6 +16,7 @@
FROM openjdk:8-jdk
ARG ARTIFACT
ADD ./target/$ARTIFACT app.jar
+ADD ./src/main/resources/bootstrap.json bootstrap.json
CMD java -jar /app.jar
EXPOSE 50050
EXPOSE 31000
diff --git
a/dubbo-demo/dubbo-demo-xds/dubbo-demo-xds-consumer/src/main/java/org/apache/dubbo/xds/demo/consumer/XdsConsumerApplication.java
b/dubbo-demo/dubbo-demo-xds/dubbo-demo-xds-consumer/src/main/java/org/apache/dubbo/xds/demo/consumer/XdsConsumerApplication.java
index 9b714b7a18..68b858eb58 100644
---
a/dubbo-demo/dubbo-demo-xds/dubbo-demo-xds-consumer/src/main/java/org/apache/dubbo/xds/demo/consumer/XdsConsumerApplication.java
+++
b/dubbo-demo/dubbo-demo-xds/dubbo-demo-xds-consumer/src/main/java/org/apache/dubbo/xds/demo/consumer/XdsConsumerApplication.java
@@ -29,7 +29,7 @@ import org.springframework.stereotype.Service;
@Service
@EnableDubbo
public class XdsConsumerApplication {
- @DubboReference(providedBy = "echo:7070")
+ @DubboReference(providedBy = "dubbo-demo-xds-provider-service:50051")
private DemoService demoService;
public static void main(String[] args) throws InterruptedException {
@@ -40,8 +40,12 @@ public class XdsConsumerApplication {
// System.setProperty("NAMESPACE", "dubbo-demo");
// IstioConstant.KUBERNETES_SA_PATH =
"/Users/smzdm/hjf/xds/resources/token";
// System.setProperty(IstioConstant.PILOT_CERT_PROVIDER_KEY, "istiod");
+
+ // System.setProperty("GRPC_XDS_BOOTSTRAP",
+ //
"/Users/hejianfei/code/server/dubbo/dubbo-demo/dubbo-demo-xds/dubbo-demo-xds-consumer/src/main/resources/bootstrap.json");
ConfigurableApplicationContext context =
SpringApplication.run(XdsConsumerApplication.class, args);
XdsConsumerApplication application =
context.getBean(XdsConsumerApplication.class);
+ Thread.sleep(10000);
while (true) {
try {
String result = application.doSayHello("world");
@@ -50,7 +54,7 @@ public class XdsConsumerApplication {
} catch (Exception e) {
e.printStackTrace();
}
- Thread.sleep(2000);
+ Thread.sleep(10000);
}
}
diff --git
a/dubbo-demo/dubbo-demo-xds/dubbo-demo-xds-consumer/src/main/resources/application.yml
b/dubbo-demo/dubbo-demo-xds/dubbo-demo-xds-consumer/src/main/resources/application.yml
index 4c2144dc3a..d0fd77f560 100644
---
a/dubbo-demo/dubbo-demo-xds/dubbo-demo-xds-consumer/src/main/resources/application.yml
+++
b/dubbo-demo/dubbo-demo-xds/dubbo-demo-xds-consumer/src/main/resources/application.yml
@@ -26,6 +26,6 @@ dubbo:
name: tri
port: 50050
registry:
- address: xds://47.251.12.148:15010?security=plaintext #
istio://istiod.istio-system.svc:15012
+ address: xds://47.251.12.148:15010?security=plaintext&use-agent=true #
istio://istiod.istio-system.svc:15012
diff --git
a/dubbo-demo/dubbo-demo-xds/dubbo-demo-xds-consumer/src/main/resources/bootstrap.json
b/dubbo-demo/dubbo-demo-xds/dubbo-demo-xds-consumer/src/main/resources/bootstrap.json
index a9daf11483..b68f39e5c7 100644
---
a/dubbo-demo/dubbo-demo-xds/dubbo-demo-xds-consumer/src/main/resources/bootstrap.json
+++
b/dubbo-demo/dubbo-demo-xds/dubbo-demo-xds-consumer/src/main/resources/bootstrap.json
@@ -1,7 +1,7 @@
{
"xds_servers": [
{
- "server_uri": "47.251.12.148:15010",
+ "server_uri": "istiod.istio-system.svc:15010",
"channel_creds": [
{
"type": "insecure"
@@ -13,7 +13,7 @@
}
],
"node": {
- "id":
"sidecar~192.168.19.141~echo-v1-5764868574-whqs9.echo-grpc~echo-grpc.svc.cluster.local",
+ "id":
"sidecar~192.168.19.141~dubbo-v4-5764868574-whqs9.dubbo-demo~dubbo-demo.svc.cluster.local",
"metadata": {
"ANNOTATIONS": {
"inject.istio.io/templates": "grpc-agent",
@@ -45,8 +45,8 @@
"version": "v1"
},
"MESH_ID": "cluster.local",
- "NAME": "echo-v1-5859d7bc7d-wlb2d",
- "NAMESPACE": "echo-grpc",
+ "NAME": "dubbo-v4-5764868574-whqs9",
+ "NAMESPACE": "dubbo-demo",
"NODE_NAME": "us-west-1.192.168.19.107",
"OWNER":
"kubernetes://apis/apps/v1/namespaces/echo-grpc/deployments/echo-v1",
"PILOT_SAN": [
diff --git a/dubbo-demo/dubbo-demo-xds/dubbo-demo-xds-provider/Dockerfile
b/dubbo-demo/dubbo-demo-xds/dubbo-demo-xds-provider/Dockerfile
index 9219edcde3..958f2730cd 100644
--- a/dubbo-demo/dubbo-demo-xds/dubbo-demo-xds-provider/Dockerfile
+++ b/dubbo-demo/dubbo-demo-xds/dubbo-demo-xds-provider/Dockerfile
@@ -16,6 +16,7 @@
FROM openjdk:8-jdk
ARG ARTIFACT
ADD ./target/$ARTIFACT app.jar
+ADD ./src/main/resources/bootstrap.json bootstrap.json
CMD java -jar /app.jar
EXPOSE 50051
EXPOSE 31001
diff --git a/dubbo-demo/dubbo-demo-xds/images/1.png
b/dubbo-demo/dubbo-demo-xds/images/1.png
new file mode 100644
index 0000000000..e5810c9a92
Binary files /dev/null and b/dubbo-demo/dubbo-demo-xds/images/1.png differ
diff --git a/dubbo-demo/dubbo-demo-xds/images/2.png
b/dubbo-demo/dubbo-demo-xds/images/2.png
new file mode 100644
index 0000000000..6be646c652
Binary files /dev/null and b/dubbo-demo/dubbo-demo-xds/images/2.png differ
diff --git a/dubbo-demo/dubbo-demo-xds/images/3.png
b/dubbo-demo/dubbo-demo-xds/images/3.png
new file mode 100644
index 0000000000..bd2da9a263
Binary files /dev/null and b/dubbo-demo/dubbo-demo-xds/images/3.png differ
diff --git a/dubbo-demo/dubbo-demo-xds/images/4.png
b/dubbo-demo/dubbo-demo-xds/images/4.png
new file mode 100644
index 0000000000..d53803a6c8
Binary files /dev/null and b/dubbo-demo/dubbo-demo-xds/images/4.png differ
diff --git a/dubbo-demo/dubbo-demo-xds/images/5.png
b/dubbo-demo/dubbo-demo-xds/images/5.png
new file mode 100644
index 0000000000..1e8e70838a
Binary files /dev/null and b/dubbo-demo/dubbo-demo-xds/images/5.png differ
diff --git a/dubbo-demo/dubbo-demo-xds/pom.xml
b/dubbo-demo/dubbo-demo-xds/pom.xml
index 97d40fb7d7..7b2b6c8b95 100644
--- a/dubbo-demo/dubbo-demo-xds/pom.xml
+++ b/dubbo-demo/dubbo-demo-xds/pom.xml
@@ -19,8 +19,9 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.dubbo</groupId>
- <artifactId>dubbo-demo</artifactId>
+ <artifactId>dubbo-parent</artifactId>
<version>${revision}</version>
+ <relativePath>../../pom.xml</relativePath>
</parent>
<artifactId>dubbo-demo-xds</artifactId>
diff --git a/dubbo-demo/dubbo-demo-xds/port_forward.sh
b/dubbo-demo/dubbo-demo-xds/port_forward.sh
index 44c57bb6f8..238ff970e0 100755
--- a/dubbo-demo/dubbo-demo-xds/port_forward.sh
+++ b/dubbo-demo/dubbo-demo-xds/port_forward.sh
@@ -6,7 +6,7 @@ CONSUMER_PORT=50050
PROVIDER_DEBUG_PORT=31001
PROVIDER_PORT=50051
-kubectl port-forward $(kubectl get pods -n istio-system | grep istiod | awk
'{print $1}') 15010:15010 &
+kubectl port-forward $(kubectl get pods -n istio-system | grep istiod | awk
'{print $1}') 15010:15010 -n istio-system &
PID1=$!
kubectl port-forward deployment/dubbo-demo-xds-consumer
$CONSUMER_DEBUG_PORT:$CONSUMER_DEBUG_PORT $CONSUMER_PORT:$CONSUMER_PORT &
PID2=$!
diff --git a/dubbo-demo/dubbo-demo-xds/service-echo.yaml
b/dubbo-demo/dubbo-demo-xds/service-echo.yaml
new file mode 100644
index 0000000000..fcf95f54fd
--- /dev/null
+++ b/dubbo-demo/dubbo-demo-xds/service-echo.yaml
@@ -0,0 +1,197 @@
+apiVersion: v1
+kind: Service
+metadata:
+ labels:
+ app: echo
+ name: echo
+ namespace: echo-grpc
+spec:
+ selector:
+ app: echo
+ type: ClusterIP
+ ports:
+ - name: http
+ port: 80
+ targetPort: 18080
+ - name: grpc
+ port: 7070
+ targetPort: 17070
+ - name: tcp
+ port: 9090
+ targetPort: 19090
+---
+apiVersion: apps/v1
+kind: Deployment
+metadata:
+ name: echo-v1
+ namespace: echo-grpc
+spec:
+ replicas: 1
+ selector:
+ matchLabels:
+ app: echo
+ version: v1
+ template:
+ metadata:
+ annotations:
+ inject.istio.io/templates: grpc-agent
+ proxy.istio.io/config: '{"holdApplicationUntilProxyStarts": true}'
+ labels:
+ app: echo
+ version: v1
+ spec:
+ containers:
+ - args:
+ - --metrics=15014
+ - --port
+ - "18080"
+ - --tcp
+ - "19090"
+ - --xds-grpc-server=17070
+ - --grpc
+ - "17070"
+ - --grpc
+ - "17171"
+ - --port
+ - "3333"
+ - --port
+ - "8080"
+ - --version
+ - v1
+ - --crt=/cert.crt
+ - --key=/cert.key
+ env:
+ - name: INSTANCE_IP
+ valueFrom:
+ fieldRef:
+ apiVersion: v1
+ fieldPath: status.podIP
+ image:
registry.cn-hangzhou.aliyuncs.com/aliacs-app-catalog/asm-grpc-app:latest
+ imagePullPolicy: Always
+ livenessProbe:
+ failureThreshold: 10
+ initialDelaySeconds: 10
+ periodSeconds: 10
+ successThreshold: 1
+ tcpSocket:
+ port: tcp-health-port
+ timeoutSeconds: 1
+ name: app
+ ports:
+ - containerPort: 17070
+ protocol: TCP
+ - containerPort: 17171
+ protocol: TCP
+ - containerPort: 8080
+ protocol: TCP
+ - containerPort: 3333
+ name: tcp-health-port
+ protocol: TCP
+ readinessProbe:
+ failureThreshold: 10
+ httpGet:
+ path: /
+ port: 8080
+ scheme: HTTP
+ initialDelaySeconds: 1
+ periodSeconds: 2
+ successThreshold: 1
+ timeoutSeconds: 1
+ securityContext:
+ runAsGroup: 1338
+ runAsUser: 1338
+ startupProbe:
+ failureThreshold: 10
+ periodSeconds: 10
+ successThreshold: 1
+ tcpSocket:
+ port: tcp-health-port
+ timeoutSeconds: 1
+---
+apiVersion: apps/v1
+kind: Deployment
+metadata:
+ name: echo-v2
+ namespace: echo-grpc
+spec:
+ replicas: 1
+ selector:
+ matchLabels:
+ app: echo
+ version: v2
+ template:
+ metadata:
+ annotations:
+ inject.istio.io/templates: grpc-agent
+ proxy.istio.io/config: '{"holdApplicationUntilProxyStarts": true}'
+ labels:
+ app: echo
+ version: v2
+ spec:
+ containers:
+ - args:
+ - --metrics=15014
+ - --xds-grpc-server=17070
+ - --port
+ - "18080"
+ - --tcp
+ - "19090"
+ - --grpc
+ - "17070"
+ - --grpc
+ - "17171"
+ - --port
+ - "3333"
+ - --port
+ - "8080"
+ - --version
+ - v2
+ - --crt=/cert.crt
+ - --key=/cert.key
+ env:
+ - name: INSTANCE_IP
+ valueFrom:
+ fieldRef:
+ apiVersion: v1
+ fieldPath: status.podIP
+ image:
registry.cn-hangzhou.aliyuncs.com/aliacs-app-catalog/asm-grpc-app:latest
+ imagePullPolicy: Always
+ livenessProbe:
+ failureThreshold: 10
+ initialDelaySeconds: 10
+ periodSeconds: 10
+ successThreshold: 1
+ tcpSocket:
+ port: tcp-health-port
+ timeoutSeconds: 1
+ name: app
+ ports:
+ - containerPort: 17070
+ protocol: TCP
+ - containerPort: 17171
+ protocol: TCP
+ - containerPort: 8080
+ protocol: TCP
+ - containerPort: 3333
+ name: tcp-health-port
+ protocol: TCP
+ readinessProbe:
+ failureThreshold: 10
+ httpGet:
+ path: /
+ port: 8080
+ scheme: HTTP
+ initialDelaySeconds: 1
+ periodSeconds: 2
+ successThreshold: 1
+ timeoutSeconds: 1
+ securityContext:
+ runAsGroup: 1338
+ runAsUser: 1338
+ startupProbe:
+ failureThreshold: 10
+ periodSeconds: 10
+ successThreshold: 1
+ tcpSocket:
+ port: tcp-health-port
+ timeoutSeconds: 1
diff --git a/dubbo-demo/dubbo-demo-xds/services.yaml
b/dubbo-demo/dubbo-demo-xds/services.yaml
index e45b95eeb7..45a85f1318 100644
--- a/dubbo-demo/dubbo-demo-xds/services.yaml
+++ b/dubbo-demo/dubbo-demo-xds/services.yaml
@@ -10,7 +10,7 @@ spec:
ports:
- port: 50050
targetPort: 50050
- name: tcp
+ name: http
- port: 31000
targetPort: 31000
name: debug
@@ -29,7 +29,7 @@ spec:
ports:
- port: 50051
targetPort: 50051
- name: tcp
+ name: http
- port: 31001
targetPort: 31001
name: debug
@@ -99,7 +99,7 @@ spec:
- containerPort: 31001
env:
- name: JAVA_TOOL_OPTIONS
- value:
"-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=31001"
+ value:
"-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=31001"
replicas: 1
diff --git a/dubbo-demo/dubbo-demo-xds/services_remote.yaml
b/dubbo-demo/dubbo-demo-xds/services_remote.yaml
index 438ac15e9d..dd6ee9896a 100644
--- a/dubbo-demo/dubbo-demo-xds/services_remote.yaml
+++ b/dubbo-demo/dubbo-demo-xds/services_remote.yaml
@@ -10,7 +10,7 @@ spec:
ports:
- port: 50050
targetPort: 50050
- name: tcp
+ name: http
- port: 31000
targetPort: 31000
name: debug
@@ -29,7 +29,7 @@ spec:
ports:
- port: 50051
targetPort: 50051
- name: tcp
+ name: http
- port: 31001
targetPort: 31001
name: debug
diff --git a/dubbo-demo/dubbo-demo-xds/update.sh
b/dubbo-demo/dubbo-demo-xds/start.sh
similarity index 88%
rename from dubbo-demo/dubbo-demo-xds/update.sh
rename to dubbo-demo/dubbo-demo-xds/start.sh
index 82b3659926..90c49da3be 100755
--- a/dubbo-demo/dubbo-demo-xds/update.sh
+++ b/dubbo-demo/dubbo-demo-xds/start.sh
@@ -1,6 +1,8 @@
#!bash
-# docker run -d -p 5000:5000 --name local-registry registry:2 #Use this to
enable docker local repository
+# create dubbo-demo namespace and set context
+kubectl create namespace dubbo-demo
+kubectl config set-context --current --namespace=dubbo-demo
BASE_DIR=$(pwd)
SKIP_PACKAGE=true
diff --git
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcInvocation.java
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcInvocation.java
index 4c69c49933..63877e28b1 100644
---
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcInvocation.java
+++
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcInvocation.java
@@ -661,6 +661,18 @@ public class RpcInvocation implements Invocation,
Serializable {
}
}
+ public Object getAttachmentObject(String key) {
+ try {
+ attachmentLock.lock();
+ if (attachments == null) {
+ attachments = new HashMap<>();
+ }
+ return attachments.get(key);
+ } finally {
+ attachmentLock.unlock();
+ }
+ }
+
@Deprecated
public void setAttachments(Map<String, String> attachments) {
try {
diff --git a/dubbo-spring-boot-project/dubbo-spring-boot/pom.xml
b/dubbo-spring-boot-project/dubbo-spring-boot/pom.xml
index cbf87b0336..bf2cb21d98 100644
--- a/dubbo-spring-boot-project/dubbo-spring-boot/pom.xml
+++ b/dubbo-spring-boot-project/dubbo-spring-boot/pom.xml
@@ -87,6 +87,11 @@
<artifactId>log4j-slf4j-impl</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.dubbo</groupId>
+ <artifactId>dubbo-config-spring</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
</dependencies>
</project>
diff --git a/dubbo-xds/src/main/java/org/apache/dubbo/xds/AdsObserver.java
b/dubbo-xds/src/main/java/org/apache/dubbo/xds/AdsObserver.java
index fc76fa12f7..2c3794d0c7 100644
--- a/dubbo-xds/src/main/java/org/apache/dubbo/xds/AdsObserver.java
+++ b/dubbo-xds/src/main/java/org/apache/dubbo/xds/AdsObserver.java
@@ -21,6 +21,7 @@ import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.threadpool.manager.FrameworkExecutorRepository;
import org.apache.dubbo.rpc.model.ApplicationModel;
+import org.apache.dubbo.xds.directory.XdsResourceListener;
import org.apache.dubbo.xds.resource.XdsResourceType;
import org.apache.dubbo.xds.resource.update.ResourceUpdate;
import org.apache.dubbo.xds.resource.update.ValidatedResourceUpdate;
@@ -56,16 +57,15 @@ public class AdsObserver {
private final Map<XdsResourceType<?>, ConcurrentMap<String,
XdsRawResourceProtocol>> rawResourceListeners =
new ConcurrentHashMap<>();
-
protected StreamObserver<DiscoveryRequest> requestObserver;
private final CompletableFuture<String> future = new CompletableFuture<>();
private final Map<String, XdsResourceType<?>> subscribedResourceTypeUrls =
new HashMap<>();
- public AdsObserver(URL url, Node node) {
+ public AdsObserver(URL url) {
this.url = url;
- this.node = node;
+ this.node = NodeBuilder.build();
this.xdsChannel = new XdsChannel(url);
this.applicationModel = url.getOrDefaultApplicationModel();
}
@@ -79,13 +79,15 @@ public class AdsObserver {
}
@SuppressWarnings("unchecked")
- public <T extends ResourceUpdate> XdsRawResourceProtocol<T> addListener(
- String resourceName, XdsResourceType<T> clusterResourceType) {
+ public <T extends ResourceUpdate> void addListener(
+ String resourceName, XdsResourceType<T> resourceType,
XdsResourceListener<T> resourceListener) {
ConcurrentMap<String, XdsRawResourceProtocol> resourceListeners =
- rawResourceListeners.computeIfAbsent(clusterResourceType, k ->
new ConcurrentHashMap<>());
- return (XdsRawResourceProtocol<T>) resourceListeners.computeIfAbsent(
- resourceName,
- k -> new XdsRawResourceProtocol<>(this, NodeBuilder.build(),
clusterResourceType, applicationModel));
+ rawResourceListeners.computeIfAbsent(resourceType, k -> new
ConcurrentHashMap<>());
+
+ XdsRawResourceProtocol<T> xdsProtocol = (XdsRawResourceProtocol<T>)
resourceListeners.computeIfAbsent(
+ resourceName, k -> new XdsRawResourceProtocol<>(this, node,
resourceType, applicationModel));
+
+ xdsProtocol.subscribeResource(resourceName, resourceType,
resourceListener);
}
public void adjustResourceSubscription(XdsResourceType<?> resourceType) {
@@ -144,7 +146,7 @@ public class AdsObserver {
// Maybe Using CountDownLatch would be better
String name = Thread.currentThread().getName();
if ("main".equals(name)) {
- future.get(600, TimeUnit.SECONDS);
+ future.get(10000, TimeUnit.SECONDS);
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
diff --git a/dubbo-xds/src/main/java/org/apache/dubbo/xds/NodeBuilder.java
b/dubbo-xds/src/main/java/org/apache/dubbo/xds/NodeBuilder.java
index cb28dfe2c2..e62de50886 100644
--- a/dubbo-xds/src/main/java/org/apache/dubbo/xds/NodeBuilder.java
+++ b/dubbo-xds/src/main/java/org/apache/dubbo/xds/NodeBuilder.java
@@ -16,7 +16,8 @@
*/
package org.apache.dubbo.xds;
-import org.apache.dubbo.common.utils.NetUtils;
+import org.apache.dubbo.xds.bootstrap.BootstrapInfo;
+import org.apache.dubbo.xds.bootstrap.Bootstrapper;
import org.apache.dubbo.xds.istio.IstioEnv;
import java.util.HashMap;
@@ -28,18 +29,17 @@ import io.envoyproxy.envoy.config.core.v3.Node;
public class NodeBuilder {
- private static final String SVC_CLUSTER_LOCAL = ".svc.cluster.local";
-
public static Node build() {
- // String podName = System.getenv("metadata.name");
- // String podNamespace = System.getenv("metadata.namespace");
-
- String podName = IstioEnv.getInstance().getPodName();
- String podNamespace = IstioEnv.getInstance().getWorkloadNameSpace();
- String svcName = IstioEnv.getInstance().getIstioMetaClusterId();
+ BootstrapInfo bootstrapInfo = Bootstrapper.getInstance().bootstrap();
+ assert bootstrapInfo.getNode().getMetadata() != null;
+ String podId = bootstrapInfo.getNode().getId();
+ String podNamespace =
+ (String)
bootstrapInfo.getNode().getMetadata().getOrDefault("NAMESPACE",
"EMPTY_NAME_SPACE");
+ String clusterName = (String)
bootstrapInfo.getNode().getMetadata().getOrDefault("CLUSTER_ID", "Kubernetes");
+ String generatorName = (String)
bootstrapInfo.getNode().getMetadata().getOrDefault("GENERATOR", "grpc");
String saName = IstioEnv.getInstance().getServiceAccountName();
- Map<String, Value> metadataMap = new HashMap<>(2);
+ Map<String, Value> metadataMap = new HashMap<>();
metadataMap.put(
"ISTIO_META_NAMESPACE",
@@ -47,14 +47,19 @@ public class NodeBuilder {
metadataMap.put(
"SERVICE_ACCOUNT",
Value.newBuilder().setStringValue(saName).build());
+ metadataMap.put(
+ "GENERATOR",
Value.newBuilder().setStringValue(generatorName).build());
+ metadataMap.put(
+ "NAMESPACE",
Value.newBuilder().setStringValue(podNamespace).build());
+
Struct metadata =
Struct.newBuilder().putAllFields(metadataMap).build();
// id -> sidecar~ip~{POD_NAME}~{NAMESPACE_NAME}.svc.cluster.local
// cluster -> {SVC_NAME}
return Node.newBuilder()
.setMetadata(metadata)
- .setId("sidecar~" + NetUtils.getLocalHost() + "~" + podName +
"~" + podNamespace + SVC_CLUSTER_LOCAL)
- .setCluster(svcName)
+ .setId(podId)
+ .setCluster(clusterName)
.build();
}
}
diff --git a/dubbo-xds/src/main/java/org/apache/dubbo/xds/PilotExchanger.java
b/dubbo-xds/src/main/java/org/apache/dubbo/xds/PilotExchanger.java
index e66738ba8d..76230ad653 100644
--- a/dubbo-xds/src/main/java/org/apache/dubbo/xds/PilotExchanger.java
+++ b/dubbo-xds/src/main/java/org/apache/dubbo/xds/PilotExchanger.java
@@ -38,7 +38,7 @@ public class PilotExchanger {
protected PilotExchanger(URL url) {
this.pollingTimeout = url.getParameter("pollingTimeout", 10);
- adsObserver = new AdsObserver(url, NodeBuilder.build());
+ adsObserver = new AdsObserver(url);
this.applicationModel = url.getOrDefaultApplicationModel();
}
@@ -48,10 +48,7 @@ public class PilotExchanger {
adsObserver.saveSubscribedType(resourceType);
}
- XdsRawResourceProtocol<T> xdsProtocol =
adsObserver.addListener(resourceName, resourceType);
- if (xdsProtocol != null) {
- xdsProtocol.subscribeResource(resourceName, resourceType,
resourceListener);
- }
+ adsObserver.addListener(resourceName, resourceType, resourceListener);
}
public void unSubscribeXdsResource(String clusterName, XdsDirectory
listener) {}
diff --git a/dubbo-xds/src/main/java/org/apache/dubbo/xds/XdsChannel.java
b/dubbo-xds/src/main/java/org/apache/dubbo/xds/XdsChannel.java
index a1be472756..301c090c99 100644
--- a/dubbo-xds/src/main/java/org/apache/dubbo/xds/XdsChannel.java
+++ b/dubbo-xds/src/main/java/org/apache/dubbo/xds/XdsChannel.java
@@ -19,7 +19,7 @@ package org.apache.dubbo.xds;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
-import org.apache.dubbo.common.url.component.URLAddress;
+import org.apache.dubbo.xds.bootstrap.BootstrapInfo;
import org.apache.dubbo.xds.bootstrap.Bootstrapper;
import org.apache.dubbo.xds.security.api.CertPair;
import org.apache.dubbo.xds.security.api.CertSource;
@@ -35,9 +35,6 @@ import
io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse;
import io.grpc.ManagedChannel;
import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
-import io.grpc.netty.shaded.io.netty.channel.epoll.EpollDomainSocketChannel;
-import io.grpc.netty.shaded.io.netty.channel.epoll.EpollEventLoopGroup;
-import io.grpc.netty.shaded.io.netty.channel.unix.DomainSocketAddress;
import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext;
import
io.grpc.netty.shaded.io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.grpc.stub.StreamObserver;
@@ -94,14 +91,13 @@ public class XdsChannel {
.build();
}
} else {
- Bootstrapper bootstrapper = new Bootstrapper();
- Bootstrapper.BootstrapInfo bootstrapInfo =
bootstrapper.bootstrap();
- URLAddress address =
-
URLAddress.parse(bootstrapInfo.getServers().get(0).getTarget(), null, false);
- EpollEventLoopGroup elg = new EpollEventLoopGroup();
- managedChannel = NettyChannelBuilder.forAddress(new
DomainSocketAddress("/" + address.getPath()))
- .eventLoopGroup(elg)
- .channelType(EpollDomainSocketChannel.class)
+ BootstrapInfo bootstrapInfo =
Bootstrapper.getInstance().bootstrap();
+ String server = bootstrapInfo.getServers().get(0).getTarget();
+ // URLAddress address =
URLAddress.parse(bootstrapInfo.getServers().get(0).getTarget(), null, false);
+ // EpollEventLoopGroup elg = new EpollEventLoopGroup();
+ managedChannel = NettyChannelBuilder.forTarget(server)
+ // .eventLoopGroup(elg)
+ // .channelType(EpollDomainSocketChannel.class)
.usePlaintext()
.build();
}
diff --git
a/dubbo-xds/src/main/java/org/apache/dubbo/xds/XdsInitializationException.java
b/dubbo-xds/src/main/java/org/apache/dubbo/xds/XdsInitializationException.java
index a57def0b5d..d8c24d26fd 100644
---
a/dubbo-xds/src/main/java/org/apache/dubbo/xds/XdsInitializationException.java
+++
b/dubbo-xds/src/main/java/org/apache/dubbo/xds/XdsInitializationException.java
@@ -16,7 +16,7 @@
*/
package org.apache.dubbo.xds;
-public final class XdsInitializationException extends Exception {
+public final class XdsInitializationException extends RuntimeException {
public XdsInitializationException(String message) {
super(message);
diff --git
a/dubbo-xds/src/main/java/org/apache/dubbo/xds/bootstrap/BootstrapInfo.java
b/dubbo-xds/src/main/java/org/apache/dubbo/xds/bootstrap/BootstrapInfo.java
new file mode 100644
index 0000000000..4c8ff16e4e
--- /dev/null
+++ b/dubbo-xds/src/main/java/org/apache/dubbo/xds/bootstrap/BootstrapInfo.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.xds.bootstrap;
+
+import org.apache.dubbo.xds.bootstrap.Bootstrapper.AuthorityInfo;
+import org.apache.dubbo.xds.bootstrap.Bootstrapper.CertificateProviderInfo;
+import org.apache.dubbo.xds.bootstrap.Bootstrapper.ServerInfo;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
+public class BootstrapInfo {
+ private final ImmutableList<ServerInfo> servers;
+ private final Node node;
+
+ @Nullable
+ private final ImmutableMap<String, CertificateProviderInfo> certProviders;
+
+ @Nullable
+ private final String serverListenerResourceNameTemplate;
+
+ private final String clientDefaultListenerResourceNameTemplate;
+ private final ImmutableMap<String, AuthorityInfo> authorities;
+
+ private BootstrapInfo(Builder builder) {
+ this.servers = ImmutableList.copyOf(builder.servers);
+ this.node = builder.node;
+ this.certProviders = builder.certProviders == null ? null :
ImmutableMap.copyOf(builder.certProviders);
+ this.serverListenerResourceNameTemplate =
builder.serverListenerResourceNameTemplate;
+ this.clientDefaultListenerResourceNameTemplate =
builder.clientDefaultListenerResourceNameTemplate;
+ this.authorities = builder.authorities == null ? null :
ImmutableMap.copyOf(builder.authorities);
+ }
+
+ public ImmutableList<ServerInfo> getServers() {
+ return servers;
+ }
+
+ public Node getNode() {
+ return node;
+ }
+
+ @Nullable
+ public ImmutableMap<String, CertificateProviderInfo> getCertProviders() {
+ return certProviders;
+ }
+
+ @Nullable
+ public String getServerListenerResourceNameTemplate() {
+ return serverListenerResourceNameTemplate;
+ }
+
+ public String getClientDefaultListenerResourceNameTemplate() {
+ return clientDefaultListenerResourceNameTemplate;
+ }
+
+ public ImmutableMap<String, AuthorityInfo> getAuthorities() {
+ return authorities;
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static final class Builder {
+ private List<ServerInfo> servers;
+ private Node node;
+ private Map<String, CertificateProviderInfo> certProviders;
+ private String serverListenerResourceNameTemplate;
+ private String clientDefaultListenerResourceNameTemplate;
+ private Map<String, AuthorityInfo> authorities;
+
+ public Builder servers(List<ServerInfo> servers) {
+ this.servers = servers;
+ return this;
+ }
+
+ public Builder node(Node node) {
+ this.node = node;
+ return this;
+ }
+
+ public Builder certProviders(@Nullable Map<String,
CertificateProviderInfo> certProviders) {
+ this.certProviders = certProviders;
+ return this;
+ }
+
+ public Builder serverListenerResourceNameTemplate(@Nullable String
serverListenerResourceNameTemplate) {
+ this.serverListenerResourceNameTemplate =
serverListenerResourceNameTemplate;
+ return this;
+ }
+
+ public Builder clientDefaultListenerResourceNameTemplate(String
clientDefaultListenerResourceNameTemplate) {
+ this.clientDefaultListenerResourceNameTemplate =
clientDefaultListenerResourceNameTemplate;
+ return this;
+ }
+
+ public Builder authorities(Map<String, AuthorityInfo> authorities) {
+ this.authorities = authorities;
+ return this;
+ }
+
+ public BootstrapInfo build() {
+ return new BootstrapInfo(this);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "BootstrapInfo{" + "servers=" + servers + ", node=" + node + ",
certProviders=" + certProviders
+ + ", serverListenerResourceNameTemplate='" +
serverListenerResourceNameTemplate + '\''
+ + ", clientDefaultListenerResourceNameTemplate='" +
clientDefaultListenerResourceNameTemplate + '\''
+ + ", authorities=" + authorities + '}';
+ }
+}
diff --git
a/dubbo-xds/src/main/java/org/apache/dubbo/xds/bootstrap/Bootstrapper.java
b/dubbo-xds/src/main/java/org/apache/dubbo/xds/bootstrap/Bootstrapper.java
index 1da4c064a2..596c5dc014 100644
--- a/dubbo-xds/src/main/java/org/apache/dubbo/xds/bootstrap/Bootstrapper.java
+++ b/dubbo-xds/src/main/java/org/apache/dubbo/xds/bootstrap/Bootstrapper.java
@@ -19,24 +19,22 @@ package org.apache.dubbo.xds.bootstrap;
import org.apache.dubbo.xds.XdsInitializationException;
import org.apache.dubbo.xds.XdsLogger;
import org.apache.dubbo.xds.XdsLogger.XdsLogLevel;
-import org.apache.dubbo.xds.bootstrap.EnvoyProtoData.Node;
-
-import javax.annotation.Nullable;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
import io.grpc.Internal;
import io.grpc.InternalLogId;
-import io.grpc.internal.JsonParser;
import static com.google.common.base.Preconditions.checkArgument;
@@ -44,9 +42,11 @@ import static
com.google.common.base.Preconditions.checkArgument;
public class Bootstrapper {
public static final String XDSTP_SCHEME = "xdstp:";
+
+ private static Bootstrapper INSTANCE = null;
private static final String BOOTSTRAP_PATH_SYS_ENV_VAR =
"GRPC_XDS_BOOTSTRAP";
private static final String BOOTSTRAP_CONFIG_SYS_ENV_VAR =
"GRPC_XDS_BOOTSTRAP_CONFIG";
- private static final String DEFAULT_BOOTSTRAP_PATH =
"/etc/istio/proxy/grpc-bootstrap.json";
+ private static final String DEFAULT_BOOTSTRAP_PATH = "/bootstrap.json";
public static final String CLIENT_FEATURE_DISABLE_OVERPROVISIONING =
"envoy.lb.does_not_support_overprovisioning";
public static final String CLIENT_FEATURE_RESOURCE_IN_SOTW =
"xds.config.resource-in-sotw";
private static final String SERVER_FEATURE_IGNORE_RESOURCE_DELETION =
"ignore_resource_deletion";
@@ -56,7 +56,7 @@ public class Bootstrapper {
protected FileReader reader = LocalFileReader.INSTANCE;
@VisibleForTesting
- public String bootstrapPathFromEnvVar =
System.getenv(BOOTSTRAP_PATH_SYS_ENV_VAR);
+ public String bootstrapPathFromEnvVar = null;
@VisibleForTesting
public String bootstrapConfigFromEnvVar =
System.getenv(BOOTSTRAP_CONFIG_SYS_ENV_VAR);
@@ -65,7 +65,18 @@ public class Bootstrapper {
logger = XdsLogger.withLogId(InternalLogId.allocate("bootstrapper",
null));
}
- public BootstrapInfo bootstrap() throws XdsInitializationException {
+ public static Bootstrapper getInstance() {
+ if (INSTANCE == null) {
+ synchronized (Bootstrapper.class) {
+ if (INSTANCE == null) {
+ INSTANCE = new Bootstrapper();
+ }
+ }
+ }
+ return INSTANCE;
+ }
+
+ public BootstrapInfo bootstrap() {
String jsonContent;
try {
jsonContent = getJsonContent();
@@ -75,31 +86,37 @@ public class Bootstrapper {
if (jsonContent == null) {
// TODO:try loading from Dubbo control panel and user specified URL
+ return null;
}
- Map<String, ?> rawBootstrap;
+ JsonNode jsonNode;
try {
- rawBootstrap = (Map<String, ?>) JsonParser.parse(jsonContent);
+ ObjectMapper mapper = new ObjectMapper();
+ jsonNode = mapper.readTree(jsonContent);
} catch (IOException e) {
throw new XdsInitializationException("Failed to parse JSON", e);
}
-
- logger.log(XdsLogLevel.DEBUG, "Bootstrap configuration:\n{0}",
rawBootstrap);
- return null;
+ logger.log(XdsLogLevel.DEBUG, "Bootstrap configuration:\n{0}",
jsonNode);
+ return buildBootstrapInfo(jsonNode);
}
private String getJsonContent() throws IOException,
XdsInitializationException {
String jsonContent;
String filePath = null;
- // Check the default path
- if (Files.exists(Paths.get(DEFAULT_BOOTSTRAP_PATH))) {
- filePath = DEFAULT_BOOTSTRAP_PATH;
- } else if (Files.exists(Paths.get(bootstrapPathFromEnvVar))) {
- // Check environment variable and system property
- filePath = bootstrapPathFromEnvVar;
+ // Get the path of the bootstrap config via environment variable and
system property
+ bootstrapPathFromEnvVar = System.getenv(BOOTSTRAP_PATH_SYS_ENV_VAR);
+ if (bootstrapPathFromEnvVar == null) {
+ bootstrapPathFromEnvVar =
System.getProperty(BOOTSTRAP_PATH_SYS_ENV_VAR);
}
+ // Check environment variable and system property
+ if (bootstrapPathFromEnvVar != null &&
Files.exists(Paths.get(bootstrapPathFromEnvVar))) {
+ filePath = bootstrapPathFromEnvVar;
+ } else if (Files.exists(Paths.get(DEFAULT_BOOTSTRAP_PATH))) {
+ // Check the default path
+ filePath = DEFAULT_BOOTSTRAP_PATH;
+ }
if (filePath != null) {
logger.log(XdsLogLevel.INFO, "Reading bootstrap file from {0}",
filePath);
jsonContent = reader.readFile(filePath);
@@ -111,7 +128,37 @@ public class Bootstrapper {
return jsonContent;
}
- public class ServerInfo {
+ private BootstrapInfo buildBootstrapInfo(JsonNode rawBootstrap) {
+ checkArgument(!rawBootstrap.isEmpty(), "Bootstrap configuration cannot
be empty");
+
+ // parse server info
+ JsonNode jsonServer = rawBootstrap.get("xds_servers").get(0);
+ ServerInfo serverInfo = new
ServerInfo(jsonServer.get("server_uri").asText(), null, false);
+
+ // parse node info
+ JsonNode jsonNode = rawBootstrap.get("node");
+ JsonNode jsonMetadata = jsonNode.get("metadata");
+ Map<String, Object> metadata = new HashMap<>();
+ metadata.put("CLUSTER_ID", jsonMetadata.get("CLUSTER_ID").asText());
+ metadata.put(
+ "ENVOY_PROMETHEUS_PORT",
+ jsonMetadata.get("ENVOY_PROMETHEUS_PORT").asText());
+ metadata.put("ENVOY_STATUS_PORT",
jsonMetadata.get("ENVOY_STATUS_PORT").asText());
+ metadata.put("GENERATOR", jsonMetadata.get("GENERATOR").asText());
+ metadata.put("NAMESPACE", jsonMetadata.get("NAMESPACE").asText());
+
+ Node node = Node.newBuilder()
+ .setId(jsonNode.get("id").asText())
+ .setMetadata(metadata)
+ .build();
+
+ return BootstrapInfo.builder()
+ .servers(Collections.singletonList(serverInfo))
+ .node(node)
+ .build();
+ }
+
+ public static class ServerInfo {
private final String target;
private final Object implSpecificConfig;
private final boolean ignoreResourceDeletion;
@@ -206,110 +253,6 @@ public class Bootstrapper {
}
}
- public class BootstrapInfo {
- private final ImmutableList<ServerInfo> servers;
- private final Node node;
-
- @Nullable
- private final ImmutableMap<String, CertificateProviderInfo>
certProviders;
-
- @Nullable
- private final String serverListenerResourceNameTemplate;
-
- private final String clientDefaultListenerResourceNameTemplate;
- private final ImmutableMap<String, AuthorityInfo> authorities;
-
- private BootstrapInfo(Builder builder) {
- this.servers = ImmutableList.copyOf(builder.servers);
- this.node = builder.node;
- this.certProviders = builder.certProviders == null ? null :
ImmutableMap.copyOf(builder.certProviders);
- this.serverListenerResourceNameTemplate =
builder.serverListenerResourceNameTemplate;
- this.clientDefaultListenerResourceNameTemplate =
builder.clientDefaultListenerResourceNameTemplate;
- this.authorities = ImmutableMap.copyOf(builder.authorities);
- }
-
- public ImmutableList<ServerInfo> getServers() {
- return servers;
- }
-
- public Node getNode() {
- return node;
- }
-
- @Nullable
- public ImmutableMap<String, CertificateProviderInfo>
getCertProviders() {
- return certProviders;
- }
-
- @Nullable
- public String getServerListenerResourceNameTemplate() {
- return serverListenerResourceNameTemplate;
- }
-
- public String getClientDefaultListenerResourceNameTemplate() {
- return clientDefaultListenerResourceNameTemplate;
- }
-
- public ImmutableMap<String, AuthorityInfo> getAuthorities() {
- return authorities;
- }
-
- public Builder builder() {
- return new
Builder().clientDefaultListenerResourceNameTemplate("%s").authorities(ImmutableMap.of());
- }
-
- public class Builder {
- private List<ServerInfo> servers;
- private Node node;
- private Map<String, CertificateProviderInfo> certProviders;
- private String serverListenerResourceNameTemplate;
- private String clientDefaultListenerResourceNameTemplate;
- private Map<String, AuthorityInfo> authorities;
-
- public Builder servers(List<ServerInfo> servers) {
- this.servers = servers;
- return this;
- }
-
- public Builder node(Node node) {
- this.node = node;
- return this;
- }
-
- public Builder certProviders(@Nullable Map<String,
CertificateProviderInfo> certProviders) {
- this.certProviders = certProviders;
- return this;
- }
-
- public Builder serverListenerResourceNameTemplate(@Nullable String
serverListenerResourceNameTemplate) {
- this.serverListenerResourceNameTemplate =
serverListenerResourceNameTemplate;
- return this;
- }
-
- public Builder clientDefaultListenerResourceNameTemplate(String
clientDefaultListenerResourceNameTemplate) {
- this.clientDefaultListenerResourceNameTemplate =
clientDefaultListenerResourceNameTemplate;
- return this;
- }
-
- public Builder authorities(Map<String, AuthorityInfo> authorities)
{
- this.authorities = authorities;
- return this;
- }
-
- public BootstrapInfo build() {
- return new BootstrapInfo(this);
- }
- }
-
- @Override
- public String toString() {
- return "BootstrapInfo{" + "servers=" + servers + ", node=" + node
+ ", certProviders=" + certProviders
- + ", serverListenerResourceNameTemplate='" +
serverListenerResourceNameTemplate + '\''
- + ", clientDefaultListenerResourceNameTemplate='" +
clientDefaultListenerResourceNameTemplate + '\''
- + ", authorities=" + authorities + '}';
- }
- }
-
@VisibleForTesting
public void setFileReader(FileReader reader) {
this.reader = reader;
diff --git a/dubbo-xds/src/main/java/org/apache/dubbo/xds/bootstrap/Node.java
b/dubbo-xds/src/main/java/org/apache/dubbo/xds/bootstrap/Node.java
new file mode 100644
index 0000000000..b8545a8db4
--- /dev/null
+++ b/dubbo-xds/src/main/java/org/apache/dubbo/xds/bootstrap/Node.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.xds.bootstrap;
+
+import org.apache.dubbo.common.url.component.URLAddress;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.MoreObjects;
+import com.google.errorprone.annotations.CanIgnoreReturnValue;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+public class Node {
+
+ private final String id;
+ private final String cluster;
+
+ @Nullable
+ private final Map<String, Object> metadata;
+
+ @Nullable
+ private final Locality locality;
+
+ private final List<URLAddress> listeningAddresses;
+ private final String buildVersion;
+ private final String userAgentName;
+
+ @Nullable
+ private final String userAgentVersion;
+
+ private final List<String> clientFeatures;
+
+ private Node(
+ String id,
+ String cluster,
+ @Nullable Map<String, Object> metadata,
+ @Nullable Locality locality,
+ List<URLAddress> listeningAddresses,
+ String buildVersion,
+ String userAgentName,
+ @Nullable String userAgentVersion,
+ List<String> clientFeatures) {
+ this.id = checkNotNull(id, "id");
+ this.cluster = checkNotNull(cluster, "cluster");
+ this.metadata = metadata;
+ this.locality = locality;
+ this.listeningAddresses =
Collections.unmodifiableList(checkNotNull(listeningAddresses,
"listeningAddresses"));
+ this.buildVersion = checkNotNull(buildVersion, "buildVersion");
+ this.userAgentName = checkNotNull(userAgentName, "userAgentName");
+ this.userAgentVersion = userAgentVersion;
+ this.clientFeatures =
Collections.unmodifiableList(checkNotNull(clientFeatures, "clientFeatures"));
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("id", id)
+ .add("cluster", cluster)
+ .add("metadata", metadata)
+ .add("locality", locality)
+ .add("listeningAddresses", listeningAddresses)
+ .add("buildVersion", buildVersion)
+ .add("userAgentName", userAgentName)
+ .add("userAgentVersion", userAgentVersion)
+ .add("clientFeatures", clientFeatures)
+ .toString();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ Node node = (Node) o;
+ return Objects.equals(id, node.id)
+ && Objects.equals(cluster, node.cluster)
+ && Objects.equals(metadata, node.metadata)
+ && Objects.equals(locality, node.locality)
+ && Objects.equals(listeningAddresses, node.listeningAddresses)
+ && Objects.equals(buildVersion, node.buildVersion)
+ && Objects.equals(userAgentName, node.userAgentName)
+ && Objects.equals(userAgentVersion, node.userAgentVersion)
+ && Objects.equals(clientFeatures, node.clientFeatures);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ id,
+ cluster,
+ metadata,
+ locality,
+ listeningAddresses,
+ buildVersion,
+ userAgentName,
+ userAgentVersion,
+ clientFeatures);
+ }
+
+ public static final class Builder {
+ private String id = "";
+ private String cluster = "";
+
+ @Nullable
+ private Map<String, Object> metadata;
+
+ @Nullable
+ private Locality locality;
+ // TODO(sanjaypujare): eliminate usage of listening_addresses field.
+ private final List<URLAddress> listeningAddresses = new ArrayList<>();
+ private String buildVersion = "";
+ private String userAgentName = "";
+
+ @Nullable
+ private String userAgentVersion;
+
+ private final List<String> clientFeatures = new ArrayList<>();
+
+ private Builder() {}
+
+ @VisibleForTesting
+ public Node.Builder setId(String id) {
+ this.id = checkNotNull(id, "id");
+ return this;
+ }
+
+ @CanIgnoreReturnValue
+ public Node.Builder setCluster(String cluster) {
+ this.cluster = checkNotNull(cluster, "cluster");
+ return this;
+ }
+
+ @CanIgnoreReturnValue
+ public Node.Builder setMetadata(Map<String, Object> metadata) {
+ this.metadata = checkNotNull(metadata, "metadata");
+ return this;
+ }
+
+ @CanIgnoreReturnValue
+ public Node.Builder setLocality(Locality locality) {
+ this.locality = checkNotNull(locality, "locality");
+ return this;
+ }
+
+ @CanIgnoreReturnValue
+ Node.Builder addListeningAddresses(URLAddress address) {
+ listeningAddresses.add(checkNotNull(address, "address"));
+ return this;
+ }
+
+ @CanIgnoreReturnValue
+ public Node.Builder setBuildVersion(String buildVersion) {
+ this.buildVersion = checkNotNull(buildVersion, "buildVersion");
+ return this;
+ }
+
+ @CanIgnoreReturnValue
+ public Node.Builder setUserAgentName(String userAgentName) {
+ this.userAgentName = checkNotNull(userAgentName, "userAgentName");
+ return this;
+ }
+
+ @CanIgnoreReturnValue
+ public Node.Builder setUserAgentVersion(String userAgentVersion) {
+ this.userAgentVersion = checkNotNull(userAgentVersion,
"userAgentVersion");
+ return this;
+ }
+
+ @CanIgnoreReturnValue
+ public Node.Builder addClientFeatures(String clientFeature) {
+ this.clientFeatures.add(checkNotNull(clientFeature,
"clientFeature"));
+ return this;
+ }
+
+ public Node build() {
+ return new Node(
+ id,
+ cluster,
+ metadata,
+ locality,
+ listeningAddresses,
+ buildVersion,
+ userAgentName,
+ userAgentVersion,
+ clientFeatures);
+ }
+ }
+
+ public static Node.Builder newBuilder() {
+ return new Node.Builder();
+ }
+
+ public Node.Builder toBuilder() {
+ Node.Builder builder = new Node.Builder();
+ builder.id = id;
+ builder.cluster = cluster;
+ builder.metadata = metadata;
+ builder.locality = locality;
+ builder.buildVersion = buildVersion;
+ builder.listeningAddresses.addAll(listeningAddresses);
+ builder.userAgentName = userAgentName;
+ builder.userAgentVersion = userAgentVersion;
+ builder.clientFeatures.addAll(clientFeatures);
+ return builder;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public String getCluster() {
+ return cluster;
+ }
+
+ @Nullable
+ public Map<String, Object> getMetadata() {
+ return metadata;
+ }
+
+ @Nullable
+ public Locality getLocality() {
+ return locality;
+ }
+
+ public List<URLAddress> getListeningAddresses() {
+ return listeningAddresses;
+ }
+}
diff --git
a/dubbo-xds/src/main/java/org/apache/dubbo/xds/directory/XdsDirectory.java
b/dubbo-xds/src/main/java/org/apache/dubbo/xds/directory/XdsDirectory.java
index 8ed38b5814..1a917724c4 100644
--- a/dubbo-xds/src/main/java/org/apache/dubbo/xds/directory/XdsDirectory.java
+++ b/dubbo-xds/src/main/java/org/apache/dubbo/xds/directory/XdsDirectory.java
@@ -31,16 +31,15 @@ import org.apache.dubbo.rpc.cluster.router.state.BitList;
import org.apache.dubbo.xds.PilotExchanger;
import
org.apache.dubbo.xds.directory.XdsDirectory.LdsUpdateWatcher.RdsUpdateWatcher;
import org.apache.dubbo.xds.resource.XdsClusterResource;
+import org.apache.dubbo.xds.resource.XdsEndpointResource;
import org.apache.dubbo.xds.resource.XdsListenerResource;
import org.apache.dubbo.xds.resource.XdsRouteConfigureResource;
-import org.apache.dubbo.xds.resource.cluster.OutlierDetection;
import org.apache.dubbo.xds.resource.common.Locality;
import org.apache.dubbo.xds.resource.endpoint.DropOverload;
import org.apache.dubbo.xds.resource.endpoint.LbEndpoint;
import org.apache.dubbo.xds.resource.endpoint.LocalityLbEndpoints;
import org.apache.dubbo.xds.resource.filter.NamedFilterConfig;
import org.apache.dubbo.xds.resource.listener.HttpConnectionManager;
-import org.apache.dubbo.xds.resource.listener.security.UpstreamTlsContext;
import org.apache.dubbo.xds.resource.route.ClusterWeight;
import org.apache.dubbo.xds.resource.route.Route;
import org.apache.dubbo.xds.resource.route.RouteAction;
@@ -68,7 +67,7 @@ import com.google.common.collect.Sets;
public class XdsDirectory<T> extends AbstractDirectory<T> {
- private final URL url;
+ private final URL oriUrl;
private final Class<T> serviceType;
@@ -80,13 +79,19 @@ public class XdsDirectory<T> extends AbstractDirectory<T> {
private Protocol protocol;
+ // 资源存储
private final Map<String, VirtualHost> xdsVirtualHostMap = new
ConcurrentHashMap<>();
-
- private final Map<String, EdsUpdate> xdsEndpointMap = new
ConcurrentHashMap<>();
+ private final Map<String, CdsUpdate> xdsClusterMap = new
ConcurrentHashMap<>();
+ private final Map<String, EdsUpdate> xdsEdsMap = new ConcurrentHashMap<>();
+ private final Map<String, BitList<Invoker<T>>> xdsClusterInvokersMap = new
ConcurrentHashMap<>();
private static ErrorTypeAwareLogger logger =
LoggerFactory.getErrorTypeAwareLogger(XdsDirectory.class);
+ /**
+ * 监听器
+ */
private Map<String, LdsUpdateWatcher> ldsWatchers = new HashMap<>();
+
private Map<String, RdsUpdateWatcher> rdsWatchers = new HashMap<>();
private Map<String, CdsUpdateNodeDirectory> cdsWatchers = new HashMap<>();
private Map<String, EdsUpdateLeafDirectory> edsWatchers = new HashMap<>();
@@ -94,13 +99,13 @@ public class XdsDirectory<T> extends AbstractDirectory<T> {
public XdsDirectory(Directory<T> directory) {
super(directory.getUrl(), null, true, directory.getConsumerUrl());
this.serviceType = directory.getInterface();
- this.url = directory.getConsumerUrl();
- this.applicationNames = url.getParameter("provided-by").split(",");
- this.protocolName = url.getParameter("protocol", "tri");
+ this.oriUrl = directory.getConsumerUrl();
+ this.applicationNames = oriUrl.getParameter("provided-by").split(",");
+ this.protocolName = oriUrl.getParameter("protocol", "tri");
this.protocol = directory.getProtocol();
super.routerChain = directory.getRouterChain();
this.pilotExchanger =
-
url.getOrDefaultApplicationModel().getBeanFactory().getBean(PilotExchanger.class);
+
oriUrl.getOrDefaultApplicationModel().getBeanFactory().getBean(PilotExchanger.class);
// subscribe resource
for (String applicationName : applicationNames) {
@@ -115,7 +120,15 @@ public class XdsDirectory<T> extends AbstractDirectory<T> {
}
public Map<String, EdsUpdate> getXdsEndpointMap() {
- return xdsEndpointMap;
+ return xdsEdsMap;
+ }
+
+ public Map<String, CdsUpdate> getXdsClusterMap() {
+ return xdsClusterMap;
+ }
+
+ public Map<String, BitList<Invoker<T>>> getXdsClusterInvokersMap() {
+ return xdsClusterInvokersMap;
}
public Protocol getProtocol() {
@@ -133,6 +146,11 @@ public class XdsDirectory<T> extends AbstractDirectory<T> {
public List<Invoker<T>> doList(
SingleRouterChain<T> singleRouterChain, BitList<Invoker<T>>
invokers, Invocation invocation) {
+ // xds资源放在invocation带入router中
+ invocation.setAttachment("xdsVirtualHostMap", getXdsVirtualHostMap());
+ invocation.setAttachment("xdsClusterMap", getXdsClusterMap());
+ invocation.setAttachment("xdsEdsMap", getXdsEndpointMap());
+
List<Invoker<T>> result =
singleRouterChain.route(this.getConsumerUrl(), invokers, invocation);
return (List) (result == null ? BitList.emptyList() : result);
}
@@ -189,6 +207,9 @@ public class XdsDirectory<T> extends AbstractDirectory<T> {
@Override
public void onResourceUpdate(LdsUpdate update) {
+ if (update == null) {
+ return;
+ }
HttpConnectionManager httpConnectionManager =
update.getHttpConnectionManager();
List<VirtualHost> virtualHosts =
httpConnectionManager.getVirtualHosts();
String rdsName = httpConnectionManager.getRdsName();
@@ -218,7 +239,7 @@ public class XdsDirectory<T> extends AbstractDirectory<T> {
if (virtualHost == null) {
return;
}
-
+ xdsVirtualHostMap.put(applicationNames[0], virtualHost);
List<Route> routes = virtualHost.getRoutes();
// Populate all clusters to which requests can be routed to
through the virtual host.
@@ -289,15 +310,27 @@ public class XdsDirectory<T> extends AbstractDirectory<T>
{
public class CdsUpdateNodeDirectory implements
XdsResourceListener<CdsUpdate> {
@Override
public void onResourceUpdate(CdsUpdate update) {
- // 啥都不干,就是把 aggregate logicalDns eds 三种做个分类处理,其中eds的不用做什么事情
- if (update.getClusterType() == ClusterType.AGGREGATE) {
- String clusterName = update.getClusterName();
+ if (update == null) {
+ return;
+ }
+ // 根据 cluster 的类型进行相应的处理
+ if (update.getClusterType() == ClusterType.EDS) {
+ // 保存Cluster信息到map中,在route时使用
+ xdsClusterMap.put(update.getClusterName(), update);
+ String edsResourceName =
+ update.getEdsServiceName() != null ?
update.getEdsServiceName() : update.getClusterName();
+ EdsUpdateLeafDirectory edsUpdateWatcher = new
EdsUpdateLeafDirectory(update.getClusterName());
+ edsWatchers.putIfAbsent(edsResourceName, edsUpdateWatcher);
+ pilotExchanger.subscribeXdsResource(
+ edsResourceName, XdsEndpointResource.getInstance(),
edsUpdateWatcher);
+ } else if (update.getClusterType() == ClusterType.AGGREGATE) {
+ // 非叶子节点,继续请求其他cluster信息
for (String cluster : update.getPrioritizedClusterNames()) {
- // create internal node directory.
+ CdsUpdateNodeDirectory cdsUpdateWatcher = new
CdsUpdateNodeDirectory();
+ cdsWatchers.putIfAbsent(cluster, cdsUpdateWatcher);
+ pilotExchanger.subscribeXdsResource(cluster,
XdsClusterResource.getInstance(), cdsUpdateWatcher);
}
- } else if (update.getClusterType() == ClusterType.EDS) {
- // create leaf directory.
- } else {
+ } else if (update.getClusterType() == ClusterType.LOGICAL_DNS) {
}
}
@@ -310,36 +343,32 @@ public class XdsDirectory<T> extends AbstractDirectory<T>
{
*/
public class EdsUpdateLeafDirectory implements
XdsResourceListener<EdsUpdate> {
private final String clusterName;
- private final String edsResourceName;
-
- @Nullable
- protected final Long maxConcurrentRequests;
-
- @Nullable
- protected final UpstreamTlsContext tlsContext;
-
- @Nullable
- protected final OutlierDetection outlierDetection;
+ // private final String edsResourceName;
+ //
+ // @Nullable
+ // protected final Long maxConcurrentRequests;
+ //
+ // @Nullable
+ // protected final UpstreamTlsContext tlsContext;
+ //
+ // @Nullable
+ // protected final OutlierDetection outlierDetection;
private Map<Locality, String> localityPriorityNames =
Collections.emptyMap();
int priorityNameGenId = 1;
- public EdsUpdateLeafDirectory(
- String clusterName,
- String edsResourceName,
- @Nullable Long maxConcurrentRequests,
- @Nullable UpstreamTlsContext tlsContext,
- @Nullable OutlierDetection outlierDetection) {
+ public EdsUpdateLeafDirectory(String clusterName) {
this.clusterName = clusterName;
- this.edsResourceName = edsResourceName;
- this.maxConcurrentRequests = maxConcurrentRequests;
- this.tlsContext = tlsContext;
- this.outlierDetection = outlierDetection;
+ ;
}
@Override
public void onResourceUpdate(EdsUpdate update) {
+ if (update == null) {
+ return;
+ }
+ xdsEdsMap.put(update.getClusterName(), update);
Map<Locality, LocalityLbEndpoints> localityLbEndpoints =
update.getLocalityLbEndpointsMap();
List<DropOverload> dropOverloads = update.getDropPolicies();
List<URLAddress> addresses = new ArrayList<>();
@@ -369,6 +398,8 @@ public class XdsDirectory<T> extends AbstractDirectory<T> {
prioritizedLocalityWeights.get(priorityName).put(locality,
localityLbInfo.getLocalityWeight());
}
+ generateInvokersFromEndpoints(addresses);
+
sortedPriorityNames.retainAll(prioritizedLocalityWeights.keySet());
}
@@ -405,6 +436,38 @@ public class XdsDirectory<T> extends AbstractDirectory<T> {
localityPriorityNames = newNames;
return ret;
}
+
+ /**
+ * 根据endpoints生成invoker
+ * @param addresses
+ */
+ private void generateInvokersFromEndpoints(List<URLAddress> addresses)
{
+ BitList<Invoker<T>> invokers = new
BitList<>(Collections.emptyList());
+ addresses.forEach(address -> {
+ URL url = new URL(
+ protocolName,
+ address.getIp(),
+ address.getPort(),
+ serviceType.getName(),
+ oriUrl.getParameters());
+ // set cluster name
+ url = url.addParameter("clusterID", clusterName);
+ // set load balance policy
+ // url = url.addParameter("loadbalance", lbPolicy);
+ // cluster to invoker
+ Invoker<T> invoker = protocol.refer(serviceType, url);
+
+ invokers.add(invoker);
+ });
+ // TODO: Consider cases where some clients are not available
+ // TODO: Need add new api which can add invokers, because a
XdsDirectory need monitor multi clusters.
+
+ BitList<Invoker<T>> oriInvokers = getInvokers();
+ oriInvokers.addAll(invokers);
+ // 设置新的invokers到xdsCluster中
+ setInvokers(invokers);
+ refreshRouter(invokers.clone(), () -> setInvokers(invokers));
+ }
}
//
diff --git
a/dubbo-xds/src/main/java/org/apache/dubbo/xds/resource/XdsClusterResource.java
b/dubbo-xds/src/main/java/org/apache/dubbo/xds/resource/XdsClusterResource.java
index 74155f67d5..3f2468fa81 100644
---
a/dubbo-xds/src/main/java/org/apache/dubbo/xds/resource/XdsClusterResource.java
+++
b/dubbo-xds/src/main/java/org/apache/dubbo/xds/resource/XdsClusterResource.java
@@ -121,6 +121,10 @@ public class XdsClusterResource extends
XdsResourceType<CdsUpdate> {
updateBuilder.lbPolicyConfig(lbPolicyConfig);
+ // updateBuilder.clusterType(cluster.getClusterType().);
+ //
+ // updateBuilder.clusterName(cluster.getName());
+
CdsUpdate cdsUpdate = updateBuilder.build();
cdsUpdate.setRawCluster(cluster); // TODO temp solution for
compatibility
diff --git
a/dubbo-xds/src/main/java/org/apache/dubbo/xds/resource/XdsListenerResource.java
b/dubbo-xds/src/main/java/org/apache/dubbo/xds/resource/XdsListenerResource.java
index 6a233c436a..b99fa66959 100644
---
a/dubbo-xds/src/main/java/org/apache/dubbo/xds/resource/XdsListenerResource.java
+++
b/dubbo-xds/src/main/java/org/apache/dubbo/xds/resource/XdsListenerResource.java
@@ -20,12 +20,10 @@ import org.apache.dubbo.common.lang.Nullable;
import org.apache.dubbo.xds.resource.common.CidrRange;
import org.apache.dubbo.xds.resource.common.ConfigOrError;
import org.apache.dubbo.xds.resource.exception.ResourceInvalidException;
-import org.apache.dubbo.xds.resource.filter.ClientFilter;
import org.apache.dubbo.xds.resource.filter.Filter;
import org.apache.dubbo.xds.resource.filter.FilterConfig;
import org.apache.dubbo.xds.resource.filter.FilterRegistry;
import org.apache.dubbo.xds.resource.filter.NamedFilterConfig;
-import org.apache.dubbo.xds.resource.filter.ServerFilter;
import org.apache.dubbo.xds.resource.filter.router.RouterFilter;
import org.apache.dubbo.xds.resource.listener.FilterChain;
import org.apache.dubbo.xds.resource.listener.FilterChainMatch;
@@ -555,14 +553,26 @@ public class XdsListenerResource extends
XdsResourceType<LdsUpdate> {
return StructOrError.fromError("HttpFilter [" + filterName + "]
contains invalid proto: " + e);
}
Filter filter = filterRegistry.get(typeUrl);
- if ((isForClient && !(filter instanceof ClientFilter)) ||
(!isForClient && !(filter instanceof ServerFilter))) {
- if (isOptional) {
- return null;
- } else {
- return StructOrError.fromError("HttpFilter [" + filterName +
"](" + typeUrl
- + ") is required but unsupported for " + (isForClient
? "client" : "server"));
- }
- }
+ // if ((isForClient && !(filter instanceof ClientFilter)) ||
(!isForClient && !(filter instanceof
+ // ServerFilter))) {
+ // if (isOptional) {
+ // return null;
+ // } else {
+ // return StructOrError.fromError("HttpFilter [" + filterName +
"](" + typeUrl
+ // + ") is required but unsupported for " +
(isForClient ? "client" : "server"));
+ // }
+ // }
+
+ // if ((isForClient && !(filter instanceof
Filter.ClientInterceptorBuilder))
+ // || (!isForClient && !(filter instanceof
Filter.ServerInterceptorBuilder))) {
+ // if (isOptional) {
+ // return null;
+ // } else {
+ // return StructOrError.fromError(
+ // "HttpFilter [" + filterName + "](" + typeUrl + ") is
required but unsupported for "
+ // + (isForClient ? "client" : "server"));
+ // }
+ // }
ConfigOrError<? extends FilterConfig> filterConfig =
filter.parseFilterConfig(rawConfig);
if (filterConfig.errorDetail != null) {
return StructOrError.fromError(
diff --git
a/dubbo-xds/src/main/java/org/apache/dubbo/xds/resource/XdsResourceType.java
b/dubbo-xds/src/main/java/org/apache/dubbo/xds/resource/XdsResourceType.java
index 2f014f636b..5080e12121 100644
--- a/dubbo-xds/src/main/java/org/apache/dubbo/xds/resource/XdsResourceType.java
+++ b/dubbo-xds/src/main/java/org/apache/dubbo/xds/resource/XdsResourceType.java
@@ -19,7 +19,7 @@ package org.apache.dubbo.xds.resource;
import org.apache.dubbo.common.lang.Nullable;
import org.apache.dubbo.common.utils.Assert;
import org.apache.dubbo.common.utils.StringUtils;
-import org.apache.dubbo.xds.bootstrap.Bootstrapper;
+import org.apache.dubbo.xds.bootstrap.BootstrapInfo;
import org.apache.dubbo.xds.bootstrap.Bootstrapper.ServerInfo;
import org.apache.dubbo.xds.resource.exception.ResourceInvalidException;
import org.apache.dubbo.xds.resource.filter.FilterRegistry;
@@ -88,7 +88,7 @@ public abstract class XdsResourceType<T extends
ResourceUpdate> {
final ServerInfo serverInfo;
final String versionInfo;
final String nonce;
- final Bootstrapper.BootstrapInfo bootstrapInfo;
+ final BootstrapInfo bootstrapInfo;
final FilterRegistry filterRegistry;
final LoadBalancerRegistry loadBalancerRegistry;
final TlsContextManager tlsContextManager;
@@ -102,7 +102,7 @@ public abstract class XdsResourceType<T extends
ResourceUpdate> {
ServerInfo serverInfo,
String versionInfo,
String nonce,
- Bootstrapper.BootstrapInfo bootstrapInfo,
+ BootstrapInfo bootstrapInfo,
FilterRegistry filterRegistry,
LoadBalancerRegistry loadBalancerRegistry,
TlsContextManager tlsContextManager,
diff --git
a/dubbo-xds/src/main/java/org/apache/dubbo/xds/resource/filter/Filter.java
b/dubbo-xds/src/main/java/org/apache/dubbo/xds/resource/filter/Filter.java
index 0fc9fca5d7..809833fcc4 100644
--- a/dubbo-xds/src/main/java/org/apache/dubbo/xds/resource/filter/Filter.java
+++ b/dubbo-xds/src/main/java/org/apache/dubbo/xds/resource/filter/Filter.java
@@ -44,4 +44,61 @@ public interface Filter {
* {@link com.google.protobuf.Any} or a {@link com.google.protobuf.Struct}.
*/
ConfigOrError<? extends FilterConfig> parseFilterConfigOverride(Message
rawProtoMessage);
+
+ // interface FilterConfig {
+ // String typeUrl();
+ // }
+
+ /// ** Uses the FilterConfigs produced above to produce an HTTP filter
interceptor for clients. */
+ // interface ClientInterceptorBuilder {
+ // @Nullable
+ // ClientInterceptor buildClientInterceptor(
+ // FilterConfig config, @Nullable FilterConfig overrideConfig,
PickSubchannelArgs args,
+ // ScheduledExecutorService scheduler);
+ // }
+
+ /// ** Uses the FilterConfigs produced above to produce an HTTP filter
interceptor for the server. */
+ // interface ServerInterceptorBuilder {
+ // @Nullable
+ // ServerInterceptor buildServerInterceptor(
+ // FilterConfig config, @Nullable FilterConfig overrideConfig);
+ // }
+ //
+ /// ** Filter config with instance name. */
+ // final class NamedFilterConfig {
+ // // filter instance name
+ // final String name;
+ // final FilterConfig filterConfig;
+ //
+ // NamedFilterConfig(String name, FilterConfig filterConfig) {
+ // this.name = name;
+ // this.filterConfig = filterConfig;
+ // }
+ //
+ // @Override
+ // public boolean equals(Object o) {
+ // if (this == o) {
+ // return true;
+ // }
+ // if (o == null || getClass() != o.getClass()) {
+ // return false;
+ // }
+ // NamedFilterConfig that = (NamedFilterConfig) o;
+ // return Objects.equals(name, that.name)
+ // && Objects.equals(filterConfig, that.filterConfig);
+ // }
+ //
+ // @Override
+ // public int hashCode() {
+ // return Objects.hash(name, filterConfig);
+ // }
+ //
+ // @Override
+ // public String toString() {
+ // return MoreObjects.toStringHelper(this)
+ // .add("name", name)
+ // .add("filterConfig", filterConfig)
+ // .toString();
+ // }
+ // }
}
diff --git
a/dubbo-xds/src/main/java/org/apache/dubbo/xds/resource/update/CdsUpdate.java
b/dubbo-xds/src/main/java/org/apache/dubbo/xds/resource/update/CdsUpdate.java
index 4f24b772d9..ad8fd0169b 100644
---
a/dubbo-xds/src/main/java/org/apache/dubbo/xds/resource/update/CdsUpdate.java
+++
b/dubbo-xds/src/main/java/org/apache/dubbo/xds/resource/update/CdsUpdate.java
@@ -136,13 +136,13 @@ public class CdsUpdate implements ResourceUpdate {
long minRingSize,
long maxRingSize,
int choiceCount,
- @Nullable String edsServiceName,
- @Nullable String dnsHostName,
- @Nullable Bootstrapper.ServerInfo lrsServerInfo,
- @Nullable Long maxConcurrentRequests,
- @Nullable UpstreamTlsContext upstreamTlsContext,
- @Nullable List<String> prioritizedClusterNames,
- @Nullable OutlierDetection outlierDetection) {
+ String edsServiceName,
+ String dnsHostName,
+ Bootstrapper.ServerInfo lrsServerInfo,
+ Long maxConcurrentRequests,
+ UpstreamTlsContext upstreamTlsContext,
+ List<String> prioritizedClusterNames,
+ OutlierDetection outlierDetection) {
this.clusterName = clusterName;
this.clusterType = clusterType;
this.lbPolicyConfig = lbPolicyConfig;
@@ -154,7 +154,8 @@ public class CdsUpdate implements ResourceUpdate {
this.lrsServerInfo = lrsServerInfo;
this.maxConcurrentRequests = maxConcurrentRequests;
this.upstreamTlsContext = upstreamTlsContext;
- this.prioritizedClusterNames = Collections.unmodifiableList(new
ArrayList<>(prioritizedClusterNames));
+ this.prioritizedClusterNames = Collections.unmodifiableList(
+ new ArrayList<>(prioritizedClusterNames == null ?
Collections.emptyList() : prioritizedClusterNames));
this.outlierDetection = outlierDetection;
}
diff --git a/dubbo-xds/src/main/java/org/apache/dubbo/xds/router/XdsRouter.java
b/dubbo-xds/src/main/java/org/apache/dubbo/xds/router/XdsRouter.java
index 64d6055bad..39483cc80a 100644
--- a/dubbo-xds/src/main/java/org/apache/dubbo/xds/router/XdsRouter.java
+++ b/dubbo-xds/src/main/java/org/apache/dubbo/xds/router/XdsRouter.java
@@ -22,14 +22,16 @@ import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.RpcException;
+import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.cluster.router.RouterSnapshotNode;
import org.apache.dubbo.rpc.cluster.router.state.AbstractStateRouter;
import org.apache.dubbo.rpc.cluster.router.state.BitList;
import org.apache.dubbo.rpc.support.RpcUtils;
-import org.apache.dubbo.xds.PilotExchanger;
import org.apache.dubbo.xds.resource.route.ClusterWeight;
import org.apache.dubbo.xds.resource.route.Route;
import org.apache.dubbo.xds.resource.route.VirtualHost;
+import org.apache.dubbo.xds.resource.update.CdsUpdate;
+import org.apache.dubbo.xds.resource.update.CdsUpdate.ClusterType;
import org.apache.dubbo.xds.resource.update.EdsUpdate;
import java.util.List;
@@ -42,15 +44,13 @@ import static org.apache.dubbo.config.Constants.MESH_KEY;
public class XdsRouter<T> extends AbstractStateRouter<T> {
- private final PilotExchanger pilotExchanger;
-
private Map<String, VirtualHost> xdsVirtualHostMap = new
ConcurrentHashMap<>();
-
- private Map<String, EdsUpdate> xdsClusterMap = new ConcurrentHashMap<>();
+ private Map<String, CdsUpdate> xdsClusterMap = new ConcurrentHashMap<>();
+ private Map<String, EdsUpdate> xdsEdsMap = new ConcurrentHashMap<>();
+ private final Map<String, BitList<Invoker<T>>> xdsClusterInvokersMap = new
ConcurrentHashMap<>();
public XdsRouter(URL url) {
super(url);
- pilotExchanger =
url.getOrDefaultApplicationModel().getBeanFactory().getBean(PilotExchanger.class);
}
@Override
@@ -69,6 +69,9 @@ public class XdsRouter<T> extends AbstractStateRouter<T> {
return invokers;
}
+ // load xds data
+ processXdsData((RpcInvocation) invocation);
+
// 1. match cluster
String matchedCluster = matchCluster(invocation);
@@ -78,11 +81,15 @@ public class XdsRouter<T> extends AbstractStateRouter<T> {
return matchedInvokers;
}
+ private void processXdsData(RpcInvocation invocation) {
+ this.xdsVirtualHostMap = (Map<String, VirtualHost>)
invocation.getAttachmentObject("xdsVirtualHostMap");
+ this.xdsClusterMap = (Map<String, CdsUpdate>)
invocation.getAttachmentObject("xdsClusterMap");
+ this.xdsEdsMap = (Map<String, EdsUpdate>)
invocation.getAttachmentObject("xdsEdsMap");
+ }
+
private String matchCluster(Invocation invocation) {
String cluster = null;
String serviceName =
invocation.getInvoker().getUrl().getParameter("provided-by");
- // VirtualHost xdsVirtualHost =
pilotExchanger.getXdsVirtualHostMap().get(serviceName);
- // FIXME
VirtualHost xdsVirtualHost = xdsVirtualHostMap.get(serviceName);
// match route
@@ -95,6 +102,8 @@ public class XdsRouter<T> extends AbstractStateRouter<T> {
if (cluster == null) {
cluster =
computeWeightCluster(xdsRoute.getRouteAction().getWeightedClusters());
}
+ CdsUpdate xdsCluster = xdsClusterMap.get(cluster);
+ cluster = findCluster(xdsCluster);
}
if (cluster != null) break;
}
@@ -102,6 +111,18 @@ public class XdsRouter<T> extends AbstractStateRouter<T> {
return cluster;
}
+ private String findCluster(CdsUpdate xdsCluster) {
+ if (ClusterType.EDS.equals(xdsCluster.getClusterType())) {
+ return xdsCluster.getEdsServiceName();
+ } else if (ClusterType.AGGREGATE.equals(xdsCluster.getClusterType())) {
+ String cluster = xdsCluster.getPrioritizedClusterNames().get(0);
+ CdsUpdate cdsUpdate = xdsClusterMap.get(cluster);
+ return findCluster(cdsUpdate);
+ } else {
+ return null;
+ }
+ }
+
private String computeWeightCluster(List<ClusterWeight> weightedClusters) {
int totalWeight = Math.max(
weightedClusters.stream().mapToInt(ClusterWeight::getWeight).sum(), 1);
diff --git
a/dubbo-xds/src/test/java/org/apache/dubbo/xds/test/BootstrapperlTest.java
b/dubbo-xds/src/test/java/org/apache/dubbo/xds/test/BootstrapperlTest.java
index c1d4c61088..48fd5f5153 100644
--- a/dubbo-xds/src/test/java/org/apache/dubbo/xds/test/BootstrapperlTest.java
+++ b/dubbo-xds/src/test/java/org/apache/dubbo/xds/test/BootstrapperlTest.java
@@ -17,8 +17,8 @@
package org.apache.dubbo.xds.test;
import org.apache.dubbo.xds.XdsInitializationException;
+import org.apache.dubbo.xds.bootstrap.BootstrapInfo;
import org.apache.dubbo.xds.bootstrap.Bootstrapper;
-import org.apache.dubbo.xds.bootstrap.Bootstrapper.BootstrapInfo;
import org.junit.After;
import org.junit.Before;
diff --git a/pom.xml b/pom.xml
index 2c066ef983..511aa0432b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -112,6 +112,7 @@
<module>dubbo-demo/dubbo-demo-api</module>
<module>dubbo-demo/dubbo-demo-spring-boot</module>
<module>dubbo-demo/dubbo-demo-spring-boot-idl</module>
+ <module>dubbo-demo/dubbo-demo-xds</module>
</modules>
<scm>