This is an automated email from the ASF dual-hosted git repository.
xuetaoli pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/dubbo-go-pixiu.git
The following commit(s) were added to refs/heads/develop by this push:
new 96acac0c feat(ai-gateway): add nacos LLM registry support (#746)
96acac0c is described below
commit 96acac0ce3c16af5fdd0574fee003211771ca870
Author: Xuetao Li <[email protected]>
AuthorDate: Sat Nov 8 13:51:58 2025 +0800
feat(ai-gateway): add nacos LLM registry support (#746)
* feat: add nacos LLM registry support
# Conflicts:
# pkg/model/llm.go
# pkg/server/cluster_manager.go
* feat: update doc
* import-format
* feat: add api_key field to endpoint metadata and update documentation
* fix: add NOSONAR comment to checkout action in sync-to-upstream.yml
---
.github/workflows/sync-to-upstream.yml | 2 +-
docs/ai/endpoint.md | 30 +-
docs/ai/endpoint_CN.md | 30 +-
docs/ai/registry.md | 209 +++++++++++++
docs/ai/registry_CN.md | 209 +++++++++++++
go.mod | 2 +-
.../llmregistry/common/common.go} | 24 +-
.../llmregistry/registry/base/baseregistry.go | 117 +++++++
.../llmregistry/registry/base/baseregistry_test.go | 227 ++++++++++++++
.../llmregistry/registry/listener.go} | 24 +-
pkg/adapter/llmregistry/registry/nacos/listener.go | 336 +++++++++++++++++++++
.../llmregistry/registry/nacos/listener_test.go | 287 ++++++++++++++++++
pkg/adapter/llmregistry/registry/nacos/registry.go | 94 ++++++
pkg/adapter/llmregistry/registry/registry.go | 56 ++++
pkg/adapter/llmregistry/registrycenter.go | 151 +++++++++
pkg/cluster/retry/countbased/count_based.go | 29 +-
pkg/common/constant/key.go | 1 +
pkg/common/constant/url.go | 1 +
pkg/model/cluster.go | 4 +-
pkg/pluginregistry/registry.go | 1 +
pkg/server/adapter_manager.go | 2 +-
pkg/server/cluster_manager.go | 13 +-
22 files changed, 1784 insertions(+), 65 deletions(-)
diff --git a/.github/workflows/sync-to-upstream.yml
b/.github/workflows/sync-to-upstream.yml
index 3349eb5e..2cc964b0 100644
--- a/.github/workflows/sync-to-upstream.yml
+++ b/.github/workflows/sync-to-upstream.yml
@@ -106,7 +106,7 @@ jobs:
# SECURITY: Safe for pull_request_target - explicitly checks out
BASE_BRANCH, not PR head
- name: Checkout repository
if: steps.check_branch.outputs.skip != 'true'
- uses: actions/checkout@v5 # NOSONAR
+ uses: actions/checkout@v5 # NOSONAR
with:
fetch-depth: 0
ref: ${{ env.BASE_BRANCH }}
diff --git a/docs/ai/endpoint.md b/docs/ai/endpoint.md
index 1852ca13..9bdf9bd3 100644
--- a/docs/ai/endpoint.md
+++ b/docs/ai/endpoint.md
@@ -36,22 +36,26 @@ The llm_meta block holds all the configuration specific to
how the gateway shoul
- Type: `boolean`
- Description: Determines if the gateway should proceed to the next endpoint
in the cluster if all retry attempts on this endpoint fail. When the value is
`true`, and if this endpoint fails, the gateway will attempt the next available
endpoint. When the value is `false`, and if this endpoint fails, the process
stops, and the last error is returned to the client.
+`api_key`
+- Type: `string`
+- Description: The API key to be used by this endpoint. This key will be
included in the request headers when forwarding requests to the LLM service.
+
`retry_policy`
- Type: `object`
- Description: An object that defines the retry strategy to use if a request
to this endpoint fails.
-The `retry_policy` object contains the following fields:
+ The `retry_policy` object contains the following fields:
-`name`
+ `name`
-- Type: `string`
-- Description: The name of the registered retry policy to use. The name is
case-insensitive.
+ - Type: `string`
+ - Description: The name of the registered retry policy to use. The name is
case-insensitive.
-`config`
+ `config`
-- Type: `object`
-- Description: A map of key-value pairs specific to the chosen retry policy
name.
+ - Type: `object`
+ - Description: A map of key-value pairs specific to the chosen retry policy
name.
### Available Retry Policies
@@ -126,11 +130,13 @@ clusters:
# --- Primary Endpoint ---
- id: deepseek-primary
socket_address:
- domains:
- - api.deepseek.com
+ domains:
+ - api.deepseek.com
llm_meta:
# If all retries fail, move to the next endpoint.
fallback: true
+ # Your API key for this endpoint.
+ api_key: "your_deepseek_api_key_here"
# Use a robust retry strategy for the primary endpoint.
retry_policy:
name: ExponentialBackoff
@@ -143,11 +149,13 @@ clusters:
# --- Fallback Endpoint ---
- id: openai-fallback
socket_address:
- domains:
- - api.openai.com/v1
+ domains:
+ - api.openai.com/v1
llm_meta:
# This is the last resort; do not fall back further.
fallback: false
+ # Your API key for this endpoint.
+ api_key: "your_openai_api_key_here"
# Use a simpler, faster retry for the fallback.
retry_policy:
name: CountBased
diff --git a/docs/ai/endpoint_CN.md b/docs/ai/endpoint_CN.md
index b0b07e54..39a41c1e 100644
--- a/docs/ai/endpoint_CN.md
+++ b/docs/ai/endpoint_CN.md
@@ -37,22 +37,26 @@ clusters:
- `true`: 如果此endpoint失败,网关将尝试下一个可用的 endpoint。
- `false`: 如果此endpoint失败,则处理停止,并将最后一个错误返回给客户端。对于 fallback 链中的最后一个 endpoint
,此值应设置为 `false`。
+`api_key`
+- **类型**: `string`
+- **描述**: 此 endpoint 使用的 API 密钥。当将请求转发到 LLM 服务时,此密钥将包含在请求头中。
+
`retry_policy`
- **类型**: `object`
- **描述**: 一个定义了当对此endpoint的请求失败时要使用的重试策略的对象。
-`retry_policy` 对象包含以下字段:
+ `retry_policy` 对象包含以下字段:
-`name`
+ `name`
-- **类型**: `string`
-- **描述**: 要使用的已注册重试策略的名称。名称不区分大小写。
+ - **类型**: `string`
+ - **描述**: 要使用的已注册重试策略的名称。名称不区分大小写。
-`config`
+ `config`
-- **类型**: `object`
-- **描述**: 一个键值对的映射,特定于所选的重试策略名称。
+ - **类型**: `object`
+ - **描述**: 一个键值对的映射,特定于所选的重试策略名称。
### 可用的重试策略
@@ -129,11 +133,13 @@ clusters:
# --- 主endpoint ---
- id: deepseek-primary
socket_address:
- domains:
- - api.deepseek.com
+ domains:
+ - api.deepseek.com
llm_meta:
# 如果所有重试都失败,则移至下一个 endpoint。
fallback: true
+ # 此 endpoint 使用的 API 密钥。
+ api_key: "your_deepseek_api_key_here"
# 为主 endpoint 使用稳健的重试策略。
retry_policy:
name: ExponentialBackoff
@@ -146,11 +152,13 @@ clusters:
# --- fallback endpoint ---
- id: openai-fallback
socket_address:
- domains:
- - api.openai.com/v1
+ domains:
+ - api.openai.com/v1
llm_meta:
# 这是最后的选择;不要再进一步 fallback。
fallback: false
+ # 此 endpoint 使用的 API 密钥。
+ api_key: "your_openai_api_key_here"
# 为 fallback endpoint 使用更简单、更快速的重试。
retry_policy:
name: CountBased
diff --git a/docs/ai/registry.md b/docs/ai/registry.md
new file mode 100644
index 00000000..e0a465d6
--- /dev/null
+++ b/docs/ai/registry.md
@@ -0,0 +1,209 @@
+## LLM Service Discovery and Registration
+
+English | [中文](registry_CN.md)
+
+This document aims to guide LLM service providers on how to dynamically
register their service instances with the LLM Gateway via a Nacos registry. By
following these guidelines, the gateway will be able to automatically discover
your service and apply appropriate routing, retry, and fallback strategies
based on the metadata you provide.
+
+### Registration Mechanism Overview
+
+The core mechanism of service discovery is that your LLM service registers as
a **Nacos instance** and provides a specific set of **metadata** upon
registration. The LLM Gateway listens for service changes in Nacos, reads this
metadata, and dynamically converts it into a fully functional gateway
`endpoint` configuration.
+
+A basic Nacos registration request includes the following key information:
+
+- **`ServiceName`**: The name of your service collection (e.g.,
`deepseek-service`).
+- **`Ip` & `Port`**: The network address where your service instance listens
for traffic.
+- **`Metadata`**: A collection of key-value pairs, **which is crucial for
configuring all gateway behaviors**.
+
+### `metadata` Configuration Fields
+
+All gateway-specific configurations are passed through the `metadata` field of
the Nacos instance. Below are all the supported `metadata` keys and their
descriptions.
+
+`cluster`
+
+- **Type**: `string`
+- **Required**: Yes
+- **Description**: Defines which cluster (`cluster`) in the gateway this
endpoint should belong to. The gateway aggregates service instances with the
same `cluster` name based on this value.
+- **Example**: `"deepseek_cluster"`
+
+`id`
+
+- **Type**: `string`
+- **Required**: Yes
+- **Description**: The unique identifier for the endpoint. It must be unique
within its `cluster`.
+- **Example**: `"deepseek-primary-instance-1"`
+
+`ip`
+
+- **Type**: `string`
+- **Required**: No
+- **Description**: An optional parameter, useful when your service needs to
register an address with the gateway that is different from its internal IP
(e.g., a publicly accessible IP). If not provided, the gateway will use the
default value `0.0.0.0`.
+- **Example**: `"203.0.113.55"`
+
+`port`
+
+- **Type**: `string` (representing an integer)
+- **Required**: No
+- **Description**: An optional parameter to override the `Port` registered by
the Nacos instance itself. Its usage is similar to the `ip` field.
+- **Example**: `"9000"`
+
+`name`
+
+- **Type**: `string`
+- **Required**: No
+- **Description**: A human-readable name to identify this endpoint. Primarily
used for logging and monitoring.
+- **Example**: `"DeepSeek V2 Chat (Primary)"`
+
+`address`
+
+- **Type**: `string`
+- **Required**: No
+- **Description**: A string split by comma, each string stands for a address
+- **Example**: `"api.deepseek.com"`
+
+`llm-meta.fallback`
+
+- **Type**: `string` ("true" or "false")
+- **Required**: No, defaults to `"false"`
+- **Description**: Determines whether the gateway should proceed to the next
endpoint in the cluster if all retry attempts on this endpoint fail.
+
+`llm-meta.api_key`
+
+- **Type**: `string`
+- **Required**: No
+- **Description**: The API key to be used by this endpoint.
+
+`llm-meta.retry_policy.name`
+
+- **Type**: `string`
+- **Required**: No, defaults to `"NoRetry"`
+- **Description**: Specifies the name of the retry policy to use when a
request to this endpoint fails. The name is case-insensitive.
+
+`llm-meta.retry_policy.config`
+
+- **Type**: `string` (JSON format)
+- **Required**: No
+- **Description**: A JSON string containing configuration parameters specific
to the selected retry policy.
+
+### Retry Policy Metadata
+
+You can configure retry behavior by using a combination of
`llm-meta.retry_policy.name` and `llm-meta.retry_policy.config`.
+
+1. `CountBased`
+
+ - **name**: `"CountBased"`
+ - **config**: A JSON string containing a `times` field.
+ - `times` (integer): Number of retry attempts.
+ - **Example**:
+ - `llm-meta.retry_policy.name`: `"CountBased"`
+ - `llm-meta.retry_policy.config`: `{"times": 2}`
+
+2. `ExponentialBackoff`
+
+ - **name**: `"ExponentialBackoff"`
+ - **config**: A JSON string containing `times`, `initialInterval`,
`maxInterval`, and `multiplier` fields.
+ - `times` (integer): Number of retries.
+ - `initialInterval` (string): Initial wait duration (e.g., "200ms").
+ - `maxInterval` (string): Maximum wait duration (e.g., "5s").
+ - `multiplier` (float): The multiplier factor for the delay.
+ - **Example**:
+ - `llm-meta.retry_policy.name`: `"ExponentialBackoff"`
+ - `llm-meta.retry_policy.config`: `{"times": 3, "initialInterval":
"200ms", "maxInterval": "5s", "multiplier": 2.0}`
+
+3. `NoRetry`
+
+ - **name**: `"NoRetry"`
+ - **config**: This policy does not require a `config` field.
+
+### Complete Registration Example (Go)
+
+Detailed usage please refer to our [official
samples](https://github.com/apache/dubbo-go-pixiu-samples/tree/main/llm/nacos)。
+
+The configuration file of pixiu, need to enable the adapter of
llmregistrycenter, as follows:
+
+```yaml
+ adapters:
+ - id: test
+ name: dgp.adapter.llmregistrycenter
+ config:
+ registries:
+ nacos:
+ protocol: nacos
+ address: "127.0.0.1:8848"
+ timeout: "5s"
+ group: test_llm_registry_group
+ namespace: public
+```
+
+This example demonstrates how to register a fully-featured LLM service
instance using the Nacos Go SDK. The instance will be configured as the primary
endpoint in the `deepseek_cluster`, using an exponential backoff retry policy,
and will fall back to the next service in the cluster upon failure.
+
+```go
+package main
+
+import (
+ "encoding/json"
+ "log"
+
+ "github.com/nacos-group/nacos-sdk-go/vo"
+)
+
+func main() {
+ // ... (Code for creating the Nacos client is omitted here)
+ // client, err := createNacosClient()
+
+ // 1. Prepare the JSON configuration for the retry policy
+ retryConfig := map[string]interface{}{
+ "times": 3,
+ "initialInterval": "200ms",
+ "maxInterval": "8s",
+ "multiplier": 2.5,
+ }
+ retryConfigJSON, _ := json.Marshal(retryConfig)
+
+ // 2. Construct the metadata containing all gateway configurations
+ metadata := map[string]string{
+ // --- Core Endpoint Configuration ---
+ "cluster": "deepseek_cluster",
+ "id": "deepseek-primary",
+ "name": "DeepSeek V2 Chat (Primary)",
+
+ // Optional (use ip+port or address): The instance's IP and Port
+ "ip": "203.0.113.55",
+ "port": "9000",
+
+ // Optional (use ip+port or address): address field
+ "address": "api.deepseek.com",
+
+ // --- LLM-Specific Metadata ---
+ "llm-meta.fallback": "true",
+
+ // API Keys in JSON string format
+ "llm-meta.api_keys": "key-xxxxxxxx",
+
+ // --- Retry Policy Configuration ---
+ "llm-meta.retry_policy.name": "ExponentialBackoff",
+ "llm-meta.retry_policy.config": string(retryConfigJSON),
+ }
+
+ // 3. Register the Nacos instance
+ // Note: The Ip and Port here are the actual listening addresses of the
service instance,
+ // while the ip and port in the metadata are the addresses you want the
gateway to access.
+ _, err := client.RegisterInstance(vo.RegisterInstanceParam{
+ Ip: "192.168.1.10", // The service's internal IP
+ Port: 8001, // The service's internal port
+ ServiceName: "deepseek-service",
+ GroupName: "DEFAULT_GROUP",
+ Ephemeral: true,
+ Healthy: true,
+ Weight: 10,
+ Metadata: metadata,
+ })
+
+ if err != nil {
+ log.Fatalf("Failed to register service instance: %v", err)
+ }
+
+ log.Println("Service instance registered successfully!")
+ // ...
+}
+
+```
\ No newline at end of file
diff --git a/docs/ai/registry_CN.md b/docs/ai/registry_CN.md
new file mode 100644
index 00000000..a19e4df7
--- /dev/null
+++ b/docs/ai/registry_CN.md
@@ -0,0 +1,209 @@
+## LLM 服务发现与注册
+
+[English](registry.md) | 中文
+
+本文档旨在指导 LLM 服务提供商如何通过 Nacos 注册中心,将其服务实例动态地注册到 LLM
网关。通过遵循这些准则,网关将能够自动发现您的服务,并根据您提供的元数据(metadata)应用相应的路由、重试和 fallback 策略。
+
+### 注册机制概述
+
+服务发现的核心机制是:您的 LLM 服务作为一个**Nacos 实例**进行注册,并在注册时提供一组特定的**元数据**。LLM 网关会监听 Nacos
中的服务变更,读取这些元数据,并将其动态地转换为一个功能齐全的网关 `endpoint` 配置。
+
+一个基本的 Nacos 注册请求包含以下关键信息:
+
+- **`ServiceName`**: 您的服务集合的名称 (例如, `deepseek-service`)。
+- **`Ip` & `Port`**: 您的服务实例监听流量的网络地址。
+- **`Metadata`**: 一个键值对集合,**这是配置所有网关行为的关键**。
+
+### `metadata` 配置字段
+
+所有网关的特定配置都通过 Nacos 实例的 `metadata` 字段进行传递。以下是所有支持的 `metadata` 键及其说明。
+
+`cluster`
+
+- **类型**: `string`
+- **必需**: 是
+- **描述**: 定义此 endpoint 应归属到网关中的哪个集群 (`cluster`)。网关会根据此值将具有相同 `cluster`
名称的服务实例聚合在一起。
+- **示例**: `"deepseek_cluster"`
+
+`id`
+
+- **类型**: `string`
+- **必需**: 是
+- **描述**: endpoint 的唯一标识符。它在所属的 `cluster` 内必须是唯一的。
+- **示例**: `"deepseek-primary-instance-1"`
+
+`ip`
+
+- **类型**: `string`
+- **必需**: 否
+- **描述**: 可选参数,当您的服务需要向网关注册一个不同于其内部 IP 的地址时(例如,一个可公开访问的
IP),此字段非常有用。如果未提供,网关将使用默认值 `0.0.0.0`。
+- **示例**: `"203.0.113.55"`
+
+`port`
+
+- **类型**: `string` (代表一个整数)
+- **必需**: 否
+- **描述**: 可选参数,用于覆盖 Nacos 实例本身注册的 `Port`。用法与 `ip` 字段类似。
+- **示例**: `"9000"`
+
+`name`
+
+- **类型**: `string`
+- **必需**: 否
+- **描述**: 一个人类可读的名称,用于标识此 endpoint。主要用于日志和监控。
+- **示例**: `"DeepSeek V2 Chat (Primary)"`
+
+`address`
+
+- **类型**: `string`
+- **必需**: 否
+- **描述**: 以逗号分隔的字符串,每一个字符串代表了一个 address
+- **示例**: `"api.deepseek.com"`
+
+`llm-meta.fallback`
+
+- **类型**: `string` ("true" 或 "false")
+- **必需**: 否, 默认为 `"false"`
+- **描述**: 决定如果在此 endpoint 上的所有重试尝试都失败后,网关是否应继续处理集群中的下一个 endpoint。
+
+`llm-meta.api_key`
+
+- **类型**: `string`
+- **必需**: 否
+- **描述**: 此 endpoint 使用的 API 密钥。
+
+`llm-meta.retry_policy.name`
+
+- **类型**: `string`
+- **必需**: 否, 默认为 `"NoRetry"`
+- **描述**: 指定当对此 endpoint 的请求失败时要使用的重试策略的名称。名称不区分大小写。
+
+`llm-meta.retry_policy.config`
+
+- **类型**: `string` (JSON 格式)
+- **必需**: 否
+- **描述**: 一个 JSON 字符串,包含特定于所选重试策略的配置参数。
+
+### 重试策略元数据
+
+您可以通过组合使用 `llm-meta.retry_policy.name` 和 `llm-meta.retry_policy.config`
来配置重试行为。
+
+1. `CountBased` (基于次数)
+
+ - **name**: `"CountBased"`
+ - **config**: 一个包含 `times` 字段的 JSON 字符串。
+ - `times` (integer): 重试的次数。
+ - **示例**:
+ - `llm-meta.retry_policy.name`: `"CountBased"`
+ - `llm-meta.retry_policy.config`: `{"times": 2}`
+
+2. `ExponentialBackoff` (指数退避)
+
+ - **name**: `"ExponentialBackoff"`
+ - **config**: 一个包含 `times`, `initialInterval`, `maxInterval`, 和
`multiplier` 字段的 JSON 字符串。
+ - `times` (integer): 重试次数。
+ - `initialInterval` (string): 初始等待时长 (例如 "200ms")。
+ - `maxInterval` (string): 最大等待时长 (例如 "5s")。
+ - `multiplier` (float): 延迟时间的乘数因子。
+ - **示例**:
+ - `llm-meta.retry_policy.name`: `"ExponentialBackoff"`
+ - `llm-meta.retry_policy.config`: `{"times": 3, "initialInterval":
"200ms", "maxInterval": "5s", "multiplier": 2.0}`
+
+3. `NoRetry` (不重试)
+
+ - **name**: `"NoRetry"`
+ - **config**: 此策略不需要 `config` 字段。
+
+### 完整注册示例 (Go)
+
+更详细的使用方法可以参考[官方示例](https://github.com/apache/dubbo-go-pixiu-samples/tree/main/llm/nacos)。
+
+pixiu 配置文件,需要启用llmregistrycenter这个适配器,示例如下:
+
+```yaml
+ adapters:
+ - id: test
+ name: dgp.adapter.llmregistrycenter
+ config:
+ registries:
+ nacos:
+ protocol: nacos
+ address: "127.0.0.1:8848"
+ timeout: "5s"
+ group: test_llm_registry_group
+ namespace: public
+```
+
+此示例展示了如何使用 Nacos Go SDK 注册一个功能完整的 LLM 服务实例。该实例将被配置为 `deepseek_cluster` 中的主
endpoint,使用指数退避重试策略,并在失败时 fallback 到集群中的下一个服务。
+
+```go
+package main
+
+import (
+ "encoding/json"
+ "log"
+
+ "github.com/nacos-group/nacos-sdk-go/vo"
+)
+
+func main() {
+ // ... (此处省略了创建 Nacos 客户端的代码)
+ // client, err := createNacosClient()
+
+ // 1. 准备重试策略的 JSON 配置
+ retryConfig := map[string]interface{}{
+ "times": 3,
+ "initialInterval": "200ms",
+ "maxInterval": "8s",
+ "multiplier": 2.5,
+ }
+ retryConfigJSON, _ := json.Marshal(retryConfig)
+
+ // 2. 构造包含所有网关配置的 metadata
+ metadata := map[string]string{
+ // --- 核心 Endpoint 配置 ---
+ "cluster": "deepseek_cluster",
+ "id": "deepseek-primary",
+ "name": "DeepSeek V2 Chat (Primary)",
+
+ // 可选: (使用ip+port或者address): 实例的 IP 和 Port
+ "ip": "203.0.113.55",
+ "port": "9000",
+
+ // 可选: (使用ip+port或者address): address 列表
+ "address": "api.deepseek.com",
+
+ // --- LLM 特定元数据 ---
+ "llm-meta.fallback": "true",
+
+ // 使用 JSON 字符串格式的 API Keys
+ "llm-meta.api_key": "key-xxxxxxxx",
+
+ // --- 重试策略配置 ---
+ "llm-meta.retry_policy.name": "ExponentialBackoff",
+ "llm-meta.retry_policy.config": string(retryConfigJSON),
+ }
+
+ // 3. 注册 Nacos 实例
+ // 注意:这里的 Ip 和 Port 是服务实例的实际监听地址,
+ // 而 metadata 中的 ip 和 port 是希望网关访问的地址。
+ _, err := client.RegisterInstance(vo.RegisterInstanceParam{
+ Ip: "192.168.1.10", // 服务的内部 IP
+ Port: 8001, // 服务的内部端口
+ ServiceName: "deepseek-service",
+ GroupName: "DEFAULT_GROUP",
+ Ephemeral: true,
+ Healthy: true,
+ Weight: 10,
+ Metadata: metadata,
+ })
+
+ if err != nil {
+ log.Fatalf("注册服务实例失败: %v", err)
+ }
+
+ log.Println("服务实例注册成功!")
+ // ...
+}
+
+```
\ No newline at end of file
diff --git a/go.mod b/go.mod
index 9304f06e..e14a1bba 100644
--- a/go.mod
+++ b/go.mod
@@ -28,6 +28,7 @@ require (
github.com/golang-jwt/jwt/v4 v4.5.2
github.com/golang/mock v1.6.0
github.com/golang/protobuf v1.5.4
+ github.com/hashicorp/go-uuid v1.0.3
github.com/jhump/protoreflect v1.17.0
github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible
github.com/lestrrat-go/httprc/v3 v3.0.1
@@ -155,7 +156,6 @@ require (
github.com/grpc-ecosystem/grpc-opentracing
v0.0.0-20180507213350-8e809c8a8645 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
- github.com/hashicorp/go-uuid v1.0.3 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/hashicorp/vault/sdk v0.7.0 // indirect
diff --git a/pkg/common/constant/url.go
b/pkg/adapter/llmregistry/common/common.go
similarity index 59%
copy from pkg/common/constant/url.go
copy to pkg/adapter/llmregistry/common/common.go
index 14aeeef0..dc2ef0ef 100644
--- a/pkg/common/constant/url.go
+++ b/pkg/adapter/llmregistry/common/common.go
@@ -14,19 +14,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package constant
-const (
- // RequestBody name of api config mapping from/to
- RequestBody = "requestBody"
- // QueryStrings name of api config mapping from/to
- QueryStrings = "queryStrings"
- // Headers name of api config mapping from/to
- Headers = "headers"
- // RequestURI name of api config mapping from/to, retrieve parameters
from uri
- // for instance, https://test.com/:id uri.id will retrieve the :id
parameter
- RequestURI = "uri"
- // Dot defines the . which will be used to present the path to specific
field in the body
- Dot = "."
- AnyValue = "*"
+package common
+
+import (
+ "github.com/apache/dubbo-go-pixiu/pkg/model"
)
+
+type RegistryEventListener interface {
+ OnAddEndpoint(r *model.Endpoint) error
+ OnRemoveEndpoint(r *model.Endpoint) error
+ //OnDeleteRouter(r config.Resource) error
+}
diff --git a/pkg/adapter/llmregistry/registry/base/baseregistry.go
b/pkg/adapter/llmregistry/registry/base/baseregistry.go
new file mode 100644
index 00000000..18f2d8e8
--- /dev/null
+++ b/pkg/adapter/llmregistry/registry/base/baseregistry.go
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package baseregistry
+
+import (
+ "sync"
+)
+
+import (
+ "github.com/apache/dubbo-go-pixiu/pkg/adapter/llmregistry/common"
+ "github.com/apache/dubbo-go-pixiu/pkg/adapter/llmregistry/registry"
+)
+
+type FacadeRegistry interface {
+ // DoSubscribe subscribes the registry cluster to monitor the changes.
+ DoSubscribe() error
+ // DoUnsubscribe unsubscribes the registry cluster.
+ DoUnsubscribe() error
+}
+
+type SvcListeners struct {
+ // listeners use url.ServiceKey as the index.
+ listeners map[string]registry.Listener
+ listenerLock sync.Mutex
+}
+
+// GetListener returns existing listener or nil
+func (s *SvcListeners) GetListener(id string) registry.Listener {
+ s.listenerLock.Lock()
+ defer s.listenerLock.Unlock()
+ listener, ok := s.listeners[id]
+ if !ok {
+ return nil
+ }
+ return listener
+}
+
+// SetListener set the listener to the registry
+func (s *SvcListeners) SetListener(id string, listener registry.Listener) {
+ s.listenerLock.Lock()
+ defer s.listenerLock.Unlock()
+ s.listeners[id] = listener
+}
+
+// RemoveListener removes the listener of the registry
+func (s *SvcListeners) RemoveListener(id string) {
+ s.listenerLock.Lock()
+ defer s.listenerLock.Unlock()
+ delete(s.listeners, id)
+}
+
+func (s *SvcListeners) GetAllListener() map[string]registry.Listener {
+ s.listenerLock.Lock()
+ defer s.listenerLock.Unlock()
+ return s.listeners
+}
+
+type BaseRegistry struct {
+ svcListeners *SvcListeners
+ facadeRegistry FacadeRegistry
+ AdapterListener common.RegistryEventListener
+}
+
+func NewBaseRegistry(facade FacadeRegistry, adapterListener
common.RegistryEventListener) *BaseRegistry {
+ return &BaseRegistry{
+ facadeRegistry: facade,
+ svcListeners: &SvcListeners{
+ listeners: make(map[string]registry.Listener),
+ },
+ AdapterListener: adapterListener,
+ }
+}
+
+// GetSvcListener returns existing listener or nil
+func (r *BaseRegistry) GetSvcListener(id string) registry.Listener {
+ return r.svcListeners.GetListener(id)
+}
+
+// GetAllSvcListener get all the listener of the registry
+func (r *BaseRegistry) GetAllSvcListener() map[string]registry.Listener {
+ return r.svcListeners.GetAllListener()
+}
+
+// SetSvcListener set the listener to the registry
+func (r *BaseRegistry) SetSvcListener(id string, listener registry.Listener) {
+ r.svcListeners.SetListener(id, listener)
+}
+
+// RemoveSvcListener remove the listener of the registry
+func (r *BaseRegistry) RemoveSvcListener(id string) {
+ r.svcListeners.RemoveListener(id)
+}
+
+// Subscribe monitors the target registry.
+func (r *BaseRegistry) Subscribe() error {
+ return r.facadeRegistry.DoSubscribe()
+}
+
+// Unsubscribe stops monitoring the target registry.
+func (r *BaseRegistry) Unsubscribe() error {
+ return r.facadeRegistry.DoUnsubscribe()
+}
diff --git a/pkg/adapter/llmregistry/registry/base/baseregistry_test.go
b/pkg/adapter/llmregistry/registry/base/baseregistry_test.go
new file mode 100644
index 00000000..146947b4
--- /dev/null
+++ b/pkg/adapter/llmregistry/registry/base/baseregistry_test.go
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package baseregistry
+
+import (
+ "errors"
+ "sync"
+ "testing"
+)
+
+import (
+ "github.com/stretchr/testify/assert"
+)
+
+import (
+ "github.com/apache/dubbo-go-pixiu/pkg/adapter/llmregistry/registry"
+ "github.com/apache/dubbo-go-pixiu/pkg/model"
+)
+
+// mockFacadeRegistry is a mock implementation of the FacadeRegistry interface
for testing.
+type mockFacadeRegistry struct {
+ subscribeCalled bool
+ unsubscribeCalled bool
+ subscribeErr error
+ unsubscribeErr error
+}
+
+func (m *mockFacadeRegistry) DoSubscribe() error {
+ m.subscribeCalled = true
+ return m.subscribeErr
+}
+
+func (m *mockFacadeRegistry) DoUnsubscribe() error {
+ m.unsubscribeCalled = true
+ return m.unsubscribeErr
+}
+
+// mockListener is a mock implementation of the registry.Listener interface.
+// Since BaseRegistry only stores and retrieves it, we don't need a complex
implementation.
+type mockListener struct{}
+
+func (m *mockListener) WatchAndHandle() {
+ panic("implement me") // NOSONAR
+}
+
+// Close is a mock method.
+func (m *mockListener) Close() {} // NOSONAR
+
+// mockAdapterListener is a mock implementation of the
common.RegistryEventListener interface.
+type mockAdapterListener struct{}
+
+func (m *mockAdapterListener) OnAddEndpoint(r *model.Endpoint) error {
+ panic("implement me") // NOSONAR
+}
+
+func (m *mockAdapterListener) OnRemoveEndpoint(r *model.Endpoint) error {
+ panic("implement me") // NOSONAR
+}
+
+func TestSvcListeners(t *testing.T) {
+ // Initialization
+ svcListeners := &SvcListeners{
+ listeners: make(map[string]registry.Listener),
+ }
+ mockL := &mockListener{}
+ id := "test-service-1"
+
+ // 1. Test SetListener and GetListener
+ t.Run("SetAndGet", func(t *testing.T) {
+ // Try to get a non-existent listener
+ listener := svcListeners.GetListener(id)
+ assert.Nil(t, listener, "Getting a non-existent listener should
return nil")
+
+ // Set and then get an existing listener
+ svcListeners.SetListener(id, mockL)
+ listener = svcListeners.GetListener(id)
+ assert.NotNil(t, listener, "Getting an existing listener should
not return nil")
+ assert.Equal(t, mockL, listener, "The retrieved listener should
be the same one that was set")
+ })
+
+ // 2. Test GetAllListener
+ t.Run("GetAll", func(t *testing.T) {
+ id2 := "test-service-2"
+ mockL2 := &mockListener{}
+ svcListeners.SetListener(id2, mockL2)
+
+ allListeners := svcListeners.GetAllListener()
+ assert.Len(t, allListeners, 2, "GetAllListener should return
all set listeners")
+ assert.Contains(t, allListeners, id, "The returned map should
contain the first listener")
+ assert.Contains(t, allListeners, id2, "The returned map should
contain the second listener")
+ })
+
+ // 3. Test RemoveListener
+ t.Run("Remove", func(t *testing.T) {
+ svcListeners.RemoveListener(id)
+ listener := svcListeners.GetListener(id)
+ assert.Nil(t, listener, "Getting a removed listener should
return nil")
+
+ allListeners := svcListeners.GetAllListener()
+ assert.Len(t, allListeners, 1, "The total count of listeners
should decrease after removal")
+ assert.NotContains(t, allListeners, id, "The returned map
should no longer contain the removed listener")
+ })
+}
+
+// Test the concurrency safety of SvcListeners.
+func TestSvcListeners_concurrency(t *testing.T) {
+ svcListeners := &SvcListeners{
+ listeners: make(map[string]registry.Listener),
+ }
+ id := "concurrent-service"
+ mockL := &mockListener{}
+
+ var wg sync.WaitGroup
+ // Start multiple goroutines to read and write concurrently
+ for i := 0; i < 100; i++ {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ svcListeners.SetListener(id, mockL)
+ _ = svcListeners.GetListener(id)
+ svcListeners.RemoveListener(id)
+ }()
+ }
+
+ wg.Wait()
+ // The main purpose of this test is to detect race conditions.
+ // Running tests with the `go test -race` command can expose such
issues.
+ // If the test completes without errors, it indicates the lock
mechanism is working correctly.
+}
+
+// --- Tests for BaseRegistry ---
+
+// Test the constructor NewBaseRegistry.
+func TestNewBaseRegistry(t *testing.T) {
+ mockFacade := &mockFacadeRegistry{}
+ mockAdapter := &mockAdapterListener{}
+
+ br := NewBaseRegistry(mockFacade, mockAdapter)
+
+ assert.NotNil(t, br, "NewBaseRegistry should not return nil")
+ assert.Equal(t, mockFacade, br.facadeRegistry, "facadeRegistry should
be initialized correctly")
+ assert.Equal(t, mockAdapter, br.AdapterListener, "AdapterListener
should be initialized correctly")
+ assert.NotNil(t, br.svcListeners, "svcListeners should be initialized")
+ assert.Empty(t, br.svcListeners.listeners, "The svcListeners map should
be empty initially")
+}
+
+// Test the listener management methods of BaseRegistry.
+func TestBaseRegistry_listenerMethods(t *testing.T) {
+ br := NewBaseRegistry(&mockFacadeRegistry{}, &mockAdapterListener{})
+ mockL := &mockListener{}
+ id := "test-service"
+
+ // Test SetSvcListener and GetSvcListener
+ l := br.GetSvcListener(id)
+ assert.Nil(t, l)
+
+ br.SetSvcListener(id, mockL)
+ l = br.GetSvcListener(id)
+ assert.NotNil(t, l)
+ assert.Equal(t, mockL, l)
+
+ // Test GetAllSvcListener
+ allListeners := br.GetAllSvcListener()
+ assert.Len(t, allListeners, 1)
+ assert.Equal(t, mockL, allListeners[id])
+
+ // Test RemoveSvcListener
+ br.RemoveSvcListener(id)
+ l = br.GetSvcListener(id)
+ assert.Nil(t, l)
+ allListeners = br.GetAllSvcListener()
+ assert.Empty(t, allListeners)
+}
+
+// Test the Subscribe and Unsubscribe methods.
+func TestBaseRegistry_subscribeUnsubscribe(t *testing.T) {
+ t.Run("Success", func(t *testing.T) {
+ mockFacade := &mockFacadeRegistry{}
+ br := NewBaseRegistry(mockFacade, &mockAdapterListener{})
+
+ // Test Subscribe
+ err := br.Subscribe()
+ assert.NoError(t, err)
+ assert.True(t, mockFacade.subscribeCalled,
"facadeRegistry.DoSubscribe should have been called")
+
+ // Test Unsubscribe
+ err = br.Unsubscribe()
+ assert.NoError(t, err)
+ assert.True(t, mockFacade.unsubscribeCalled,
"facadeRegistry.DoUnsubscribe should have been called")
+ })
+
+ t.Run("Error", func(t *testing.T) {
+ subscribeError := errors.New("failed to subscribe")
+ unsubscribeError := errors.New("failed to unsubscribe")
+
+ mockFacade := &mockFacadeRegistry{
+ subscribeErr: subscribeError,
+ unsubscribeErr: unsubscribeError,
+ }
+ br := NewBaseRegistry(mockFacade, &mockAdapterListener{})
+
+ // Test Subscribe error propagation
+ err := br.Subscribe()
+ assert.Error(t, err)
+ assert.Equal(t, subscribeError, err, "Subscribe should
propagate the error from facadeRegistry")
+
+ // Test Unsubscribe error propagation
+ err = br.Unsubscribe()
+ assert.Error(t, err)
+ assert.Equal(t, unsubscribeError, err, "Unsubscribe should
propagate the error from facadeRegistry")
+ })
+}
diff --git a/pkg/common/constant/url.go
b/pkg/adapter/llmregistry/registry/listener.go
similarity index 58%
copy from pkg/common/constant/url.go
copy to pkg/adapter/llmregistry/registry/listener.go
index 14aeeef0..3f4c1bcd 100644
--- a/pkg/common/constant/url.go
+++ b/pkg/adapter/llmregistry/registry/listener.go
@@ -14,19 +14,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package constant
-const (
- // RequestBody name of api config mapping from/to
- RequestBody = "requestBody"
- // QueryStrings name of api config mapping from/to
- QueryStrings = "queryStrings"
- // Headers name of api config mapping from/to
- Headers = "headers"
- // RequestURI name of api config mapping from/to, retrieve parameters
from uri
- // for instance, https://test.com/:id uri.id will retrieve the :id
parameter
- RequestURI = "uri"
- // Dot defines the . which will be used to present the path to specific
field in the body
- Dot = "."
- AnyValue = "*"
-)
+package registry
+
+// Listener this interface defined for load services from different kinds
registry, such as nacos,consul,zookeeper.
+type Listener interface {
+ // Close closes this listener
+ Close()
+ // WatchAndHandle watch the target path and handle the event
+ WatchAndHandle()
+}
diff --git a/pkg/adapter/llmregistry/registry/nacos/listener.go
b/pkg/adapter/llmregistry/registry/nacos/listener.go
new file mode 100644
index 00000000..ff064a80
--- /dev/null
+++ b/pkg/adapter/llmregistry/registry/nacos/listener.go
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package nacos
+
+import (
+ "encoding/json"
+ "reflect"
+ "strconv"
+ "strings"
+ "sync"
+ "time"
+)
+
+import (
+ "dubbo.apache.org/dubbo-go/v3/remoting"
+
+ "github.com/creasty/defaults"
+
+ "github.com/hashicorp/go-uuid"
+
+ "github.com/nacos-group/nacos-sdk-go/clients/naming_client"
+ nacosModel "github.com/nacos-group/nacos-sdk-go/model"
+ "github.com/nacos-group/nacos-sdk-go/vo"
+)
+
+import (
+ "github.com/apache/dubbo-go-pixiu/pkg/adapter/llmregistry/common"
+ "github.com/apache/dubbo-go-pixiu/pkg/common/constant"
+ "github.com/apache/dubbo-go-pixiu/pkg/logger"
+ "github.com/apache/dubbo-go-pixiu/pkg/model"
+)
+
+const (
+ // ServicePollingInterval How often to poll for the list of available
services (to discover new ones).
+ ServicePollingInterval = 30 * time.Second
+)
+
+// listener monitors Nacos for service changes.
+type listener struct {
+ client naming_client.INamingClient
+ // Caches the last known instances for a given service. Key:
ServiceName, Value: *sync.Map
+ instanceCache sync.Map
+ // Caches the set of services we are currently subscribed to. Key:
ServiceName, Value: bool
+ subscribedServices sync.Map
+ regConf *model.Registry
+ adapterListener common.RegistryEventListener // The callback to
notify the gateway core.
+
+ exit chan struct{}
+ wg sync.WaitGroup
+}
+
+// newNacosListener creates a new Nacos service listener.
+func newNacosListener(client naming_client.INamingClient, regConf
*model.Registry, adapterListener common.RegistryEventListener) *listener {
+ return &listener{
+ client: client,
+ exit: make(chan struct{}),
+ regConf: regConf,
+ adapterListener: adapterListener,
+ }
+}
+
+// WatchAndHandle starts the background goroutine to watch for service changes.
+func (l *listener) WatchAndHandle() {
+ l.wg.Add(1)
+ go l.watchForServices()
+}
+
+// watchForServices periodically polls Nacos to discover new services to
subscribe to.
+// The actual instance updates for subscribed services are push-based via the
callback.
+func (l *listener) watchForServices() {
+ defer l.wg.Done()
+
+ ticker := time.NewTicker(ServicePollingInterval)
+ defer ticker.Stop()
+
+ // Perform an initial check immediately.
+ l.discoverAndSubscribe()
+
+ for {
+ select {
+ case <-l.exit:
+ logger.Info("Nacos listener is stopping...")
+ l.unsubscribeAll()
+ return
+ case <-ticker.C:
+ l.discoverAndSubscribe()
+ }
+ }
+}
+
+func (l *listener) discoverAndSubscribe() {
+ serviceList, err :=
l.client.GetAllServicesInfo(vo.GetAllServiceInfoParam{
+ GroupName: l.regConf.Group,
+ NameSpace: l.regConf.Namespace,
+ })
+ if err != nil {
+ logger.Warnf("Failed to get service list from Nacos: %v", err)
+ return
+ }
+
+ currentServices := make(map[string]struct{})
+ for _, serviceName := range serviceList.Doms {
+ currentServices[serviceName] = struct{}{}
+ // If we aren't already subscribed to this service, subscribe
now.
+ if _, loaded := l.subscribedServices.LoadOrStore(serviceName,
true); !loaded {
+ err := l.client.Subscribe(&vo.SubscribeParam{
+ ServiceName: serviceName,
+ GroupName: l.regConf.Group,
+ SubscribeCallback: l.serviceCallback,
+ })
+ if err != nil {
+ logger.Errorf("Failed to subscribe to Nacos
service %s: %v", serviceName, err)
+ l.subscribedServices.Delete(serviceName) //
Remove from map to retry next time.
+ } else {
+ logger.Infof("Successfully subscribed to Nacos
service: %s", serviceName)
+ }
+ }
+ }
+
+ // Unsubscribe from services that no longer exist.
+ l.subscribedServices.Range(func(key, value any) bool {
+ serviceName := key.(string)
+ if _, exists := currentServices[serviceName]; !exists {
+ err := l.client.Unsubscribe(&vo.SubscribeParam{
+ ServiceName: serviceName,
+ GroupName: l.regConf.Group,
+ })
+ if err != nil {
+ logger.Errorf("Failed to unsubscribe from Nacos
service %s: %v", serviceName, err)
+ } else {
+ logger.Infof("Successfully unsubscribed from
Nacos service: %s", serviceName)
+ l.subscribedServices.Delete(serviceName)
+ l.instanceCache.Delete(serviceName)
+ }
+ }
+ return true
+ })
+}
+
+// serviceCallback is the function that Nacos SDK invokes when there's a change
+// in the instances of a subscribed service. This is the injected callback.
+func (l *listener) serviceCallback(services []nacosModel.SubscribeService, err
error) {
+ if err != nil {
+ logger.Errorf("Nacos subscribe callback received an error: %v",
err)
+ return
+ }
+ if len(services) == 0 {
+ logger.Warn("Nacos callback received an empty list of services,
which might indicate all instances are offline.")
+ // The logic to handle removal of a service with zero instances
+ // is handled by the polling `discoverAndSubscribe` loop.
+ return
+ }
+
+ serviceName := services[0].ServiceName
+ logger.Debugf("Received callback for service: %s with %d instances",
serviceName, len(services))
+
+ oldCache, _ := l.instanceCache.LoadOrStore(serviceName, &sync.Map{})
+ oldInstanceMap := oldCache.(*sync.Map)
+
+ newInstanceMap := &sync.Map{}
+ newEndpoints := make(map[string]*model.Endpoint)
+
+ // Process the new list from Nacos.
+ for i := range services {
+ // Also check for health
+ if !services[i].Enable || !services[i].Healthy {
+ continue
+ }
+ instance := generateInstance(services[i])
+ endpoint := generateEndpoint(instance)
+ if endpoint == nil {
+ continue
+ }
+ key := serviceName + constant.At + endpoint.ID
+ newInstanceMap.Store(key, instance)
+ newEndpoints[key] = endpoint
+ }
+
+ // Check for added or updated instances.
+ for key, endpoint := range newEndpoints {
+ if oldRaw, ok := oldInstanceMap.Load(key); ok {
+ oldInstance := oldRaw.(nacosModel.Instance)
+ newInstance, _ := newInstanceMap.Load(key)
+ if !reflect.DeepEqual(oldInstance, newInstance) {
+ l.handle(endpoint, remoting.EventTypeUpdate)
+ }
+ } else {
+ l.handle(endpoint, remoting.EventTypeAdd)
+ }
+ }
+
+ // Check for removed instances.
+ oldInstanceMap.Range(func(key, value any) bool {
+ instanceKey := key.(string)
+ if _, ok := newEndpoints[instanceKey]; !ok {
+ instance := value.(nacosModel.Instance)
+ endpoint := generateEndpoint(instance)
+ l.handle(endpoint, remoting.EventTypeDel)
+ }
+ return true
+ })
+
+ // Update the cache with the new state.
+ l.instanceCache.Store(serviceName, newInstanceMap)
+}
+
+func (l *listener) handle(endpoint *model.Endpoint, action remoting.EventType)
{
+ if endpoint == nil {
+ return
+ }
+ logger.Infof("Handling endpoint event: %v for %s at %s", action,
endpoint.Name, endpoint.Address.Address)
+ switch action {
+ case remoting.EventTypeAdd, remoting.EventTypeUpdate:
+ if err := l.adapterListener.OnAddEndpoint(endpoint); err != nil
{
+ logger.Errorf("Failed to add/update endpoint %s: %s",
endpoint.Name, err.Error())
+ }
+ case remoting.EventTypeDel:
+ if err := l.adapterListener.OnRemoveEndpoint(endpoint); err !=
nil {
+ logger.Errorf("Failed to remove endpoint %s: %s",
endpoint.Name, err.Error())
+ }
+ }
+}
+
+// Close gracefully stops the listener.
+func (l *listener) Close() {
+ close(l.exit)
+ l.wg.Wait()
+}
+
+func (l *listener) unsubscribeAll() {
+ l.subscribedServices.Range(func(key, value any) bool {
+ serviceName := key.(string)
+ err := l.client.Unsubscribe(&vo.SubscribeParam{
+ ServiceName: serviceName,
+ GroupName: l.regConf.Group,
+ })
+ if err != nil {
+ logger.Errorf("Failed to unsubscribe from Nacos service
%s on shutdown: %v", serviceName, err)
+ } else {
+ logger.Infof("Unsubscribed from Nacos service %s on
shutdown.", serviceName)
+ }
+ return true
+ })
+}
+
+func generateEndpoint(instance nacosModel.Instance) *model.Endpoint {
+ if instance.Metadata == nil {
+ logger.Warnf("Nacos instance metadata is empty, instance: %+v",
instance)
+ return nil
+ }
+
+ ret := &model.Endpoint{
+ Address: model.SocketAddress{},
+ LLMMeta: &model.LLMMeta{},
+ }
+
+ err := defaults.Set(ret)
+ if err != nil {
+ logger.Warnf("Failed to set default values for endpoint: %v",
err)
+ return nil
+ }
+
+ if ip, ok := instance.Metadata["ip"]; ok {
+ ret.Address.Address = ip
+ }
+
+ if port, ok := instance.Metadata["port"]; ok {
+ p, err := strconv.Atoi(port)
+ if err != nil {
+ logger.Warnf("Invalid port in metadata: %s, error: %v",
port, err)
+ }
+ ret.Address.Port = p
+ }
+
+ if id, ok := instance.Metadata["id"]; ok {
+ ret.ID = id
+ } else {
+ ret.ID, _ = uuid.GenerateUUID()
+ }
+
+ if name, ok := instance.Metadata["name"]; ok {
+ ret.Name = name
+ }
+ if address, ok := instance.Metadata["address"]; ok {
+ ret.Address.Domains = strings.Split(address, ",")
+ }
+ if apiKey, ok := instance.Metadata["llm-meta.api_key"]; ok {
+ ret.LLMMeta.APIKey = apiKey
+ }
+ if retryPolicy, ok := instance.Metadata["llm-meta.retry_policy.name"];
ok {
+ ret.LLMMeta.RetryPolicy.Name = model.RetryTypeValue[retryPolicy]
+ }
+ if retryConfig, ok :=
instance.Metadata["llm-meta.retry_policy.config"]; ok {
+ err := json.Unmarshal([]byte(retryConfig),
&ret.LLMMeta.RetryPolicy.Config)
+ if err != nil {
+ logger.Warnf("Failed to parse retry policy config JSON:
%s, error: %v", retryConfig, err)
+ }
+ }
+ if fallback, ok := instance.Metadata["llm-meta.fallback"]; ok {
+ ret.LLMMeta.Fallback =
strings.ToLower(strings.TrimSpace(fallback)) == "true"
+ }
+
+ ret.Metadata = instance.Metadata
+
+ return ret
+}
+
+func generateInstance(ss nacosModel.SubscribeService) nacosModel.Instance {
+ return nacosModel.Instance{
+ InstanceId: ss.InstanceId,
+ Ip: ss.Ip,
+ Port: ss.Port,
+ ServiceName: ss.ServiceName,
+ Valid: ss.Valid,
+ Enable: ss.Enable,
+ Weight: ss.Weight,
+ Metadata: ss.Metadata,
+ ClusterName: ss.ClusterName,
+ Healthy: ss.Healthy,
+ }
+}
diff --git a/pkg/adapter/llmregistry/registry/nacos/listener_test.go
b/pkg/adapter/llmregistry/registry/nacos/listener_test.go
new file mode 100644
index 00000000..6835f7d3
--- /dev/null
+++ b/pkg/adapter/llmregistry/registry/nacos/listener_test.go
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package nacos
+
+import (
+ "errors"
+ "sync"
+ "testing"
+ "time"
+)
+
+import (
+ "github.com/nacos-group/nacos-sdk-go/clients/naming_client"
+ nacosModel "github.com/nacos-group/nacos-sdk-go/model"
+ "github.com/nacos-group/nacos-sdk-go/vo"
+
+ "github.com/stretchr/testify/assert"
+)
+
+import (
+ "github.com/apache/dubbo-go-pixiu/pkg/adapter/llmregistry/common"
+ "github.com/apache/dubbo-go-pixiu/pkg/model"
+)
+
+type mockNacosClient struct {
+ naming_client.INamingClient
+
+ mu sync.Mutex
+ servicesToReturn nacosModel.ServiceList
+ servicesToReturnErr error
+ subscribeCallback func(services []nacosModel.SubscribeService, err
error)
+ subscribedServices map[string]struct{}
+ unsubscribedServices map[string]struct{}
+}
+
+func newMockNacosClient() *mockNacosClient {
+ return &mockNacosClient{
+ subscribedServices: make(map[string]struct{}),
+ unsubscribedServices: make(map[string]struct{}),
+ }
+}
+
+func (m *mockNacosClient) GetAllServicesInfo(param vo.GetAllServiceInfoParam)
(nacosModel.ServiceList, error) {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ return m.servicesToReturn, m.servicesToReturnErr
+}
+
+func (m *mockNacosClient) Subscribe(param *vo.SubscribeParam) error {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ m.subscribedServices[param.ServiceName] = struct{}{}
+ m.subscribeCallback = param.SubscribeCallback
+ return nil
+}
+
+func (m *mockNacosClient) Unsubscribe(param *vo.SubscribeParam) error {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ m.unsubscribedServices[param.ServiceName] = struct{}{}
+ return nil
+}
+
+type mockAdapterListener struct {
+ mu sync.Mutex
+ addedEndpoints map[string]*model.Endpoint
+ removedEndpoints map[string]*model.Endpoint
+}
+
+var _ common.RegistryEventListener = (*mockAdapterListener)(nil)
+
+func newMockAdapterListener() *mockAdapterListener {
+ return &mockAdapterListener{
+ addedEndpoints: make(map[string]*model.Endpoint),
+ removedEndpoints: make(map[string]*model.Endpoint),
+ }
+}
+
+func (m *mockAdapterListener) OnAddEndpoint(endpoint *model.Endpoint) error {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ m.addedEndpoints[endpoint.ID] = endpoint
+ return nil
+}
+
+func (m *mockAdapterListener) OnRemoveEndpoint(endpoint *model.Endpoint) error
{
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ m.removedEndpoints[endpoint.ID] = endpoint
+ return nil
+}
+
+func (m *mockAdapterListener) reset() {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ m.addedEndpoints = make(map[string]*model.Endpoint)
+ m.removedEndpoints = make(map[string]*model.Endpoint)
+}
+
+func testSetup() (*listener, *mockNacosClient, *mockAdapterListener) {
+ client := newMockNacosClient()
+ adapterListener := newMockAdapterListener()
+ regConf := &model.Registry{Group: "test_group", Namespace:
"test_namespace"}
+ nacosListener := newNacosListener(client, regConf, adapterListener)
+ return nacosListener, client, adapterListener
+}
+
+func TestGenerateEndpoint(t *testing.T) {
+ t.Run("Full valid metadata", func(t *testing.T) {
+ instance := nacosModel.Instance{
+ Metadata: map[string]string{
+ "id": "ep-123",
+ "name": "my-llm",
+ "ip": "127.0.0.1",
+ "port": "8080",
+ "address":
"openai.com,openai1.com",
+ "llm-meta.retry_policy.name":
"ExponentialBackoff",
+ "llm-meta.fallback": "true",
+ },
+ }
+
+ endpoint := generateEndpoint(instance)
+ assert.NotNil(t, endpoint)
+ assert.Equal(t, "ep-123", endpoint.ID)
+ assert.Equal(t, "my-llm", endpoint.Name)
+ assert.Equal(t, "127.0.0.1", endpoint.Address.Address)
+ assert.Equal(t, 8080, endpoint.Address.Port)
+ assert.Equal(t, 2, len(endpoint.Address.Domains))
+ assert.Equal(t, "openai.com", endpoint.Address.Domains[0])
+ assert.Equal(t, "openai1.com", endpoint.Address.Domains[1])
+ assert.Equal(t, model.RetryerExponentialBackoff,
endpoint.LLMMeta.RetryPolicy.Name)
+ assert.True(t, endpoint.LLMMeta.Fallback)
+ })
+
+ t.Run("Nil metadata", func(t *testing.T) {
+ instance := nacosModel.Instance{Metadata: nil}
+ endpoint := generateEndpoint(instance)
+ assert.Nil(t, endpoint)
+ })
+
+ t.Run("Invalid port", func(t *testing.T) {
+ instance := nacosModel.Instance{Metadata:
map[string]string{"port": "not-a-number"}}
+ endpoint := generateEndpoint(instance)
+ assert.NotNil(t, endpoint)
+ assert.Equal(t, 0, endpoint.Address.Port)
+ })
+}
+
+func TestDiscoverAndSubscribe(t *testing.T) {
+ l, client, _ := testSetup()
+
+ t.Run("Discover and subscribe to a new service", func(t *testing.T) {
+ // CHANGE THIS LINE:
+ client.servicesToReturn = nacosModel.ServiceList{Doms:
[]string{"service-A"}} // Was nacosModel.Service
+ l.discoverAndSubscribe()
+
+ assert.Equal(t, struct{}{},
client.subscribedServices["service-A"], "Should subscribe to service-A")
+ _, loaded := l.subscribedServices.Load("service-A")
+ assert.True(t, loaded, "service-A should be in the
subscribedServices map")
+ })
+
+ t.Run("Unsubscribe from a removed service", func(t *testing.T) {
+ // Ensure service-A is already subscribed for the test setup
+ l.subscribedServices.Store("service-A", true)
+ client.servicesToReturn = nacosModel.ServiceList{Doms:
[]string{}} // Nacos now returns an empty list
+
+ l.discoverAndSubscribe()
+
+ assert.Equal(t, struct{}{},
client.unsubscribedServices["service-A"], "Should unsubscribe from service-A")
+ _, loaded := l.subscribedServices.Load("service-A")
+ assert.False(t, loaded, "service-A should be removed from
subscribedServices map")
+ })
+
+ t.Run("Handle Nacos API error", func(t *testing.T) {
+ client.subscribedServices = make(map[string]struct{})
+ l.subscribedServices.Store("stale-service", true)
+
+ client.servicesToReturnErr = errors.New("Nacos unavailable")
+ l.discoverAndSubscribe()
+
+ _, loaded := l.subscribedServices.Load("stale-service")
+ assert.True(t, loaded, "Should not change subscriptions on API
error")
+ assert.Empty(t, client.subscribedServices, "Should not attempt
to subscribe on API error")
+ })
+}
+
+func TestServiceCallback(t *testing.T) {
+ l, client, adapterListener := testSetup()
+
+ _ = client.Subscribe(&vo.SubscribeParam{
+ ServiceName: "service-A",
+ SubscribeCallback: l.serviceCallback,
+ })
+
+ instance1 := nacosModel.SubscribeService{
+ InstanceId: "ep-1", ServiceName: "service-A", Enable: true,
Healthy: true,
+ Metadata: map[string]string{"id": "ep-1", "name": "inst-1"},
+ }
+ instance2 := nacosModel.SubscribeService{
+ InstanceId: "ep-2", ServiceName: "service-A", Enable: true,
Healthy: true,
+ Metadata: map[string]string{"id": "ep-2", "name": "inst-2"},
+ }
+
+ t.Run("Initial instance registration", func(t *testing.T) {
+ adapterListener.reset()
+
client.subscribeCallback([]nacosModel.SubscribeService{instance1, instance2},
nil)
+
+ assert.Len(t, adapterListener.addedEndpoints, 2, "Should add 2
endpoints")
+ assert.Contains(t, adapterListener.addedEndpoints, "ep-1")
+ assert.Contains(t, adapterListener.addedEndpoints, "ep-2")
+ assert.Empty(t, adapterListener.removedEndpoints, "Should not
remove any endpoints")
+ })
+
+ t.Run("One instance is removed", func(t *testing.T) {
+ adapterListener.reset()
+
client.subscribeCallback([]nacosModel.SubscribeService{instance1}, nil)
+
+ assert.Empty(t, adapterListener.addedEndpoints, "Should not add
any new endpoints")
+ assert.Len(t, adapterListener.removedEndpoints, 1, "Should
remove 1 endpoint")
+ assert.Contains(t, adapterListener.removedEndpoints, "ep-2")
+ })
+
+ t.Run("One instance is updated", func(t *testing.T) {
+ adapterListener.reset()
+ updatedInstance1 := instance1
+ updatedInstance1.Metadata = map[string]string{"id": "ep-1",
"name": "inst-1-updated"}
+
+
client.subscribeCallback([]nacosModel.SubscribeService{updatedInstance1}, nil)
+
+ assert.Len(t, adapterListener.addedEndpoints, 1, "Should fire
an add/update event for 1 endpoint")
+ assert.Contains(t, adapterListener.addedEndpoints, "ep-1")
+ assert.Equal(t, "inst-1-updated",
adapterListener.addedEndpoints["ep-1"].Name)
+ assert.Empty(t, adapterListener.removedEndpoints, "Should not
remove any endpoints")
+ })
+
+ t.Run("Filter unhealthy or disabled instances", func(t *testing.T) {
+ adapterListener.reset()
+ unhealthyInstance := instance1
+ unhealthyInstance.Healthy = false
+ disabledInstance := instance2
+ disabledInstance.Enable = false
+
+
client.subscribeCallback([]nacosModel.SubscribeService{unhealthyInstance,
disabledInstance}, nil)
+
+ assert.Empty(t, adapterListener.addedEndpoints, "Should not add
unhealthy/disabled endpoints")
+ assert.Len(t, adapterListener.removedEndpoints, 1, "Should
remove the previously active endpoints")
+ })
+
+ t.Run("No changes in instances", func(t *testing.T) {
+
client.subscribeCallback([]nacosModel.SubscribeService{instance1}, nil)
+
+ adapterListener.reset()
+
+
client.subscribeCallback([]nacosModel.SubscribeService{instance1}, nil)
+
+ assert.Empty(t, adapterListener.addedEndpoints, "Should not
trigger add for unchanged instance")
+ assert.Empty(t, adapterListener.removedEndpoints, "Should not
trigger remove for unchanged instance")
+ })
+}
+
+func TestLifecycle(t *testing.T) {
+ l, _, _ := testSetup()
+
+ l.WatchAndHandle()
+
+ time.Sleep(100 * time.Millisecond)
+
+ // test that Close works without panic
+ assert.NotPanics(t, func() {
+ l.Close()
+ })
+}
diff --git a/pkg/adapter/llmregistry/registry/nacos/registry.go
b/pkg/adapter/llmregistry/registry/nacos/registry.go
new file mode 100644
index 00000000..8ab6c8e4
--- /dev/null
+++ b/pkg/adapter/llmregistry/registry/nacos/registry.go
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package nacos
+
+import (
+ "github.com/nacos-group/nacos-sdk-go/clients"
+ "github.com/nacos-group/nacos-sdk-go/clients/naming_client"
+ nacosConstant "github.com/nacos-group/nacos-sdk-go/common/constant"
+ "github.com/nacos-group/nacos-sdk-go/vo"
+)
+
+import (
+ "github.com/apache/dubbo-go-pixiu/pkg/adapter/llmregistry/common"
+ "github.com/apache/dubbo-go-pixiu/pkg/adapter/llmregistry/registry"
+ baseRegistry
"github.com/apache/dubbo-go-pixiu/pkg/adapter/llmregistry/registry/base"
+ "github.com/apache/dubbo-go-pixiu/pkg/common/constant"
+ "github.com/apache/dubbo-go-pixiu/pkg/common/util/stringutil"
+ "github.com/apache/dubbo-go-pixiu/pkg/model"
+)
+
+func init() {
+ registry.SetRegistry(constant.Nacos, newNacosRegistry)
+}
+
+type NacosRegistry struct {
+ *baseRegistry.BaseRegistry
+ nacosListener *listener
+ client naming_client.INamingClient
+}
+
+func (n *NacosRegistry) DoSubscribe() error {
+ go n.nacosListener.WatchAndHandle()
+ return nil
+}
+
+func (n *NacosRegistry) DoUnsubscribe() error {
+ n.nacosListener.Close()
+ return nil
+}
+
+var _ registry.Registry = new(NacosRegistry)
+
+func newNacosRegistry(regConfig model.Registry, adapterListener
common.RegistryEventListener) (registry.Registry, error) {
+ addrs, err := stringutil.GetIPAndPort(regConfig.Address)
+ if err != nil {
+ return nil, err
+ }
+
+ scs := make([]nacosConstant.ServerConfig, 0, len(addrs))
+ for _, addr := range addrs {
+ scs = append(scs, nacosConstant.ServerConfig{
+ IpAddr: addr.IP.String(),
+ Port: uint64(addr.Port),
+ })
+ }
+
+ ccs := nacosConstant.NewClientConfig(
+ nacosConstant.WithNamespaceId(regConfig.Namespace),
+ nacosConstant.WithUsername(regConfig.Username),
+ nacosConstant.WithPassword(regConfig.Password),
+ nacosConstant.WithNotLoadCacheAtStart(true),
+ nacosConstant.WithUpdateCacheWhenEmpty(true))
+
+ client, err := clients.NewNamingClient(vo.NacosClientParam{
+ ServerConfigs: scs,
+ ClientConfig: ccs,
+ })
+ if err != nil {
+ return nil, err
+ }
+
+ nacosRegistry := &NacosRegistry{
+ client: client,
+ }
+ nacosRegistry.BaseRegistry =
baseRegistry.NewBaseRegistry(nacosRegistry, adapterListener)
+ nacosRegistry.nacosListener = newNacosListener(client, ®Config,
adapterListener)
+
+ return nacosRegistry, nil
+}
diff --git a/pkg/adapter/llmregistry/registry/registry.go
b/pkg/adapter/llmregistry/registry/registry.go
new file mode 100644
index 00000000..6282184b
--- /dev/null
+++ b/pkg/adapter/llmregistry/registry/registry.go
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package registry
+
+import (
+ "github.com/pkg/errors"
+)
+
+import (
+ "github.com/apache/dubbo-go-pixiu/pkg/adapter/llmregistry/common"
+ "github.com/apache/dubbo-go-pixiu/pkg/model"
+)
+
+// A map to store registry creation functions by protocol name.
+var registryMap = make(map[string]func(model.Registry,
common.RegistryEventListener) (Registry, error), 8)
+
+// Registry interface defines the basic features of a service registry.
+type Registry interface {
+ // Subscribe starts monitoring the target registry for service changes.
+ Subscribe() error
+ // Unsubscribe stops monitoring the target registry.
+ Unsubscribe() error
+}
+
+// SetRegistry registers a factory function for creating a new registry client.
+func SetRegistry(name string, newRegFunc func(model.Registry,
common.RegistryEventListener) (Registry, error)) {
+ registryMap[name] = newRegFunc
+}
+
+// GetRegistry creates and returns a new registry client based on the
configuration.
+// It panics if the registry client cannot be initialized.
+func GetRegistry(regConfig model.Registry, listener
common.RegistryEventListener) (Registry, error) {
+ if newRegFunc, ok := registryMap[regConfig.Protocol]; ok {
+ reg, err := newRegFunc(regConfig, listener)
+ if err != nil {
+ return nil, errors.New("Initialize Registry " +
regConfig.Protocol + " failed due to: " + err.Error())
+ }
+ return reg, nil
+ }
+ return nil, errors.New("Registry protocol " + regConfig.Protocol + " is
not supported")
+}
diff --git a/pkg/adapter/llmregistry/registrycenter.go
b/pkg/adapter/llmregistry/registrycenter.go
new file mode 100644
index 00000000..6d632cba
--- /dev/null
+++ b/pkg/adapter/llmregistry/registrycenter.go
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package llmregistry
+
+import (
+ "os"
+)
+
+import (
+ "github.com/apache/dubbo-go-pixiu/pkg/adapter/llmregistry/common"
+ "github.com/apache/dubbo-go-pixiu/pkg/adapter/llmregistry/registry"
+ _
"github.com/apache/dubbo-go-pixiu/pkg/adapter/llmregistry/registry/nacos"
+ "github.com/apache/dubbo-go-pixiu/pkg/common/constant"
+ "github.com/apache/dubbo-go-pixiu/pkg/common/extension/adapter"
+ "github.com/apache/dubbo-go-pixiu/pkg/logger"
+ "github.com/apache/dubbo-go-pixiu/pkg/model"
+ "github.com/apache/dubbo-go-pixiu/pkg/server"
+)
+
+const (
+ Kind = constant.LLMRegistryCenterAdapter
+)
+
+func init() {
+ adapter.RegisterAdapterPlugin(&Plugin{})
+}
+
+var (
+ _ adapter.AdapterPlugin = new(Plugin)
+ _ adapter.Adapter = new(Adapter)
+ // Explicitly declare that Adapter implements the listener interface.
+ _ common.RegistryEventListener = new(Adapter)
+)
+
+type (
+ // Plugin to monitor LLM services on a registry center.
+ Plugin struct{}
+
+ // AdapterConfig defines the configuration for the LLM registry adapter.
+ AdapterConfig struct {
+ Registries map[string]model.Registry `yaml:"registries"
json:"registries" mapstructure:"registries"`
+ }
+)
+
+// Kind returns the identifier of the plugin.
+func (p *Plugin) Kind() string {
+ return Kind
+}
+
+// CreateAdapter returns the LLM registry center adapter.
+func (p *Plugin) CreateAdapter(a *model.Adapter) (adapter.Adapter, error) {
+ adapter := &Adapter{
+ id: a.ID,
+ registries: make(map[string]registry.Registry),
+ cfg: &AdapterConfig{Registries:
make(map[string]model.Registry)},
+ }
+ return adapter, nil
+}
+
+// Adapter to monitor LLM services on a registry center.
+type Adapter struct {
+ id string
+ cfg *AdapterConfig
+ registries map[string]registry.Registry
+}
+
+// Start starts the adapter by subscribing to all configured registries.
+func (a *Adapter) Start() {
+ for name, reg := range a.registries {
+ logger.Infof("Subscribing to LLM registry: %s", name)
+ if err := reg.Subscribe(); err != nil {
+ logger.Errorf("Failed to subscribe to registry %s: %s",
name, err.Error())
+ }
+ }
+}
+
+// Stop stops the adapter by unsubscribing from all registries.
+func (a *Adapter) Stop() {
+ for name, reg := range a.registries {
+ if err := reg.Unsubscribe(); err != nil {
+ logger.Errorf("Failed to unsubscribe from registry %s:
%s", name, err.Error())
+ }
+ }
+}
+
+// Apply initializes the registries according to the configuration.
+func (a *Adapter) Apply() error {
+ nacosAddrFromEnv :=
os.Getenv(constant.EnvDubbogoPixiuNacosRegistryAddress)
+
+ for key, registryConfig := range a.cfg.Registries {
+ var err error
+ // Override address from environment variable if it's set for
Nacos.
+ if nacosAddrFromEnv != "" && registryConfig.Protocol ==
constant.Nacos {
+ logger.Infof("Overriding Nacos address for registry
'%s' with environment variable: %s", key, nacosAddrFromEnv)
+ registryConfig.Address = nacosAddrFromEnv
+ }
+
+ a.registries[key], err = registry.GetRegistry(registryConfig, a)
+ if err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+// Config returns the configuration of the adapter.
+func (a *Adapter) Config() any {
+ return a.cfg
+}
+
+// OnAddEndpoint is the callback that gets triggered when a new LLM endpoint
is discovered.
+func (a *Adapter) OnAddEndpoint(endpoint *model.Endpoint) error {
+ // The endpoint metadata MUST contain a "cluster" key to identify the
target cluster.
+ clusterName, ok := endpoint.Metadata["cluster"]
+ if !ok || clusterName == "" {
+ logger.Warnf("Endpoint %s (ID: %s) is missing 'cluster'
metadata, skipping.", endpoint.Name, endpoint.ID)
+ return nil
+ }
+ logger.Infof("Adding endpoint %s to cluster %s",
endpoint.ID+constant.PathParamIdentifier+endpoint.Name, clusterName)
+ server.GetClusterManager().SetEndpoint(clusterName, endpoint)
+ return nil
+}
+
+// OnRemoveEndpoint is the callback that gets triggered when an LLM endpoint
is removed.
+func (a *Adapter) OnRemoveEndpoint(endpoint *model.Endpoint) error {
+ // The endpoint metadata MUST contain a "cluster" key.
+ clusterName, ok := endpoint.Metadata["cluster"]
+ if !ok || clusterName == "" {
+ logger.Warnf("Endpoint %s (ID: %s) is missing 'cluster'
metadata for removal, skipping.", endpoint.Name, endpoint.ID)
+ return nil
+ }
+ logger.Infof("Removing endpoint ID %s from cluster %s", endpoint.ID,
clusterName)
+ server.GetClusterManager().DeleteEndpoint(clusterName, endpoint.ID)
+ return nil
+}
diff --git a/pkg/cluster/retry/countbased/count_based.go
b/pkg/cluster/retry/countbased/count_based.go
index 7746b215..7ab33c04 100644
--- a/pkg/cluster/retry/countbased/count_based.go
+++ b/pkg/cluster/retry/countbased/count_based.go
@@ -19,6 +19,7 @@ package countbased
import (
"fmt"
+ "strconv"
)
import (
@@ -58,11 +59,31 @@ func newCountBasedRetry(config map[string]any)
(retry.RetryPolicy, error) {
timesValue = defaultRetryTimes
}
- timesUint, ok := timesValue.(int)
- if !ok {
- return nil, fmt.Errorf("invalid type for
'retry.count_based.times', expected int but got %T", timesValue)
+ var times int
+
+ switch v := timesValue.(type) {
+ case int:
+ times = v
+ case float64:
+ if v != float64(int(v)) {
+ return nil, fmt.Errorf("invalid float value for '%s',
must be a whole number, but got %f", retryTimesKey, v)
+ }
+ times = int(v)
+ case string:
+ parsedTimes, err := strconv.Atoi(v)
+ if err != nil {
+ return nil, fmt.Errorf("could not parse string value
for '%s': %w", retryTimesKey, err)
+ }
+ times = parsedTimes
+ default:
+ return nil, fmt.Errorf("invalid type for '%s', expected a
number or a numeric string, but got %T", retryTimesKey, timesValue)
+ }
+
+ // must be non-negative
+ if times < 0 {
+ return nil, fmt.Errorf("value for '%s' cannot be negative, but
got %d", retryTimesKey, times)
}
// Total attempts = 1 initial try plus number of retries.
- return &CountBasedRetry{MaxAttempts: uint(timesUint) + 1}, nil
+ return &CountBasedRetry{MaxAttempts: uint(times) + 1}, nil
}
diff --git a/pkg/common/constant/key.go b/pkg/common/constant/key.go
index 0400250c..b105e98e 100644
--- a/pkg/common/constant/key.go
+++ b/pkg/common/constant/key.go
@@ -66,6 +66,7 @@ const (
const (
SpringCloudAdapter = "dgp.adapter.springcloud"
DubboRegistryCenterAdapter = "dgp.adapter.dubboregistrycenter"
+ LLMRegistryCenterAdapter = "dgp.adapter.llmregistrycenter"
McpServerAdapter = "dgp.adapter.mcpserver"
)
diff --git a/pkg/common/constant/url.go b/pkg/common/constant/url.go
index 14aeeef0..0e205c9b 100644
--- a/pkg/common/constant/url.go
+++ b/pkg/common/constant/url.go
@@ -29,4 +29,5 @@ const (
// Dot defines the . which will be used to present the path to specific
field in the body
Dot = "."
AnyValue = "*"
+ At = "@"
)
diff --git a/pkg/model/cluster.go b/pkg/model/cluster.go
index 33be18a8..b35b7b24 100644
--- a/pkg/model/cluster.go
+++ b/pkg/model/cluster.go
@@ -92,10 +92,10 @@ type (
ID string `yaml:"ID" json:"ID"`
// ID indicate one endpoint
Name string `yaml:"name" json:"name"`
// Name the endpoint unique name
Address SocketAddress `yaml:"socket_address"
json:"socket_address" mapstructure:"socket_address"` // Address socket address
- Metadata map[string]string `yaml:"meta" json:"meta"`
// Metadata extra info such as label or
other meta data
+ Metadata map[string]string `yaml:"meta" json:"meta"
mapstructure:"meta"` // Metadata extra info such
as label or other meta data
UnHealthy bool
- LLMMeta *LLMMeta `yaml:"llm_meta" json:"llm_meta"` // LLMMeta
extra info such as label or other meta data
+ LLMMeta *LLMMeta `yaml:"llm_meta" json:"llm_meta"
mapstructure:"llm_meta"` // LLMMeta extra info such as label or other meta data
}
// ConsistentHash methods include: RingHash, MaglevHash
diff --git a/pkg/pluginregistry/registry.go b/pkg/pluginregistry/registry.go
index 7fc267ea..2b3de88e 100644
--- a/pkg/pluginregistry/registry.go
+++ b/pkg/pluginregistry/registry.go
@@ -19,6 +19,7 @@ package pluginregistry
import (
_ "github.com/apache/dubbo-go-pixiu/pkg/adapter/dubboregistry"
+ _ "github.com/apache/dubbo-go-pixiu/pkg/adapter/llmregistry"
_ "github.com/apache/dubbo-go-pixiu/pkg/adapter/mcpserver"
_ "github.com/apache/dubbo-go-pixiu/pkg/adapter/springcloud"
_ "github.com/apache/dubbo-go-pixiu/pkg/cluster/loadbalancer/maglev"
diff --git a/pkg/server/adapter_manager.go b/pkg/server/adapter_manager.go
index 8276d026..72d0c6d1 100644
--- a/pkg/server/adapter_manager.go
+++ b/pkg/server/adapter_manager.go
@@ -78,7 +78,7 @@ func (am *AdapterManager) initAdapters() {
err = hf.Apply()
if err != nil {
- logger.Error("initFilterIfNeed apply adapter error %s",
err)
+ logger.Errorf("initFilterIfNeed apply adapter error
%s", err)
}
ads = append(ads, hf)
}
diff --git a/pkg/server/cluster_manager.go b/pkg/server/cluster_manager.go
index 599730d6..69ea4ecb 100644
--- a/pkg/server/cluster_manager.go
+++ b/pkg/server/cluster_manager.go
@@ -19,11 +19,14 @@ package server
import (
"fmt"
- "strconv"
"sync"
"sync/atomic"
)
+import (
+ "github.com/hashicorp/go-uuid"
+)
+
import (
"github.com/apache/dubbo-go-pixiu/pkg/cluster"
"github.com/apache/dubbo-go-pixiu/pkg/cluster/loadbalancer"
@@ -257,18 +260,18 @@ func (s *ClusterStore) AddCluster(c *model.ClusterConfig)
{
atomic.AddInt32(&clusterIndex, 1)
}
- s.AssembleClusterEndpoints(c)
+ s.assembleClusterEndpoints(c)
s.Config = append(s.Config, c)
s.clustersMap[c.Name] = cluster.NewCluster(c)
c.CreateConsistentHash()
}
-// AssembleClusterEndpoints assembles the cluster endpoints
+// assembleClusterEndpoints assembles the cluster endpoints
// by formatting the ID, name and domains for each endpoint
// If endpoint.LLMMeta is not nil, the assimilation of name and domain is
based on
// the LLM provider denoted in the endpoint LLMMeta.
-func (s *ClusterStore) AssembleClusterEndpoints(c *model.ClusterConfig) {
+func (s *ClusterStore) assembleClusterEndpoints(c *model.ClusterConfig) {
if c == nil {
return
}
@@ -276,7 +279,7 @@ func (s *ClusterStore) AssembleClusterEndpoints(c
*model.ClusterConfig) {
for i, endpoint := range c.Endpoints {
// If the endpoint ID is not set, set it to the index + 1
if endpoint.ID == "" {
- endpoint.ID = strconv.Itoa(i + 1)
+ endpoint.ID, _ = uuid.GenerateUUID()
}
// If the endpoint has no name, set a default name