This is an automated email from the ASF dual-hosted git repository.
xuetaoli pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/dubbo-go-pixiu.git
The following commit(s) were added to refs/heads/develop by this push:
new 5ac91629 feat(grpc-proxy): add gRPC Server Reflection support (#821)
(#849)
5ac91629 is described below
commit 5ac916297109daabdad749c0f53530788e64bbef
Author: Tsukikage <[email protected]>
AuthorDate: Fri Jan 9 21:56:13 2026 +0800
feat(grpc-proxy): add gRPC Server Reflection support (#821) (#849)
Add gRPC reflection mode to enable content-aware proxying:
- Support three modes: passthrough, reflection, hybrid
- Add descriptor caching with TTL and LRU eviction
- Add dynamic codec for message inspection
- Fix unary call deadlock by detecting stream type
---
docs/user/filter/grpcproxy.md | 511 +++++++++++++++
docs/user/filter/grpcproxy_CN.md | 511 +++++++++++++++
.../grpcproxy/filter/proxy/boundary_test.go | 394 ++++++++++++
.../grpcproxy/filter/proxy/descriptor_cache.go | 301 +++++++++
.../filter/proxy/descriptor_cache_test.go | 170 +++++
.../grpcproxy/filter/proxy/dynamic_codec.go | 235 +++++++
.../grpcproxy/filter/proxy/dynamic_codec_test.go | 559 ++++++++++++++++
.../grpcproxy/filter/proxy/grpc_proxy_filter.go | 189 +++++-
.../filter/proxy/grpc_proxy_filter_test.go | 193 ++++++
.../network/grpcproxy/filter/proxy/protocol.go | 51 ++
.../grpcproxy/filter/proxy/protocol_test.go | 122 ++++
.../grpcproxy/filter/proxy/reflection_manager.go | 709 +++++++++++++++++++++
.../filter/proxy/reflection_manager_test.go | 463 ++++++++++++++
13 files changed, 4376 insertions(+), 32 deletions(-)
diff --git a/docs/user/filter/grpcproxy.md b/docs/user/filter/grpcproxy.md
new file mode 100644
index 00000000..af13d4de
--- /dev/null
+++ b/docs/user/filter/grpcproxy.md
@@ -0,0 +1,511 @@
+# gRPC Proxy Filter (dgp.filter.grpc.proxy)
+
+English | [中文](grpcproxy_CN.md)
+
+---
+
+## Overview
+
+The `dgp.filter.grpc.proxy` filter provides gRPC proxy functionality for Pixiu
gateway with support for **gRPC Server Reflection**. This feature enables
dynamic message parsing and inspection at the gateway level without requiring
pre-compiled proto files.
+
+### Key Features
+
+- **Three Reflection Modes**: Choose between performance and functionality
based on your needs
+- **Dynamic Message Decoding**: Parse and inspect gRPC messages at runtime
without proto files
+- **TTL-based Caching**: Efficient caching of method descriptors with
automatic cleanup
+- **Protocol Detection**: Support for both gRPC and Triple protocol
compatibility
+- **Graceful Fallback**: Hybrid mode provides automatic fallback to passthrough
+
+---
+
+## Reflection Modes
+
+### Passthrough Mode (Default)
+
+Performs transparent binary proxying without decoding messages.
+
+**Characteristics:**
+- Highest performance (no message parsing overhead)
+- No content inspection capabilities
+- Pure binary forwarding
+
+**Use Cases:**
+- High-throughput scenarios where message inspection is not needed
+- Simple routing based on service/method names only
+- When backward compatibility is critical
+
+**Configuration:**
+```yaml
+grpc_filters:
+ - name: dgp.filter.grpc.proxy
+ config:
+ reflection_mode: "passthrough" # or omit (default)
+```
+
+### Reflection Mode
+
+Uses gRPC Server Reflection API to dynamically decode and inspect message
contents.
+
+**Characteristics:**
+- Full message decoding at runtime
+- Enables content-based routing and filtering
+- Requires backend server to support reflection
+- Slight performance overhead due to reflection calls
+
+**Use Cases:**
+- Content-aware routing (route based on message fields)
+- Field-level filtering or transformation
+- Logging and debugging with full message inspection
+- API gateway scenarios requiring message validation
+
+**Configuration:**
+```yaml
+grpc_filters:
+ - name: dgp.filter.grpc.proxy
+ config:
+ reflection_mode: "reflection"
+ descriptor_cache_ttl: 300 # 5 minutes cache
+```
+
+### Hybrid Mode
+
+Tries reflection first, falls back to passthrough on failure.
+
+**Characteristics:**
+- Best of both worlds: content inspection when available
+- Graceful degradation for services without reflection
+- Reflection timeout prevents blocking
+
+**Use Cases:**
+- Mixed environments with varying reflection support
+- Migration scenarios (gradually enabling reflection)
+- Production environments requiring high availability
+
+**Configuration:**
+```yaml
+grpc_filters:
+ - name: dgp.filter.grpc.proxy
+ config:
+ reflection_mode: "hybrid"
+ reflection_timeout: 5s # Max time to wait for reflection
+```
+
+---
+
+## Configuration
+
+### Complete Configuration Example
+
+```yaml
+static_resources:
+ listeners:
+ - name: "grpc-gateway"
+ protocol_type: "GRPC"
+ address:
+ socket_address:
+ address: "0.0.0.0"
+ port: 8882
+ filter_chains:
+ filters:
+ - name: dgp.filter.network.grpcconnectionmanager
+ config:
+ route_config:
+ routes:
+ - match:
+ prefix: "/echo.EchoService/"
+ route:
+ cluster: "echo-grpc"
+ grpc_filters:
+ - name: dgp.filter.grpc.proxy
+ config:
+ # Reflection mode (default: "passthrough")
+ reflection_mode: "reflection"
+
+ # Cache TTL for method descriptors (default: 300s)
+ descriptor_cache_ttl: 300
+
+ # Enable Triple protocol detection (default: false)
+ enable_protocol_detection: true
+
+ # Reflection timeout for hybrid mode (default: 5s)
+ reflection_timeout: 5s
+
+ # TLS configuration (optional)
+ enable_tls: false
+ tls_cert_file: ""
+ tls_key_file: ""
+
+ # Connection settings (optional)
+ keepalive_time: 300s
+ keepalive_timeout: 5s
+ connect_timeout: 5s
+ max_concurrent_streams: 0
+
+ clusters:
+ - name: "echo-grpc"
+ lb_policy: "RoundRobin"
+ endpoints:
+ - socket_address:
+ address: 127.0.0.1
+ port: 50051
+ protocol_type: "GRPC"
+```
+
+### Configuration Fields
+
+| Field | Type | Default | Description |
+|-------|------|---------|-------------|
+| `reflection_mode` | string | `"passthrough"` | Reflection mode:
`"passthrough"`, `"reflection"`, or `"hybrid"` |
+| `descriptor_cache_ttl` | int | `300` | Cache TTL for method descriptors in
seconds |
+| `enable_protocol_detection` | bool | `false` | Enable Triple protocol
detection |
+| `reflection_timeout` | string | `"5s"` | Max time to wait for reflection in
hybrid mode |
+| `enable_tls` | bool | `false` | Enable TLS for backend connections |
+| `tls_cert_file` | string | `""` | Path to TLS certificate file |
+| `tls_key_file` | string | `""` | Path to TLS key file |
+| `keepalive_time` | string | `"300s"` | Keepalive time for backend
connections |
+| `keepalive_timeout` | string | `"5s"` | Keepalive timeout |
+| `connect_timeout` | string | `"5s"` | Connection timeout |
+| `max_concurrent_streams` | uint32 | `0` (unlimited) | Max concurrent streams
|
+
+---
+
+## Enabling Server Reflection
+
+To use `reflection` or `hybrid` modes, your backend gRPC server must have
Server Reflection enabled.
+
+### Go (gRPC-Go)
+
+```go
+import (
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/reflection"
+)
+
+func main() {
+ server := grpc.NewServer()
+
+ // Register your service
+ echo.RegisterEchoServiceServer(server, &echoServer{})
+
+ // Enable server reflection (IMPORTANT!)
+ reflection.Register(server)
+
+ // Start server
+ lis, _ := net.Listen("tcp", ":50051")
+ server.Serve(lis)
+}
+```
+
+### Java (gRPC-Java)
+
+```java
+import io.grpc.Server;
+import io.grpc.ServerBuilder;
+import io.grpc.reflection.v1alpha.ServerReflectionGrpc;
+
+public class EchoServer {
+ public static void main(String[] args) throws Exception {
+ Server server = ServerBuilder
+ .forPort(50051)
+ .addService(new EchoServiceImpl())
+ // Enable server reflection
+ .addService(ServerReflectionGrpc.newInstance())
+ .build();
+
+ server.start();
+ server.awaitTermination();
+ }
+}
+```
+
+### Python (gRPC-Python)
+
+```python
+import grpc
+from grpc_reflection.v1alpha import reflection
+from concurrent import futures
+
+class EchoService(echo_pb2_grpc.EchoServiceServicer):
+ # ... implementation ...
+
+def serve():
+ server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
+ echo_pb2_grpc.add_EchoServiceServicer_to_server(EchoService(), server)
+
+ # Enable server reflection
+ reflection.enable_server_reflection(
+
service_names=[echo.DESCRIPTOR.services_by_name['EchoService'].full_name,
+ reflection.SERVICE_NAME],
+ server=server
+ )
+
+ server.add_insecure_port('[::]:50051')
+ server.start()
+ server.wait_for_termination()
+```
+
+---
+
+## Mode Comparison
+
+| Feature | Passthrough | Reflection | Hybrid |
+|---------|-------------|------------|--------|
+| **Performance** | ⭐⭐⭐⭐⭐ (Best) | ⭐⭐⭐ (Good) | ⭐⭐⭐⭐ (Better) |
+| **Message Inspection** | ❌ No | ✅ Yes | ✅ Yes (when available) |
+| **Requires Reflection** | ❌ No | ✅ Yes | ❌ Optional |
+| **Fallback** | N/A | ❌ No | ✅ Yes |
+| **Use Case** | High-performance proxy | Content-aware routing | Mixed
environments |
+
+---
+
+## Descriptor Cache
+
+The reflection mode uses a TTL-based cache to store method descriptors
retrieved from the backend server. This reduces the number of reflection
requests and improves performance.
+
+### Cache Behavior
+
+| Operation | Description |
+|-----------|-------------|
+| **Cache Hit** | Return cached descriptor immediately |
+| **Cache Miss** | Fetch descriptor via reflection API, cache it |
+| **Expiration** | Descriptors expire after TTL, re-fetched on next access |
+| **Eviction** | LRU eviction when cache is full |
+
+### Cache Configuration
+
+```yaml
+grpc_filters:
+ - name: dgp.filter.grpc.proxy
+ config:
+ reflection_mode: "reflection"
+ # Cache TTL in seconds (default: 300)
+ descriptor_cache_ttl: 300
+```
+
+**Recommended TTL Values:**
+- Development: `60` (1 minute) - frequent updates
+- Testing: `300` (5 minutes) - balanced
+- Production: `1800` (30 minutes) - stable services
+
+---
+
+## Triple Protocol Detection
+
+Pixiu supports the **Dubbo Triple protocol**, a gRPC-compatible protocol
developed by the Apache Dubbo community.
+
+### What is Triple Protocol?
+
+Triple is a gRPC-compatible protocol that:
+- Uses HTTP/2 like gRPC
+- Supports Protobuf serialization
+- Adds Triple-specific metadata headers (`tri-*`)
+- Enables gRPC services to work with Dubbo ecosystem
+
+### Enabling Protocol Detection
+
+```yaml
+grpc_filters:
+ - name: dgp.filter.grpc.proxy
+ config:
+ reflection_mode: "reflection"
+ # Enable Triple protocol detection
+ enable_protocol_detection: true
+```
+
+**When enabled:**
+- Pixiu automatically detects Triple vs gRPC protocol
+- Extracts Triple-specific metadata (`tri-service-version`, `tri-group`, etc.)
+- Ensures compatibility with Dubbo Triple services
+
+---
+
+## Usage Examples
+
+### Example 1: Simple Passthrough Proxy
+
+```yaml
+static_resources:
+ listeners:
+ - name: "grpc-proxy"
+ protocol_type: "GRPC"
+ address:
+ socket_address:
+ address: "0.0.0.0"
+ port: 8882
+ filter_chains:
+ filters:
+ - name: dgp.filter.network.grpcconnectionmanager
+ config:
+ route_config:
+ routes:
+ - match:
+ prefix: "/"
+ route:
+ cluster: "backend-grpc"
+ grpc_filters:
+ - name: dgp.filter.grpc.proxy
+ config: {} # Default passthrough mode
+ clusters:
+ - name: "backend-grpc"
+ endpoints:
+ - socket_address:
+ address: backend.example.com
+ port: 50051
+```
+
+### Example 2: Content-Aware Routing with Reflection
+
+```yaml
+static_resources:
+ listeners:
+ - name: "grpc-gateway"
+ protocol_type: "GRPC"
+ address:
+ socket_address:
+ address: "0.0.0.0"
+ port: 8882
+ filter_chains:
+ filters:
+ - name: dgp.filter.network.grpcconnectionmanager
+ config:
+ route_config:
+ routes:
+ - match:
+ prefix: "/myapi/"
+ route:
+ cluster: "api-v1"
+ grpc_filters:
+ - name: dgp.filter.grpc.proxy
+ config:
+ reflection_mode: "reflection"
+ descriptor_cache_ttl: 600
+```
+
+### Example 3: Hybrid Mode for Migration
+
+```yaml
+grpc_filters:
+ - name: dgp.filter.grpc.proxy
+ config:
+ # Try reflection first, fallback to passthrough
+ reflection_mode: "hybrid"
+ reflection_timeout: 3s
+ enable_protocol_detection: true
+```
+
+---
+
+## Performance Considerations
+
+### Mode Performance
+
+| Mode | Latency Impact | Throughput | Memory Usage |
+|------|----------------|------------|--------------|
+| Passthrough | Minimal | Highest | Lowest |
+| Reflection | +10-20% | High | Moderate (cache) |
+| Hybrid | +5-10% | Higher | Moderate |
+
+### Optimization Tips
+
+1. **Use Passthrough** when message inspection is not needed
+2. **Enable Caching** with appropriate TTL for reflection mode
+3. **Set Reasonable Timeout** for hybrid mode to prevent blocking
+4. **Monitor Cache Hit Ratio** to tune TTL values
+
+---
+
+## Troubleshooting
+
+### Problem: Reflection mode returns "service not found"
+
+**Cause**: Backend server does not have Server Reflection enabled.
+
+**Solution**: Enable reflection on your server:
+```go
+reflection.Register(grpcServer)
+```
+
+### Problem: Hybrid mode falls back to passthrough
+
+**Cause**: Reflection timeout exceeded or reflection service unavailable.
+
+**Solution**:
+1. Check if reflection is enabled on the backend
+2. Increase `reflection_timeout` value
+3. Check network connectivity to reflection service
+
+### Problem: High memory usage
+
+**Cause**: Descriptor cache too large or TTL too long.
+
+**Solution**:
+```yaml
+# Reduce cache TTL
+descriptor_cache_ttl: 60 # 1 minute instead of 5 minutes
+```
+
+### Problem: Triple services not working
+
+**Cause**: Protocol detection not enabled.
+
+**Solution**:
+```yaml
+enable_protocol_detection: true
+```
+
+---
+
+## Migration Guide
+
+### From Passthrough to Reflection
+
+**Step 1**: Enable reflection on backend servers
+**Step 2**: Update Pixiu configuration:
+```yaml
+# Before
+grpc_filters:
+ - name: dgp.filter.grpc.proxy
+ config: {}
+
+# After
+grpc_filters:
+ - name: dgp.filter.grpc.proxy
+ config:
+ reflection_mode: "reflection"
+```
+
+### From Legacy ForceCodec to Reflection
+
+**Old Configuration (Deprecated):**
+```yaml
+grpc_filters:
+ - name: dgp.filter.grpc.proxy
+ config:
+ force_codec: "passthrough"
+```
+
+**New Configuration (Recommended):**
+```yaml
+grpc_filters:
+ - name: dgp.filter.grpc.proxy
+ config:
+ reflection_mode: "passthrough" # Same behavior, clearer name
+```
+
+---
+
+## Related Resources
+
+- [gRPC Server Reflection
Protocol](https://github.com/grpc/grpc/blob/master/doc/server-reflection.md) -
Official specification
+- [Issue #821](https://github.com/apache/dubbo-go-pixiu/issues/821) - Original
feature request
+- [PR #849](https://github.com/apache/dubbo-go-pixiu/pull/849) -
Implementation details
+- [Sample
Code](https://github.com/apache/dubbo-go-pixiu-samples/tree/master/grpc/reflection)
- Complete working examples
+
+---
+
+## Notes
+
+- **Filter Order**: gRPC Proxy filter should be the **only** filter in
`grpc_filters` list
+- **Server Reflection**: Required for `reflection` mode, optional for `hybrid`
mode
+- **Thread Safety**: Filter is thread-safe and supports high-concurrency
scenarios
+- **Backward Compatibility**: `force_codec` field is deprecated; use
`reflection_mode` instead
diff --git a/docs/user/filter/grpcproxy_CN.md b/docs/user/filter/grpcproxy_CN.md
new file mode 100644
index 00000000..e907896e
--- /dev/null
+++ b/docs/user/filter/grpcproxy_CN.md
@@ -0,0 +1,511 @@
+# gRPC 代理过滤器 (dgp.filter.grpc.proxy)
+
+[English](grpcproxy.md) | 中文
+
+---
+
+## 概述
+
+`dgp.filter.grpc.proxy` 过滤器为 Pixiu 网关提供 gRPC 代理功能,并支持 **gRPC Server
Reflection**。该特性使得网关可以在不需要预编译 proto 文件的情况下,动态解析和检查消息内容。
+
+### 核心特性
+
+- **三种反射模式**: 根据需求在性能和功能之间选择
+- **动态消息解码**: 在运行时解析和检查 gRPC 消息,无需 proto 文件
+- **基于 TTL 的缓存**: 高效的方法描述符缓存,自动清理过期条目
+- **协议检测**: 同时支持 gRPC 和 Triple 协议
+- **优雅降级**: 混合模式提供自动回退到透传模式
+
+---
+
+## 反射模式
+
+### 透传模式(Passthrough,默认)
+
+执行透明的二进制代理,不解码消息。
+
+**特点:**
+- 最高性能(无消息解析开销)
+- 无法检查消息内容
+- 纯二进制转发
+
+**使用场景:**
+- 不需要消息检查的高吞吐场景
+- 仅基于服务/方法名称的简单路由
+- 对向后兼容性要求极高的场景
+
+**配置:**
+```yaml
+grpc_filters:
+ - name: dgp.filter.grpc.proxy
+ config:
+ reflection_mode: "passthrough" # 或省略(默认值)
+```
+
+### 反射模式(Reflection)
+
+使用 gRPC Server Reflection API 动态解码和检查消息内容。
+
+**特点:**
+- 运行时完整消息解码
+- 支持基于内容的路由和过滤
+- 需要后端服务器支持反射
+- 由于反射调用,有轻微性能开销
+
+**使用场景:**
+- 基于内容的路由(根据消息字段路由)
+- 字段级别的过滤或转换
+- 完整消息检查的日志记录和调试
+- 需要消息验证的 API 网关场景
+
+**配置:**
+```yaml
+grpc_filters:
+ - name: dgp.filter.grpc.proxy
+ config:
+ reflection_mode: "reflection"
+ descriptor_cache_ttl: 300 # 5分钟缓存
+```
+
+### 混合模式(Hybrid)
+
+先尝试反射,失败时回退到透传模式。
+
+**特点:**
+- 两全其美:可用时进行内容检查
+- 对不支持反射的服务优雅降级
+- 反射超时防止阻塞
+
+**使用场景:**
+- 反射支持程度不一的混合环境
+- 迁移场景(逐步启用反射)
+- 需要高可用的生产环境
+
+**配置:**
+```yaml
+grpc_filters:
+ - name: dgp.filter.grpc.proxy
+ config:
+ reflection_mode: "hybrid"
+ reflection_timeout: 5s # 等待反射的最大时间
+```
+
+---
+
+## 配置
+
+### 完整配置示例
+
+```yaml
+static_resources:
+ listeners:
+ - name: "grpc-gateway"
+ protocol_type: "GRPC"
+ address:
+ socket_address:
+ address: "0.0.0.0"
+ port: 8882
+ filter_chains:
+ filters:
+ - name: dgp.filter.network.grpcconnectionmanager
+ config:
+ route_config:
+ routes:
+ - match:
+ prefix: "/echo.EchoService/"
+ route:
+ cluster: "echo-grpc"
+ grpc_filters:
+ - name: dgp.filter.grpc.proxy
+ config:
+ # 反射模式(默认: "passthrough")
+ reflection_mode: "reflection"
+
+ # 方法描述符缓存 TTL(默认: 300秒)
+ descriptor_cache_ttl: 300
+
+ # 启用 Triple 协议检测(默认: false)
+ enable_protocol_detection: true
+
+ # 混合模式的反射超时(默认: 5秒)
+ reflection_timeout: 5s
+
+ # TLS 配置(可选)
+ enable_tls: false
+ tls_cert_file: ""
+ tls_key_file: ""
+
+ # 连接设置(可选)
+ keepalive_time: 300s
+ keepalive_timeout: 5s
+ connect_timeout: 5s
+ max_concurrent_streams: 0
+
+ clusters:
+ - name: "echo-grpc"
+ lb_policy: "RoundRobin"
+ endpoints:
+ - socket_address:
+ address: 127.0.0.1
+ port: 50051
+ protocol_type: "GRPC"
+```
+
+### 配置字段
+
+| 字段 | 类型 | 默认值 | 描述 |
+|------|------|--------|------|
+| `reflection_mode` | string | `"passthrough"` | 反射模式:
`"passthrough"`、`"reflection"` 或 `"hybrid"` |
+| `descriptor_cache_ttl` | int | `300` | 方法描述符缓存 TTL(秒) |
+| `enable_protocol_detection` | bool | `false` | 启用 Triple 协议检测 |
+| `reflection_timeout` | string | `"5s"` | 混合模式下的反射超时时间 |
+| `enable_tls` | bool | `false` | 启用后端连接的 TLS |
+| `tls_cert_file` | string | `""` | TLS 证书文件路径 |
+| `tls_key_file` | string | `""` | TLS 密钥文件路径 |
+| `keepalive_time` | string | `"300s"` | 后端连接的保活时间 |
+| `keepalive_timeout` | string | `"5s"` | 保活超时时间 |
+| `connect_timeout` | string | `"5s"` | 连接超时时间 |
+| `max_concurrent_streams` | uint32 | `0`(无限制) | 最大并发流数 |
+
+---
+
+## 启用服务器反射
+
+要使用 `reflection` 或 `hybrid` 模式,你的后端 gRPC 服务器必须启用 Server Reflection。
+
+### Go (gRPC-Go)
+
+```go
+import (
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/reflection"
+)
+
+func main() {
+ server := grpc.NewServer()
+
+ // 注册你的服务
+ echo.RegisterEchoServiceServer(server, &echoServer{})
+
+ // 启用服务器反射(重要!)
+ reflection.Register(server)
+
+ // 启动服务器
+ lis, _ := net.Listen("tcp", ":50051")
+ server.Serve(lis)
+}
+```
+
+### Java (gRPC-Java)
+
+```java
+import io.grpc.Server;
+import io.grpc.ServerBuilder;
+import io.grpc.reflection.v1alpha.ServerReflectionGrpc;
+
+public class EchoServer {
+ public static void main(String[] args) throws Exception {
+ Server server = ServerBuilder
+ .forPort(50051)
+ .addService(new EchoServiceImpl())
+ // 启用服务器反射
+ .addService(ServerReflectionGrpc.newInstance())
+ .build();
+
+ server.start();
+ server.awaitTermination();
+ }
+}
+```
+
+### Python (gRPC-Python)
+
+```python
+import grpc
+from grpc_reflection.v1alpha import reflection
+from concurrent import futures
+
+class EchoService(echo_pb2_grpc.EchoServiceServicer):
+ # ... 实现 ...
+
+def serve():
+ server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
+ echo_pb2_grpc.add_EchoServiceServicer_to_server(EchoService(), server)
+
+ # 启用服务器反射
+ reflection.enable_server_reflection(
+
service_names=[echo.DESCRIPTOR.services_by_name['EchoService'].full_name,
+ reflection.SERVICE_NAME],
+ server=server
+ )
+
+ server.add_insecure_port('[::]:50051')
+ server.start()
+ server.wait_for_termination()
+```
+
+---
+
+## 模式对比
+
+| 特性 | 透传模式 | 反射模式 | 混合模式 |
+|------|---------|---------|---------|
+| **性能** | ⭐⭐⭐⭐⭐(最佳) | ⭐⭐⭐(良好) | ⭐⭐⭐⭐(更好) |
+| **消息检查** | ❌ 否 | ✅ 是 | ✅ 是(可用时) |
+| **需要反射** | ❌ 否 | ✅ 是 | ❌ 可选 |
+| **回退支持** | N/A | ❌ 否 | ✅ 是 |
+| **使用场景** | 高性能代理 | 基于内容的路由 | 混合环境 |
+
+---
+
+## 描述符缓存
+
+反射模式使用基于 TTL 的缓存来存储从后端服务器获取的方法描述符。这减少了反射请求的数量并提高了性能。
+
+### 缓存行为
+
+| 操作 | 描述 |
+|------|------|
+| **缓存命中** | 立即返回缓存的描述符 |
+| **缓存未命中** | 通过反射 API 获取描述符并缓存 |
+| **过期** | 描述符在 TTL 后过期,下次访问时重新获取 |
+| **驱逐** | 缓存满时进行 LRU 驱逐 |
+
+### 缓存配置
+
+```yaml
+grpc_filters:
+ - name: dgp.filter.grpc.proxy
+ config:
+ reflection_mode: "reflection"
+ # 缓存 TTL(秒),默认: 300
+ descriptor_cache_ttl: 300
+```
+
+**推荐的 TTL 值:**
+- 开发环境: `60`(1分钟)- 频繁更新
+- 测试环境: `300`(5分钟)- 平衡
+- 生产环境: `1800`(30分钟)- 稳定服务
+
+---
+
+## Triple 协议检测
+
+Pixiu 支持 **Dubbo Triple 协议**,这是 Apache Dubbo 社区开发的 gRPC 兼容协议。
+
+### 什么是 Triple 协议?
+
+Triple 是一个与 gRPC 兼容的协议,具有以下特点:
+- 像 gRPC 一样使用 HTTP/2
+- 支持 Protobuf 序列化
+- 添加 Triple 特定的元数据头(`tri-*`)
+- 使 gRPC 服务能与 Dubbo 生态系统协作
+
+### 启用协议检测
+
+```yaml
+grpc_filters:
+ - name: dgp.filter.grpc.proxy
+ config:
+ reflection_mode: "reflection"
+ # 启用 Triple 协议检测
+ enable_protocol_detection: true
+```
+
+**启用后:**
+- Pixiu 自动检测 Triple 或 gRPC 协议
+- 提取 Triple 特定的元数据(`tri-service-version`、`tri-group` 等)
+- 确保与 Dubbo Triple 服务的兼容性
+
+---
+
+## 使用示例
+
+### 示例 1: 简单透传代理
+
+```yaml
+static_resources:
+ listeners:
+ - name: "grpc-proxy"
+ protocol_type: "GRPC"
+ address:
+ socket_address:
+ address: "0.0.0.0"
+ port: 8882
+ filter_chains:
+ filters:
+ - name: dgp.filter.network.grpcconnectionmanager
+ config:
+ route_config:
+ routes:
+ - match:
+ prefix: "/"
+ route:
+ cluster: "backend-grpc"
+ grpc_filters:
+ - name: dgp.filter.grpc.proxy
+ config: {} # 默认透传模式
+ clusters:
+ - name: "backend-grpc"
+ endpoints:
+ - socket_address:
+ address: backend.example.com
+ port: 50051
+```
+
+### 示例 2: 使用反射进行基于内容的路由
+
+```yaml
+static_resources:
+ listeners:
+ - name: "grpc-gateway"
+ protocol_type: "GRPC"
+ address:
+ socket_address:
+ address: "0.0.0.0"
+ port: 8882
+ filter_chains:
+ filters:
+ - name: dgp.filter.network.grpcconnectionmanager
+ config:
+ route_config:
+ routes:
+ - match:
+ prefix: "/myapi/"
+ route:
+ cluster: "api-v1"
+ grpc_filters:
+ - name: dgp.filter.grpc.proxy
+ config:
+ reflection_mode: "reflection"
+ descriptor_cache_ttl: 600
+```
+
+### 示例 3: 迁移场景的混合模式
+
+```yaml
+grpc_filters:
+ - name: dgp.filter.grpc.proxy
+ config:
+ # 先尝试反射,失败时回退到透传
+ reflection_mode: "hybrid"
+ reflection_timeout: 3s
+ enable_protocol_detection: true
+```
+
+---
+
+## 性能考虑
+
+### 模式性能
+
+| 模式 | 延迟影响 | 吞吐量 | 内存使用 |
+|------|---------|--------|---------|
+| 透传模式 | 最小 | 最高 | 最低 |
+| 反射模式 | +10-20% | 高 | 中等(缓存) |
+| 混合模式 | +5-10% | 较高 | 中等 |
+
+### 优化建议
+
+1. **使用透传模式** 当不需要消息检查时
+2. **启用缓存** 并为反射模式设置合适的 TTL
+3. **设置合理的超时** 防止混合模式阻塞
+4. **监控缓存命中率** 以调整 TTL 值
+
+---
+
+## 故障排除
+
+### 问题: 反射模式返回 "service not found"
+
+**原因**: 后端服务器未启用 Server Reflection。
+
+**解决方案**: 在服务器上启用反射:
+```go
+reflection.Register(grpcServer)
+```
+
+### 问题: 混合模式回退到透传模式
+
+**原因**: 反射超时或反射服务不可用。
+
+**解决方案**:
+1. 检查后端是否启用了反射
+2. 增加 `reflection_timeout` 值
+3. 检查到反射服务的网络连接
+
+### 问题: 内存使用过高
+
+**原因**: 描述符缓存过大或 TTL 过长。
+
+**解决方案**:
+```yaml
+# 减少缓存 TTL
+descriptor_cache_ttl: 60 # 1分钟而不是5分钟
+```
+
+### 问题: Triple 服务不工作
+
+**原因**: 未启用协议检测。
+
+**解决方案**:
+```yaml
+enable_protocol_detection: true
+```
+
+---
+
+## 迁移指南
+
+### 从透传模式迁移到反射模式
+
+**步骤 1**: 在后端服务器上启用反射
+**步骤 2**: 更新 Pixiu 配置:
+```yaml
+# 之前
+grpc_filters:
+ - name: dgp.filter.grpc.proxy
+ config: {}
+
+# 之后
+grpc_filters:
+ - name: dgp.filter.grpc.proxy
+ config:
+ reflection_mode: "reflection"
+```
+
+### 从旧的 ForceCodec 迁移到反射模式
+
+**旧配置(已弃用):**
+```yaml
+grpc_filters:
+ - name: dgp.filter.grpc.proxy
+ config:
+ force_codec: "passthrough"
+```
+
+**新配置(推荐):**
+```yaml
+grpc_filters:
+ - name: dgp.filter.grpc.proxy
+ config:
+ reflection_mode: "passthrough" # 行为相同,名称更清晰
+```
+
+---
+
+## 相关资源
+
+- [gRPC Server Reflection
Protocol](https://github.com/grpc/grpc/blob/master/doc/server-reflection.md) -
官方规范
+- [Issue #821](https://github.com/apache/dubbo-go-pixiu/issues/821) - 原始功能请求
+- [PR #849](https://github.com/apache/dubbo-go-pixiu/pull/849) - 实现详情
+-
[示例代码](https://github.com/apache/dubbo-go-pixiu-samples/tree/master/grpc/reflection)
- 完整的示例代码
+
+---
+
+## 注意事项
+
+- **过滤器顺序**: gRPC Proxy 过滤器应该是 `grpc_filters` 列表中的**唯一**过滤器
+- **服务器反射**: `reflection` 模式必需,`hybrid` 模式可选
+- **线程安全**: 过滤器是线程安全的,支持高并发场景
+- **向后兼容**: `force_codec` 字段已弃用;请使用 `reflection_mode` 替代
diff --git a/pkg/filter/network/grpcproxy/filter/proxy/boundary_test.go
b/pkg/filter/network/grpcproxy/filter/proxy/boundary_test.go
new file mode 100644
index 00000000..d604940d
--- /dev/null
+++ b/pkg/filter/network/grpcproxy/filter/proxy/boundary_test.go
@@ -0,0 +1,394 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package proxy
+
+import (
+ "testing"
+ "time"
+)
+
+import (
+ "go.uber.org/zap"
+
+ "google.golang.org/protobuf/types/descriptorpb"
+)
+
+import (
+ "github.com/apache/dubbo-go-pixiu/pkg/logger"
+)
+
+// TestCacheStats tests cache statistics tracking
+func TestCacheStats(t *testing.T) {
+ cache := NewDescriptorCache(5 * time.Minute)
+
+ // Initially no hits or misses
+ stats := cache.GetStats()
+ if stats.Hits != 0 || stats.Misses != 0 {
+ t.Errorf("Expected zero stats, got hits=%d, misses=%d",
stats.Hits, stats.Misses)
+ }
+
+ // Generate some misses
+ cache.Get("nonexistent")
+ cache.Get("also-nonexistent")
+
+ stats = cache.GetStats()
+ if stats.Misses != 2 {
+ t.Errorf("Expected 2 misses, got %d", stats.Misses)
+ }
+
+ // Check max cache size
+ if stats.MaxSize != defaultMaxCacheSize {
+ t.Errorf("Expected max_cache_size %d, got %d",
+ defaultMaxCacheSize, stats.MaxSize)
+ }
+
+ // Check TTL
+ if stats.TTL.Seconds() != 300 {
+ t.Errorf("Expected TTL 300s, got %v", stats.TTL.Seconds())
+ }
+
+ // Reset stats
+ cache.ResetStats()
+ stats = cache.GetStats()
+ if stats.Hits != 0 || stats.Misses != 0 {
+ t.Errorf("Expected stats to be reset, got hits=%d, misses=%d",
stats.Hits, stats.Misses)
+ }
+}
+
+// TestDynamicCacheSizeAdjustment tests dynamic cache size adjustment
+func TestDynamicCacheSizeAdjustment(t *testing.T) {
+ cache := NewDescriptorCache(5 * time.Minute)
+
+ // Setting below minimum should adjust to minimum
+ cache.SetMaxSize(10)
+ stats := cache.GetStats()
+ if stats.MaxSize != minCacheSize {
+ t.Errorf("Expected MaxSize %d (minCacheSize), got %d",
minCacheSize, stats.MaxSize)
+ }
+
+ // Setting above minimum should work
+ cache.SetMaxSize(500)
+ stats = cache.GetStats()
+ if stats.MaxSize != 500 {
+ t.Errorf("Expected MaxSize 500, got %d", stats.MaxSize)
+ }
+}
+
+// TestInvalidateByVersion tests version-based cache invalidation API
+func TestInvalidateByVersion(t *testing.T) {
+ cache := NewDescriptorCache(5 * time.Minute)
+
+ // Test with non-existent version hash
+ count := cache.InvalidateByVersion("non-existent-version")
+ if count != 0 {
+ t.Errorf("Expected 0 invalidations for non-existent version,
got %d", count)
+ }
+}
+
+// TestReflectionManagerConfig tests configuration updates
+func TestReflectionManagerConfig(t *testing.T) {
+ config := ReflectionConfig{
+ CacheTTL: 10 * time.Minute,
+ MaxCacheSize: 500,
+ ReflectionVersion: ReflectionV1Alpha,
+ ContinueOnError: true,
+ }
+
+ rm := NewReflectionManagerWithConfig(config)
+
+ // Verify initial config
+ initialConfig := rm.GetConfig()
+ if initialConfig.MaxCacheSize != 500 {
+ t.Errorf("Expected MaxCacheSize 500, got %d",
initialConfig.MaxCacheSize)
+ }
+
+ // Update config
+ newConfig := ReflectionConfig{
+ MaxCacheSize: 1000,
+ ContinueOnError: false,
+ }
+ rm.SetConfig(newConfig)
+
+ updatedConfig := rm.GetConfig()
+ if updatedConfig.MaxCacheSize != 1000 {
+ t.Errorf("Expected MaxCacheSize 1000, got %d",
updatedConfig.MaxCacheSize)
+ }
+ if updatedConfig.ContinueOnError != false {
+ t.Error("Expected ContinueOnError to be false")
+ }
+}
+
+// TestReflectionManagerCacheStats tests enhanced cache statistics
+func TestReflectionManagerCacheStats(t *testing.T) {
+ rm := NewReflectionManager(5 * time.Minute)
+
+ stats := rm.GetCacheStats()
+
+ // Verify stats struct has expected values
+ if stats.MaxCacheSize != defaultMaxCacheSize {
+ t.Errorf("Expected MaxCacheSize %d, got %d",
+ defaultMaxCacheSize, stats.MaxCacheSize)
+ }
+
+ if stats.TTLSeconds != 300 {
+ t.Errorf("Expected TTLSeconds 300, got %v", stats.TTLSeconds)
+ }
+
+ // Initial stats should be zero
+ if stats.MethodCacheHits != 0 {
+ t.Errorf("Expected MethodCacheHits 0, got %d",
stats.MethodCacheHits)
+ }
+
+ if stats.FileRegistryCount != 0 {
+ t.Errorf("Expected FileRegistryCount 0, got %d",
stats.FileRegistryCount)
+ }
+}
+
+// TestGetMissingDependencies tests missing dependency tracking
+func TestGetMissingDependencies(t *testing.T) {
+ rm := NewReflectionManagerWithConfig(ReflectionConfig{
+ ContinueOnError: true,
+ })
+
+ // Initially no missing dependencies
+ missing := rm.GetMissingDependencies("test-address")
+ if missing != nil {
+ t.Errorf("Expected nil missing dependencies, got %v", missing)
+ }
+
+ // Get all missing dependencies should return empty map
+ allMissing := rm.GetAllMissingDependencies()
+ if len(allMissing) != 0 {
+ t.Errorf("Expected empty map, got %d entries", len(allMissing))
+ }
+}
+
+// TestReflectionManagerContinueOnError tests the ContinueOnError behavior
+func TestReflectionManagerContinueOnError(t *testing.T) {
+ tests := []struct {
+ name string
+ continueOnError bool
+ }{
+ {
+ name: "continue on error enabled",
+ continueOnError: true,
+ },
+ {
+ name: "continue on error disabled",
+ continueOnError: false,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ config := ReflectionConfig{
+ CacheTTL: 5 * time.Minute,
+ ContinueOnError: tt.continueOnError,
+ ReflectionVersion: ReflectionV1Alpha,
+ }
+
+ rm := NewReflectionManagerWithConfig(config)
+
+ if rm.GetConfig().ContinueOnError != tt.continueOnError
{
+ t.Errorf("Expected ContinueOnError %v, got %v",
+ tt.continueOnError,
rm.GetConfig().ContinueOnError)
+ }
+ })
+ }
+}
+
+// TestReflectionManagerInvalidationCache tests cache invalidation
+func TestReflectionManagerInvalidationCache(t *testing.T) {
+ rm := NewReflectionManager(5 * time.Minute)
+
+ // Invalidate non-existent address should not panic
+ rm.InvalidateCache("non-existent-address")
+
+ // Clear cache should not panic
+ rm.ClearCache()
+
+ // Close should not panic
+ rm.Close()
+}
+
+// TestTopologicalSortCircularDependency tests handling of circular
dependencies
+func TestTopologicalSortCircularDependency(t *testing.T) {
+ rm := NewReflectionManager(5 * time.Minute)
+
+ // Create circular dependency: A depends on B, B depends on A
+ fileDescs := []*descriptorpb.FileDescriptorProto{
+ {
+ Name: strPtr("a.proto"),
+ Dependency: []string{"b.proto"},
+ Package: strPtr("test"),
+ },
+ {
+ Name: strPtr("b.proto"),
+ Dependency: []string{"a.proto"},
+ Package: strPtr("test"),
+ },
+ }
+
+ // Should handle circular dependency gracefully (return all files)
+ result, err := rm.topologicalSort(fileDescs)
+ if err != nil {
+ t.Errorf("Topological sort should handle circular dependencies,
got error: %v", err)
+ }
+
+ if len(result) != 2 {
+ t.Errorf("Expected 2 files in result, got %d", len(result))
+ }
+}
+
+// TestTopologicalSortEmptyList tests empty file descriptor list
+func TestTopologicalSortEmptyList(t *testing.T) {
+ rm := NewReflectionManager(5 * time.Minute)
+
+ result, err := rm.topologicalSort([]*descriptorpb.FileDescriptorProto{})
+ if err != nil {
+ t.Errorf("Expected no error for empty list, got: %v", err)
+ }
+
+ if len(result) != 0 {
+ t.Errorf("Expected empty result, got %d files", len(result))
+ }
+}
+
+// TestTopologicalSortWithExternalDependencies tests handling of external
dependencies
+func TestTopologicalSortWithExternalDependencies(t *testing.T) {
+ rm := NewReflectionManager(5 * time.Minute)
+
+ fileDescs := []*descriptorpb.FileDescriptorProto{
+ {
+ Name: strPtr("a.proto"),
+ Dependency: []string{"google/protobuf/empty.proto"},
+ Package: strPtr("test"),
+ },
+ }
+
+ // Should handle external dependency gracefully
+ result, err := rm.topologicalSort(fileDescs)
+ if err != nil {
+ t.Errorf("Expected no error with external dependency, got: %v",
err)
+ }
+
+ if len(result) != 1 {
+ t.Errorf("Expected 1 file in result, got %d", len(result))
+ }
+}
+
+// TestReflectionVersionConstants tests reflection version constants
+func TestReflectionVersionConstants(t *testing.T) {
+ tests := []struct {
+ name string
+ version ReflectionVersion
+ valid bool
+ }{
+ {"v1alpha", ReflectionV1Alpha, true},
+ {"v1", ReflectionV1, true},
+ {"auto", ReflectionAuto, true},
+ {"invalid", ReflectionVersion("invalid"), true}, // Any string
is valid
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if tt.version == "" {
+ t.Error("Reflection version should not be
empty")
+ }
+ })
+ }
+}
+
+// TestNewReflectionManagerWithInvalidConfig tests configuration validation
+func TestNewReflectionManagerWithInvalidConfig(t *testing.T) {
+ tests := []struct {
+ name string
+ config ReflectionConfig
+ expectCacheSize int
+ expectTTL time.Duration
+ }{
+ {
+ name: "zero TTL uses default",
+ config: ReflectionConfig{
+ CacheTTL: 0,
+ MaxCacheSize: 100,
+ ContinueOnError: true,
+ },
+ expectCacheSize: 100,
+ expectTTL: defaultDescCacheTTL,
+ },
+ {
+ name: "negative TTL uses default",
+ config: ReflectionConfig{
+ CacheTTL: -1,
+ MaxCacheSize: 100,
+ ContinueOnError: true,
+ },
+ expectCacheSize: 100,
+ expectTTL: defaultDescCacheTTL,
+ },
+ {
+ name: "cache size below minimum",
+ config: ReflectionConfig{
+ CacheTTL: 5 * time.Minute,
+ MaxCacheSize: 10,
+ ContinueOnError: true,
+ },
+ expectCacheSize: minCacheSize,
+ expectTTL: 5 * time.Minute,
+ },
+ {
+ name: "empty reflection version",
+ config: ReflectionConfig{
+ CacheTTL: 5 * time.Minute,
+ MaxCacheSize: 100,
+ ReflectionVersion: "",
+ ContinueOnError: true,
+ },
+ expectCacheSize: 100,
+ expectTTL: 5 * time.Minute,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ rm := NewReflectionManagerWithConfig(tt.config)
+
+ config := rm.GetConfig()
+ if config.MaxCacheSize != tt.expectCacheSize {
+ t.Errorf("Expected cache size %d, got %d",
tt.expectCacheSize, config.MaxCacheSize)
+ }
+
+ if config.CacheTTL != tt.expectTTL {
+ t.Errorf("Expected TTL %v, got %v",
tt.expectTTL, config.CacheTTL)
+ }
+ })
+ }
+}
+
+// Helper function for string pointers
+func strPtr(s string) *string {
+ return &s
+}
+
+// Init logger for tests
+func init() {
+ cfg := zap.NewDevelopmentConfig()
+ cfg.Level = zap.NewAtomicLevelAt(zap.InfoLevel)
+ logger.InitLogger(&cfg)
+}
diff --git a/pkg/filter/network/grpcproxy/filter/proxy/descriptor_cache.go
b/pkg/filter/network/grpcproxy/filter/proxy/descriptor_cache.go
new file mode 100644
index 00000000..9b937a09
--- /dev/null
+++ b/pkg/filter/network/grpcproxy/filter/proxy/descriptor_cache.go
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package proxy
+
+import (
+ "sync"
+ "sync/atomic"
+ "time"
+)
+
+import (
+ "github.com/dubbogo/gost/container/gxlru"
+
+ "google.golang.org/protobuf/reflect/protoreflect"
+)
+
+import (
+ "github.com/apache/dubbo-go-pixiu/pkg/logger"
+)
+
+// Constants for cache configuration
+const (
+ defaultMaxCacheSize = 1000 // Maximum number of cached descriptors
+ minCacheSize = 100 // Minimum cache size to prevent thrashing
+ // Logger prefix for descriptor cache operations
+ loggerPrefix = "[grpc-proxy-descriptor-cache] "
+)
+
+// cacheEntry represents a cached method descriptor with metadata
+// Implements gxlru.Value interface (Size always returns 1 for counting)
+type cacheEntry struct {
+ key string
+ descriptor protoreflect.MethodDescriptor
+ expireAt time.Time
+ versionHash string // Hash of the descriptor for version detection
+}
+
+// Size implements gxlru.Value interface - returns 1 for per-entry counting
+func (e *cacheEntry) Size() int {
+ return 1
+}
+
+// CacheStats holds cache statistics for monitoring (returned by GetStats)
+type CacheStats struct {
+ Hits int64
+ Misses int64
+ Evictions int64
+ Size int
+ MaxSize int
+ TTL time.Duration
+ HitRatio float64
+}
+
+// DescriptorCache provides TTL-based caching with LRU eviction for gRPC
method descriptors
+type DescriptorCache struct {
+ lru *gxlru.LRUCache // LRU cache from gost
+ entries sync.Map // key -> *cacheEntry (for TTL and version
lookup)
+ ttl time.Duration
+ stopCh chan struct{}
+ closeOnce sync.Once
+ maxSize atomic.Int32 // Maximum cache size (atomic for lock-free
reads)
+ // Atomic statistics for lock-free updates
+ hits atomic.Int64
+ misses atomic.Int64
+ evictions atomic.Int64
+}
+
+// NewDescriptorCache creates a new descriptor cache with the specified TTL
+func NewDescriptorCache(ttl time.Duration) *DescriptorCache {
+ return NewDescriptorCacheWithSize(ttl, defaultMaxCacheSize)
+}
+
+// NewDescriptorCacheWithSize creates a new descriptor cache with custom size
limit
+func NewDescriptorCacheWithSize(ttl time.Duration, maxSize int)
*DescriptorCache {
+ if maxSize < minCacheSize {
+ maxSize = minCacheSize
+ logger.Infof("%sCache size adjusted to minimum: %d",
loggerPrefix, maxSize)
+ }
+
+ cache := &DescriptorCache{
+ lru: gxlru.NewLRUCache(int64(maxSize)),
+ ttl: ttl,
+ stopCh: make(chan struct{}),
+ }
+ cache.maxSize.Store(int32(maxSize))
+
+ go cache.cleanupLoop()
+ return cache
+}
+
+// Get retrieves a method descriptor from cache
+// Returns nil if not found or expired
+func (c *DescriptorCache) Get(key string) protoreflect.MethodDescriptor {
+ entry, ok := c.entries.Load(key)
+ if !ok {
+ c.misses.Add(1)
+ return nil
+ }
+
+ e := entry.(*cacheEntry)
+ if time.Now().Before(e.expireAt) {
+ // Valid cache hit
+ c.hits.Add(1)
+ return e.descriptor
+ }
+
+ // Entry expired, delete it
+ c.entries.Delete(key)
+ c.lru.Delete(key)
+ c.misses.Add(1)
+ return nil
+}
+
+// GetWithVersion retrieves a method descriptor and returns version mismatch
status
+// Returns (descriptor, true) if found and version matches
+// Returns (descriptor, false) if found but version differs (stale)
+// Returns (nil, false) if not found or expired
+func (c *DescriptorCache) GetWithVersion(key string, versionHash string)
(protoreflect.MethodDescriptor, bool) {
+ entry, ok := c.entries.Load(key)
+ if !ok {
+ c.misses.Add(1)
+ return nil, false
+ }
+
+ e := entry.(*cacheEntry)
+ if time.Now().After(e.expireAt) {
+ // Entry expired
+ c.entries.Delete(key)
+ c.lru.Delete(key)
+ c.misses.Add(1)
+ return nil, false
+ }
+
+ // Check version hash
+ if e.versionHash != "" && versionHash != "" && e.versionHash !=
versionHash {
+ // Version mismatch - stale cache
+ logger.Debugf("%sCache version mismatch for %s: cached=%s,
current=%s",
+ loggerPrefix, key, e.versionHash, versionHash)
+ c.entries.Delete(key)
+ c.lru.Delete(key)
+ c.misses.Add(1)
+ return nil, false
+ }
+
+ // Valid cache hit
+ c.hits.Add(1)
+ return e.descriptor, true
+}
+
+// Set stores a method descriptor in the cache with optional version hash
+func (c *DescriptorCache) Set(key string, descriptor
protoreflect.MethodDescriptor) {
+ c.SetWithVersion(key, descriptor, "")
+}
+
+// SetWithVersion stores a method descriptor with version hash for change
detection
+func (c *DescriptorCache) SetWithVersion(key string, descriptor
protoreflect.MethodDescriptor, versionHash string) {
+ newEntry := &cacheEntry{
+ key: key,
+ descriptor: descriptor,
+ expireAt: time.Now().Add(c.ttl),
+ versionHash: versionHash,
+ }
+
+ // Add to LRU cache (handles eviction automatically)
+ c.lru.Set(key, newEntry)
+
+ // Store in entries map for TTL/version lookup (possibly replacing old
entry)
+ c.entries.Store(key, newEntry)
+}
+
+// Delete removes a specific entry from the cache
+func (c *DescriptorCache) Delete(key string) {
+ c.entries.Delete(key)
+ c.lru.Delete(key)
+}
+
+// InvalidateByVersion removes all entries with a specific version hash
+// Useful when service definitions are updated
+func (c *DescriptorCache) InvalidateByVersion(versionHash string) int {
+ count := 0
+ c.entries.Range(func(key, value any) bool {
+ entry := value.(*cacheEntry)
+ if entry.versionHash == versionHash {
+ c.entries.Delete(key)
+ c.lru.Delete(key.(string))
+ count++
+ }
+ return true
+ })
+ if count > 0 {
+ logger.Infof("%sInvalidated %d cache entries with version hash:
%s", loggerPrefix, count, versionHash)
+ }
+ return count
+}
+
+// Clear removes all entries from the cache
+func (c *DescriptorCache) Clear() {
+ c.entries.Range(func(key, value any) bool {
+ c.entries.Delete(key)
+ c.lru.Delete(key.(string))
+ return true
+ })
+ c.lru.Clear()
+}
+
+// Size returns the current number of entries in the cache
+func (c *DescriptorCache) Size() int {
+ return int(c.lru.Length())
+}
+
+// GetStats returns current cache statistics
+func (c *DescriptorCache) GetStats() CacheStats {
+ hits := c.hits.Load()
+ misses := c.misses.Load()
+ total := hits + misses
+
+ var hitRatio float64
+ if total > 0 {
+ hitRatio = float64(hits) / float64(total)
+ }
+
+ // Use gost's eviction count + our own tracking
+ evictions := c.evictions.Load() + c.lru.Evictions()
+
+ return CacheStats{
+ Hits: hits,
+ Misses: misses,
+ Evictions: evictions,
+ Size: int(c.lru.Length()),
+ MaxSize: int(c.maxSize.Load()),
+ TTL: c.ttl,
+ HitRatio: hitRatio,
+ }
+}
+
+// ResetStats clears cache statistics (atomic reset)
+func (c *DescriptorCache) ResetStats() {
+ c.hits.Store(0)
+ c.misses.Store(0)
+ c.evictions.Store(0)
+ // Note: gost's internal eviction counter cannot be reset
+}
+
+// SetMaxSize dynamically adjusts the maximum cache size
+func (c *DescriptorCache) SetMaxSize(size int) {
+ if size < minCacheSize {
+ size = minCacheSize
+ }
+ c.maxSize.Store(int32(size))
+ c.lru.SetCapacity(int64(size))
+}
+
+// cleanupLoop periodically removes expired entries
+func (c *DescriptorCache) cleanupLoop() {
+ ticker := time.NewTicker(c.ttl / 2)
+ defer ticker.Stop()
+
+ for {
+ select {
+ case <-ticker.C:
+ now := time.Now()
+ c.entries.Range(func(key, value any) bool {
+ entry := value.(*cacheEntry)
+ if now.After(entry.expireAt) {
+ c.entries.Delete(key)
+ c.lru.Delete(key.(string))
+ }
+ return true
+ })
+ case <-c.stopCh:
+ return
+ }
+ }
+}
+
+// Close stops the cleanup goroutine (safe to call multiple times)
+func (c *DescriptorCache) Close() {
+ c.closeOnce.Do(func() {
+ close(c.stopCh)
+ })
+}
+
+// BuildCacheKey creates a cache key from service and method names
+func BuildCacheKey(address, serviceName, methodName string) string {
+ return address + "/" + serviceName + "/" + methodName
+}
diff --git a/pkg/filter/network/grpcproxy/filter/proxy/descriptor_cache_test.go
b/pkg/filter/network/grpcproxy/filter/proxy/descriptor_cache_test.go
new file mode 100644
index 00000000..eb9adc4d
--- /dev/null
+++ b/pkg/filter/network/grpcproxy/filter/proxy/descriptor_cache_test.go
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package proxy
+
+import (
+ "testing"
+ "time"
+)
+
+func TestDescriptorCache_SetAndGet(t *testing.T) {
+ cache := NewDescriptorCache(1 * time.Minute)
+ defer cache.Close()
+
+ key := "test/service/method"
+
+ // Initially should be nil
+ if got := cache.Get(key); got != nil {
+ t.Errorf("Get() on empty cache = %v, want nil", got)
+ }
+
+ // Set a nil value (valid for testing)
+ cache.Set(key, nil)
+
+ // Should be able to get it back (even if nil)
+ // Note: Our implementation stores the entry but returns nil descriptor
+ // This is acceptable behavior as it caches the "not found" state
+}
+
+func TestDescriptorCache_Expiration(t *testing.T) {
+ // Use very short TTL for testing
+ ttl := 100 * time.Millisecond
+ cache := NewDescriptorCache(ttl)
+ defer cache.Close()
+
+ key := "test/service/method"
+ cache.Set(key, nil)
+
+ // Should exist before expiration
+ // Wait a bit less than TTL
+ time.Sleep(50 * time.Millisecond)
+
+ // Entry should still be valid (we can't test the descriptor value
easily
+ // without a real MethodDescriptor, but we can verify the cache
mechanics)
+
+ // Wait for expiration
+ time.Sleep(100 * time.Millisecond)
+
+ // After expiration, Get should return nil
+ if got := cache.Get(key); got != nil {
+ t.Errorf("Get() after expiration = %v, want nil", got)
+ }
+}
+
+func TestDescriptorCache_Delete(t *testing.T) {
+ cache := NewDescriptorCache(1 * time.Minute)
+ defer cache.Close()
+
+ key := "test/service/method"
+ cache.Set(key, nil)
+
+ // Delete the entry
+ cache.Delete(key)
+
+ // Should be nil after deletion
+ if got := cache.Get(key); got != nil {
+ t.Errorf("Get() after Delete() = %v, want nil", got)
+ }
+}
+
+func TestDescriptorCache_Clear(t *testing.T) {
+ cache := NewDescriptorCache(1 * time.Minute)
+ defer cache.Close()
+
+ // Add multiple entries
+ keys := []string{"key1", "key2", "key3"}
+ for _, k := range keys {
+ cache.Set(k, nil)
+ }
+
+ // Verify size
+ if size := cache.Size(); size != 3 {
+ t.Errorf("Size() = %d, want 3", size)
+ }
+
+ // Clear all
+ cache.Clear()
+
+ // Verify all cleared
+ if size := cache.Size(); size != 0 {
+ t.Errorf("Size() after Clear() = %d, want 0", size)
+ }
+
+ for _, k := range keys {
+ if got := cache.Get(k); got != nil {
+ t.Errorf("Get(%q) after Clear() = %v, want nil", k, got)
+ }
+ }
+}
+
+func TestDescriptorCache_Size(t *testing.T) {
+ cache := NewDescriptorCache(1 * time.Minute)
+ defer cache.Close()
+
+ // Initially empty
+ if size := cache.Size(); size != 0 {
+ t.Errorf("Size() on empty cache = %d, want 0", size)
+ }
+
+ // Add entries
+ for i := 0; i < 5; i++ {
+ cache.Set("key"+string(rune('0'+i)), nil)
+ }
+
+ if size := cache.Size(); size != 5 {
+ t.Errorf("Size() = %d, want 5", size)
+ }
+}
+
+func TestBuildCacheKey(t *testing.T) {
+ tests := []struct {
+ address string
+ serviceName string
+ methodName string
+ expected string
+ }{
+ {
+ address: "localhost:50051",
+ serviceName: "helloworld.Greeter",
+ methodName: "SayHello",
+ expected:
"localhost:50051/helloworld.Greeter/SayHello",
+ },
+ {
+ address: "backend.example.com:8080",
+ serviceName: "my.package.Service",
+ methodName: "Method",
+ expected:
"backend.example.com:8080/my.package.Service/Method",
+ },
+ {
+ address: "",
+ serviceName: "Service",
+ methodName: "Method",
+ expected: "/Service/Method",
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.expected, func(t *testing.T) {
+ result := BuildCacheKey(tt.address, tt.serviceName,
tt.methodName)
+ if result != tt.expected {
+ t.Errorf("BuildCacheKey(%q, %q, %q) = %q, want
%q",
+ tt.address, tt.serviceName,
tt.methodName, result, tt.expected)
+ }
+ })
+ }
+}
diff --git a/pkg/filter/network/grpcproxy/filter/proxy/dynamic_codec.go
b/pkg/filter/network/grpcproxy/filter/proxy/dynamic_codec.go
new file mode 100644
index 00000000..c9943c7a
--- /dev/null
+++ b/pkg/filter/network/grpcproxy/filter/proxy/dynamic_codec.go
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package proxy
+
+import (
+ "fmt"
+)
+
+import (
+ "google.golang.org/protobuf/encoding/protojson"
+
+ "google.golang.org/protobuf/proto"
+
+ "google.golang.org/protobuf/reflect/protoreflect"
+
+ "google.golang.org/protobuf/types/dynamicpb"
+)
+
+// DynamicCodec is a gRPC codec that uses protobuf reflection to encode/decode
messages.
+// This enables the gateway to inspect and manipulate message content.
+type DynamicCodec struct {
+ methodDesc protoreflect.MethodDescriptor
+}
+
+// NewDynamicCodec creates a new DynamicCodec for the given method descriptor
+func NewDynamicCodec(methodDesc protoreflect.MethodDescriptor) *DynamicCodec {
+ return &DynamicCodec{
+ methodDesc: methodDesc,
+ }
+}
+
+// Marshal encodes a message to bytes
+func (c *DynamicCodec) Marshal(v any) ([]byte, error) {
+ switch msg := v.(type) {
+ case *dynamicpb.Message:
+ return proto.Marshal(msg)
+ case proto.Message:
+ return proto.Marshal(msg)
+ case []byte:
+ // Already bytes, pass through
+ return msg, nil
+ case *DynamicMessage:
+ return proto.Marshal(msg.Message)
+ default:
+ return nil, fmt.Errorf("dynamic codec: cannot marshal type %T",
v)
+ }
+}
+
+// Unmarshal decodes bytes into a message
+func (c *DynamicCodec) Unmarshal(data []byte, v any) error {
+ switch msg := v.(type) {
+ case **DynamicMessage:
+ // Create dynamic message for input type
+ inputType := c.methodDesc.Input()
+ dynMsg := dynamicpb.NewMessage(inputType)
+ if err := proto.Unmarshal(data, dynMsg); err != nil {
+ return fmt.Errorf("dynamic codec: unmarshal error: %w",
err)
+ }
+ *msg = &DynamicMessage{
+ Message: dynMsg,
+ Descriptor: inputType,
+ }
+ return nil
+ case *[]byte:
+ // Passthrough mode
+ *msg = data
+ return nil
+ case proto.Message:
+ return proto.Unmarshal(data, msg)
+ default:
+ return fmt.Errorf("dynamic codec: cannot unmarshal into type
%T", v)
+ }
+}
+
+// Name returns the codec name
+func (c *DynamicCodec) Name() string {
+ return "dynamic_proto"
+}
+
+// DynamicMessage wraps a dynamicpb.Message with its descriptor
+type DynamicMessage struct {
+ Message *dynamicpb.Message
+ Descriptor protoreflect.MessageDescriptor
+}
+
+// GetField retrieves a field value by name
+func (dm *DynamicMessage) GetField(name string) (protoreflect.Value, bool) {
+ fd := dm.Descriptor.Fields().ByName(protoreflect.Name(name))
+ if fd == nil {
+ return protoreflect.Value{}, false
+ }
+ return dm.Message.Get(fd), true
+}
+
+// GetFieldString retrieves a string field value by name
+func (dm *DynamicMessage) GetFieldString(name string) (string, bool) {
+ val, ok := dm.GetField(name)
+ if !ok {
+ return "", false
+ }
+ return val.String(), true
+}
+
+// GetFieldInt retrieves an int64 field value by name
+func (dm *DynamicMessage) GetFieldInt(name string) (int64, bool) {
+ val, ok := dm.GetField(name)
+ if !ok {
+ return 0, false
+ }
+ return val.Int(), true
+}
+
+// SetField sets a field value by name
+func (dm *DynamicMessage) SetField(name string, value protoreflect.Value) bool
{
+ fd := dm.Descriptor.Fields().ByName(protoreflect.Name(name))
+ if fd == nil {
+ return false
+ }
+ dm.Message.Set(fd, value)
+ return true
+}
+
+// ToBytes serializes the message to bytes
+func (dm *DynamicMessage) ToBytes() ([]byte, error) {
+ return proto.Marshal(dm.Message)
+}
+
+// ToJSON converts the message to JSON (for logging/debugging)
+func (dm *DynamicMessage) ToJSON() ([]byte, error) {
+ return protojson.Marshal(dm.Message)
+}
+
+// ResponseCodec is a codec for handling response messages
+type ResponseCodec struct {
+ methodDesc protoreflect.MethodDescriptor
+}
+
+// NewResponseCodec creates a codec for response messages
+func NewResponseCodec(methodDesc protoreflect.MethodDescriptor) *ResponseCodec
{
+ return &ResponseCodec{
+ methodDesc: methodDesc,
+ }
+}
+
+// Marshal encodes a response message
+func (c *ResponseCodec) Marshal(v any) ([]byte, error) {
+ switch msg := v.(type) {
+ case *dynamicpb.Message:
+ return proto.Marshal(msg)
+ case proto.Message:
+ return proto.Marshal(msg)
+ case []byte:
+ return msg, nil
+ case *DynamicMessage:
+ return proto.Marshal(msg.Message)
+ default:
+ return nil, fmt.Errorf("response codec: cannot marshal type
%T", v)
+ }
+}
+
+// Unmarshal decodes response bytes
+func (c *ResponseCodec) Unmarshal(data []byte, v any) error {
+ switch msg := v.(type) {
+ case **DynamicMessage:
+ // Create dynamic message for output type
+ outputType := c.methodDesc.Output()
+ dynMsg := dynamicpb.NewMessage(outputType)
+ if err := proto.Unmarshal(data, dynMsg); err != nil {
+ return fmt.Errorf("response codec: unmarshal error:
%w", err)
+ }
+ *msg = &DynamicMessage{
+ Message: dynMsg,
+ Descriptor: outputType,
+ }
+ return nil
+ case *[]byte:
+ *msg = data
+ return nil
+ case proto.Message:
+ return proto.Unmarshal(data, msg)
+ default:
+ return fmt.Errorf("response codec: cannot unmarshal into type
%T", v)
+ }
+}
+
+// Name returns the codec name
+func (c *ResponseCodec) Name() string {
+ return "dynamic_proto"
+}
+
+// DecodeRequest decodes a request using the method descriptor
+func DecodeRequest(methodDesc protoreflect.MethodDescriptor, data []byte)
(*DynamicMessage, error) {
+ inputType := methodDesc.Input()
+ msg := dynamicpb.NewMessage(inputType)
+ if err := proto.Unmarshal(data, msg); err != nil {
+ return nil, fmt.Errorf("failed to decode request: %w", err)
+ }
+ return &DynamicMessage{
+ Message: msg,
+ Descriptor: inputType,
+ }, nil
+}
+
+// DecodeResponse decodes a response using the method descriptor
+func DecodeResponse(methodDesc protoreflect.MethodDescriptor, data []byte)
(*DynamicMessage, error) {
+ outputType := methodDesc.Output()
+ msg := dynamicpb.NewMessage(outputType)
+ if err := proto.Unmarshal(data, msg); err != nil {
+ return nil, fmt.Errorf("failed to decode response: %w", err)
+ }
+ return &DynamicMessage{
+ Message: msg,
+ Descriptor: outputType,
+ }, nil
+}
+
+// EncodeMessage encodes a dynamic message to bytes
+func EncodeMessage(msg *DynamicMessage) ([]byte, error) {
+ return proto.Marshal(msg.Message)
+}
diff --git a/pkg/filter/network/grpcproxy/filter/proxy/dynamic_codec_test.go
b/pkg/filter/network/grpcproxy/filter/proxy/dynamic_codec_test.go
new file mode 100644
index 00000000..40ba3f35
--- /dev/null
+++ b/pkg/filter/network/grpcproxy/filter/proxy/dynamic_codec_test.go
@@ -0,0 +1,559 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package proxy
+
+import (
+ "strings"
+ "testing"
+)
+
+import (
+ "google.golang.org/protobuf/proto"
+
+ "google.golang.org/protobuf/reflect/protodesc"
+ "google.golang.org/protobuf/reflect/protoreflect"
+
+ "google.golang.org/protobuf/types/descriptorpb"
+ "google.golang.org/protobuf/types/dynamicpb"
+)
+
+// buildTestMethodDescriptor creates a test method descriptor for unit testing
+func buildTestMethodDescriptor(t *testing.T) protoreflect.MethodDescriptor {
+ // Define a simple proto file descriptor
+ fileDescProto := &descriptorpb.FileDescriptorProto{
+ Name: proto.String("test.proto"),
+ Package: proto.String("test"),
+ MessageType: []*descriptorpb.DescriptorProto{
+ {
+ Name: proto.String("TestRequest"),
+ Field: []*descriptorpb.FieldDescriptorProto{
+ {
+ Name: proto.String("name"),
+ Number: proto.Int32(1),
+ Type:
descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum(),
+ Label:
descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum(),
+ },
+ {
+ Name: proto.String("id"),
+ Number: proto.Int32(2),
+ Type:
descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum(),
+ Label:
descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum(),
+ },
+ },
+ },
+ {
+ Name: proto.String("TestResponse"),
+ Field: []*descriptorpb.FieldDescriptorProto{
+ {
+ Name: proto.String("message"),
+ Number: proto.Int32(1),
+ Type:
descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum(),
+ Label:
descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum(),
+ },
+ },
+ },
+ },
+ Service: []*descriptorpb.ServiceDescriptorProto{
+ {
+ Name: proto.String("TestService"),
+ Method: []*descriptorpb.MethodDescriptorProto{
+ {
+ Name:
proto.String("TestMethod"),
+ InputType:
proto.String(".test.TestRequest"),
+ OutputType:
proto.String(".test.TestResponse"),
+ },
+ },
+ },
+ },
+ }
+
+ // Build file descriptor
+ fileDesc, err := protodesc.NewFile(fileDescProto, nil)
+ if err != nil {
+ t.Fatalf("Failed to create file descriptor: %v", err)
+ }
+
+ // Get service and method
+ svcDesc := fileDesc.Services().ByName("TestService")
+ if svcDesc == nil {
+ t.Fatal("TestService not found")
+ }
+
+ methodDesc := svcDesc.Methods().ByName("TestMethod")
+ if methodDesc == nil {
+ t.Fatal("TestMethod not found")
+ }
+
+ return methodDesc
+}
+
+func TestDynamicCodec_Name(t *testing.T) {
+ methodDesc := buildTestMethodDescriptor(t)
+ codec := NewDynamicCodec(methodDesc)
+
+ if got := codec.Name(); got != "dynamic_proto" {
+ t.Errorf("Name() = %q, want %q", got, "dynamic_proto")
+ }
+}
+
+func TestDynamicCodec_MarshalBytes(t *testing.T) {
+ methodDesc := buildTestMethodDescriptor(t)
+ codec := NewDynamicCodec(methodDesc)
+
+ // Test marshaling raw bytes (passthrough)
+ data := []byte{0x0a, 0x05, 0x68, 0x65, 0x6c, 0x6c, 0x6f}
+ result, err := codec.Marshal(data)
+ if err != nil {
+ t.Fatalf("Marshal([]byte) error = %v", err)
+ }
+ if string(result) != string(data) {
+ t.Errorf("Marshal([]byte) = %v, want %v", result, data)
+ }
+}
+
+func TestDynamicCodec_MarshalDynamicMessage(t *testing.T) {
+ methodDesc := buildTestMethodDescriptor(t)
+ codec := NewDynamicCodec(methodDesc)
+
+ // Create a dynamic message
+ inputType := methodDesc.Input()
+ dynMsg := dynamicpb.NewMessage(inputType)
+
+ // Set field values
+ nameField := inputType.Fields().ByName("name")
+ dynMsg.Set(nameField, protoreflect.ValueOfString("test"))
+
+ // Marshal as DynamicMessage wrapper
+ dm := &DynamicMessage{
+ Message: dynMsg,
+ Descriptor: inputType,
+ }
+
+ result, err := codec.Marshal(dm)
+ if err != nil {
+ t.Fatalf("Marshal(*DynamicMessage) error = %v", err)
+ }
+
+ // Verify by unmarshaling
+ newMsg := dynamicpb.NewMessage(inputType)
+ if err := proto.Unmarshal(result, newMsg); err != nil {
+ t.Fatalf("Unmarshal verification error = %v", err)
+ }
+
+ if got := newMsg.Get(nameField).String(); got != "test" {
+ t.Errorf("Unmarshaled name = %q, want %q", got, "test")
+ }
+}
+
+func TestDynamicCodec_MarshalUnsupportedType(t *testing.T) {
+ methodDesc := buildTestMethodDescriptor(t)
+ codec := NewDynamicCodec(methodDesc)
+
+ // Try to marshal unsupported type
+ _, err := codec.Marshal(123)
+ if err == nil {
+ t.Error("Marshal(int) should return error")
+ }
+}
+
+func TestDynamicCodec_UnmarshalToBytes(t *testing.T) {
+ methodDesc := buildTestMethodDescriptor(t)
+ codec := NewDynamicCodec(methodDesc)
+
+ data := []byte{0x0a, 0x05, 0x68, 0x65, 0x6c, 0x6c, 0x6f}
+ var result []byte
+ err := codec.Unmarshal(data, &result)
+ if err != nil {
+ t.Fatalf("Unmarshal(to []byte) error = %v", err)
+ }
+ if string(result) != string(data) {
+ t.Errorf("Unmarshal result = %v, want %v", result, data)
+ }
+}
+
+func TestDynamicCodec_UnmarshalToDynamicMessage(t *testing.T) {
+ methodDesc := buildTestMethodDescriptor(t)
+ codec := NewDynamicCodec(methodDesc)
+
+ // Create test data
+ inputType := methodDesc.Input()
+ srcMsg := dynamicpb.NewMessage(inputType)
+ nameField := inputType.Fields().ByName("name")
+ idField := inputType.Fields().ByName("id")
+ srcMsg.Set(nameField, protoreflect.ValueOfString("hello"))
+ srcMsg.Set(idField, protoreflect.ValueOfInt64(42))
+
+ data, err := proto.Marshal(srcMsg)
+ if err != nil {
+ t.Fatalf("Failed to marshal source message: %v", err)
+ }
+
+ // Unmarshal to DynamicMessage
+ var dm *DynamicMessage
+ err = codec.Unmarshal(data, &dm)
+ if err != nil {
+ t.Fatalf("Unmarshal(to *DynamicMessage) error = %v", err)
+ }
+
+ if dm == nil {
+ t.Fatal("Unmarshal result is nil")
+ }
+
+ // Verify fields
+ if got := dm.Message.Get(nameField).String(); got != "hello" {
+ t.Errorf("name = %q, want %q", got, "hello")
+ }
+ if got := dm.Message.Get(idField).Int(); got != 42 {
+ t.Errorf("id = %d, want %d", got, 42)
+ }
+}
+
+func TestDynamicCodec_UnmarshalUnsupportedType(t *testing.T) {
+ methodDesc := buildTestMethodDescriptor(t)
+ codec := NewDynamicCodec(methodDesc)
+
+ data := []byte{0x0a, 0x05}
+ var result int
+ err := codec.Unmarshal(data, &result)
+ if err == nil {
+ t.Error("Unmarshal(to int) should return error")
+ }
+}
+
+func TestDynamicMessage_GetField(t *testing.T) {
+ methodDesc := buildTestMethodDescriptor(t)
+ inputType := methodDesc.Input()
+ dynMsg := dynamicpb.NewMessage(inputType)
+
+ nameField := inputType.Fields().ByName("name")
+ dynMsg.Set(nameField, protoreflect.ValueOfString("test_value"))
+
+ dm := &DynamicMessage{
+ Message: dynMsg,
+ Descriptor: inputType,
+ }
+
+ // Test GetField with existing field
+ val, ok := dm.GetField("name")
+ if !ok {
+ t.Error("GetField(name) returned false")
+ }
+ if val.String() != "test_value" {
+ t.Errorf("GetField(name) = %q, want %q", val.String(),
"test_value")
+ }
+
+ // Test GetField with non-existing field
+ _, ok = dm.GetField("nonexistent")
+ if ok {
+ t.Error("GetField(nonexistent) should return false")
+ }
+}
+
+func TestDynamicMessage_GetFieldString(t *testing.T) {
+ methodDesc := buildTestMethodDescriptor(t)
+ inputType := methodDesc.Input()
+ dynMsg := dynamicpb.NewMessage(inputType)
+
+ nameField := inputType.Fields().ByName("name")
+ dynMsg.Set(nameField, protoreflect.ValueOfString("hello_world"))
+
+ dm := &DynamicMessage{
+ Message: dynMsg,
+ Descriptor: inputType,
+ }
+
+ // Test existing field
+ val, ok := dm.GetFieldString("name")
+ if !ok {
+ t.Error("GetFieldString(name) returned false")
+ }
+ if val != "hello_world" {
+ t.Errorf("GetFieldString(name) = %q, want %q", val,
"hello_world")
+ }
+
+ // Test non-existing field
+ _, ok = dm.GetFieldString("nonexistent")
+ if ok {
+ t.Error("GetFieldString(nonexistent) should return false")
+ }
+}
+
+func TestDynamicMessage_GetFieldInt(t *testing.T) {
+ methodDesc := buildTestMethodDescriptor(t)
+ inputType := methodDesc.Input()
+ dynMsg := dynamicpb.NewMessage(inputType)
+
+ idField := inputType.Fields().ByName("id")
+ dynMsg.Set(idField, protoreflect.ValueOfInt64(12345))
+
+ dm := &DynamicMessage{
+ Message: dynMsg,
+ Descriptor: inputType,
+ }
+
+ // Test existing field
+ val, ok := dm.GetFieldInt("id")
+ if !ok {
+ t.Error("GetFieldInt(id) returned false")
+ }
+ if val != 12345 {
+ t.Errorf("GetFieldInt(id) = %d, want %d", val, 12345)
+ }
+
+ // Test non-existing field
+ _, ok = dm.GetFieldInt("nonexistent")
+ if ok {
+ t.Error("GetFieldInt(nonexistent) should return false")
+ }
+}
+
+func TestDynamicMessage_SetField(t *testing.T) {
+ methodDesc := buildTestMethodDescriptor(t)
+ inputType := methodDesc.Input()
+ dynMsg := dynamicpb.NewMessage(inputType)
+
+ dm := &DynamicMessage{
+ Message: dynMsg,
+ Descriptor: inputType,
+ }
+
+ // Set existing field
+ ok := dm.SetField("name", protoreflect.ValueOfString("new_value"))
+ if !ok {
+ t.Error("SetField(name) returned false")
+ }
+
+ // Verify
+ val, _ := dm.GetFieldString("name")
+ if val != "new_value" {
+ t.Errorf("After SetField, name = %q, want %q", val, "new_value")
+ }
+
+ // Set non-existing field
+ ok = dm.SetField("nonexistent", protoreflect.ValueOfString("value"))
+ if ok {
+ t.Error("SetField(nonexistent) should return false")
+ }
+}
+
+func TestDynamicMessage_ToBytes(t *testing.T) {
+ methodDesc := buildTestMethodDescriptor(t)
+ inputType := methodDesc.Input()
+ dynMsg := dynamicpb.NewMessage(inputType)
+
+ nameField := inputType.Fields().ByName("name")
+ dynMsg.Set(nameField, protoreflect.ValueOfString("serialize_test"))
+
+ dm := &DynamicMessage{
+ Message: dynMsg,
+ Descriptor: inputType,
+ }
+
+ data, err := dm.ToBytes()
+ if err != nil {
+ t.Fatalf("ToBytes() error = %v", err)
+ }
+
+ // Verify by unmarshaling
+ newMsg := dynamicpb.NewMessage(inputType)
+ if err := proto.Unmarshal(data, newMsg); err != nil {
+ t.Fatalf("Unmarshal verification error = %v", err)
+ }
+
+ if got := newMsg.Get(nameField).String(); got != "serialize_test" {
+ t.Errorf("After ToBytes, name = %q, want %q", got,
"serialize_test")
+ }
+}
+
+func TestDynamicMessage_ToJSON(t *testing.T) {
+ methodDesc := buildTestMethodDescriptor(t)
+ inputType := methodDesc.Input()
+ dynMsg := dynamicpb.NewMessage(inputType)
+
+ nameField := inputType.Fields().ByName("name")
+ dynMsg.Set(nameField, protoreflect.ValueOfString("json_test"))
+
+ dm := &DynamicMessage{
+ Message: dynMsg,
+ Descriptor: inputType,
+ }
+
+ data, err := dm.ToJSON()
+ if err != nil {
+ t.Fatalf("ToJSON() error = %v", err)
+ }
+
+ // Verify it returns valid JSON with the expected field
+ if len(data) == 0 {
+ t.Error("ToJSON() returned empty data")
+ }
+
+ // Should contain the field name in JSON format
+ jsonStr := string(data)
+ if !strings.Contains(jsonStr, "name") || !strings.Contains(jsonStr,
"json_test") {
+ t.Errorf("ToJSON() = %s, expected to contain 'name' and
'json_test'", jsonStr)
+ }
+}
+
+func TestResponseCodec_Name(t *testing.T) {
+ methodDesc := buildTestMethodDescriptor(t)
+ codec := NewResponseCodec(methodDesc)
+
+ if got := codec.Name(); got != "dynamic_proto" {
+ t.Errorf("Name() = %q, want %q", got, "dynamic_proto")
+ }
+}
+
+func TestResponseCodec_MarshalAndUnmarshal(t *testing.T) {
+ methodDesc := buildTestMethodDescriptor(t)
+ codec := NewResponseCodec(methodDesc)
+
+ // Create output message
+ outputType := methodDesc.Output()
+ dynMsg := dynamicpb.NewMessage(outputType)
+ msgField := outputType.Fields().ByName("message")
+ dynMsg.Set(msgField, protoreflect.ValueOfString("response_value"))
+
+ dm := &DynamicMessage{
+ Message: dynMsg,
+ Descriptor: outputType,
+ }
+
+ // Marshal
+ data, err := codec.Marshal(dm)
+ if err != nil {
+ t.Fatalf("Marshal error = %v", err)
+ }
+
+ // Unmarshal
+ var result *DynamicMessage
+ err = codec.Unmarshal(data, &result)
+ if err != nil {
+ t.Fatalf("Unmarshal error = %v", err)
+ }
+
+ // Verify
+ val, ok := result.GetFieldString("message")
+ if !ok {
+ t.Error("GetFieldString(message) returned false")
+ }
+ if val != "response_value" {
+ t.Errorf("message = %q, want %q", val, "response_value")
+ }
+}
+
+func TestDecodeRequest(t *testing.T) {
+ methodDesc := buildTestMethodDescriptor(t)
+ inputType := methodDesc.Input()
+
+ // Create test message
+ srcMsg := dynamicpb.NewMessage(inputType)
+ nameField := inputType.Fields().ByName("name")
+ srcMsg.Set(nameField, protoreflect.ValueOfString("decode_test"))
+
+ data, err := proto.Marshal(srcMsg)
+ if err != nil {
+ t.Fatalf("Marshal error = %v", err)
+ }
+
+ // Decode
+ result, err := DecodeRequest(methodDesc, data)
+ if err != nil {
+ t.Fatalf("DecodeRequest error = %v", err)
+ }
+
+ // Verify
+ val, ok := result.GetFieldString("name")
+ if !ok {
+ t.Error("GetFieldString(name) returned false")
+ }
+ if val != "decode_test" {
+ t.Errorf("name = %q, want %q", val, "decode_test")
+ }
+}
+
+func TestDecodeRequest_InvalidData(t *testing.T) {
+ methodDesc := buildTestMethodDescriptor(t)
+
+ // Invalid protobuf data
+ invalidData := []byte{0xff, 0xff, 0xff}
+ _, err := DecodeRequest(methodDesc, invalidData)
+ if err == nil {
+ t.Error("DecodeRequest with invalid data should return error")
+ }
+}
+
+func TestDecodeResponse(t *testing.T) {
+ methodDesc := buildTestMethodDescriptor(t)
+ outputType := methodDesc.Output()
+
+ // Create test message
+ srcMsg := dynamicpb.NewMessage(outputType)
+ msgField := outputType.Fields().ByName("message")
+ srcMsg.Set(msgField, protoreflect.ValueOfString("response_decode_test"))
+
+ data, err := proto.Marshal(srcMsg)
+ if err != nil {
+ t.Fatalf("Marshal error = %v", err)
+ }
+
+ // Decode
+ result, err := DecodeResponse(methodDesc, data)
+ if err != nil {
+ t.Fatalf("DecodeResponse error = %v", err)
+ }
+
+ // Verify
+ val, ok := result.GetFieldString("message")
+ if !ok {
+ t.Error("GetFieldString(message) returned false")
+ }
+ if val != "response_decode_test" {
+ t.Errorf("message = %q, want %q", val, "response_decode_test")
+ }
+}
+
+func TestEncodeMessage(t *testing.T) {
+ methodDesc := buildTestMethodDescriptor(t)
+ inputType := methodDesc.Input()
+
+ dynMsg := dynamicpb.NewMessage(inputType)
+ nameField := inputType.Fields().ByName("name")
+ dynMsg.Set(nameField, protoreflect.ValueOfString("encode_test"))
+
+ dm := &DynamicMessage{
+ Message: dynMsg,
+ Descriptor: inputType,
+ }
+
+ data, err := EncodeMessage(dm)
+ if err != nil {
+ t.Fatalf("EncodeMessage error = %v", err)
+ }
+
+ // Verify by decoding
+ result, err := DecodeRequest(methodDesc, data)
+ if err != nil {
+ t.Fatalf("DecodeRequest verification error = %v", err)
+ }
+
+ val, _ := result.GetFieldString("name")
+ if val != "encode_test" {
+ t.Errorf("After EncodeMessage, name = %q, want %q", val,
"encode_test")
+ }
+}
diff --git a/pkg/filter/network/grpcproxy/filter/proxy/grpc_proxy_filter.go
b/pkg/filter/network/grpcproxy/filter/proxy/grpc_proxy_filter.go
index eb1522cc..7880d878 100644
--- a/pkg/filter/network/grpcproxy/filter/proxy/grpc_proxy_filter.go
+++ b/pkg/filter/network/grpcproxy/filter/proxy/grpc_proxy_filter.go
@@ -35,6 +35,8 @@ import (
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
+
+ "google.golang.org/protobuf/reflect/protoreflect"
)
import (
@@ -54,6 +56,17 @@ const (
defaultConnectTimeout = 5 * time.Second
defaultMaxMsgSize = 4 * 1024 * 1024 // 4MB
defaultHealthCheckInterval = 30 * time.Second
+ defaultDescriptorCacheTTL = 5 * time.Minute
+)
+
+// ReflectionMode defines the mode of gRPC reflection
+const (
+ // ReflectionModePassthrough performs transparent binary proxying
(default, highest performance)
+ ReflectionModePassthrough = "passthrough"
+ // ReflectionModeReflection uses gRPC reflection to decode/encode
messages (content-aware)
+ ReflectionModeReflection = "reflection"
+ // ReflectionModeHybrid tries reflection first, falls back to
passthrough on failure
+ ReflectionModeHybrid = "hybrid"
)
func init() {
@@ -76,13 +89,20 @@ type (
KeepAliveTime time.Duration `yaml:"-" json:"-"`
KeepAliveTimeout time.Duration `yaml:"-" json:"-"`
ConnectTimeout time.Duration `yaml:"-" json:"-"`
+
+ // ReflectionMode: "passthrough" (default), "reflection", or
"hybrid"
+ ReflectionMode string `yaml:"reflection_mode"
json:"reflection_mode" mapstructure:"reflection_mode"`
+ DescriptorCacheTTLStr string
`yaml:"descriptor_cache_ttl" json:"descriptor_cache_ttl"
mapstructure:"descriptor_cache_ttl"`
+ DescriptorCacheTTL time.Duration `yaml:"-" json:"-"`
+ ExtractTripleMetadata bool
`yaml:"extract_triple_metadata" json:"extract_triple_metadata"
mapstructure:"extract_triple_metadata"`
}
// Filter implements the gRPC proxy filter
Filter struct {
- Config *Config
- clientConnPool sync.Map // address -> *grpc.ClientConn
- mu sync.RWMutex // protects concurrent operations
+ Config *Config
+ clientConnPool sync.Map // address -> *grpc.ClientConn
+ mu sync.RWMutex // protects concurrent operations
+ reflectionManager *ReflectionManager
}
)
@@ -102,8 +122,25 @@ func (p Plugin) CreateFilter(config any)
(filter.GrpcFilter, error) {
cfg.KeepAliveTime = parseDurationWithDefault(cfg.KeepAliveTimeStr,
defaultKeepAliveTime)
cfg.KeepAliveTimeout =
parseDurationWithDefault(cfg.KeepAliveTimeoutStr, defaultKeepAliveTimeout)
cfg.ConnectTimeout = parseDurationWithDefault(cfg.ConnectTimeoutStr,
defaultConnectTimeout)
+ cfg.DescriptorCacheTTL =
parseDurationWithDefault(cfg.DescriptorCacheTTLStr, defaultDescriptorCacheTTL)
+
+ // Set default reflection mode if not specified
+ if cfg.ReflectionMode == "" {
+ cfg.ReflectionMode = ReflectionModePassthrough
+ }
+
+ f := &Filter{Config: cfg}
- return &Filter{Config: cfg}, nil
+ // Initialize reflection manager if reflection mode is enabled
+ if cfg.ReflectionMode == ReflectionModeReflection || cfg.ReflectionMode
== ReflectionModeHybrid {
+ f.reflectionManager =
NewReflectionManager(cfg.DescriptorCacheTTL)
+ logger.Infof("gRPC proxy filter initialized with reflection
mode: %s, cache TTL: %s",
+ cfg.ReflectionMode, cfg.DescriptorCacheTTL)
+ } else {
+ logger.Infof("gRPC proxy filter initialized with passthrough
mode")
+ }
+
+ return f, nil
}
// Config Expose the config so that Filter Manger can inject it, so it must be
a pointer
@@ -160,77 +197,98 @@ func (f *Filter) Handle(ctx *grpcCtx.GrpcContext)
filter.FilterStatus {
// handleStream handles all types of gRPC calls by creating a full-duplex
stream pipe.
func (f *Filter) handleStream(ctx *grpcCtx.GrpcContext, address string)
filter.FilterStatus {
- // Get or create connection
conn, err := f.getOrCreateConnection(address)
if err != nil {
ctx.SetError(errors.Errorf("gRPC proxy failed to get
connection: %v", err))
return filter.Stop
}
- // Set metadata for the outgoing context
md := make(metadata.MD)
for k, v := range ctx.Attachments {
if str, ok := v.(string); ok {
md.Set(k, str)
}
}
- outCtx := metadata.NewOutgoingContext(ctx.Context, md)
- // Create the full method path for the gRPC call
+ if f.Config.ExtractTripleMetadata {
+ tripleMeta := ExtractTripleMetadata(ctx.Attachments)
+ if len(tripleMeta) > 0 {
+ ctx.SetAttachment("_triple_metadata", tripleMeta)
+ logger.Debugf("Extracted Triple metadata: %v",
tripleMeta)
+ }
+ }
+
+ outCtx := metadata.NewOutgoingContext(ctx.Context, md)
fullMethod := ctx.ServiceName + "/" + ctx.MethodName
- // logger.Debugf("[dubbo-go-pixiu] gRPC proxy bidirectional stream to
%s", fullMethod)
- // Create a new client stream to the backend
+ var codecOpt grpc.CallOption
+ var useReflection bool
+
+ if f.reflectionManager != nil && f.Config.ReflectionMode !=
ReflectionModePassthrough {
+ methodDesc, err := f.reflectionManager.GetMethodDescriptor(
+ ctx.Context, conn, address, ctx.ServiceName,
ctx.MethodName)
+ if err != nil {
+ if f.Config.ReflectionMode == ReflectionModeHybrid {
+ logger.Warnf("Reflection failed for %s, falling
back to passthrough: %v", fullMethod, err)
+ codecOpt = grpc.ForceCodec(ptcodec.Codec{})
+ useReflection = false
+ } else {
+ ctx.SetError(errors.Wrapf(err, "reflection
failed for %s", fullMethod))
+ return filter.Stop
+ }
+ } else {
+ ctx.SetAttachment("_method_descriptor", methodDesc)
+ ctx.SetAttachment("_is_client_streaming",
methodDesc.IsStreamingClient())
+ logger.Debugf("Reflection succeeded for %s, input: %s,
output: %s, clientStream: %v",
+ fullMethod, methodDesc.Input().FullName(),
methodDesc.Output().FullName(), methodDesc.IsStreamingClient())
+ codecOpt = grpc.ForceCodec(NewDynamicCodec(methodDesc))
+ useReflection = true
+ }
+ } else {
+ codecOpt = grpc.ForceCodec(ptcodec.Codec{})
+ useReflection = false
+ }
+
+ ctx.SetAttachment("_use_reflection", useReflection)
+
clientStream, err := conn.NewStream(outCtx, &grpc.StreamDesc{
StreamName: ctx.MethodName,
ServerStreams: true,
ClientStreams: true,
- }, fullMethod, grpc.ForceCodec(ptcodec.Codec{}))
-
+ }, fullMethod, codecOpt)
if err != nil {
ctx.SetError(errors.Errorf("failed to create client stream:
%v", err))
return filter.Stop
}
- // Ensure there is a server stream to work with
if ctx.Stream == nil {
ctx.SetError(errors.New("no stream available in context"))
return filter.Stop
}
- // Use a WaitGroup to coordinate the two forwarding goroutines
var wg sync.WaitGroup
wg.Add(2)
-
- // Channels for error propagation and termination signaling
errChan := make(chan error, 2)
doneChan := make(chan struct{})
- // Start forwarding data in both directions
go f.forwardClientToServer(ctx, clientStream, &wg, errChan, doneChan)
go f.forwardServerToClient(ctx, clientStream, &wg, errChan, doneChan)
- // Goroutine to wait for context cancellation or the first error
go func() {
select {
case <-ctx.Context.Done():
- // If the client context is canceled, signal the
forwarding goroutines to stop
close(doneChan)
case err := <-errChan:
- // If an error occurs, propagate it and signal
termination
ctx.SetError(err)
close(doneChan)
}
}()
- // Wait for both forwarding goroutines to complete
wg.Wait()
- close(errChan) // Close channel to allow the final error check to
complete
+ close(errChan)
- // Final check for any errors that might have occurred
for err := range errChan {
if err != nil && ctx.Error == nil {
- // Set error if one hasn't been set already
ctx.SetError(err)
}
}
@@ -249,7 +307,24 @@ func (f *Filter) handleStream(ctx *grpcCtx.GrpcContext,
address string) filter.F
func (f *Filter) forwardClientToServer(ctx *grpcCtx.GrpcContext, clientStream
grpc.ClientStream, wg *sync.WaitGroup, errChan chan<- error, doneChan <-chan
struct{}) {
defer wg.Done()
- // Send initial arguments if available (for unary and server-stream
calls)
+ useReflection := false
+ if val, ok := ctx.GetAttachment("_use_reflection"); ok {
+ useReflection, _ = val.(bool)
+ }
+
+ var methodDesc protoreflect.MethodDescriptor
+ if useReflection {
+ if md, ok := ctx.GetAttachment("_method_descriptor"); ok {
+ methodDesc, _ = md.(protoreflect.MethodDescriptor)
+ }
+ }
+
+ // Check if this is a client-streaming call (client-stream or
bidirectional)
+ isClientStreaming := false
+ if val, ok := ctx.GetAttachment("_is_client_streaming"); ok {
+ isClientStreaming, _ = val.(bool)
+ }
+
if len(ctx.Arguments) > 0 {
for _, arg := range ctx.Arguments {
if err := clientStream.SendMsg(arg); err != nil {
@@ -259,17 +334,46 @@ func (f *Filter) forwardClientToServer(ctx
*grpcCtx.GrpcContext, clientStream gr
}
}
- // Continuously forward messages from the client stream
+ // For unary/server-stream: read one message then CloseSend to prevent
deadlock
+ if !isClientStreaming {
+ var msg []byte
+ if err := ctx.Stream.RecvMsg(&msg); err != nil {
+ if err == io.EOF {
+ if err := clientStream.CloseSend(); err != nil {
+ logger.Errorf("Error closing send
stream to backend: %v", err)
+ }
+ return
+ }
+ errChan <- errors.Wrap(err, "error receiving from
client")
+ return
+ }
+
+ if methodDesc != nil {
+ if decoded, err := DecodeRequest(methodDesc, msg); err
== nil {
+ logger.Debugf("Decoded request message: %v",
decoded.Message)
+ }
+ }
+
+ if err := clientStream.SendMsg(msg); err != nil {
+ errChan <- errors.Wrap(err, "error forwarding to
backend")
+ return
+ }
+
+ if err := clientStream.CloseSend(); err != nil {
+ logger.Errorf("Error closing send stream to backend:
%v", err)
+ }
+ return
+ }
+
+ // Client-streaming: continuously forward messages until EOF
for {
select {
case <-doneChan:
- // Stop forwarding if the done signal is received
return
default:
var msg []byte
if err := ctx.Stream.RecvMsg(&msg); err != nil {
if err == io.EOF {
- // Client has finished sending, so
close the send direction of the backend stream
if err := clientStream.CloseSend(); err
!= nil {
logger.Errorf("Error closing
send stream to backend: %v", err)
}
@@ -279,6 +383,12 @@ func (f *Filter) forwardClientToServer(ctx
*grpcCtx.GrpcContext, clientStream gr
return
}
+ if methodDesc != nil {
+ if decoded, err := DecodeRequest(methodDesc,
msg); err == nil {
+ logger.Debugf("Decoded request message:
%v", decoded.Message)
+ }
+ }
+
if err := clientStream.SendMsg(msg); err != nil {
errChan <- errors.Wrap(err, "error forwarding
to backend")
return
@@ -291,7 +401,13 @@ func (f *Filter) forwardClientToServer(ctx
*grpcCtx.GrpcContext, clientStream gr
func (f *Filter) forwardServerToClient(ctx *grpcCtx.GrpcContext, clientStream
grpc.ClientStream, wg *sync.WaitGroup, errChan chan<- error, doneChan <-chan
struct{}) {
defer wg.Done()
- // Forward header metadata from backend to client
+ var methodDesc protoreflect.MethodDescriptor
+ if val, ok := ctx.GetAttachment("_use_reflection"); ok && val.(bool) {
+ if md, ok := ctx.GetAttachment("_method_descriptor"); ok {
+ methodDesc, _ = md.(protoreflect.MethodDescriptor)
+ }
+ }
+
if header, err := clientStream.Header(); err == nil {
if s, ok := ctx.Stream.(grpc.ServerStream); ok {
s.SetHeader(header)
@@ -301,23 +417,26 @@ func (f *Filter) forwardServerToClient(ctx
*grpcCtx.GrpcContext, clientStream gr
for {
select {
case <-doneChan:
- // Stop forwarding if the done signal is received
return
default:
var resp []byte
err := clientStream.RecvMsg(&resp)
if err != nil {
- // Upon any error from the backend, including
EOF, forward the trailer metadata
if s, ok := ctx.Stream.(grpc.ServerStream); ok {
s.SetTrailer(clientStream.Trailer())
}
if err != io.EOF {
- // Propagate the actual gRPC status
error, but not EOF
errChan <- err
}
return
}
+ if methodDesc != nil {
+ if decoded, err := DecodeResponse(methodDesc,
resp); err == nil {
+ logger.Debugf("Decoded response
message: %v", decoded.Message)
+ }
+ }
+
if err := ctx.Stream.SendMsg(resp); err != nil {
errChan <- errors.Wrap(err, "failed to forward
response to client")
return
@@ -496,6 +615,12 @@ func (f *Filter) createTLSCredentials()
(credentials.TransportCredentials, error
func (f *Filter) Close() error {
logger.Info("Closing gRPC proxy filter and all connections")
+ // Close reflection manager if initialized
+ if f.reflectionManager != nil {
+ f.reflectionManager.Close()
+ logger.Info("Reflection manager closed")
+ }
+
var wg sync.WaitGroup
var closeErrors []error
var errorMu sync.Mutex
diff --git
a/pkg/filter/network/grpcproxy/filter/proxy/grpc_proxy_filter_test.go
b/pkg/filter/network/grpcproxy/filter/proxy/grpc_proxy_filter_test.go
new file mode 100644
index 00000000..1343ad9c
--- /dev/null
+++ b/pkg/filter/network/grpcproxy/filter/proxy/grpc_proxy_filter_test.go
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package proxy
+
+import (
+ "testing"
+ "time"
+)
+
+func TestParseDurationWithDefault(t *testing.T) {
+ tests := []struct {
+ name string
+ durationStr string
+ defaultVal time.Duration
+ expected time.Duration
+ }{
+ {
+ name: "empty string uses default",
+ durationStr: "",
+ defaultVal: 30 * time.Second,
+ expected: 30 * time.Second,
+ },
+ {
+ name: "valid duration seconds",
+ durationStr: "10s",
+ defaultVal: 30 * time.Second,
+ expected: 10 * time.Second,
+ },
+ {
+ name: "valid duration minutes",
+ durationStr: "5m",
+ defaultVal: 30 * time.Second,
+ expected: 5 * time.Minute,
+ },
+ {
+ name: "valid duration hours",
+ durationStr: "2h",
+ defaultVal: 30 * time.Second,
+ expected: 2 * time.Hour,
+ },
+ {
+ name: "valid duration milliseconds",
+ durationStr: "500ms",
+ defaultVal: 1 * time.Second,
+ expected: 500 * time.Millisecond,
+ },
+ {
+ name: "valid complex duration",
+ durationStr: "1h30m",
+ defaultVal: 30 * time.Second,
+ expected: 1*time.Hour + 30*time.Minute,
+ },
+ {
+ name: "invalid duration uses default",
+ durationStr: "invalid",
+ defaultVal: 45 * time.Second,
+ expected: 45 * time.Second,
+ },
+ {
+ name: "number without unit uses default",
+ durationStr: "100",
+ defaultVal: 20 * time.Second,
+ expected: 20 * time.Second,
+ },
+ {
+ name: "negative duration",
+ durationStr: "-5s",
+ defaultVal: 10 * time.Second,
+ expected: -5 * time.Second,
+ },
+ {
+ name: "zero duration",
+ durationStr: "0s",
+ defaultVal: 10 * time.Second,
+ expected: 0,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ result := parseDurationWithDefault(tt.durationStr,
tt.defaultVal)
+ if result != tt.expected {
+ t.Errorf("parseDurationWithDefault(%q, %v) =
%v, want %v",
+ tt.durationStr, tt.defaultVal, result,
tt.expected)
+ }
+ })
+ }
+}
+
+func TestPlugin_Kind(t *testing.T) {
+ plugin := Plugin{}
+ expected := Kind
+
+ if got := plugin.Kind(); got != expected {
+ t.Errorf("Kind() = %q, want %q", got, expected)
+ }
+}
+
+func TestReflectionModeConstants(t *testing.T) {
+ // Verify reflection mode constants are defined correctly
+ tests := []struct {
+ name string
+ mode string
+ expected string
+ }{
+ {"passthrough mode", ReflectionModePassthrough, "passthrough"},
+ {"reflection mode", ReflectionModeReflection, "reflection"},
+ {"hybrid mode", ReflectionModeHybrid, "hybrid"},
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if tt.mode != tt.expected {
+ t.Errorf("%s = %q, want %q", tt.name, tt.mode,
tt.expected)
+ }
+ })
+ }
+}
+
+func TestConfig_DefaultValues(t *testing.T) {
+ // Test that config can be created with default values
+ config := &Config{}
+
+ // Verify default empty values
+ if config.ReflectionMode != "" {
+ t.Errorf("default ReflectionMode = %q, want empty",
config.ReflectionMode)
+ }
+ if config.DescriptorCacheTTLStr != "" {
+ t.Errorf("default DescriptorCacheTTLStr = %q, want empty",
config.DescriptorCacheTTLStr)
+ }
+ if config.ExtractTripleMetadata {
+ t.Error("default ExtractTripleMetadata should be false")
+ }
+}
+
+func TestConfig_WithReflectionSettings(t *testing.T) {
+ config := &Config{
+ ReflectionMode: ReflectionModeReflection,
+ DescriptorCacheTTLStr: "10m",
+ ExtractTripleMetadata: true,
+ }
+
+ if config.ReflectionMode != "reflection" {
+ t.Errorf("ReflectionMode = %q, want %q", config.ReflectionMode,
"reflection")
+ }
+
+ // Parse the TTL
+ ttl := parseDurationWithDefault(config.DescriptorCacheTTLStr,
5*time.Minute)
+ if ttl != 10*time.Minute {
+ t.Errorf("parsed TTL = %v, want %v", ttl, 10*time.Minute)
+ }
+
+ if !config.ExtractTripleMetadata {
+ t.Error("ExtractTripleMetadata should be true")
+ }
+}
+
+func TestFilter_Close_NilReflectionManager(t *testing.T) {
+ // Test that Close handles nil reflection manager gracefully
+ filter := &Filter{
+ Config: &Config{},
+ reflectionManager: nil,
+ }
+
+ // Should not panic
+ filter.Close()
+}
+
+func TestFilter_Close_WithReflectionManager(t *testing.T) {
+ rm := NewReflectionManager(5 * time.Minute)
+ filter := &Filter{
+ Config: &Config{},
+ reflectionManager: rm,
+ }
+
+ // Should not panic and should close the manager
+ filter.Close()
+}
diff --git a/pkg/filter/network/grpcproxy/filter/proxy/protocol.go
b/pkg/filter/network/grpcproxy/filter/proxy/protocol.go
new file mode 100644
index 00000000..70e0193c
--- /dev/null
+++ b/pkg/filter/network/grpcproxy/filter/proxy/protocol.go
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package proxy
+
+import (
+ "strings"
+)
+
+// Triple-specific header prefix
+const (
+ TripleHeaderPrefix = "tri-"
+)
+
+// ExtractTripleMetadata extracts Triple-specific metadata from attachments.
+// This is used to extract tri-* headers for routing decisions based on
+// service version, group, etc. Note: This filter is only used by gRPC
listener,
+// which handles gRPC protocol. Triple protocol should use triple listener.
+// This function is kept for potential cross-protocol scenarios where Triple
+// headers might be present in gRPC requests for routing purposes.
+func ExtractTripleMetadata(attachments map[string]any) map[string]string {
+ meta := make(map[string]string)
+ for k, v := range attachments {
+ key := strings.ToLower(k)
+ if strings.HasPrefix(key, TripleHeaderPrefix) {
+ if str, ok := v.(string); ok {
+ meta[key] = str
+ }
+ }
+ }
+ return meta
+}
+
+// IsTripleHeader checks if a header key is Triple-specific
+func IsTripleHeader(key string) bool {
+ return strings.HasPrefix(strings.ToLower(key), TripleHeaderPrefix)
+}
diff --git a/pkg/filter/network/grpcproxy/filter/proxy/protocol_test.go
b/pkg/filter/network/grpcproxy/filter/proxy/protocol_test.go
new file mode 100644
index 00000000..6cff9437
--- /dev/null
+++ b/pkg/filter/network/grpcproxy/filter/proxy/protocol_test.go
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package proxy
+
+import (
+ "testing"
+)
+
+func TestExtractTripleMetadata(t *testing.T) {
+ tests := []struct {
+ name string
+ attachments map[string]any
+ expected map[string]string
+ }{
+ {
+ name: "with triple headers",
+ attachments: map[string]any{
+ "tri-service-version": "1.0.0",
+ "tri-service-group": "production",
+ "tri-req-id": "12345",
+ "other-header": "value",
+ },
+ expected: map[string]string{
+ "tri-service-version": "1.0.0",
+ "tri-service-group": "production",
+ "tri-req-id": "12345",
+ },
+ },
+ {
+ name: "no triple headers",
+ attachments: map[string]any{
+ "content-type": "application/grpc",
+ "authorization": "Bearer token",
+ },
+ expected: map[string]string{},
+ },
+ {
+ name: "nil attachments",
+ attachments: nil,
+ expected: map[string]string{},
+ },
+ {
+ name: "empty attachments",
+ attachments: map[string]any{},
+ expected: map[string]string{},
+ },
+ {
+ name: "mixed case headers",
+ attachments: map[string]any{
+ "Tri-Service-Version": "2.0.0",
+ "TRI-SERVICE-GROUP": "staging",
+ },
+ expected: map[string]string{
+ "tri-service-version": "2.0.0",
+ "tri-service-group": "staging",
+ },
+ },
+ {
+ name: "non-string values ignored",
+ attachments: map[string]any{
+ "tri-service-version": "1.0.0",
+ "tri-count": 123, // non-string,
should be ignored
+ },
+ expected: map[string]string{
+ "tri-service-version": "1.0.0",
+ },
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ result := ExtractTripleMetadata(tt.attachments)
+ if len(result) != len(tt.expected) {
+ t.Errorf("ExtractTripleMetadata() returned %d
items, want %d", len(result), len(tt.expected))
+ }
+ for k, v := range tt.expected {
+ if result[k] != v {
+ t.Errorf("ExtractTripleMetadata()[%q] =
%q, want %q", k, result[k], v)
+ }
+ }
+ })
+ }
+}
+
+func TestIsTripleHeader(t *testing.T) {
+ tests := []struct {
+ name string
+ key string
+ expected bool
+ }{
+ {"lowercase tri prefix", "tri-service-version", true},
+ {"uppercase TRI prefix", "TRI-SERVICE-VERSION", true},
+ {"mixed case", "Tri-Service-Group", true},
+ {"no tri prefix", "content-type", false},
+ {"partial match", "triangle", false},
+ {"empty string", "", false},
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ result := IsTripleHeader(tt.key)
+ if result != tt.expected {
+ t.Errorf("IsTripleHeader(%q) = %v, want %v",
tt.key, result, tt.expected)
+ }
+ })
+ }
+}
diff --git a/pkg/filter/network/grpcproxy/filter/proxy/reflection_manager.go
b/pkg/filter/network/grpcproxy/filter/proxy/reflection_manager.go
new file mode 100644
index 00000000..9da1621f
--- /dev/null
+++ b/pkg/filter/network/grpcproxy/filter/proxy/reflection_manager.go
@@ -0,0 +1,709 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package proxy
+
+import (
+ "context"
+ "crypto/md5"
+ "fmt"
+ "io"
+ "sync"
+ "time"
+)
+
+import (
+ "github.com/pkg/errors"
+
+ "golang.org/x/sync/singleflight"
+
+ "google.golang.org/grpc"
+ rpb "google.golang.org/grpc/reflection/grpc_reflection_v1alpha"
+
+ "google.golang.org/protobuf/proto"
+
+ "google.golang.org/protobuf/reflect/protodesc"
+ "google.golang.org/protobuf/reflect/protoreflect"
+ "google.golang.org/protobuf/reflect/protoregistry"
+
+ "google.golang.org/protobuf/types/descriptorpb"
+)
+
+import (
+ "github.com/apache/dubbo-go-pixiu/pkg/logger"
+)
+
+const (
+ defaultDescCacheTTL = 5 * time.Minute
+ reflectionTimeout = 10 * time.Second
+ // versionHashLength is the length of truncated MD5 hash used for
version detection.
+ // 16 hex characters (64 bits) provide sufficient uniqueness for cache
invalidation
+ // while keeping the hash compact for logging and comparison.
+ versionHashLength = 16
+)
+
+// ReflectionVersion specifies which version of the reflection API to use
+type ReflectionVersion string
+
+const (
+ // ReflectionV1Alpha uses the v1alpha reflection API
+ ReflectionV1Alpha ReflectionVersion = "v1alpha"
+ // ReflectionV1 uses the v1 reflection API (not yet implemented,
reserved)
+ ReflectionV1 ReflectionVersion = "v1"
+ // ReflectionAuto attempts to detect the available version
+ ReflectionAuto ReflectionVersion = "auto"
+)
+
+// ReflectionConfig holds configuration for the reflection manager
+type ReflectionConfig struct {
+ CacheTTL time.Duration
+ MaxCacheSize int
+ ReflectionVersion ReflectionVersion
+ ContinueOnError bool // Continue with partial results on dependency
resolution failure
+}
+
+// ReflectionCacheStats holds cache statistics for the reflection manager
+type ReflectionCacheStats struct {
+ MethodCacheSize int `json:"method_cache_size"`
+ MethodCacheHits int64 `json:"method_cache_hits"`
+ MethodCacheMisses int64 `json:"method_cache_misses"`
+ MethodCacheEvictions int64 `json:"method_cache_evictions"`
+ MethodCacheHitRatio float64 `json:"method_cache_hit_ratio"`
+ FileRegistryCount int `json:"file_registry_count"`
+ TTLSeconds float64 `json:"ttl_secs"`
+ MaxCacheSize int `json:"max_cache_size"`
+}
+
+// fileRegistryWithMetadata holds a file registry with metadata for version
tracking
+type fileRegistryWithMetadata struct {
+ files *protoregistry.Files
+ versionHash string
+ timestamp time.Time
+}
+
+// ReflectionManager manages gRPC reflection clients and descriptor caching
+// using official google.golang.org/protobuf libraries
+type ReflectionManager struct {
+ cache *DescriptorCache
+ cacheTTL time.Duration
+ config ReflectionConfig
+ // fileDescCache caches file descriptors per address with metadata
+ fileDescCache sync.Map // address -> *fileRegistryWithMetadata
+ // fileRegistryGroup uses singleflight to ensure only one goroutine
+ // performs reflection for each address at a time
+ fileRegistryGroup singleflight.Group
+ // Track missing dependencies per address for monitoring
+ missingDeps sync.Map // address -> []string
+}
+
+// NewReflectionManager creates a new reflection manager with default config
+func NewReflectionManager(cacheTTL time.Duration) *ReflectionManager {
+ return NewReflectionManagerWithConfig(ReflectionConfig{
+ CacheTTL: cacheTTL,
+ MaxCacheSize: defaultMaxCacheSize,
+ ReflectionVersion: ReflectionV1Alpha,
+ ContinueOnError: true,
+ })
+}
+
+// NewReflectionManagerWithConfig creates a new reflection manager with custom
config
+func NewReflectionManagerWithConfig(config ReflectionConfig)
*ReflectionManager {
+ if config.CacheTTL <= 0 {
+ config.CacheTTL = defaultDescCacheTTL
+ }
+ if config.MaxCacheSize < minCacheSize {
+ config.MaxCacheSize = minCacheSize
+ }
+ if config.ReflectionVersion == "" {
+ config.ReflectionVersion = ReflectionV1Alpha
+ }
+
+ return &ReflectionManager{
+ cache: NewDescriptorCacheWithSize(config.CacheTTL,
config.MaxCacheSize),
+ cacheTTL: config.CacheTTL,
+ config: config,
+ }
+}
+
+// GetMethodDescriptor retrieves a method descriptor using gRPC reflection
+// Results are cached for improved performance
+func (rm *ReflectionManager) GetMethodDescriptor(
+ ctx context.Context,
+ conn *grpc.ClientConn,
+ address string,
+ serviceName string,
+ methodName string,
+) (protoreflect.MethodDescriptor, error) {
+ // Build cache key
+ cacheKey := BuildCacheKey(address, serviceName, methodName)
+
+ // Check cache first (with version check)
+ if cached := rm.cache.Get(cacheKey); cached != nil {
+ logger.Debugf("Reflection cache hit for %s", cacheKey)
+ return cached, nil
+ }
+
+ logger.Debugf("Reflection cache miss for %s, performing reflection",
cacheKey)
+
+ // Perform reflection with timeout
+ reflectCtx, cancel := context.WithTimeout(ctx, reflectionTimeout)
+ defer cancel()
+
+ // Get or create file registry for this address
+ files, versionHash, err := rm.getOrCreateFileRegistry(reflectCtx, conn,
address, serviceName)
+ if err != nil {
+ return nil, err
+ }
+
+ // Find service descriptor
+ serviceDesc, err :=
files.FindDescriptorByName(protoreflect.FullName(serviceName))
+ if err != nil {
+ return nil, errors.Wrapf(err, "failed to find service %s",
serviceName)
+ }
+
+ svcDesc, ok := serviceDesc.(protoreflect.ServiceDescriptor)
+ if !ok {
+ return nil, fmt.Errorf("%s is not a service", serviceName)
+ }
+
+ // Find method descriptor
+ methodDesc := svcDesc.Methods().ByName(protoreflect.Name(methodName))
+ if methodDesc == nil {
+ return nil, fmt.Errorf("method %s not found in service %s",
methodName, serviceName)
+ }
+
+ // Cache the result with version hash
+ rm.cache.SetWithVersion(cacheKey, methodDesc, versionHash)
+ logger.Debugf("Cached method descriptor for %s (version: %s)",
cacheKey, versionHash)
+
+ return methodDesc, nil
+}
+
+// getOrCreateFileRegistry gets or creates a file registry for the given
address
+// Uses singleflight to ensure only one goroutine performs reflection per
address
+// Returns the files registry and version hash for change detection
+func (rm *ReflectionManager) getOrCreateFileRegistry(
+ ctx context.Context,
+ conn *grpc.ClientConn,
+ address string,
+ serviceName string,
+) (*protoregistry.Files, string, error) {
+ // Check cache first (fast path, no locking)
+ if cached, ok := rm.fileDescCache.Load(address); ok {
+ reg := cached.(*fileRegistryWithMetadata)
+ return reg.files, reg.versionHash, nil
+ }
+
+ // Use singleflight to deduplicate concurrent reflection requests
+ // for the same address. This prevents thundering herd problem.
+ result, err, shared := rm.fileRegistryGroup.Do(address, func() (any,
error) {
+ // Double-check cache in case another goroutine just filled it
+ if cached, ok := rm.fileDescCache.Load(address); ok {
+ reg := cached.(*fileRegistryWithMetadata)
+ return registryWithVersion{files: reg.files,
versionHash: reg.versionHash}, nil
+ }
+
+ // Create reflection client based on configured version
+ client := rpb.NewServerReflectionClient(conn)
+ stream, err := client.ServerReflectionInfo(ctx)
+ if err != nil {
+ return nil, errors.Wrap(err, "failed to create
reflection stream")
+ }
+ defer stream.CloseSend()
+
+ // Request file descriptor for the service
+ fileDescs, missingDeps, err :=
rm.resolveServiceFileDescriptors(stream, serviceName)
+ if err != nil && !rm.config.ContinueOnError {
+ return nil, err
+ }
+
+ // Log missing dependencies if any
+ if len(missingDeps) > 0 {
+ logger.Warnf("Missing dependencies for %s: %v",
address, missingDeps)
+ rm.missingDeps.Store(address, missingDeps)
+ }
+
+ // Compute version hash from all file descriptors
+ versionHash := rm.computeVersionHash(fileDescs)
+
+ // Build file registry
+ files, err := rm.buildFileRegistry(fileDescs)
+ if err != nil && !rm.config.ContinueOnError {
+ return nil, err
+ }
+
+ // Cache the registry with metadata for future requests
+ rm.fileDescCache.Store(address, &fileRegistryWithMetadata{
+ files: files,
+ versionHash: versionHash,
+ timestamp: time.Now(),
+ })
+
+ return registryWithVersion{files: files, versionHash:
versionHash}, nil
+ })
+
+ if err != nil {
+ return nil, "", err
+ }
+
+ if shared {
+ logger.Debugf("Reflection request for %s shared with another
goroutine", address)
+ }
+
+ rw := result.(registryWithVersion)
+ return rw.files, rw.versionHash, nil
+}
+
+// registryWithVersion is a helper type to return both files and version hash
from singleflight
+type registryWithVersion struct {
+ files *protoregistry.Files
+ versionHash string
+}
+
+// resolveServiceFileDescriptors resolves all file descriptors needed for a
service
+// Returns file descriptors, list of missing dependencies, and error
+func (rm *ReflectionManager) resolveServiceFileDescriptors(
+ stream rpb.ServerReflection_ServerReflectionInfoClient,
+ serviceName string,
+) ([]*descriptorpb.FileDescriptorProto, []string, error) {
+ // Request file containing the service
+ req := &rpb.ServerReflectionRequest{
+ MessageRequest:
&rpb.ServerReflectionRequest_FileContainingSymbol{
+ FileContainingSymbol: serviceName,
+ },
+ }
+
+ if err := stream.Send(req); err != nil {
+ return nil, nil, errors.Wrap(err, "failed to send reflection
request")
+ }
+
+ resp, err := stream.Recv()
+ if err != nil {
+ if err == io.EOF {
+ return nil, nil, errors.New("unexpected EOF when
receiving reflection response")
+ }
+ return nil, nil, errors.Wrap(err, "failed to receive reflection
response")
+ }
+
+ fdResp := resp.GetFileDescriptorResponse()
+ if fdResp == nil {
+ if errResp := resp.GetErrorResponse(); errResp != nil {
+ return nil, nil, fmt.Errorf("reflection error: %s",
errResp.ErrorMessage)
+ }
+ return nil, nil, errors.New("unexpected reflection response
type")
+ }
+
+ // Parse file descriptors
+ fileDescs := make([]*descriptorpb.FileDescriptorProto, 0,
len(fdResp.FileDescriptorProto))
+ for _, fdBytes := range fdResp.FileDescriptorProto {
+ fd := &descriptorpb.FileDescriptorProto{}
+ if err := proto.Unmarshal(fdBytes, fd); err != nil {
+ return nil, nil, errors.Wrap(err, "failed to unmarshal
file descriptor")
+ }
+ fileDescs = append(fileDescs, fd)
+ }
+
+ // Resolve dependencies recursively
+ resolved := make(map[string]bool)
+ missingDeps := make([]string, 0)
+ for _, fd := range fileDescs {
+ resolved[fd.GetName()] = true
+ }
+
+ for _, fd := range fileDescs {
+ deps, missing, err := rm.resolveDependencies(stream,
fd.GetDependency(), resolved)
+ if err != nil {
+ if !rm.config.ContinueOnError {
+ return nil, nil, err
+ }
+ logger.Warnf("Failed to resolve dependencies for %s:
%v", fd.GetName(), err)
+ }
+ missingDeps = append(missingDeps, missing...)
+ fileDescs = append(fileDescs, deps...)
+ }
+
+ return fileDescs, missingDeps, nil
+}
+
+// resolveDependencies resolves file descriptor dependencies
+// Returns resolved descriptors, list of missing dependencies, and error
+func (rm *ReflectionManager) resolveDependencies(
+ stream rpb.ServerReflection_ServerReflectionInfoClient,
+ deps []string,
+ resolved map[string]bool,
+) ([]*descriptorpb.FileDescriptorProto, []string, error) {
+ var result []*descriptorpb.FileDescriptorProto
+ var missingDeps []string
+
+ for _, dep := range deps {
+ if resolved[dep] {
+ continue
+ }
+
+ req := &rpb.ServerReflectionRequest{
+ MessageRequest:
&rpb.ServerReflectionRequest_FileByFilename{
+ FileByFilename: dep,
+ },
+ }
+
+ if err := stream.Send(req); err != nil {
+ if err == io.EOF {
+ // Stream closed by server, return what we have
collected so far
+ logger.Debugf("Stream EOF during send for
dependency %s", dep)
+ return result, missingDeps, nil
+ }
+ if !rm.config.ContinueOnError {
+ return nil, nil, errors.Wrapf(err, "failed to
send request for %s", dep)
+ }
+ logger.Warnf("Failed to send request for dependency %s:
%v", dep, err)
+ missingDeps = append(missingDeps, dep)
+ continue
+ }
+
+ resp, err := stream.Recv()
+ if err != nil {
+ if err == io.EOF {
+ // Stream closed by server, return what we have
collected so far
+ logger.Debugf("Stream EOF during recv for
dependency %s", dep)
+ return result, missingDeps, nil
+ }
+ if !rm.config.ContinueOnError {
+ return nil, nil, errors.Wrapf(err, "failed to
receive response for %s", dep)
+ }
+ logger.Warnf("Failed to receive response for dependency
%s: %v", dep, err)
+ missingDeps = append(missingDeps, dep)
+ continue
+ }
+
+ fdResp := resp.GetFileDescriptorResponse()
+ if fdResp == nil {
+ // No file descriptor in response, could be an error
response or unexpected type
+ if errResp := resp.GetErrorResponse(); errResp != nil {
+ logger.Warnf("Reflection error for dependency
%s: %s", dep, errResp.ErrorMessage)
+ // Mark as missing and continue if
ContinueOnError is enabled
+ if rm.config.ContinueOnError {
+ missingDeps = append(missingDeps, dep)
+ continue
+ }
+ return nil, nil, fmt.Errorf("reflection error
for dependency %s: %s", dep, errResp.ErrorMessage)
+ }
+ // Unexpected response type, skip this dependency
+ missingDeps = append(missingDeps, dep)
+ continue
+ }
+
+ for _, fdBytes := range fdResp.FileDescriptorProto {
+ fd := &descriptorpb.FileDescriptorProto{}
+ if err := proto.Unmarshal(fdBytes, fd); err != nil {
+ if !rm.config.ContinueOnError {
+ return nil, nil, errors.Wrap(err,
"failed to unmarshal dependency")
+ }
+ logger.Warnf("Failed to unmarshal dependency
%s: %v", dep, err)
+ continue
+ }
+ if !resolved[fd.GetName()] {
+ resolved[fd.GetName()] = true
+ result = append(result, fd)
+
+ // Resolve nested dependencies
+ nested, nestedMissing, err :=
rm.resolveDependencies(stream, fd.GetDependency(), resolved)
+ if err != nil {
+ if !rm.config.ContinueOnError {
+ return nil, nil, err
+ }
+ logger.Warnf("Failed to resolve nested
dependencies for %s: %v", fd.GetName(), err)
+ }
+ missingDeps = append(missingDeps,
nestedMissing...)
+ result = append(result, nested...)
+ }
+ }
+ }
+
+ return result, missingDeps, nil
+}
+
+// buildFileRegistry builds a protoregistry.Files from file descriptor protos
+// Continues on error if ContinueOnError is enabled
+func (rm *ReflectionManager) buildFileRegistry(
+ fileDescs []*descriptorpb.FileDescriptorProto,
+) (*protoregistry.Files, error) {
+ files := new(protoregistry.Files)
+
+ // Build dependency graph and sort topologically
+ sorted, err := rm.topologicalSort(fileDescs)
+ if err != nil {
+ if !rm.config.ContinueOnError {
+ return nil, err
+ }
+ logger.Warnf("Topological sort failed, using original order:
%v", err)
+ sorted = fileDescs
+ }
+
+ // Register files in dependency order
+ registeredCount := 0
+ for _, fd := range sorted {
+ fileDesc, err := protodesc.NewFile(fd, files)
+ if err != nil {
+ // Skip files that fail to register (might be already
registered or missing deps)
+ logger.Debugf("Skipping file %s: %v", fd.GetName(), err)
+ continue
+ }
+ if err := files.RegisterFile(fileDesc); err != nil {
+ // Skip already registered files
+ logger.Debugf("Skipping duplicate file %s: %v",
fd.GetName(), err)
+ continue
+ }
+ registeredCount++
+ }
+
+ logger.Debugf("Registered %d/%d file descriptors successfully",
registeredCount, len(sorted))
+
+ if registeredCount == 0 && !rm.config.ContinueOnError {
+ return nil, errors.New("failed to register any file
descriptors")
+ }
+
+ return files, nil
+}
+
+// topologicalSort sorts file descriptors by dependency order
+func (rm *ReflectionManager) topologicalSort(
+ fileDescs []*descriptorpb.FileDescriptorProto,
+) ([]*descriptorpb.FileDescriptorProto, error) {
+ // Build a map for quick lookup
+ fdMap := make(map[string]*descriptorpb.FileDescriptorProto)
+ for _, fd := range fileDescs {
+ fdMap[fd.GetName()] = fd
+ }
+
+ visited := make(map[string]bool)
+ inStack := make(map[string]bool)
+ var result []*descriptorpb.FileDescriptorProto
+
+ var visit func(name string) error
+ visit = func(name string) error {
+ if inStack[name] {
+ // Circular dependency detected
+ logger.Warnf("Circular dependency detected involving:
%s", name)
+ // Don't return error, just skip to avoid infinite loop
+ return nil
+ }
+ if visited[name] {
+ return nil
+ }
+
+ fd, ok := fdMap[name]
+ if !ok {
+ // External dependency, skip
+ return nil
+ }
+
+ inStack[name] = true
+ for _, dep := range fd.GetDependency() {
+ if err := visit(dep); err != nil {
+ return err
+ }
+ }
+ inStack[name] = false
+ visited[name] = true
+ result = append(result, fd)
+ return nil
+ }
+
+ for name := range fdMap {
+ if err := visit(name); err != nil {
+ return nil, err
+ }
+ }
+
+ return result, nil
+}
+
+// computeVersionHash computes a hash from file descriptors for version
detection
+func (rm *ReflectionManager) computeVersionHash(fileDescs
[]*descriptorpb.FileDescriptorProto) string {
+ h := md5.New()
+
+ // Sort file names for consistent hashing
+ names := make([]string, len(fileDescs))
+ for i, fd := range fileDescs {
+ names[i] = fd.GetName()
+ }
+
+ // Write names and sizes to hash
+ for _, name := range names {
+ h.Write([]byte(name))
+ h.Write([]byte{0}) // null terminator
+ }
+
+ return fmt.Sprintf("%x", h.Sum(nil))[:versionHashLength]
+}
+
+// InvalidateCache removes cached descriptors for a specific address
+func (rm *ReflectionManager) InvalidateCache(address string) {
+ rm.fileDescCache.Delete(address)
+ // Forget the singleflight key to allow new reflection requests
+ rm.fileRegistryGroup.Forget(address)
+ // Clear missing dependencies tracking
+ rm.missingDeps.Delete(address)
+ logger.Infof("Cache invalidation for address: %s", address)
+}
+
+// InvalidateByVersion removes all cached entries with a specific version hash
+func (rm *ReflectionManager) InvalidateByVersion(versionHash string) int {
+ count := 0
+ rm.fileDescCache.Range(func(key, value any) bool {
+ reg := value.(*fileRegistryWithMetadata)
+ if reg.versionHash == versionHash {
+ rm.fileDescCache.Delete(key)
+ if address, ok := key.(string); ok {
+ rm.fileRegistryGroup.Forget(address)
+ }
+ count++
+ }
+ return true
+ })
+ // Also invalidate method cache with this version
+ count += rm.cache.InvalidateByVersion(versionHash)
+ if count > 0 {
+ logger.Infof("Invalidated %d cache entries with version hash:
%s", count, versionHash)
+ }
+ return count
+}
+
+// ClearCache clears all cached descriptors
+func (rm *ReflectionManager) ClearCache() {
+ rm.cache.Clear()
+ // Clear file descriptor cache and singleflight cache
+ rm.fileDescCache.Range(func(key, _ any) bool {
+ rm.fileDescCache.Delete(key)
+ // Forget the singleflight key to allow new reflection requests
+ if address, ok := key.(string); ok {
+ rm.fileRegistryGroup.Forget(address)
+ }
+ return true
+ })
+ // Clear missing dependencies tracking
+ rm.missingDeps.Range(func(key, _ any) bool {
+ rm.missingDeps.Delete(key)
+ return true
+ })
+ logger.Info("Reflection descriptor cache cleared")
+}
+
+// Close cleans up resources
+func (rm *ReflectionManager) Close() {
+ rm.cache.Close()
+}
+
+// WarmupService pre-fetches and caches descriptors for a specific service.
+// This eliminates cold-start latency for the first request to this service.
+// The conn parameter should be a connection to the backend server.
+func (rm *ReflectionManager) WarmupService(ctx context.Context, conn
*grpc.ClientConn, address, serviceName string) error {
+ _, _, err := rm.getOrCreateFileRegistry(ctx, conn, address, serviceName)
+ if err != nil {
+ return errors.Wrapf(err, "warmup failed for service %s at %s",
serviceName, address)
+ }
+ logger.Infof("Warmup completed for service %s at %s", serviceName,
address)
+ return nil
+}
+
+// WarmupServices pre-fetches and caches descriptors for multiple services
concurrently.
+// Returns a map of service names to errors (nil if successful).
+func (rm *ReflectionManager) WarmupServices(ctx context.Context, conn
*grpc.ClientConn, address string, serviceNames []string) map[string]error {
+ results := make(map[string]error)
+ var mu sync.Mutex
+ var wg sync.WaitGroup
+
+ for _, svc := range serviceNames {
+ wg.Add(1)
+ go func(serviceName string) {
+ defer wg.Done()
+ err := rm.WarmupService(ctx, conn, address, serviceName)
+ mu.Lock()
+ results[serviceName] = err
+ mu.Unlock()
+ }(svc)
+ }
+
+ wg.Wait()
+ return results
+}
+
+// GetCacheStats returns cache statistics for monitoring
+func (rm *ReflectionManager) GetCacheStats() ReflectionCacheStats {
+ fileCount := 0
+ rm.fileDescCache.Range(func(_, _ any) bool {
+ fileCount++
+ return true
+ })
+
+ methodStats := rm.cache.GetStats()
+
+ return ReflectionCacheStats{
+ MethodCacheSize: methodStats.Size,
+ MethodCacheHits: methodStats.Hits,
+ MethodCacheMisses: methodStats.Misses,
+ MethodCacheEvictions: methodStats.Evictions,
+ MethodCacheHitRatio: methodStats.HitRatio,
+ FileRegistryCount: fileCount,
+ TTLSeconds: rm.cacheTTL.Seconds(),
+ MaxCacheSize: methodStats.MaxSize,
+ }
+}
+
+// GetMissingDependencies returns missing dependencies for a given address
+func (rm *ReflectionManager) GetMissingDependencies(address string) []string {
+ if missing, ok := rm.missingDeps.Load(address); ok {
+ return missing.([]string)
+ }
+ return nil
+}
+
+// GetAllMissingDependencies returns all missing dependencies across all
addresses
+func (rm *ReflectionManager) GetAllMissingDependencies() map[string][]string {
+ result := make(map[string][]string)
+ rm.missingDeps.Range(func(key, value any) bool {
+ if address, ok := key.(string); ok {
+ result[address] = value.([]string)
+ }
+ return true
+ })
+ return result
+}
+
+// SetConfig updates the reflection manager configuration
+func (rm *ReflectionManager) SetConfig(config ReflectionConfig) {
+ if config.CacheTTL > 0 {
+ rm.cacheTTL = config.CacheTTL
+ rm.config.CacheTTL = config.CacheTTL
+ }
+ if config.MaxCacheSize >= minCacheSize {
+ rm.cache.SetMaxSize(config.MaxCacheSize)
+ rm.config.MaxCacheSize = config.MaxCacheSize
+ }
+ if config.ReflectionVersion != "" {
+ rm.config.ReflectionVersion = config.ReflectionVersion
+ }
+ rm.config.ContinueOnError = config.ContinueOnError
+
+ logger.Infof("Reflection manager config updated: version=%s,
continueOnError=%v",
+ rm.config.ReflectionVersion, rm.config.ContinueOnError)
+}
+
+// GetConfig returns the current configuration
+func (rm *ReflectionManager) GetConfig() ReflectionConfig {
+ return rm.config
+}
diff --git
a/pkg/filter/network/grpcproxy/filter/proxy/reflection_manager_test.go
b/pkg/filter/network/grpcproxy/filter/proxy/reflection_manager_test.go
new file mode 100644
index 00000000..99bfba36
--- /dev/null
+++ b/pkg/filter/network/grpcproxy/filter/proxy/reflection_manager_test.go
@@ -0,0 +1,463 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package proxy
+
+import (
+ "testing"
+ "time"
+)
+
+import (
+ "google.golang.org/protobuf/proto"
+
+ "google.golang.org/protobuf/types/descriptorpb"
+)
+
+func TestNewReflectionManager(t *testing.T) {
+ tests := []struct {
+ name string
+ ttl time.Duration
+ expected time.Duration
+ }{
+ {
+ name: "positive TTL",
+ ttl: 10 * time.Minute,
+ expected: 10 * time.Minute,
+ },
+ {
+ name: "zero TTL uses default",
+ ttl: 0,
+ expected: defaultDescCacheTTL,
+ },
+ {
+ name: "negative TTL uses default",
+ ttl: -1 * time.Minute,
+ expected: defaultDescCacheTTL,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ rm := NewReflectionManager(tt.ttl)
+ defer rm.Close()
+
+ if rm == nil {
+ t.Fatal("NewReflectionManager returned nil")
+ }
+ if rm.cache == nil {
+ t.Error("cache is nil")
+ }
+ if rm.cacheTTL != tt.expected {
+ t.Errorf("cacheTTL = %v, want %v", rm.cacheTTL,
tt.expected)
+ }
+ })
+ }
+}
+
+func TestReflectionManager_InvalidateCache(t *testing.T) {
+ rm := NewReflectionManager(5 * time.Minute)
+ defer rm.Close()
+
+ // Store something in the file descriptor cache
+ rm.fileDescCache.Store("test-address:50051", "dummy-value")
+
+ // Verify it's stored
+ if _, ok := rm.fileDescCache.Load("test-address:50051"); !ok {
+ t.Fatal("value not stored in fileDescCache")
+ }
+
+ // Invalidate
+ rm.InvalidateCache("test-address:50051")
+
+ // Verify it's gone
+ if _, ok := rm.fileDescCache.Load("test-address:50051"); ok {
+ t.Error("value still exists after InvalidateCache")
+ }
+}
+
+func TestReflectionManager_ClearCache(t *testing.T) {
+ rm := NewReflectionManager(5 * time.Minute)
+ defer rm.Close()
+
+ // Store multiple entries
+ rm.fileDescCache.Store("addr1:50051", "value1")
+ rm.fileDescCache.Store("addr2:50052", "value2")
+ rm.fileDescCache.Store("addr3:50053", "value3")
+
+ // Also set some method cache entries
+ rm.cache.Set("key1", nil)
+ rm.cache.Set("key2", nil)
+
+ // Clear all
+ rm.ClearCache()
+
+ // Verify file descriptor cache is cleared
+ count := 0
+ rm.fileDescCache.Range(func(_, _ any) bool {
+ count++
+ return true
+ })
+ if count != 0 {
+ t.Errorf("fileDescCache has %d entries after ClearCache, want
0", count)
+ }
+
+ // Verify method cache is cleared
+ if size := rm.cache.Size(); size != 0 {
+ t.Errorf("method cache has %d entries after ClearCache, want
0", size)
+ }
+}
+
+func TestReflectionManager_Close(t *testing.T) {
+ rm := NewReflectionManager(5 * time.Minute)
+
+ // Should not panic
+ rm.Close()
+
+ // Close again should also not panic
+ rm.Close()
+}
+
+func TestReflectionManager_GetCacheStats(t *testing.T) {
+ rm := NewReflectionManager(10 * time.Minute)
+ defer rm.Close()
+
+ // Initially empty
+ stats := rm.GetCacheStats()
+
+ if stats.MethodCacheSize != 0 {
+ t.Errorf("MethodCacheSize = %v, want 0", stats.MethodCacheSize)
+ }
+ if stats.FileRegistryCount != 0 {
+ t.Errorf("FileRegistryCount = %v, want 0",
stats.FileRegistryCount)
+ }
+ if stats.TTLSeconds != 600.0 {
+ t.Errorf("TTLSeconds = %v, want 600", stats.TTLSeconds)
+ }
+
+ // Add some entries
+ rm.fileDescCache.Store("addr1", "value1")
+ rm.fileDescCache.Store("addr2", "value2")
+ rm.cache.Set("method1", nil)
+
+ stats = rm.GetCacheStats()
+ if stats.MethodCacheSize != 1 {
+ t.Errorf("MethodCacheSize = %v, want 1", stats.MethodCacheSize)
+ }
+ if stats.FileRegistryCount != 2 {
+ t.Errorf("FileRegistryCount = %v, want 2",
stats.FileRegistryCount)
+ }
+}
+
+func TestReflectionManager_TopologicalSort(t *testing.T) {
+ rm := NewReflectionManager(5 * time.Minute)
+ defer rm.Close()
+
+ tests := []struct {
+ name string
+ fileDescs []*descriptorpb.FileDescriptorProto
+ wantOrder []string // Expected order (dependencies before
dependents)
+ }{
+ {
+ name: "single file no deps",
+ fileDescs: []*descriptorpb.FileDescriptorProto{
+ {Name: proto.String("a.proto")},
+ },
+ wantOrder: []string{"a.proto"},
+ },
+ {
+ name: "linear dependency chain",
+ fileDescs: []*descriptorpb.FileDescriptorProto{
+ {Name: proto.String("c.proto"), Dependency:
[]string{"b.proto"}},
+ {Name: proto.String("b.proto"), Dependency:
[]string{"a.proto"}},
+ {Name: proto.String("a.proto")},
+ },
+ wantOrder: []string{"a.proto", "b.proto", "c.proto"},
+ },
+ {
+ name: "diamond dependency",
+ fileDescs: []*descriptorpb.FileDescriptorProto{
+ {Name: proto.String("d.proto"), Dependency:
[]string{"b.proto", "c.proto"}},
+ {Name: proto.String("b.proto"), Dependency:
[]string{"a.proto"}},
+ {Name: proto.String("c.proto"), Dependency:
[]string{"a.proto"}},
+ {Name: proto.String("a.proto")},
+ },
+ // a must come first, b and c can be in any order, d
must come last
+ wantOrder: nil, // We'll check constraints instead
+ },
+ {
+ name: "external dependency (not in list)",
+ fileDescs: []*descriptorpb.FileDescriptorProto{
+ {Name: proto.String("b.proto"), Dependency:
[]string{"external.proto"}},
+ {Name: proto.String("a.proto")},
+ },
+ wantOrder: nil, // Just verify no error
+ },
+ {
+ name: "empty list",
+ fileDescs: []*descriptorpb.FileDescriptorProto{},
+ wantOrder: []string{},
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ result, err := rm.topologicalSort(tt.fileDescs)
+ if err != nil {
+ t.Fatalf("topologicalSort() error = %v", err)
+ }
+
+ if tt.wantOrder != nil {
+ // Exact order check
+ if len(result) != len(tt.wantOrder) {
+ t.Errorf("got %d files, want %d",
len(result), len(tt.wantOrder))
+ return
+ }
+ for i, fd := range result {
+ if fd.GetName() != tt.wantOrder[i] {
+ t.Errorf("position %d: got %s,
want %s", i, fd.GetName(), tt.wantOrder[i])
+ }
+ }
+ } else {
+ // Just verify length matches and no error
+ if len(result) != len(tt.fileDescs) {
+ t.Errorf("got %d files, want %d",
len(result), len(tt.fileDescs))
+ }
+ }
+ })
+ }
+}
+
+func TestReflectionManager_TopologicalSort_DependencyOrder(t *testing.T) {
+ rm := NewReflectionManager(5 * time.Minute)
+ defer rm.Close()
+
+ // Create a complex dependency graph
+ fileDescs := []*descriptorpb.FileDescriptorProto{
+ {Name: proto.String("leaf1.proto"), Dependency:
[]string{"mid.proto"}},
+ {Name: proto.String("leaf2.proto"), Dependency:
[]string{"mid.proto"}},
+ {Name: proto.String("mid.proto"), Dependency:
[]string{"root.proto"}},
+ {Name: proto.String("root.proto")},
+ }
+
+ result, err := rm.topologicalSort(fileDescs)
+ if err != nil {
+ t.Fatalf("topologicalSort() error = %v", err)
+ }
+
+ // Build position map
+ position := make(map[string]int)
+ for i, fd := range result {
+ position[fd.GetName()] = i
+ }
+
+ // Verify dependency constraints
+ // root should come before mid
+ if position["root.proto"] >= position["mid.proto"] {
+ t.Error("root.proto should come before mid.proto")
+ }
+ // mid should come before leaf1 and leaf2
+ if position["mid.proto"] >= position["leaf1.proto"] {
+ t.Error("mid.proto should come before leaf1.proto")
+ }
+ if position["mid.proto"] >= position["leaf2.proto"] {
+ t.Error("mid.proto should come before leaf2.proto")
+ }
+}
+
+func TestReflectionManager_BuildFileRegistry(t *testing.T) {
+ rm := NewReflectionManager(5 * time.Minute)
+ defer rm.Close()
+
+ // Create a simple proto file descriptor
+ fileDescs := []*descriptorpb.FileDescriptorProto{
+ {
+ Name: proto.String("test.proto"),
+ Package: proto.String("test"),
+ MessageType: []*descriptorpb.DescriptorProto{
+ {
+ Name: proto.String("TestMessage"),
+ Field:
[]*descriptorpb.FieldDescriptorProto{
+ {
+ Name:
proto.String("value"),
+ Number: proto.Int32(1),
+ Type:
descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum(),
+ Label:
descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum(),
+ },
+ },
+ },
+ },
+ },
+ }
+
+ registry, err := rm.buildFileRegistry(fileDescs)
+ if err != nil {
+ t.Fatalf("buildFileRegistry() error = %v", err)
+ }
+
+ if registry == nil {
+ t.Fatal("buildFileRegistry() returned nil registry")
+ }
+
+ // Verify the file was registered
+ fd, err := registry.FindFileByPath("test.proto")
+ if err != nil {
+ t.Fatalf("FindFileByPath() error = %v", err)
+ }
+
+ if fd.Package().Name() != "test" {
+ t.Errorf("package = %q, want %q", fd.Package().Name(), "test")
+ }
+}
+
+func TestReflectionManager_BuildFileRegistry_WithDependencies(t *testing.T) {
+ rm := NewReflectionManager(5 * time.Minute)
+ defer rm.Close()
+
+ // Create proto files with dependency
+ fileDescs := []*descriptorpb.FileDescriptorProto{
+ {
+ Name: proto.String("base.proto"),
+ Package: proto.String("base"),
+ MessageType: []*descriptorpb.DescriptorProto{
+ {
+ Name: proto.String("BaseMessage"),
+ Field:
[]*descriptorpb.FieldDescriptorProto{
+ {
+ Name:
proto.String("id"),
+ Number: proto.Int32(1),
+ Type:
descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum(),
+ Label:
descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum(),
+ },
+ },
+ },
+ },
+ },
+ {
+ Name: proto.String("derived.proto"),
+ Package: proto.String("derived"),
+ Dependency: []string{"base.proto"},
+ MessageType: []*descriptorpb.DescriptorProto{
+ {
+ Name: proto.String("DerivedMessage"),
+ Field:
[]*descriptorpb.FieldDescriptorProto{
+ {
+ Name:
proto.String("base"),
+ Number:
proto.Int32(1),
+ Type:
descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum(),
+ TypeName:
proto.String(".base.BaseMessage"),
+ Label:
descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum(),
+ },
+ },
+ },
+ },
+ },
+ }
+
+ registry, err := rm.buildFileRegistry(fileDescs)
+ if err != nil {
+ t.Fatalf("buildFileRegistry() error = %v", err)
+ }
+
+ // Verify both files were registered
+ _, err = registry.FindFileByPath("base.proto")
+ if err != nil {
+ t.Errorf("FindFileByPath(base.proto) error = %v", err)
+ }
+
+ _, err = registry.FindFileByPath("derived.proto")
+ if err != nil {
+ t.Errorf("FindFileByPath(derived.proto) error = %v", err)
+ }
+}
+
+func TestReflectionManager_BuildFileRegistry_Empty(t *testing.T) {
+ rm := NewReflectionManager(5 * time.Minute)
+ defer rm.Close()
+
+ registry, err :=
rm.buildFileRegistry([]*descriptorpb.FileDescriptorProto{})
+ if err != nil {
+ t.Fatalf("buildFileRegistry() error = %v", err)
+ }
+
+ if registry == nil {
+ t.Error("buildFileRegistry() returned nil for empty input")
+ }
+}
+
+func TestReflectionManager_BuildFileRegistry_WithService(t *testing.T) {
+ rm := NewReflectionManager(5 * time.Minute)
+ defer rm.Close()
+
+ fileDescs := []*descriptorpb.FileDescriptorProto{
+ {
+ Name: proto.String("service.proto"),
+ Package: proto.String("myservice"),
+ MessageType: []*descriptorpb.DescriptorProto{
+ {
+ Name: proto.String("Request"),
+ Field:
[]*descriptorpb.FieldDescriptorProto{
+ {
+ Name:
proto.String("query"),
+ Number: proto.Int32(1),
+ Type:
descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum(),
+ Label:
descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum(),
+ },
+ },
+ },
+ {
+ Name: proto.String("Response"),
+ Field:
[]*descriptorpb.FieldDescriptorProto{
+ {
+ Name:
proto.String("result"),
+ Number: proto.Int32(1),
+ Type:
descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum(),
+ Label:
descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum(),
+ },
+ },
+ },
+ },
+ Service: []*descriptorpb.ServiceDescriptorProto{
+ {
+ Name: proto.String("MyService"),
+ Method:
[]*descriptorpb.MethodDescriptorProto{
+ {
+ Name:
proto.String("Search"),
+ InputType:
proto.String(".myservice.Request"),
+ OutputType:
proto.String(".myservice.Response"),
+ },
+ },
+ },
+ },
+ },
+ }
+
+ registry, err := rm.buildFileRegistry(fileDescs)
+ if err != nil {
+ t.Fatalf("buildFileRegistry() error = %v", err)
+ }
+
+ // Find service descriptor
+ desc, err := registry.FindDescriptorByName("myservice.MyService")
+ if err != nil {
+ t.Fatalf("FindDescriptorByName() error = %v", err)
+ }
+
+ if desc.Name() != "MyService" {
+ t.Errorf("service name = %q, want %q", desc.Name(), "MyService")
+ }
+}