This is an automated email from the ASF dual-hosted git repository.

xuetaoli pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/dubbo-go-pixiu.git


The following commit(s) were added to refs/heads/develop by this push:
     new 96acac0c feat(ai-gateway): add nacos LLM registry support (#746)
96acac0c is described below

commit 96acac0ce3c16af5fdd0574fee003211771ca870
Author: Xuetao Li <[email protected]>
AuthorDate: Sat Nov 8 13:51:58 2025 +0800

    feat(ai-gateway): add nacos LLM registry support (#746)
    
    * feat: add nacos LLM registry support
    
    # Conflicts:
    #       pkg/model/llm.go
    #       pkg/server/cluster_manager.go
    
    * feat: update doc
    
    * import-format
    
    * feat: add api_key field to endpoint metadata and update documentation
    
    * fix: add NOSONAR comment to checkout action in sync-to-upstream.yml
---
 .github/workflows/sync-to-upstream.yml             |   2 +-
 docs/ai/endpoint.md                                |  30 +-
 docs/ai/endpoint_CN.md                             |  30 +-
 docs/ai/registry.md                                | 209 +++++++++++++
 docs/ai/registry_CN.md                             | 209 +++++++++++++
 go.mod                                             |   2 +-
 .../llmregistry/common/common.go}                  |  24 +-
 .../llmregistry/registry/base/baseregistry.go      | 117 +++++++
 .../llmregistry/registry/base/baseregistry_test.go | 227 ++++++++++++++
 .../llmregistry/registry/listener.go}              |  24 +-
 pkg/adapter/llmregistry/registry/nacos/listener.go | 336 +++++++++++++++++++++
 .../llmregistry/registry/nacos/listener_test.go    | 287 ++++++++++++++++++
 pkg/adapter/llmregistry/registry/nacos/registry.go |  94 ++++++
 pkg/adapter/llmregistry/registry/registry.go       |  56 ++++
 pkg/adapter/llmregistry/registrycenter.go          | 151 +++++++++
 pkg/cluster/retry/countbased/count_based.go        |  29 +-
 pkg/common/constant/key.go                         |   1 +
 pkg/common/constant/url.go                         |   1 +
 pkg/model/cluster.go                               |   4 +-
 pkg/pluginregistry/registry.go                     |   1 +
 pkg/server/adapter_manager.go                      |   2 +-
 pkg/server/cluster_manager.go                      |  13 +-
 22 files changed, 1784 insertions(+), 65 deletions(-)

diff --git a/.github/workflows/sync-to-upstream.yml 
b/.github/workflows/sync-to-upstream.yml
index 3349eb5e..2cc964b0 100644
--- a/.github/workflows/sync-to-upstream.yml
+++ b/.github/workflows/sync-to-upstream.yml
@@ -106,7 +106,7 @@ jobs:
       # SECURITY: Safe for pull_request_target - explicitly checks out 
BASE_BRANCH, not PR head
       - name: Checkout repository
         if: steps.check_branch.outputs.skip != 'true'
-        uses: actions/checkout@v5  # NOSONAR
+        uses: actions/checkout@v5 # NOSONAR
         with:
           fetch-depth: 0
           ref: ${{ env.BASE_BRANCH }}
diff --git a/docs/ai/endpoint.md b/docs/ai/endpoint.md
index 1852ca13..9bdf9bd3 100644
--- a/docs/ai/endpoint.md
+++ b/docs/ai/endpoint.md
@@ -36,22 +36,26 @@ The llm_meta block holds all the configuration specific to 
how the gateway shoul
 - Type: `boolean`
 - Description: Determines if the gateway should proceed to the next endpoint 
in the cluster if all retry attempts on this endpoint fail. When the value is 
`true`, and if this endpoint fails, the gateway will attempt the next available 
endpoint. When the value is `false`, and if this endpoint fails, the process 
stops, and the last error is returned to the client.
 
+`api_key`
+- Type: `string`
+- Description: The API key to be used by this endpoint. This key will be 
included in the request headers when forwarding requests to the LLM service.
+
 `retry_policy`
 
 - Type: `object`
 - Description: An object that defines the retry strategy to use if a request 
to this endpoint fails.
 
-The `retry_policy` object contains the following fields:
+   The `retry_policy` object contains the following fields:
 
-`name`
+   `name`
 
-- Type: `string`
-- Description: The name of the registered retry policy to use. The name is 
case-insensitive.
+  - Type: `string`
+  - Description: The name of the registered retry policy to use. The name is 
case-insensitive.
 
-`config`
+   `config`
 
-- Type: `object`
-- Description: A map of key-value pairs specific to the chosen retry policy 
name.
+  - Type: `object`
+  - Description: A map of key-value pairs specific to the chosen retry policy 
name.
 
 ### Available Retry Policies
 
@@ -126,11 +130,13 @@ clusters:
       # --- Primary Endpoint ---
       - id: deepseek-primary
         socket_address:
-           domains:
-              - api.deepseek.com
+          domains:
+            - api.deepseek.com
         llm_meta:
           # If all retries fail, move to the next endpoint.
           fallback: true
+          # Your API key for this endpoint.
+          api_key: "your_deepseek_api_key_here"
           # Use a robust retry strategy for the primary endpoint.
           retry_policy:
             name: ExponentialBackoff
@@ -143,11 +149,13 @@ clusters:
       # --- Fallback Endpoint ---
       - id: openai-fallback
         socket_address:
-           domains:
-              - api.openai.com/v1
+          domains:
+            - api.openai.com/v1
         llm_meta:
           # This is the last resort; do not fall back further.
           fallback: false
+          # Your API key for this endpoint.
+          api_key: "your_openai_api_key_here"
           # Use a simpler, faster retry for the fallback.
           retry_policy:
             name: CountBased
diff --git a/docs/ai/endpoint_CN.md b/docs/ai/endpoint_CN.md
index b0b07e54..39a41c1e 100644
--- a/docs/ai/endpoint_CN.md
+++ b/docs/ai/endpoint_CN.md
@@ -37,22 +37,26 @@ clusters:
     - `true`: 如果此endpoint失败,网关将尝试下一个可用的 endpoint。
     - `false`: 如果此endpoint失败,则处理停止,并将最后一个错误返回给客户端。对于 fallback 链中的最后一个 endpoint 
,此值应设置为 `false`。
 
+`api_key`
+- **类型**: `string`
+- **描述**: 此 endpoint 使用的 API 密钥。当将请求转发到 LLM 服务时,此密钥将包含在请求头中。
+
 `retry_policy`
 
 - **类型**: `object`
 - **描述**: 一个定义了当对此endpoint的请求失败时要使用的重试策略的对象。
 
-`retry_policy` 对象包含以下字段:
+   `retry_policy` 对象包含以下字段:
 
-`name`
+   `name`
 
-- **类型**: `string`
-- **描述**: 要使用的已注册重试策略的名称。名称不区分大小写。
+  - **类型**: `string`
+  - **描述**: 要使用的已注册重试策略的名称。名称不区分大小写。
 
-`config`
+   `config`
 
-- **类型**: `object`
-- **描述**: 一个键值对的映射,特定于所选的重试策略名称。
+  - **类型**: `object`
+  - **描述**: 一个键值对的映射,特定于所选的重试策略名称。
 
 ### 可用的重试策略
 
@@ -129,11 +133,13 @@ clusters:
       # --- 主endpoint ---
       - id: deepseek-primary
         socket_address:
-           domains:
-              - api.deepseek.com
+          domains:
+            - api.deepseek.com
         llm_meta:
           # 如果所有重试都失败,则移至下一个 endpoint。
           fallback: true
+          # 此 endpoint 使用的 API 密钥。
+          api_key: "your_deepseek_api_key_here"
           # 为主 endpoint 使用稳健的重试策略。
           retry_policy:
             name: ExponentialBackoff
@@ -146,11 +152,13 @@ clusters:
       # --- fallback endpoint ---
       - id: openai-fallback
         socket_address:
-           domains:
-              - api.openai.com/v1
+          domains:
+            - api.openai.com/v1
         llm_meta:
           # 这是最后的选择;不要再进一步 fallback。
           fallback: false
+          # 此 endpoint 使用的 API 密钥。
+          api_key: "your_openai_api_key_here"
           # 为 fallback endpoint 使用更简单、更快速的重试。
           retry_policy:
             name: CountBased
diff --git a/docs/ai/registry.md b/docs/ai/registry.md
new file mode 100644
index 00000000..e0a465d6
--- /dev/null
+++ b/docs/ai/registry.md
@@ -0,0 +1,209 @@
+## LLM Service Discovery and Registration
+
+English | [中文](registry_CN.md)
+
+This document aims to guide LLM service providers on how to dynamically 
register their service instances with the LLM Gateway via a Nacos registry. By 
following these guidelines, the gateway will be able to automatically discover 
your service and apply appropriate routing, retry, and fallback strategies 
based on the metadata you provide.
+
+### Registration Mechanism Overview
+
+The core mechanism of service discovery is that your LLM service registers as 
a **Nacos instance** and provides a specific set of **metadata** upon 
registration. The LLM Gateway listens for service changes in Nacos, reads this 
metadata, and dynamically converts it into a fully functional gateway 
`endpoint` configuration.
+
+A basic Nacos registration request includes the following key information:
+
+- **`ServiceName`**: The name of your service collection (e.g., 
`deepseek-service`).
+- **`Ip` & `Port`**: The network address where your service instance listens 
for traffic.
+- **`Metadata`**: A collection of key-value pairs, **which is crucial for 
configuring all gateway behaviors**.
+
+### `metadata` Configuration Fields
+
+All gateway-specific configurations are passed through the `metadata` field of 
the Nacos instance. Below are all the supported `metadata` keys and their 
descriptions.
+
+`cluster`
+
+- **Type**: `string`
+- **Required**: Yes
+- **Description**: Defines which cluster (`cluster`) in the gateway this 
endpoint should belong to. The gateway aggregates service instances with the 
same `cluster` name based on this value.
+- **Example**: `"deepseek_cluster"`
+
+`id`
+
+- **Type**: `string`
+- **Required**: Yes
+- **Description**: The unique identifier for the endpoint. It must be unique 
within its `cluster`.
+- **Example**: `"deepseek-primary-instance-1"`
+
+`ip`
+
+- **Type**: `string`
+- **Required**: No
+- **Description**: An optional parameter, useful when your service needs to 
register an address with the gateway that is different from its internal IP 
(e.g., a publicly accessible IP). If not provided, the gateway will use the 
default value `0.0.0.0`.
+- **Example**: `"203.0.113.55"`
+
+`port`
+
+- **Type**: `string` (representing an integer)
+- **Required**: No
+- **Description**: An optional parameter to override the `Port` registered by 
the Nacos instance itself. Its usage is similar to the `ip` field.
+- **Example**: `"9000"`
+
+`name`
+
+- **Type**: `string`
+- **Required**: No
+- **Description**: A human-readable name to identify this endpoint. Primarily 
used for logging and monitoring.
+- **Example**: `"DeepSeek V2 Chat (Primary)"`
+
+`address`
+
+- **Type**: `string`
+- **Required**: No
+- **Description**: A string split by comma, each string stands for a address 
+- **Example**: `"api.deepseek.com"`
+
+`llm-meta.fallback`
+
+- **Type**: `string` ("true" or "false")
+- **Required**: No, defaults to `"false"`
+- **Description**: Determines whether the gateway should proceed to the next 
endpoint in the cluster if all retry attempts on this endpoint fail.
+
+`llm-meta.api_key`
+
+- **Type**: `string` 
+- **Required**: No
+- **Description**: The API key to be used by this endpoint.
+
+`llm-meta.retry_policy.name`
+
+- **Type**: `string`
+- **Required**: No, defaults to `"NoRetry"`
+- **Description**: Specifies the name of the retry policy to use when a 
request to this endpoint fails. The name is case-insensitive.
+
+`llm-meta.retry_policy.config`
+
+- **Type**: `string` (JSON format)
+- **Required**: No
+- **Description**: A JSON string containing configuration parameters specific 
to the selected retry policy.
+
+### Retry Policy Metadata
+
+You can configure retry behavior by using a combination of 
`llm-meta.retry_policy.name` and `llm-meta.retry_policy.config`.
+
+1.  `CountBased`
+
+    - **name**: `"CountBased"`
+    - **config**: A JSON string containing a `times` field.
+        - `times` (integer): Number of retry attempts.
+    - **Example**:
+        - `llm-meta.retry_policy.name`: `"CountBased"`
+        - `llm-meta.retry_policy.config`: `{"times": 2}`
+
+2.  `ExponentialBackoff`
+
+    - **name**: `"ExponentialBackoff"`
+    - **config**: A JSON string containing `times`, `initialInterval`, 
`maxInterval`, and `multiplier` fields.
+        - `times` (integer): Number of retries.
+        - `initialInterval` (string): Initial wait duration (e.g., "200ms").
+        - `maxInterval` (string): Maximum wait duration (e.g., "5s").
+        - `multiplier` (float): The multiplier factor for the delay.
+    - **Example**:
+        - `llm-meta.retry_policy.name`: `"ExponentialBackoff"`
+        - `llm-meta.retry_policy.config`: `{"times": 3, "initialInterval": 
"200ms", "maxInterval": "5s", "multiplier": 2.0}`
+
+3.  `NoRetry`
+
+    - **name**: `"NoRetry"`
+    - **config**: This policy does not require a `config` field.
+
+### Complete Registration Example (Go)
+
+Detailed usage please refer to our [official 
samples](https://github.com/apache/dubbo-go-pixiu-samples/tree/main/llm/nacos)。
+
+The configuration file of pixiu, need to enable the adapter of 
llmregistrycenter, as follows:
+
+```yaml
+  adapters:
+    - id: test
+      name: dgp.adapter.llmregistrycenter
+      config:
+        registries:
+          nacos:
+            protocol: nacos
+            address: "127.0.0.1:8848"
+            timeout: "5s"
+            group: test_llm_registry_group
+            namespace: public
+```
+
+This example demonstrates how to register a fully-featured LLM service 
instance using the Nacos Go SDK. The instance will be configured as the primary 
endpoint in the `deepseek_cluster`, using an exponential backoff retry policy, 
and will fall back to the next service in the cluster upon failure.
+
+```go
+package main
+
+import (
+       "encoding/json"
+       "log"
+
+       "github.com/nacos-group/nacos-sdk-go/vo"
+)
+
+func main() {
+       // ... (Code for creating the Nacos client is omitted here)
+       // client, err := createNacosClient()
+
+       // 1. Prepare the JSON configuration for the retry policy
+       retryConfig := map[string]interface{}{
+               "times":           3,
+               "initialInterval": "200ms",
+               "maxInterval":     "8s",
+               "multiplier":      2.5,
+       }
+       retryConfigJSON, _ := json.Marshal(retryConfig)
+
+       // 2. Construct the metadata containing all gateway configurations
+       metadata := map[string]string{
+               // --- Core Endpoint Configuration ---
+               "cluster": "deepseek_cluster",
+               "id":      "deepseek-primary",
+               "name":    "DeepSeek V2 Chat (Primary)",
+
+               // Optional (use ip+port or address): The instance's IP and Port
+               "ip":   "203.0.113.55",
+               "port": "9000",
+
+               // Optional (use ip+port or address): address field
+               "address": "api.deepseek.com",
+
+               // --- LLM-Specific Metadata ---
+               "llm-meta.fallback": "true",
+
+               // API Keys in JSON string format
+               "llm-meta.api_keys": "key-xxxxxxxx",
+
+               // --- Retry Policy Configuration ---
+               "llm-meta.retry_policy.name":   "ExponentialBackoff",
+               "llm-meta.retry_policy.config": string(retryConfigJSON),
+       }
+
+       // 3. Register the Nacos instance
+       // Note: The Ip and Port here are the actual listening addresses of the 
service instance,
+       // while the ip and port in the metadata are the addresses you want the 
gateway to access.
+       _, err := client.RegisterInstance(vo.RegisterInstanceParam{
+               Ip:          "192.168.1.10", // The service's internal IP
+               Port:        8001,           // The service's internal port
+               ServiceName: "deepseek-service",
+               GroupName:   "DEFAULT_GROUP",
+               Ephemeral:   true,
+               Healthy:     true,
+               Weight:      10,
+               Metadata:    metadata,
+       })
+
+       if err != nil {
+               log.Fatalf("Failed to register service instance: %v", err)
+       }
+
+       log.Println("Service instance registered successfully!")
+       // ...
+}
+
+```
\ No newline at end of file
diff --git a/docs/ai/registry_CN.md b/docs/ai/registry_CN.md
new file mode 100644
index 00000000..a19e4df7
--- /dev/null
+++ b/docs/ai/registry_CN.md
@@ -0,0 +1,209 @@
+## LLM 服务发现与注册
+
+[English](registry.md) | 中文
+
+本文档旨在指导 LLM 服务提供商如何通过 Nacos 注册中心,将其服务实例动态地注册到 LLM 
网关。通过遵循这些准则,网关将能够自动发现您的服务,并根据您提供的元数据(metadata)应用相应的路由、重试和 fallback 策略。
+
+### 注册机制概述
+
+服务发现的核心机制是:您的 LLM 服务作为一个**Nacos 实例**进行注册,并在注册时提供一组特定的**元数据**。LLM 网关会监听 Nacos 
中的服务变更,读取这些元数据,并将其动态地转换为一个功能齐全的网关 `endpoint` 配置。
+
+一个基本的 Nacos 注册请求包含以下关键信息:
+
+- **`ServiceName`**: 您的服务集合的名称 (例如, `deepseek-service`)。
+- **`Ip` & `Port`**: 您的服务实例监听流量的网络地址。
+- **`Metadata`**: 一个键值对集合,**这是配置所有网关行为的关键**。
+
+### `metadata` 配置字段
+
+所有网关的特定配置都通过 Nacos 实例的 `metadata` 字段进行传递。以下是所有支持的 `metadata` 键及其说明。
+
+`cluster`
+
+- **类型**: `string`
+- **必需**: 是
+- **描述**: 定义此 endpoint 应归属到网关中的哪个集群 (`cluster`)。网关会根据此值将具有相同 `cluster` 
名称的服务实例聚合在一起。
+- **示例**: `"deepseek_cluster"`
+
+`id`
+
+- **类型**: `string`
+- **必需**: 是
+- **描述**: endpoint 的唯一标识符。它在所属的 `cluster` 内必须是唯一的。
+- **示例**: `"deepseek-primary-instance-1"`
+
+`ip`
+
+- **类型**: `string`
+- **必需**: 否
+- **描述**: 可选参数,当您的服务需要向网关注册一个不同于其内部 IP 的地址时(例如,一个可公开访问的 
IP),此字段非常有用。如果未提供,网关将使用默认值 `0.0.0.0`。
+- **示例**: `"203.0.113.55"`
+
+`port`
+
+- **类型**: `string` (代表一个整数)
+- **必需**: 否
+- **描述**: 可选参数,用于覆盖 Nacos 实例本身注册的 `Port`。用法与 `ip` 字段类似。
+- **示例**: `"9000"`
+
+`name`
+
+- **类型**: `string`
+- **必需**: 否
+- **描述**: 一个人类可读的名称,用于标识此 endpoint。主要用于日志和监控。
+- **示例**: `"DeepSeek V2 Chat (Primary)"`
+
+`address`
+
+- **类型**: `string`
+- **必需**: 否
+- **描述**: 以逗号分隔的字符串,每一个字符串代表了一个 address
+- **示例**: `"api.deepseek.com"`
+
+`llm-meta.fallback`
+
+- **类型**: `string` ("true" 或 "false")
+- **必需**: 否, 默认为 `"false"`
+- **描述**: 决定如果在此 endpoint 上的所有重试尝试都失败后,网关是否应继续处理集群中的下一个 endpoint。
+
+`llm-meta.api_key`
+
+- **类型**: `string`
+- **必需**: 否
+- **描述**: 此 endpoint 使用的 API 密钥。
+
+`llm-meta.retry_policy.name`
+
+- **类型**: `string`
+- **必需**: 否, 默认为 `"NoRetry"`
+- **描述**: 指定当对此 endpoint 的请求失败时要使用的重试策略的名称。名称不区分大小写。
+
+`llm-meta.retry_policy.config`
+
+- **类型**: `string` (JSON 格式)
+- **必需**: 否
+- **描述**: 一个 JSON 字符串,包含特定于所选重试策略的配置参数。
+
+### 重试策略元数据
+
+您可以通过组合使用 `llm-meta.retry_policy.name` 和 `llm-meta.retry_policy.config` 
来配置重试行为。
+
+1.  `CountBased` (基于次数)
+
+    - **name**: `"CountBased"`
+    - **config**: 一个包含 `times` 字段的 JSON 字符串。
+        - `times` (integer): 重试的次数。
+    - **示例**:
+        - `llm-meta.retry_policy.name`: `"CountBased"`
+        - `llm-meta.retry_policy.config`: `{"times": 2}`
+
+2.  `ExponentialBackoff` (指数退避)
+
+    - **name**: `"ExponentialBackoff"`
+    - **config**: 一个包含 `times`, `initialInterval`, `maxInterval`, 和 
`multiplier` 字段的 JSON 字符串。
+        - `times` (integer): 重试次数。
+        - `initialInterval` (string): 初始等待时长 (例如 "200ms")。
+        - `maxInterval` (string): 最大等待时长 (例如 "5s")。
+        - `multiplier` (float): 延迟时间的乘数因子。
+    - **示例**:
+        - `llm-meta.retry_policy.name`: `"ExponentialBackoff"`
+        - `llm-meta.retry_policy.config`: `{"times": 3, "initialInterval": 
"200ms", "maxInterval": "5s", "multiplier": 2.0}`
+
+3.  `NoRetry` (不重试)
+
+    - **name**: `"NoRetry"`
+    - **config**: 此策略不需要 `config` 字段。
+
+### 完整注册示例 (Go)
+
+更详细的使用方法可以参考[官方示例](https://github.com/apache/dubbo-go-pixiu-samples/tree/main/llm/nacos)。
+
+pixiu 配置文件,需要启用llmregistrycenter这个适配器,示例如下:
+
+```yaml
+  adapters:
+    - id: test
+      name: dgp.adapter.llmregistrycenter
+      config:
+        registries:
+          nacos:
+            protocol: nacos
+            address: "127.0.0.1:8848"
+            timeout: "5s"
+            group: test_llm_registry_group
+            namespace: public
+```
+
+此示例展示了如何使用 Nacos Go SDK 注册一个功能完整的 LLM 服务实例。该实例将被配置为 `deepseek_cluster` 中的主 
endpoint,使用指数退避重试策略,并在失败时 fallback 到集群中的下一个服务。
+
+```go
+package main
+
+import (
+       "encoding/json"
+       "log"
+
+       "github.com/nacos-group/nacos-sdk-go/vo"
+)
+
+func main() {
+       // ... (此处省略了创建 Nacos 客户端的代码)
+       // client, err := createNacosClient()
+
+       // 1. 准备重试策略的 JSON 配置
+       retryConfig := map[string]interface{}{
+               "times":           3,
+               "initialInterval": "200ms",
+               "maxInterval":     "8s",
+               "multiplier":      2.5,
+       }
+       retryConfigJSON, _ := json.Marshal(retryConfig)
+
+       // 2. 构造包含所有网关配置的 metadata
+       metadata := map[string]string{
+               // --- 核心 Endpoint 配置 ---
+               "cluster": "deepseek_cluster",
+               "id":      "deepseek-primary",
+               "name":    "DeepSeek V2 Chat (Primary)",
+
+               // 可选: (使用ip+port或者address): 实例的 IP 和 Port
+               "ip":   "203.0.113.55",
+               "port": "9000",
+
+               // 可选: (使用ip+port或者address): address 列表
+               "address": "api.deepseek.com",
+
+               // --- LLM 特定元数据 ---
+               "llm-meta.fallback": "true",
+
+               // 使用 JSON 字符串格式的 API Keys
+               "llm-meta.api_key": "key-xxxxxxxx",
+
+               // --- 重试策略配置 ---
+               "llm-meta.retry_policy.name":   "ExponentialBackoff",
+               "llm-meta.retry_policy.config": string(retryConfigJSON),
+       }
+
+       // 3. 注册 Nacos 实例
+       // 注意:这里的 Ip 和 Port 是服务实例的实际监听地址,
+       // 而 metadata 中的 ip 和 port 是希望网关访问的地址。
+       _, err := client.RegisterInstance(vo.RegisterInstanceParam{
+               Ip:          "192.168.1.10", // 服务的内部 IP
+               Port:        8001,           // 服务的内部端口
+               ServiceName: "deepseek-service",
+               GroupName:   "DEFAULT_GROUP",
+               Ephemeral:   true,
+               Healthy:     true,
+               Weight:      10,
+               Metadata:    metadata,
+       })
+
+       if err != nil {
+               log.Fatalf("注册服务实例失败: %v", err)
+       }
+
+       log.Println("服务实例注册成功!")
+       // ...
+}
+
+```
\ No newline at end of file
diff --git a/go.mod b/go.mod
index 9304f06e..e14a1bba 100644
--- a/go.mod
+++ b/go.mod
@@ -28,6 +28,7 @@ require (
        github.com/golang-jwt/jwt/v4 v4.5.2
        github.com/golang/mock v1.6.0
        github.com/golang/protobuf v1.5.4
+       github.com/hashicorp/go-uuid v1.0.3
        github.com/jhump/protoreflect v1.17.0
        github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible
        github.com/lestrrat-go/httprc/v3 v3.0.1
@@ -155,7 +156,6 @@ require (
        github.com/grpc-ecosystem/grpc-opentracing 
v0.0.0-20180507213350-8e809c8a8645 // indirect
        github.com/hashicorp/errwrap v1.1.0 // indirect
        github.com/hashicorp/go-multierror v1.1.1 // indirect
-       github.com/hashicorp/go-uuid v1.0.3 // indirect
        github.com/hashicorp/golang-lru v0.5.4 // indirect
        github.com/hashicorp/hcl v1.0.0 // indirect
        github.com/hashicorp/vault/sdk v0.7.0 // indirect
diff --git a/pkg/common/constant/url.go 
b/pkg/adapter/llmregistry/common/common.go
similarity index 59%
copy from pkg/common/constant/url.go
copy to pkg/adapter/llmregistry/common/common.go
index 14aeeef0..dc2ef0ef 100644
--- a/pkg/common/constant/url.go
+++ b/pkg/adapter/llmregistry/common/common.go
@@ -14,19 +14,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package constant
 
-const (
-       // RequestBody name of api config mapping from/to
-       RequestBody = "requestBody"
-       // QueryStrings name of api config mapping from/to
-       QueryStrings = "queryStrings"
-       // Headers name of api config mapping from/to
-       Headers = "headers"
-       // RequestURI name of api config mapping from/to, retrieve parameters 
from uri
-       // for instance, https://test.com/:id uri.id will retrieve the :id 
parameter
-       RequestURI = "uri"
-       // Dot defines the . which will be used to present the path to specific 
field in the body
-       Dot      = "."
-       AnyValue = "*"
+package common
+
+import (
+       "github.com/apache/dubbo-go-pixiu/pkg/model"
 )
+
+type RegistryEventListener interface {
+       OnAddEndpoint(r *model.Endpoint) error
+       OnRemoveEndpoint(r *model.Endpoint) error
+       //OnDeleteRouter(r config.Resource) error
+}
diff --git a/pkg/adapter/llmregistry/registry/base/baseregistry.go 
b/pkg/adapter/llmregistry/registry/base/baseregistry.go
new file mode 100644
index 00000000..18f2d8e8
--- /dev/null
+++ b/pkg/adapter/llmregistry/registry/base/baseregistry.go
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package baseregistry
+
+import (
+       "sync"
+)
+
+import (
+       "github.com/apache/dubbo-go-pixiu/pkg/adapter/llmregistry/common"
+       "github.com/apache/dubbo-go-pixiu/pkg/adapter/llmregistry/registry"
+)
+
+type FacadeRegistry interface {
+       // DoSubscribe subscribes the registry cluster to monitor the changes.
+       DoSubscribe() error
+       // DoUnsubscribe unsubscribes the registry cluster.
+       DoUnsubscribe() error
+}
+
+type SvcListeners struct {
+       // listeners use url.ServiceKey as the index.
+       listeners    map[string]registry.Listener
+       listenerLock sync.Mutex
+}
+
+// GetListener returns existing listener or nil
+func (s *SvcListeners) GetListener(id string) registry.Listener {
+       s.listenerLock.Lock()
+       defer s.listenerLock.Unlock()
+       listener, ok := s.listeners[id]
+       if !ok {
+               return nil
+       }
+       return listener
+}
+
+// SetListener set the listener to the registry
+func (s *SvcListeners) SetListener(id string, listener registry.Listener) {
+       s.listenerLock.Lock()
+       defer s.listenerLock.Unlock()
+       s.listeners[id] = listener
+}
+
+// RemoveListener removes the listener of the registry
+func (s *SvcListeners) RemoveListener(id string) {
+       s.listenerLock.Lock()
+       defer s.listenerLock.Unlock()
+       delete(s.listeners, id)
+}
+
+func (s *SvcListeners) GetAllListener() map[string]registry.Listener {
+       s.listenerLock.Lock()
+       defer s.listenerLock.Unlock()
+       return s.listeners
+}
+
+type BaseRegistry struct {
+       svcListeners    *SvcListeners
+       facadeRegistry  FacadeRegistry
+       AdapterListener common.RegistryEventListener
+}
+
+func NewBaseRegistry(facade FacadeRegistry, adapterListener 
common.RegistryEventListener) *BaseRegistry {
+       return &BaseRegistry{
+               facadeRegistry: facade,
+               svcListeners: &SvcListeners{
+                       listeners: make(map[string]registry.Listener),
+               },
+               AdapterListener: adapterListener,
+       }
+}
+
+// GetSvcListener returns existing listener or nil
+func (r *BaseRegistry) GetSvcListener(id string) registry.Listener {
+       return r.svcListeners.GetListener(id)
+}
+
+// GetAllSvcListener get all the listener of the registry
+func (r *BaseRegistry) GetAllSvcListener() map[string]registry.Listener {
+       return r.svcListeners.GetAllListener()
+}
+
+// SetSvcListener set the listener to the registry
+func (r *BaseRegistry) SetSvcListener(id string, listener registry.Listener) {
+       r.svcListeners.SetListener(id, listener)
+}
+
+// RemoveSvcListener remove the listener of the registry
+func (r *BaseRegistry) RemoveSvcListener(id string) {
+       r.svcListeners.RemoveListener(id)
+}
+
+// Subscribe monitors the target registry.
+func (r *BaseRegistry) Subscribe() error {
+       return r.facadeRegistry.DoSubscribe()
+}
+
+// Unsubscribe stops monitoring the target registry.
+func (r *BaseRegistry) Unsubscribe() error {
+       return r.facadeRegistry.DoUnsubscribe()
+}
diff --git a/pkg/adapter/llmregistry/registry/base/baseregistry_test.go 
b/pkg/adapter/llmregistry/registry/base/baseregistry_test.go
new file mode 100644
index 00000000..146947b4
--- /dev/null
+++ b/pkg/adapter/llmregistry/registry/base/baseregistry_test.go
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package baseregistry
+
+import (
+       "errors"
+       "sync"
+       "testing"
+)
+
+import (
+       "github.com/stretchr/testify/assert"
+)
+
+import (
+       "github.com/apache/dubbo-go-pixiu/pkg/adapter/llmregistry/registry"
+       "github.com/apache/dubbo-go-pixiu/pkg/model"
+)
+
+// mockFacadeRegistry is a mock implementation of the FacadeRegistry interface 
for testing.
+type mockFacadeRegistry struct {
+       subscribeCalled   bool
+       unsubscribeCalled bool
+       subscribeErr      error
+       unsubscribeErr    error
+}
+
+func (m *mockFacadeRegistry) DoSubscribe() error {
+       m.subscribeCalled = true
+       return m.subscribeErr
+}
+
+func (m *mockFacadeRegistry) DoUnsubscribe() error {
+       m.unsubscribeCalled = true
+       return m.unsubscribeErr
+}
+
+// mockListener is a mock implementation of the registry.Listener interface.
+// Since BaseRegistry only stores and retrieves it, we don't need a complex 
implementation.
+type mockListener struct{}
+
+func (m *mockListener) WatchAndHandle() {
+       panic("implement me") // NOSONAR
+}
+
+// Close is a mock method.
+func (m *mockListener) Close() {} // NOSONAR
+
+// mockAdapterListener is a mock implementation of the 
common.RegistryEventListener interface.
+type mockAdapterListener struct{}
+
+func (m *mockAdapterListener) OnAddEndpoint(r *model.Endpoint) error {
+       panic("implement me") // NOSONAR
+}
+
+func (m *mockAdapterListener) OnRemoveEndpoint(r *model.Endpoint) error {
+       panic("implement me") // NOSONAR
+}
+
+func TestSvcListeners(t *testing.T) {
+       // Initialization
+       svcListeners := &SvcListeners{
+               listeners: make(map[string]registry.Listener),
+       }
+       mockL := &mockListener{}
+       id := "test-service-1"
+
+       // 1. Test SetListener and GetListener
+       t.Run("SetAndGet", func(t *testing.T) {
+               // Try to get a non-existent listener
+               listener := svcListeners.GetListener(id)
+               assert.Nil(t, listener, "Getting a non-existent listener should 
return nil")
+
+               // Set and then get an existing listener
+               svcListeners.SetListener(id, mockL)
+               listener = svcListeners.GetListener(id)
+               assert.NotNil(t, listener, "Getting an existing listener should 
not return nil")
+               assert.Equal(t, mockL, listener, "The retrieved listener should 
be the same one that was set")
+       })
+
+       // 2. Test GetAllListener
+       t.Run("GetAll", func(t *testing.T) {
+               id2 := "test-service-2"
+               mockL2 := &mockListener{}
+               svcListeners.SetListener(id2, mockL2)
+
+               allListeners := svcListeners.GetAllListener()
+               assert.Len(t, allListeners, 2, "GetAllListener should return 
all set listeners")
+               assert.Contains(t, allListeners, id, "The returned map should 
contain the first listener")
+               assert.Contains(t, allListeners, id2, "The returned map should 
contain the second listener")
+       })
+
+       // 3. Test RemoveListener
+       t.Run("Remove", func(t *testing.T) {
+               svcListeners.RemoveListener(id)
+               listener := svcListeners.GetListener(id)
+               assert.Nil(t, listener, "Getting a removed listener should 
return nil")
+
+               allListeners := svcListeners.GetAllListener()
+               assert.Len(t, allListeners, 1, "The total count of listeners 
should decrease after removal")
+               assert.NotContains(t, allListeners, id, "The returned map 
should no longer contain the removed listener")
+       })
+}
+
+// Test the concurrency safety of SvcListeners.
+func TestSvcListeners_concurrency(t *testing.T) {
+       svcListeners := &SvcListeners{
+               listeners: make(map[string]registry.Listener),
+       }
+       id := "concurrent-service"
+       mockL := &mockListener{}
+
+       var wg sync.WaitGroup
+       // Start multiple goroutines to read and write concurrently
+       for i := 0; i < 100; i++ {
+               wg.Add(1)
+               go func() {
+                       defer wg.Done()
+                       svcListeners.SetListener(id, mockL)
+                       _ = svcListeners.GetListener(id)
+                       svcListeners.RemoveListener(id)
+               }()
+       }
+
+       wg.Wait()
+       // The main purpose of this test is to detect race conditions.
+       // Running tests with the `go test -race` command can expose such 
issues.
+       // If the test completes without errors, it indicates the lock 
mechanism is working correctly.
+}
+
+// --- Tests for BaseRegistry ---
+
+// Test the constructor NewBaseRegistry.
+func TestNewBaseRegistry(t *testing.T) {
+       mockFacade := &mockFacadeRegistry{}
+       mockAdapter := &mockAdapterListener{}
+
+       br := NewBaseRegistry(mockFacade, mockAdapter)
+
+       assert.NotNil(t, br, "NewBaseRegistry should not return nil")
+       assert.Equal(t, mockFacade, br.facadeRegistry, "facadeRegistry should 
be initialized correctly")
+       assert.Equal(t, mockAdapter, br.AdapterListener, "AdapterListener 
should be initialized correctly")
+       assert.NotNil(t, br.svcListeners, "svcListeners should be initialized")
+       assert.Empty(t, br.svcListeners.listeners, "The svcListeners map should 
be empty initially")
+}
+
+// Test the listener management methods of BaseRegistry.
+func TestBaseRegistry_listenerMethods(t *testing.T) {
+       br := NewBaseRegistry(&mockFacadeRegistry{}, &mockAdapterListener{})
+       mockL := &mockListener{}
+       id := "test-service"
+
+       // Test SetSvcListener and GetSvcListener
+       l := br.GetSvcListener(id)
+       assert.Nil(t, l)
+
+       br.SetSvcListener(id, mockL)
+       l = br.GetSvcListener(id)
+       assert.NotNil(t, l)
+       assert.Equal(t, mockL, l)
+
+       // Test GetAllSvcListener
+       allListeners := br.GetAllSvcListener()
+       assert.Len(t, allListeners, 1)
+       assert.Equal(t, mockL, allListeners[id])
+
+       // Test RemoveSvcListener
+       br.RemoveSvcListener(id)
+       l = br.GetSvcListener(id)
+       assert.Nil(t, l)
+       allListeners = br.GetAllSvcListener()
+       assert.Empty(t, allListeners)
+}
+
+// Test the Subscribe and Unsubscribe methods.
+func TestBaseRegistry_subscribeUnsubscribe(t *testing.T) {
+       t.Run("Success", func(t *testing.T) {
+               mockFacade := &mockFacadeRegistry{}
+               br := NewBaseRegistry(mockFacade, &mockAdapterListener{})
+
+               // Test Subscribe
+               err := br.Subscribe()
+               assert.NoError(t, err)
+               assert.True(t, mockFacade.subscribeCalled, 
"facadeRegistry.DoSubscribe should have been called")
+
+               // Test Unsubscribe
+               err = br.Unsubscribe()
+               assert.NoError(t, err)
+               assert.True(t, mockFacade.unsubscribeCalled, 
"facadeRegistry.DoUnsubscribe should have been called")
+       })
+
+       t.Run("Error", func(t *testing.T) {
+               subscribeError := errors.New("failed to subscribe")
+               unsubscribeError := errors.New("failed to unsubscribe")
+
+               mockFacade := &mockFacadeRegistry{
+                       subscribeErr:   subscribeError,
+                       unsubscribeErr: unsubscribeError,
+               }
+               br := NewBaseRegistry(mockFacade, &mockAdapterListener{})
+
+               // Test Subscribe error propagation
+               err := br.Subscribe()
+               assert.Error(t, err)
+               assert.Equal(t, subscribeError, err, "Subscribe should 
propagate the error from facadeRegistry")
+
+               // Test Unsubscribe error propagation
+               err = br.Unsubscribe()
+               assert.Error(t, err)
+               assert.Equal(t, unsubscribeError, err, "Unsubscribe should 
propagate the error from facadeRegistry")
+       })
+}
diff --git a/pkg/common/constant/url.go 
b/pkg/adapter/llmregistry/registry/listener.go
similarity index 58%
copy from pkg/common/constant/url.go
copy to pkg/adapter/llmregistry/registry/listener.go
index 14aeeef0..3f4c1bcd 100644
--- a/pkg/common/constant/url.go
+++ b/pkg/adapter/llmregistry/registry/listener.go
@@ -14,19 +14,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package constant
 
-const (
-       // RequestBody name of api config mapping from/to
-       RequestBody = "requestBody"
-       // QueryStrings name of api config mapping from/to
-       QueryStrings = "queryStrings"
-       // Headers name of api config mapping from/to
-       Headers = "headers"
-       // RequestURI name of api config mapping from/to, retrieve parameters 
from uri
-       // for instance, https://test.com/:id uri.id will retrieve the :id 
parameter
-       RequestURI = "uri"
-       // Dot defines the . which will be used to present the path to specific 
field in the body
-       Dot      = "."
-       AnyValue = "*"
-)
+package registry
+
+// Listener this interface defined for load services from different kinds 
registry, such as nacos,consul,zookeeper.
+type Listener interface {
+       // Close closes this listener
+       Close()
+       // WatchAndHandle watch the target path and handle the event
+       WatchAndHandle()
+}
diff --git a/pkg/adapter/llmregistry/registry/nacos/listener.go 
b/pkg/adapter/llmregistry/registry/nacos/listener.go
new file mode 100644
index 00000000..ff064a80
--- /dev/null
+++ b/pkg/adapter/llmregistry/registry/nacos/listener.go
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package nacos
+
+import (
+       "encoding/json"
+       "reflect"
+       "strconv"
+       "strings"
+       "sync"
+       "time"
+)
+
+import (
+       "dubbo.apache.org/dubbo-go/v3/remoting"
+
+       "github.com/creasty/defaults"
+
+       "github.com/hashicorp/go-uuid"
+
+       "github.com/nacos-group/nacos-sdk-go/clients/naming_client"
+       nacosModel "github.com/nacos-group/nacos-sdk-go/model"
+       "github.com/nacos-group/nacos-sdk-go/vo"
+)
+
+import (
+       "github.com/apache/dubbo-go-pixiu/pkg/adapter/llmregistry/common"
+       "github.com/apache/dubbo-go-pixiu/pkg/common/constant"
+       "github.com/apache/dubbo-go-pixiu/pkg/logger"
+       "github.com/apache/dubbo-go-pixiu/pkg/model"
+)
+
+const (
+       // ServicePollingInterval How often to poll for the list of available 
services (to discover new ones).
+       ServicePollingInterval = 30 * time.Second
+)
+
+// listener monitors Nacos for service changes.
+type listener struct {
+       client naming_client.INamingClient
+       // Caches the last known instances for a given service. Key: 
ServiceName, Value: *sync.Map
+       instanceCache sync.Map
+       // Caches the set of services we are currently subscribed to. Key: 
ServiceName, Value: bool
+       subscribedServices sync.Map
+       regConf            *model.Registry
+       adapterListener    common.RegistryEventListener // The callback to 
notify the gateway core.
+
+       exit chan struct{}
+       wg   sync.WaitGroup
+}
+
+// newNacosListener creates a new Nacos service listener.
+func newNacosListener(client naming_client.INamingClient, regConf 
*model.Registry, adapterListener common.RegistryEventListener) *listener {
+       return &listener{
+               client:          client,
+               exit:            make(chan struct{}),
+               regConf:         regConf,
+               adapterListener: adapterListener,
+       }
+}
+
+// WatchAndHandle starts the background goroutine to watch for service changes.
+func (l *listener) WatchAndHandle() {
+       l.wg.Add(1)
+       go l.watchForServices()
+}
+
+// watchForServices periodically polls Nacos to discover new services to 
subscribe to.
+// The actual instance updates for subscribed services are push-based via the 
callback.
+func (l *listener) watchForServices() {
+       defer l.wg.Done()
+
+       ticker := time.NewTicker(ServicePollingInterval)
+       defer ticker.Stop()
+
+       // Perform an initial check immediately.
+       l.discoverAndSubscribe()
+
+       for {
+               select {
+               case <-l.exit:
+                       logger.Info("Nacos listener is stopping...")
+                       l.unsubscribeAll()
+                       return
+               case <-ticker.C:
+                       l.discoverAndSubscribe()
+               }
+       }
+}
+
+func (l *listener) discoverAndSubscribe() {
+       serviceList, err := 
l.client.GetAllServicesInfo(vo.GetAllServiceInfoParam{
+               GroupName: l.regConf.Group,
+               NameSpace: l.regConf.Namespace,
+       })
+       if err != nil {
+               logger.Warnf("Failed to get service list from Nacos: %v", err)
+               return
+       }
+
+       currentServices := make(map[string]struct{})
+       for _, serviceName := range serviceList.Doms {
+               currentServices[serviceName] = struct{}{}
+               // If we aren't already subscribed to this service, subscribe 
now.
+               if _, loaded := l.subscribedServices.LoadOrStore(serviceName, 
true); !loaded {
+                       err := l.client.Subscribe(&vo.SubscribeParam{
+                               ServiceName:       serviceName,
+                               GroupName:         l.regConf.Group,
+                               SubscribeCallback: l.serviceCallback,
+                       })
+                       if err != nil {
+                               logger.Errorf("Failed to subscribe to Nacos 
service %s: %v", serviceName, err)
+                               l.subscribedServices.Delete(serviceName) // 
Remove from map to retry next time.
+                       } else {
+                               logger.Infof("Successfully subscribed to Nacos 
service: %s", serviceName)
+                       }
+               }
+       }
+
+       // Unsubscribe from services that no longer exist.
+       l.subscribedServices.Range(func(key, value any) bool {
+               serviceName := key.(string)
+               if _, exists := currentServices[serviceName]; !exists {
+                       err := l.client.Unsubscribe(&vo.SubscribeParam{
+                               ServiceName: serviceName,
+                               GroupName:   l.regConf.Group,
+                       })
+                       if err != nil {
+                               logger.Errorf("Failed to unsubscribe from Nacos 
service %s: %v", serviceName, err)
+                       } else {
+                               logger.Infof("Successfully unsubscribed from 
Nacos service: %s", serviceName)
+                               l.subscribedServices.Delete(serviceName)
+                               l.instanceCache.Delete(serviceName)
+                       }
+               }
+               return true
+       })
+}
+
+// serviceCallback is the function that Nacos SDK invokes when there's a change
+// in the instances of a subscribed service. This is the injected callback.
+func (l *listener) serviceCallback(services []nacosModel.SubscribeService, err 
error) {
+       if err != nil {
+               logger.Errorf("Nacos subscribe callback received an error: %v", 
err)
+               return
+       }
+       if len(services) == 0 {
+               logger.Warn("Nacos callback received an empty list of services, 
which might indicate all instances are offline.")
+               // The logic to handle removal of a service with zero instances
+               // is handled by the polling `discoverAndSubscribe` loop.
+               return
+       }
+
+       serviceName := services[0].ServiceName
+       logger.Debugf("Received callback for service: %s with %d instances", 
serviceName, len(services))
+
+       oldCache, _ := l.instanceCache.LoadOrStore(serviceName, &sync.Map{})
+       oldInstanceMap := oldCache.(*sync.Map)
+
+       newInstanceMap := &sync.Map{}
+       newEndpoints := make(map[string]*model.Endpoint)
+
+       // Process the new list from Nacos.
+       for i := range services {
+               // Also check for health
+               if !services[i].Enable || !services[i].Healthy {
+                       continue
+               }
+               instance := generateInstance(services[i])
+               endpoint := generateEndpoint(instance)
+               if endpoint == nil {
+                       continue
+               }
+               key := serviceName + constant.At + endpoint.ID
+               newInstanceMap.Store(key, instance)
+               newEndpoints[key] = endpoint
+       }
+
+       // Check for added or updated instances.
+       for key, endpoint := range newEndpoints {
+               if oldRaw, ok := oldInstanceMap.Load(key); ok {
+                       oldInstance := oldRaw.(nacosModel.Instance)
+                       newInstance, _ := newInstanceMap.Load(key)
+                       if !reflect.DeepEqual(oldInstance, newInstance) {
+                               l.handle(endpoint, remoting.EventTypeUpdate)
+                       }
+               } else {
+                       l.handle(endpoint, remoting.EventTypeAdd)
+               }
+       }
+
+       // Check for removed instances.
+       oldInstanceMap.Range(func(key, value any) bool {
+               instanceKey := key.(string)
+               if _, ok := newEndpoints[instanceKey]; !ok {
+                       instance := value.(nacosModel.Instance)
+                       endpoint := generateEndpoint(instance)
+                       l.handle(endpoint, remoting.EventTypeDel)
+               }
+               return true
+       })
+
+       // Update the cache with the new state.
+       l.instanceCache.Store(serviceName, newInstanceMap)
+}
+
+func (l *listener) handle(endpoint *model.Endpoint, action remoting.EventType) 
{
+       if endpoint == nil {
+               return
+       }
+       logger.Infof("Handling endpoint event: %v for %s at %s", action, 
endpoint.Name, endpoint.Address.Address)
+       switch action {
+       case remoting.EventTypeAdd, remoting.EventTypeUpdate:
+               if err := l.adapterListener.OnAddEndpoint(endpoint); err != nil 
{
+                       logger.Errorf("Failed to add/update endpoint %s: %s", 
endpoint.Name, err.Error())
+               }
+       case remoting.EventTypeDel:
+               if err := l.adapterListener.OnRemoveEndpoint(endpoint); err != 
nil {
+                       logger.Errorf("Failed to remove endpoint %s: %s", 
endpoint.Name, err.Error())
+               }
+       }
+}
+
+// Close gracefully stops the listener.
+func (l *listener) Close() {
+       close(l.exit)
+       l.wg.Wait()
+}
+
+func (l *listener) unsubscribeAll() {
+       l.subscribedServices.Range(func(key, value any) bool {
+               serviceName := key.(string)
+               err := l.client.Unsubscribe(&vo.SubscribeParam{
+                       ServiceName: serviceName,
+                       GroupName:   l.regConf.Group,
+               })
+               if err != nil {
+                       logger.Errorf("Failed to unsubscribe from Nacos service 
%s on shutdown: %v", serviceName, err)
+               } else {
+                       logger.Infof("Unsubscribed from Nacos service %s on 
shutdown.", serviceName)
+               }
+               return true
+       })
+}
+
+func generateEndpoint(instance nacosModel.Instance) *model.Endpoint {
+       if instance.Metadata == nil {
+               logger.Warnf("Nacos instance metadata is empty, instance: %+v", 
instance)
+               return nil
+       }
+
+       ret := &model.Endpoint{
+               Address: model.SocketAddress{},
+               LLMMeta: &model.LLMMeta{},
+       }
+
+       err := defaults.Set(ret)
+       if err != nil {
+               logger.Warnf("Failed to set default values for endpoint: %v", 
err)
+               return nil
+       }
+
+       if ip, ok := instance.Metadata["ip"]; ok {
+               ret.Address.Address = ip
+       }
+
+       if port, ok := instance.Metadata["port"]; ok {
+               p, err := strconv.Atoi(port)
+               if err != nil {
+                       logger.Warnf("Invalid port in metadata: %s, error: %v", 
port, err)
+               }
+               ret.Address.Port = p
+       }
+
+       if id, ok := instance.Metadata["id"]; ok {
+               ret.ID = id
+       } else {
+               ret.ID, _ = uuid.GenerateUUID()
+       }
+
+       if name, ok := instance.Metadata["name"]; ok {
+               ret.Name = name
+       }
+       if address, ok := instance.Metadata["address"]; ok {
+               ret.Address.Domains = strings.Split(address, ",")
+       }
+       if apiKey, ok := instance.Metadata["llm-meta.api_key"]; ok {
+               ret.LLMMeta.APIKey = apiKey
+       }
+       if retryPolicy, ok := instance.Metadata["llm-meta.retry_policy.name"]; 
ok {
+               ret.LLMMeta.RetryPolicy.Name = model.RetryTypeValue[retryPolicy]
+       }
+       if retryConfig, ok := 
instance.Metadata["llm-meta.retry_policy.config"]; ok {
+               err := json.Unmarshal([]byte(retryConfig), 
&ret.LLMMeta.RetryPolicy.Config)
+               if err != nil {
+                       logger.Warnf("Failed to parse retry policy config JSON: 
%s, error: %v", retryConfig, err)
+               }
+       }
+       if fallback, ok := instance.Metadata["llm-meta.fallback"]; ok {
+               ret.LLMMeta.Fallback = 
strings.ToLower(strings.TrimSpace(fallback)) == "true"
+       }
+
+       ret.Metadata = instance.Metadata
+
+       return ret
+}
+
+func generateInstance(ss nacosModel.SubscribeService) nacosModel.Instance {
+       return nacosModel.Instance{
+               InstanceId:  ss.InstanceId,
+               Ip:          ss.Ip,
+               Port:        ss.Port,
+               ServiceName: ss.ServiceName,
+               Valid:       ss.Valid,
+               Enable:      ss.Enable,
+               Weight:      ss.Weight,
+               Metadata:    ss.Metadata,
+               ClusterName: ss.ClusterName,
+               Healthy:     ss.Healthy,
+       }
+}
diff --git a/pkg/adapter/llmregistry/registry/nacos/listener_test.go 
b/pkg/adapter/llmregistry/registry/nacos/listener_test.go
new file mode 100644
index 00000000..6835f7d3
--- /dev/null
+++ b/pkg/adapter/llmregistry/registry/nacos/listener_test.go
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package nacos
+
+import (
+       "errors"
+       "sync"
+       "testing"
+       "time"
+)
+
+import (
+       "github.com/nacos-group/nacos-sdk-go/clients/naming_client"
+       nacosModel "github.com/nacos-group/nacos-sdk-go/model"
+       "github.com/nacos-group/nacos-sdk-go/vo"
+
+       "github.com/stretchr/testify/assert"
+)
+
+import (
+       "github.com/apache/dubbo-go-pixiu/pkg/adapter/llmregistry/common"
+       "github.com/apache/dubbo-go-pixiu/pkg/model"
+)
+
+type mockNacosClient struct {
+       naming_client.INamingClient
+
+       mu                   sync.Mutex
+       servicesToReturn     nacosModel.ServiceList
+       servicesToReturnErr  error
+       subscribeCallback    func(services []nacosModel.SubscribeService, err 
error)
+       subscribedServices   map[string]struct{}
+       unsubscribedServices map[string]struct{}
+}
+
+func newMockNacosClient() *mockNacosClient {
+       return &mockNacosClient{
+               subscribedServices:   make(map[string]struct{}),
+               unsubscribedServices: make(map[string]struct{}),
+       }
+}
+
+func (m *mockNacosClient) GetAllServicesInfo(param vo.GetAllServiceInfoParam) 
(nacosModel.ServiceList, error) {
+       m.mu.Lock()
+       defer m.mu.Unlock()
+       return m.servicesToReturn, m.servicesToReturnErr
+}
+
+func (m *mockNacosClient) Subscribe(param *vo.SubscribeParam) error {
+       m.mu.Lock()
+       defer m.mu.Unlock()
+       m.subscribedServices[param.ServiceName] = struct{}{}
+       m.subscribeCallback = param.SubscribeCallback
+       return nil
+}
+
+func (m *mockNacosClient) Unsubscribe(param *vo.SubscribeParam) error {
+       m.mu.Lock()
+       defer m.mu.Unlock()
+       m.unsubscribedServices[param.ServiceName] = struct{}{}
+       return nil
+}
+
+type mockAdapterListener struct {
+       mu               sync.Mutex
+       addedEndpoints   map[string]*model.Endpoint
+       removedEndpoints map[string]*model.Endpoint
+}
+
+var _ common.RegistryEventListener = (*mockAdapterListener)(nil)
+
+func newMockAdapterListener() *mockAdapterListener {
+       return &mockAdapterListener{
+               addedEndpoints:   make(map[string]*model.Endpoint),
+               removedEndpoints: make(map[string]*model.Endpoint),
+       }
+}
+
+func (m *mockAdapterListener) OnAddEndpoint(endpoint *model.Endpoint) error {
+       m.mu.Lock()
+       defer m.mu.Unlock()
+       m.addedEndpoints[endpoint.ID] = endpoint
+       return nil
+}
+
+func (m *mockAdapterListener) OnRemoveEndpoint(endpoint *model.Endpoint) error 
{
+       m.mu.Lock()
+       defer m.mu.Unlock()
+       m.removedEndpoints[endpoint.ID] = endpoint
+       return nil
+}
+
+func (m *mockAdapterListener) reset() {
+       m.mu.Lock()
+       defer m.mu.Unlock()
+       m.addedEndpoints = make(map[string]*model.Endpoint)
+       m.removedEndpoints = make(map[string]*model.Endpoint)
+}
+
+func testSetup() (*listener, *mockNacosClient, *mockAdapterListener) {
+       client := newMockNacosClient()
+       adapterListener := newMockAdapterListener()
+       regConf := &model.Registry{Group: "test_group", Namespace: 
"test_namespace"}
+       nacosListener := newNacosListener(client, regConf, adapterListener)
+       return nacosListener, client, adapterListener
+}
+
+func TestGenerateEndpoint(t *testing.T) {
+       t.Run("Full valid metadata", func(t *testing.T) {
+               instance := nacosModel.Instance{
+                       Metadata: map[string]string{
+                               "id":                         "ep-123",
+                               "name":                       "my-llm",
+                               "ip":                         "127.0.0.1",
+                               "port":                       "8080",
+                               "address":                    
"openai.com,openai1.com",
+                               "llm-meta.retry_policy.name": 
"ExponentialBackoff",
+                               "llm-meta.fallback":          "true",
+                       },
+               }
+
+               endpoint := generateEndpoint(instance)
+               assert.NotNil(t, endpoint)
+               assert.Equal(t, "ep-123", endpoint.ID)
+               assert.Equal(t, "my-llm", endpoint.Name)
+               assert.Equal(t, "127.0.0.1", endpoint.Address.Address)
+               assert.Equal(t, 8080, endpoint.Address.Port)
+               assert.Equal(t, 2, len(endpoint.Address.Domains))
+               assert.Equal(t, "openai.com", endpoint.Address.Domains[0])
+               assert.Equal(t, "openai1.com", endpoint.Address.Domains[1])
+               assert.Equal(t, model.RetryerExponentialBackoff, 
endpoint.LLMMeta.RetryPolicy.Name)
+               assert.True(t, endpoint.LLMMeta.Fallback)
+       })
+
+       t.Run("Nil metadata", func(t *testing.T) {
+               instance := nacosModel.Instance{Metadata: nil}
+               endpoint := generateEndpoint(instance)
+               assert.Nil(t, endpoint)
+       })
+
+       t.Run("Invalid port", func(t *testing.T) {
+               instance := nacosModel.Instance{Metadata: 
map[string]string{"port": "not-a-number"}}
+               endpoint := generateEndpoint(instance)
+               assert.NotNil(t, endpoint)
+               assert.Equal(t, 0, endpoint.Address.Port)
+       })
+}
+
+func TestDiscoverAndSubscribe(t *testing.T) {
+       l, client, _ := testSetup()
+
+       t.Run("Discover and subscribe to a new service", func(t *testing.T) {
+               // CHANGE THIS LINE:
+               client.servicesToReturn = nacosModel.ServiceList{Doms: 
[]string{"service-A"}} // Was nacosModel.Service
+               l.discoverAndSubscribe()
+
+               assert.Equal(t, struct{}{}, 
client.subscribedServices["service-A"], "Should subscribe to service-A")
+               _, loaded := l.subscribedServices.Load("service-A")
+               assert.True(t, loaded, "service-A should be in the 
subscribedServices map")
+       })
+
+       t.Run("Unsubscribe from a removed service", func(t *testing.T) {
+               // Ensure service-A is already subscribed for the test setup
+               l.subscribedServices.Store("service-A", true)
+               client.servicesToReturn = nacosModel.ServiceList{Doms: 
[]string{}} // Nacos now returns an empty list
+
+               l.discoverAndSubscribe()
+
+               assert.Equal(t, struct{}{}, 
client.unsubscribedServices["service-A"], "Should unsubscribe from service-A")
+               _, loaded := l.subscribedServices.Load("service-A")
+               assert.False(t, loaded, "service-A should be removed from 
subscribedServices map")
+       })
+
+       t.Run("Handle Nacos API error", func(t *testing.T) {
+               client.subscribedServices = make(map[string]struct{})
+               l.subscribedServices.Store("stale-service", true)
+
+               client.servicesToReturnErr = errors.New("Nacos unavailable")
+               l.discoverAndSubscribe()
+
+               _, loaded := l.subscribedServices.Load("stale-service")
+               assert.True(t, loaded, "Should not change subscriptions on API 
error")
+               assert.Empty(t, client.subscribedServices, "Should not attempt 
to subscribe on API error")
+       })
+}
+
+func TestServiceCallback(t *testing.T) {
+       l, client, adapterListener := testSetup()
+
+       _ = client.Subscribe(&vo.SubscribeParam{
+               ServiceName:       "service-A",
+               SubscribeCallback: l.serviceCallback,
+       })
+
+       instance1 := nacosModel.SubscribeService{
+               InstanceId: "ep-1", ServiceName: "service-A", Enable: true, 
Healthy: true,
+               Metadata: map[string]string{"id": "ep-1", "name": "inst-1"},
+       }
+       instance2 := nacosModel.SubscribeService{
+               InstanceId: "ep-2", ServiceName: "service-A", Enable: true, 
Healthy: true,
+               Metadata: map[string]string{"id": "ep-2", "name": "inst-2"},
+       }
+
+       t.Run("Initial instance registration", func(t *testing.T) {
+               adapterListener.reset()
+               
client.subscribeCallback([]nacosModel.SubscribeService{instance1, instance2}, 
nil)
+
+               assert.Len(t, adapterListener.addedEndpoints, 2, "Should add 2 
endpoints")
+               assert.Contains(t, adapterListener.addedEndpoints, "ep-1")
+               assert.Contains(t, adapterListener.addedEndpoints, "ep-2")
+               assert.Empty(t, adapterListener.removedEndpoints, "Should not 
remove any endpoints")
+       })
+
+       t.Run("One instance is removed", func(t *testing.T) {
+               adapterListener.reset()
+               
client.subscribeCallback([]nacosModel.SubscribeService{instance1}, nil)
+
+               assert.Empty(t, adapterListener.addedEndpoints, "Should not add 
any new endpoints")
+               assert.Len(t, adapterListener.removedEndpoints, 1, "Should 
remove 1 endpoint")
+               assert.Contains(t, adapterListener.removedEndpoints, "ep-2")
+       })
+
+       t.Run("One instance is updated", func(t *testing.T) {
+               adapterListener.reset()
+               updatedInstance1 := instance1
+               updatedInstance1.Metadata = map[string]string{"id": "ep-1", 
"name": "inst-1-updated"}
+
+               
client.subscribeCallback([]nacosModel.SubscribeService{updatedInstance1}, nil)
+
+               assert.Len(t, adapterListener.addedEndpoints, 1, "Should fire 
an add/update event for 1 endpoint")
+               assert.Contains(t, adapterListener.addedEndpoints, "ep-1")
+               assert.Equal(t, "inst-1-updated", 
adapterListener.addedEndpoints["ep-1"].Name)
+               assert.Empty(t, adapterListener.removedEndpoints, "Should not 
remove any endpoints")
+       })
+
+       t.Run("Filter unhealthy or disabled instances", func(t *testing.T) {
+               adapterListener.reset()
+               unhealthyInstance := instance1
+               unhealthyInstance.Healthy = false
+               disabledInstance := instance2
+               disabledInstance.Enable = false
+
+               
client.subscribeCallback([]nacosModel.SubscribeService{unhealthyInstance, 
disabledInstance}, nil)
+
+               assert.Empty(t, adapterListener.addedEndpoints, "Should not add 
unhealthy/disabled endpoints")
+               assert.Len(t, adapterListener.removedEndpoints, 1, "Should 
remove the previously active endpoints")
+       })
+
+       t.Run("No changes in instances", func(t *testing.T) {
+               
client.subscribeCallback([]nacosModel.SubscribeService{instance1}, nil)
+
+               adapterListener.reset()
+
+               
client.subscribeCallback([]nacosModel.SubscribeService{instance1}, nil)
+
+               assert.Empty(t, adapterListener.addedEndpoints, "Should not 
trigger add for unchanged instance")
+               assert.Empty(t, adapterListener.removedEndpoints, "Should not 
trigger remove for unchanged instance")
+       })
+}
+
+func TestLifecycle(t *testing.T) {
+       l, _, _ := testSetup()
+
+       l.WatchAndHandle()
+
+       time.Sleep(100 * time.Millisecond)
+
+       // test that Close works without panic
+       assert.NotPanics(t, func() {
+               l.Close()
+       })
+}
diff --git a/pkg/adapter/llmregistry/registry/nacos/registry.go 
b/pkg/adapter/llmregistry/registry/nacos/registry.go
new file mode 100644
index 00000000..8ab6c8e4
--- /dev/null
+++ b/pkg/adapter/llmregistry/registry/nacos/registry.go
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package nacos
+
+import (
+       "github.com/nacos-group/nacos-sdk-go/clients"
+       "github.com/nacos-group/nacos-sdk-go/clients/naming_client"
+       nacosConstant "github.com/nacos-group/nacos-sdk-go/common/constant"
+       "github.com/nacos-group/nacos-sdk-go/vo"
+)
+
+import (
+       "github.com/apache/dubbo-go-pixiu/pkg/adapter/llmregistry/common"
+       "github.com/apache/dubbo-go-pixiu/pkg/adapter/llmregistry/registry"
+       baseRegistry 
"github.com/apache/dubbo-go-pixiu/pkg/adapter/llmregistry/registry/base"
+       "github.com/apache/dubbo-go-pixiu/pkg/common/constant"
+       "github.com/apache/dubbo-go-pixiu/pkg/common/util/stringutil"
+       "github.com/apache/dubbo-go-pixiu/pkg/model"
+)
+
+func init() {
+       registry.SetRegistry(constant.Nacos, newNacosRegistry)
+}
+
+type NacosRegistry struct {
+       *baseRegistry.BaseRegistry
+       nacosListener *listener
+       client        naming_client.INamingClient
+}
+
+func (n *NacosRegistry) DoSubscribe() error {
+       go n.nacosListener.WatchAndHandle()
+       return nil
+}
+
+func (n *NacosRegistry) DoUnsubscribe() error {
+       n.nacosListener.Close()
+       return nil
+}
+
+var _ registry.Registry = new(NacosRegistry)
+
+func newNacosRegistry(regConfig model.Registry, adapterListener 
common.RegistryEventListener) (registry.Registry, error) {
+       addrs, err := stringutil.GetIPAndPort(regConfig.Address)
+       if err != nil {
+               return nil, err
+       }
+
+       scs := make([]nacosConstant.ServerConfig, 0, len(addrs))
+       for _, addr := range addrs {
+               scs = append(scs, nacosConstant.ServerConfig{
+                       IpAddr: addr.IP.String(),
+                       Port:   uint64(addr.Port),
+               })
+       }
+
+       ccs := nacosConstant.NewClientConfig(
+               nacosConstant.WithNamespaceId(regConfig.Namespace),
+               nacosConstant.WithUsername(regConfig.Username),
+               nacosConstant.WithPassword(regConfig.Password),
+               nacosConstant.WithNotLoadCacheAtStart(true),
+               nacosConstant.WithUpdateCacheWhenEmpty(true))
+
+       client, err := clients.NewNamingClient(vo.NacosClientParam{
+               ServerConfigs: scs,
+               ClientConfig:  ccs,
+       })
+       if err != nil {
+               return nil, err
+       }
+
+       nacosRegistry := &NacosRegistry{
+               client: client,
+       }
+       nacosRegistry.BaseRegistry = 
baseRegistry.NewBaseRegistry(nacosRegistry, adapterListener)
+       nacosRegistry.nacosListener = newNacosListener(client, &regConfig, 
adapterListener)
+
+       return nacosRegistry, nil
+}
diff --git a/pkg/adapter/llmregistry/registry/registry.go 
b/pkg/adapter/llmregistry/registry/registry.go
new file mode 100644
index 00000000..6282184b
--- /dev/null
+++ b/pkg/adapter/llmregistry/registry/registry.go
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package registry
+
+import (
+       "github.com/pkg/errors"
+)
+
+import (
+       "github.com/apache/dubbo-go-pixiu/pkg/adapter/llmregistry/common"
+       "github.com/apache/dubbo-go-pixiu/pkg/model"
+)
+
+// A map to store registry creation functions by protocol name.
+var registryMap = make(map[string]func(model.Registry, 
common.RegistryEventListener) (Registry, error), 8)
+
+// Registry interface defines the basic features of a service registry.
+type Registry interface {
+       // Subscribe starts monitoring the target registry for service changes.
+       Subscribe() error
+       // Unsubscribe stops monitoring the target registry.
+       Unsubscribe() error
+}
+
+// SetRegistry registers a factory function for creating a new registry client.
+func SetRegistry(name string, newRegFunc func(model.Registry, 
common.RegistryEventListener) (Registry, error)) {
+       registryMap[name] = newRegFunc
+}
+
+// GetRegistry creates and returns a new registry client based on the 
configuration.
+// It panics if the registry client cannot be initialized.
+func GetRegistry(regConfig model.Registry, listener 
common.RegistryEventListener) (Registry, error) {
+       if newRegFunc, ok := registryMap[regConfig.Protocol]; ok {
+               reg, err := newRegFunc(regConfig, listener)
+               if err != nil {
+                       return nil, errors.New("Initialize Registry " + 
regConfig.Protocol + " failed due to: " + err.Error())
+               }
+               return reg, nil
+       }
+       return nil, errors.New("Registry protocol " + regConfig.Protocol + " is 
not supported")
+}
diff --git a/pkg/adapter/llmregistry/registrycenter.go 
b/pkg/adapter/llmregistry/registrycenter.go
new file mode 100644
index 00000000..6d632cba
--- /dev/null
+++ b/pkg/adapter/llmregistry/registrycenter.go
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package llmregistry
+
+import (
+       "os"
+)
+
+import (
+       "github.com/apache/dubbo-go-pixiu/pkg/adapter/llmregistry/common"
+       "github.com/apache/dubbo-go-pixiu/pkg/adapter/llmregistry/registry"
+       _ 
"github.com/apache/dubbo-go-pixiu/pkg/adapter/llmregistry/registry/nacos"
+       "github.com/apache/dubbo-go-pixiu/pkg/common/constant"
+       "github.com/apache/dubbo-go-pixiu/pkg/common/extension/adapter"
+       "github.com/apache/dubbo-go-pixiu/pkg/logger"
+       "github.com/apache/dubbo-go-pixiu/pkg/model"
+       "github.com/apache/dubbo-go-pixiu/pkg/server"
+)
+
+const (
+       Kind = constant.LLMRegistryCenterAdapter
+)
+
+func init() {
+       adapter.RegisterAdapterPlugin(&Plugin{})
+}
+
+var (
+       _ adapter.AdapterPlugin = new(Plugin)
+       _ adapter.Adapter       = new(Adapter)
+       // Explicitly declare that Adapter implements the listener interface.
+       _ common.RegistryEventListener = new(Adapter)
+)
+
+type (
+       // Plugin to monitor LLM services on a registry center.
+       Plugin struct{}
+
+       // AdapterConfig defines the configuration for the LLM registry adapter.
+       AdapterConfig struct {
+               Registries map[string]model.Registry `yaml:"registries" 
json:"registries" mapstructure:"registries"`
+       }
+)
+
+// Kind returns the identifier of the plugin.
+func (p *Plugin) Kind() string {
+       return Kind
+}
+
+// CreateAdapter returns the LLM registry center adapter.
+func (p *Plugin) CreateAdapter(a *model.Adapter) (adapter.Adapter, error) {
+       adapter := &Adapter{
+               id:         a.ID,
+               registries: make(map[string]registry.Registry),
+               cfg:        &AdapterConfig{Registries: 
make(map[string]model.Registry)},
+       }
+       return adapter, nil
+}
+
+// Adapter to monitor LLM services on a registry center.
+type Adapter struct {
+       id         string
+       cfg        *AdapterConfig
+       registries map[string]registry.Registry
+}
+
+// Start starts the adapter by subscribing to all configured registries.
+func (a *Adapter) Start() {
+       for name, reg := range a.registries {
+               logger.Infof("Subscribing to LLM registry: %s", name)
+               if err := reg.Subscribe(); err != nil {
+                       logger.Errorf("Failed to subscribe to registry %s: %s", 
name, err.Error())
+               }
+       }
+}
+
+// Stop stops the adapter by unsubscribing from all registries.
+func (a *Adapter) Stop() {
+       for name, reg := range a.registries {
+               if err := reg.Unsubscribe(); err != nil {
+                       logger.Errorf("Failed to unsubscribe from registry %s: 
%s", name, err.Error())
+               }
+       }
+}
+
+// Apply initializes the registries according to the configuration.
+func (a *Adapter) Apply() error {
+       nacosAddrFromEnv := 
os.Getenv(constant.EnvDubbogoPixiuNacosRegistryAddress)
+
+       for key, registryConfig := range a.cfg.Registries {
+               var err error
+               // Override address from environment variable if it's set for 
Nacos.
+               if nacosAddrFromEnv != "" && registryConfig.Protocol == 
constant.Nacos {
+                       logger.Infof("Overriding Nacos address for registry 
'%s' with environment variable: %s", key, nacosAddrFromEnv)
+                       registryConfig.Address = nacosAddrFromEnv
+               }
+
+               a.registries[key], err = registry.GetRegistry(registryConfig, a)
+               if err != nil {
+                       return err
+               }
+       }
+
+       return nil
+}
+
+// Config returns the configuration of the adapter.
+func (a *Adapter) Config() any {
+       return a.cfg
+}
+
+// OnAddEndpoint is the callback that gets triggered when a new LLM endpoint 
is discovered.
+func (a *Adapter) OnAddEndpoint(endpoint *model.Endpoint) error {
+       // The endpoint metadata MUST contain a "cluster" key to identify the 
target cluster.
+       clusterName, ok := endpoint.Metadata["cluster"]
+       if !ok || clusterName == "" {
+               logger.Warnf("Endpoint %s (ID: %s) is missing 'cluster' 
metadata, skipping.", endpoint.Name, endpoint.ID)
+               return nil
+       }
+       logger.Infof("Adding endpoint %s to cluster %s", 
endpoint.ID+constant.PathParamIdentifier+endpoint.Name, clusterName)
+       server.GetClusterManager().SetEndpoint(clusterName, endpoint)
+       return nil
+}
+
+// OnRemoveEndpoint is the callback that gets triggered when an LLM endpoint 
is removed.
+func (a *Adapter) OnRemoveEndpoint(endpoint *model.Endpoint) error {
+       // The endpoint metadata MUST contain a "cluster" key.
+       clusterName, ok := endpoint.Metadata["cluster"]
+       if !ok || clusterName == "" {
+               logger.Warnf("Endpoint %s (ID: %s) is missing 'cluster' 
metadata for removal, skipping.", endpoint.Name, endpoint.ID)
+               return nil
+       }
+       logger.Infof("Removing endpoint ID %s from cluster %s", endpoint.ID, 
clusterName)
+       server.GetClusterManager().DeleteEndpoint(clusterName, endpoint.ID)
+       return nil
+}
diff --git a/pkg/cluster/retry/countbased/count_based.go 
b/pkg/cluster/retry/countbased/count_based.go
index 7746b215..7ab33c04 100644
--- a/pkg/cluster/retry/countbased/count_based.go
+++ b/pkg/cluster/retry/countbased/count_based.go
@@ -19,6 +19,7 @@ package countbased
 
 import (
        "fmt"
+       "strconv"
 )
 
 import (
@@ -58,11 +59,31 @@ func newCountBasedRetry(config map[string]any) 
(retry.RetryPolicy, error) {
                timesValue = defaultRetryTimes
        }
 
-       timesUint, ok := timesValue.(int)
-       if !ok {
-               return nil, fmt.Errorf("invalid type for 
'retry.count_based.times', expected int but got %T", timesValue)
+       var times int
+
+       switch v := timesValue.(type) {
+       case int:
+               times = v
+       case float64:
+               if v != float64(int(v)) {
+                       return nil, fmt.Errorf("invalid float value for '%s', 
must be a whole number, but got %f", retryTimesKey, v)
+               }
+               times = int(v)
+       case string:
+               parsedTimes, err := strconv.Atoi(v)
+               if err != nil {
+                       return nil, fmt.Errorf("could not parse string value 
for '%s': %w", retryTimesKey, err)
+               }
+               times = parsedTimes
+       default:
+               return nil, fmt.Errorf("invalid type for '%s', expected a 
number or a numeric string, but got %T", retryTimesKey, timesValue)
+       }
+
+       // must be non-negative
+       if times < 0 {
+               return nil, fmt.Errorf("value for '%s' cannot be negative, but 
got %d", retryTimesKey, times)
        }
 
        // Total attempts = 1 initial try plus number of retries.
-       return &CountBasedRetry{MaxAttempts: uint(timesUint) + 1}, nil
+       return &CountBasedRetry{MaxAttempts: uint(times) + 1}, nil
 }
diff --git a/pkg/common/constant/key.go b/pkg/common/constant/key.go
index 0400250c..b105e98e 100644
--- a/pkg/common/constant/key.go
+++ b/pkg/common/constant/key.go
@@ -66,6 +66,7 @@ const (
 const (
        SpringCloudAdapter         = "dgp.adapter.springcloud"
        DubboRegistryCenterAdapter = "dgp.adapter.dubboregistrycenter"
+       LLMRegistryCenterAdapter   = "dgp.adapter.llmregistrycenter"
        McpServerAdapter           = "dgp.adapter.mcpserver"
 )
 
diff --git a/pkg/common/constant/url.go b/pkg/common/constant/url.go
index 14aeeef0..0e205c9b 100644
--- a/pkg/common/constant/url.go
+++ b/pkg/common/constant/url.go
@@ -29,4 +29,5 @@ const (
        // Dot defines the . which will be used to present the path to specific 
field in the body
        Dot      = "."
        AnyValue = "*"
+       At       = "@"
 )
diff --git a/pkg/model/cluster.go b/pkg/model/cluster.go
index 33be18a8..b35b7b24 100644
--- a/pkg/model/cluster.go
+++ b/pkg/model/cluster.go
@@ -92,10 +92,10 @@ type (
                ID        string            `yaml:"ID" json:"ID"`               
                                        // ID indicate one endpoint
                Name      string            `yaml:"name" json:"name"`           
                                        // Name the endpoint unique name
                Address   SocketAddress     `yaml:"socket_address" 
json:"socket_address" mapstructure:"socket_address"` // Address socket address
-               Metadata  map[string]string `yaml:"meta" json:"meta"`           
                                        // Metadata extra info such as label or 
other meta data
+               Metadata  map[string]string `yaml:"meta" json:"meta" 
mapstructure:"meta"`                               // Metadata extra info such 
as label or other meta data
                UnHealthy bool
 
-               LLMMeta *LLMMeta `yaml:"llm_meta" json:"llm_meta"` // LLMMeta 
extra info such as label or other meta data
+               LLMMeta *LLMMeta `yaml:"llm_meta" json:"llm_meta" 
mapstructure:"llm_meta"` // LLMMeta extra info such as label or other meta data
        }
 
        // ConsistentHash methods include: RingHash, MaglevHash
diff --git a/pkg/pluginregistry/registry.go b/pkg/pluginregistry/registry.go
index 7fc267ea..2b3de88e 100644
--- a/pkg/pluginregistry/registry.go
+++ b/pkg/pluginregistry/registry.go
@@ -19,6 +19,7 @@ package pluginregistry
 
 import (
        _ "github.com/apache/dubbo-go-pixiu/pkg/adapter/dubboregistry"
+       _ "github.com/apache/dubbo-go-pixiu/pkg/adapter/llmregistry"
        _ "github.com/apache/dubbo-go-pixiu/pkg/adapter/mcpserver"
        _ "github.com/apache/dubbo-go-pixiu/pkg/adapter/springcloud"
        _ "github.com/apache/dubbo-go-pixiu/pkg/cluster/loadbalancer/maglev"
diff --git a/pkg/server/adapter_manager.go b/pkg/server/adapter_manager.go
index 8276d026..72d0c6d1 100644
--- a/pkg/server/adapter_manager.go
+++ b/pkg/server/adapter_manager.go
@@ -78,7 +78,7 @@ func (am *AdapterManager) initAdapters() {
 
                err = hf.Apply()
                if err != nil {
-                       logger.Error("initFilterIfNeed apply adapter error %s", 
err)
+                       logger.Errorf("initFilterIfNeed apply adapter error 
%s", err)
                }
                ads = append(ads, hf)
        }
diff --git a/pkg/server/cluster_manager.go b/pkg/server/cluster_manager.go
index 599730d6..69ea4ecb 100644
--- a/pkg/server/cluster_manager.go
+++ b/pkg/server/cluster_manager.go
@@ -19,11 +19,14 @@ package server
 
 import (
        "fmt"
-       "strconv"
        "sync"
        "sync/atomic"
 )
 
+import (
+       "github.com/hashicorp/go-uuid"
+)
+
 import (
        "github.com/apache/dubbo-go-pixiu/pkg/cluster"
        "github.com/apache/dubbo-go-pixiu/pkg/cluster/loadbalancer"
@@ -257,18 +260,18 @@ func (s *ClusterStore) AddCluster(c *model.ClusterConfig) 
{
                atomic.AddInt32(&clusterIndex, 1)
        }
 
-       s.AssembleClusterEndpoints(c)
+       s.assembleClusterEndpoints(c)
 
        s.Config = append(s.Config, c)
        s.clustersMap[c.Name] = cluster.NewCluster(c)
        c.CreateConsistentHash()
 }
 
-// AssembleClusterEndpoints assembles the cluster endpoints
+// assembleClusterEndpoints assembles the cluster endpoints
 // by formatting the ID, name and domains for each endpoint
 // If endpoint.LLMMeta is not nil, the assimilation of name and domain is 
based on
 // the LLM provider denoted in the endpoint LLMMeta.
-func (s *ClusterStore) AssembleClusterEndpoints(c *model.ClusterConfig) {
+func (s *ClusterStore) assembleClusterEndpoints(c *model.ClusterConfig) {
        if c == nil {
                return
        }
@@ -276,7 +279,7 @@ func (s *ClusterStore) AssembleClusterEndpoints(c 
*model.ClusterConfig) {
        for i, endpoint := range c.Endpoints {
                // If the endpoint ID is not set, set it to the index + 1
                if endpoint.ID == "" {
-                       endpoint.ID = strconv.Itoa(i + 1)
+                       endpoint.ID, _ = uuid.GenerateUUID()
                }
 
                // If the endpoint has no name, set a default name

Reply via email to