This is an automated email from the ASF dual-hosted git repository.

xuetaoli pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/dubbo-go-pixiu.git


The following commit(s) were added to refs/heads/develop by this push:
     new 5ac91629 feat(grpc-proxy): add gRPC Server Reflection support (#821) 
(#849)
5ac91629 is described below

commit 5ac916297109daabdad749c0f53530788e64bbef
Author: Tsukikage <[email protected]>
AuthorDate: Fri Jan 9 21:56:13 2026 +0800

    feat(grpc-proxy): add gRPC Server Reflection support (#821) (#849)
    
    Add gRPC reflection mode to enable content-aware proxying:
    - Support three modes: passthrough, reflection, hybrid
    - Add descriptor caching with TTL and LRU eviction
    - Add dynamic codec for message inspection
    - Fix unary call deadlock by detecting stream type
---
 docs/user/filter/grpcproxy.md                      | 511 +++++++++++++++
 docs/user/filter/grpcproxy_CN.md                   | 511 +++++++++++++++
 .../grpcproxy/filter/proxy/boundary_test.go        | 394 ++++++++++++
 .../grpcproxy/filter/proxy/descriptor_cache.go     | 301 +++++++++
 .../filter/proxy/descriptor_cache_test.go          | 170 +++++
 .../grpcproxy/filter/proxy/dynamic_codec.go        | 235 +++++++
 .../grpcproxy/filter/proxy/dynamic_codec_test.go   | 559 ++++++++++++++++
 .../grpcproxy/filter/proxy/grpc_proxy_filter.go    | 189 +++++-
 .../filter/proxy/grpc_proxy_filter_test.go         | 193 ++++++
 .../network/grpcproxy/filter/proxy/protocol.go     |  51 ++
 .../grpcproxy/filter/proxy/protocol_test.go        | 122 ++++
 .../grpcproxy/filter/proxy/reflection_manager.go   | 709 +++++++++++++++++++++
 .../filter/proxy/reflection_manager_test.go        | 463 ++++++++++++++
 13 files changed, 4376 insertions(+), 32 deletions(-)

diff --git a/docs/user/filter/grpcproxy.md b/docs/user/filter/grpcproxy.md
new file mode 100644
index 00000000..af13d4de
--- /dev/null
+++ b/docs/user/filter/grpcproxy.md
@@ -0,0 +1,511 @@
+# gRPC Proxy Filter (dgp.filter.grpc.proxy)
+
+English | [中文](grpcproxy_CN.md)
+
+---
+
+## Overview
+
+The `dgp.filter.grpc.proxy` filter provides gRPC proxy functionality for Pixiu 
gateway with support for **gRPC Server Reflection**. This feature enables 
dynamic message parsing and inspection at the gateway level without requiring 
pre-compiled proto files.
+
+### Key Features
+
+- **Three Reflection Modes**: Choose between performance and functionality 
based on your needs
+- **Dynamic Message Decoding**: Parse and inspect gRPC messages at runtime 
without proto files
+- **TTL-based Caching**: Efficient caching of method descriptors with 
automatic cleanup
+- **Protocol Detection**: Support for both gRPC and Triple protocol 
compatibility
+- **Graceful Fallback**: Hybrid mode provides automatic fallback to passthrough
+
+---
+
+## Reflection Modes
+
+### Passthrough Mode (Default)
+
+Performs transparent binary proxying without decoding messages.
+
+**Characteristics:**
+- Highest performance (no message parsing overhead)
+- No content inspection capabilities
+- Pure binary forwarding
+
+**Use Cases:**
+- High-throughput scenarios where message inspection is not needed
+- Simple routing based on service/method names only
+- When backward compatibility is critical
+
+**Configuration:**
+```yaml
+grpc_filters:
+  - name: dgp.filter.grpc.proxy
+    config:
+      reflection_mode: "passthrough"  # or omit (default)
+```
+
+### Reflection Mode
+
+Uses gRPC Server Reflection API to dynamically decode and inspect message 
contents.
+
+**Characteristics:**
+- Full message decoding at runtime
+- Enables content-based routing and filtering
+- Requires backend server to support reflection
+- Slight performance overhead due to reflection calls
+
+**Use Cases:**
+- Content-aware routing (route based on message fields)
+- Field-level filtering or transformation
+- Logging and debugging with full message inspection
+- API gateway scenarios requiring message validation
+
+**Configuration:**
+```yaml
+grpc_filters:
+  - name: dgp.filter.grpc.proxy
+    config:
+      reflection_mode: "reflection"
+      descriptor_cache_ttl: 300  # 5 minutes cache
+```
+
+### Hybrid Mode
+
+Tries reflection first, falls back to passthrough on failure.
+
+**Characteristics:**
+- Best of both worlds: content inspection when available
+- Graceful degradation for services without reflection
+- Reflection timeout prevents blocking
+
+**Use Cases:**
+- Mixed environments with varying reflection support
+- Migration scenarios (gradually enabling reflection)
+- Production environments requiring high availability
+
+**Configuration:**
+```yaml
+grpc_filters:
+  - name: dgp.filter.grpc.proxy
+    config:
+      reflection_mode: "hybrid"
+      reflection_timeout: 5s  # Max time to wait for reflection
+```
+
+---
+
+## Configuration
+
+### Complete Configuration Example
+
+```yaml
+static_resources:
+  listeners:
+    - name: "grpc-gateway"
+      protocol_type: "GRPC"
+      address:
+        socket_address:
+          address: "0.0.0.0"
+          port: 8882
+      filter_chains:
+        filters:
+          - name: dgp.filter.network.grpcconnectionmanager
+            config:
+              route_config:
+                routes:
+                  - match:
+                      prefix: "/echo.EchoService/"
+                    route:
+                      cluster: "echo-grpc"
+              grpc_filters:
+                - name: dgp.filter.grpc.proxy
+                  config:
+                    # Reflection mode (default: "passthrough")
+                    reflection_mode: "reflection"
+
+                    # Cache TTL for method descriptors (default: 300s)
+                    descriptor_cache_ttl: 300
+
+                    # Enable Triple protocol detection (default: false)
+                    enable_protocol_detection: true
+
+                    # Reflection timeout for hybrid mode (default: 5s)
+                    reflection_timeout: 5s
+
+                    # TLS configuration (optional)
+                    enable_tls: false
+                    tls_cert_file: ""
+                    tls_key_file: ""
+
+                    # Connection settings (optional)
+                    keepalive_time: 300s
+                    keepalive_timeout: 5s
+                    connect_timeout: 5s
+                    max_concurrent_streams: 0
+
+  clusters:
+    - name: "echo-grpc"
+      lb_policy: "RoundRobin"
+      endpoints:
+        - socket_address:
+            address: 127.0.0.1
+            port: 50051
+            protocol_type: "GRPC"
+```
+
+### Configuration Fields
+
+| Field | Type | Default | Description |
+|-------|------|---------|-------------|
+| `reflection_mode` | string | `"passthrough"` | Reflection mode: 
`"passthrough"`, `"reflection"`, or `"hybrid"` |
+| `descriptor_cache_ttl` | int | `300` | Cache TTL for method descriptors in 
seconds |
+| `enable_protocol_detection` | bool | `false` | Enable Triple protocol 
detection |
+| `reflection_timeout` | string | `"5s"` | Max time to wait for reflection in 
hybrid mode |
+| `enable_tls` | bool | `false` | Enable TLS for backend connections |
+| `tls_cert_file` | string | `""` | Path to TLS certificate file |
+| `tls_key_file` | string | `""` | Path to TLS key file |
+| `keepalive_time` | string | `"300s"` | Keepalive time for backend 
connections |
+| `keepalive_timeout` | string | `"5s"` | Keepalive timeout |
+| `connect_timeout` | string | `"5s"` | Connection timeout |
+| `max_concurrent_streams` | uint32 | `0` (unlimited) | Max concurrent streams 
|
+
+---
+
+## Enabling Server Reflection
+
+To use `reflection` or `hybrid` modes, your backend gRPC server must have 
Server Reflection enabled.
+
+### Go (gRPC-Go)
+
+```go
+import (
+    "google.golang.org/grpc"
+    "google.golang.org/grpc/reflection"
+)
+
+func main() {
+    server := grpc.NewServer()
+
+    // Register your service
+    echo.RegisterEchoServiceServer(server, &echoServer{})
+
+    // Enable server reflection (IMPORTANT!)
+    reflection.Register(server)
+
+    // Start server
+    lis, _ := net.Listen("tcp", ":50051")
+    server.Serve(lis)
+}
+```
+
+### Java (gRPC-Java)
+
+```java
+import io.grpc.Server;
+import io.grpc.ServerBuilder;
+import io.grpc.reflection.v1alpha.ServerReflectionGrpc;
+
+public class EchoServer {
+    public static void main(String[] args) throws Exception {
+        Server server = ServerBuilder
+            .forPort(50051)
+            .addService(new EchoServiceImpl())
+            // Enable server reflection
+            .addService(ServerReflectionGrpc.newInstance())
+            .build();
+
+        server.start();
+        server.awaitTermination();
+    }
+}
+```
+
+### Python (gRPC-Python)
+
+```python
+import grpc
+from grpc_reflection.v1alpha import reflection
+from concurrent import futures
+
+class EchoService(echo_pb2_grpc.EchoServiceServicer):
+    # ... implementation ...
+
+def serve():
+    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
+    echo_pb2_grpc.add_EchoServiceServicer_to_server(EchoService(), server)
+
+    # Enable server reflection
+    reflection.enable_server_reflection(
+        
service_names=[echo.DESCRIPTOR.services_by_name['EchoService'].full_name,
+                       reflection.SERVICE_NAME],
+        server=server
+    )
+
+    server.add_insecure_port('[::]:50051')
+    server.start()
+    server.wait_for_termination()
+```
+
+---
+
+## Mode Comparison
+
+| Feature | Passthrough | Reflection | Hybrid |
+|---------|-------------|------------|--------|
+| **Performance** | ⭐⭐⭐⭐⭐ (Best) | ⭐⭐⭐ (Good) | ⭐⭐⭐⭐ (Better) |
+| **Message Inspection** | ❌ No | ✅ Yes | ✅ Yes (when available) |
+| **Requires Reflection** | ❌ No | ✅ Yes | ❌ Optional |
+| **Fallback** | N/A | ❌ No | ✅ Yes |
+| **Use Case** | High-performance proxy | Content-aware routing | Mixed 
environments |
+
+---
+
+## Descriptor Cache
+
+The reflection mode uses a TTL-based cache to store method descriptors 
retrieved from the backend server. This reduces the number of reflection 
requests and improves performance.
+
+### Cache Behavior
+
+| Operation | Description |
+|-----------|-------------|
+| **Cache Hit** | Return cached descriptor immediately |
+| **Cache Miss** | Fetch descriptor via reflection API, cache it |
+| **Expiration** | Descriptors expire after TTL, re-fetched on next access |
+| **Eviction** | LRU eviction when cache is full |
+
+### Cache Configuration
+
+```yaml
+grpc_filters:
+  - name: dgp.filter.grpc.proxy
+    config:
+      reflection_mode: "reflection"
+      # Cache TTL in seconds (default: 300)
+      descriptor_cache_ttl: 300
+```
+
+**Recommended TTL Values:**
+- Development: `60` (1 minute) - frequent updates
+- Testing: `300` (5 minutes) - balanced
+- Production: `1800` (30 minutes) - stable services
+
+---
+
+## Triple Protocol Detection
+
+Pixiu supports the **Dubbo Triple protocol**, a gRPC-compatible protocol 
developed by the Apache Dubbo community.
+
+### What is Triple Protocol?
+
+Triple is a gRPC-compatible protocol that:
+- Uses HTTP/2 like gRPC
+- Supports Protobuf serialization
+- Adds Triple-specific metadata headers (`tri-*`)
+- Enables gRPC services to work with Dubbo ecosystem
+
+### Enabling Protocol Detection
+
+```yaml
+grpc_filters:
+  - name: dgp.filter.grpc.proxy
+    config:
+      reflection_mode: "reflection"
+      # Enable Triple protocol detection
+      enable_protocol_detection: true
+```
+
+**When enabled:**
+- Pixiu automatically detects Triple vs gRPC protocol
+- Extracts Triple-specific metadata (`tri-service-version`, `tri-group`, etc.)
+- Ensures compatibility with Dubbo Triple services
+
+---
+
+## Usage Examples
+
+### Example 1: Simple Passthrough Proxy
+
+```yaml
+static_resources:
+  listeners:
+    - name: "grpc-proxy"
+      protocol_type: "GRPC"
+      address:
+        socket_address:
+          address: "0.0.0.0"
+          port: 8882
+      filter_chains:
+        filters:
+          - name: dgp.filter.network.grpcconnectionmanager
+            config:
+              route_config:
+                routes:
+                  - match:
+                      prefix: "/"
+                    route:
+                      cluster: "backend-grpc"
+              grpc_filters:
+                - name: dgp.filter.grpc.proxy
+                  config: {}  # Default passthrough mode
+  clusters:
+    - name: "backend-grpc"
+      endpoints:
+        - socket_address:
+            address: backend.example.com
+            port: 50051
+```
+
+### Example 2: Content-Aware Routing with Reflection
+
+```yaml
+static_resources:
+  listeners:
+    - name: "grpc-gateway"
+      protocol_type: "GRPC"
+      address:
+        socket_address:
+          address: "0.0.0.0"
+          port: 8882
+      filter_chains:
+        filters:
+          - name: dgp.filter.network.grpcconnectionmanager
+            config:
+              route_config:
+                routes:
+                  - match:
+                      prefix: "/myapi/"
+                    route:
+                      cluster: "api-v1"
+              grpc_filters:
+                - name: dgp.filter.grpc.proxy
+                  config:
+                    reflection_mode: "reflection"
+                    descriptor_cache_ttl: 600
+```
+
+### Example 3: Hybrid Mode for Migration
+
+```yaml
+grpc_filters:
+  - name: dgp.filter.grpc.proxy
+    config:
+      # Try reflection first, fallback to passthrough
+      reflection_mode: "hybrid"
+      reflection_timeout: 3s
+      enable_protocol_detection: true
+```
+
+---
+
+## Performance Considerations
+
+### Mode Performance
+
+| Mode | Latency Impact | Throughput | Memory Usage |
+|------|----------------|------------|--------------|
+| Passthrough | Minimal | Highest | Lowest |
+| Reflection | +10-20% | High | Moderate (cache) |
+| Hybrid | +5-10% | Higher | Moderate |
+
+### Optimization Tips
+
+1. **Use Passthrough** when message inspection is not needed
+2. **Enable Caching** with appropriate TTL for reflection mode
+3. **Set Reasonable Timeout** for hybrid mode to prevent blocking
+4. **Monitor Cache Hit Ratio** to tune TTL values
+
+---
+
+## Troubleshooting
+
+### Problem: Reflection mode returns "service not found"
+
+**Cause**: Backend server does not have Server Reflection enabled.
+
+**Solution**: Enable reflection on your server:
+```go
+reflection.Register(grpcServer)
+```
+
+### Problem: Hybrid mode falls back to passthrough
+
+**Cause**: Reflection timeout exceeded or reflection service unavailable.
+
+**Solution**:
+1. Check if reflection is enabled on the backend
+2. Increase `reflection_timeout` value
+3. Check network connectivity to reflection service
+
+### Problem: High memory usage
+
+**Cause**: Descriptor cache too large or TTL too long.
+
+**Solution**:
+```yaml
+# Reduce cache TTL
+descriptor_cache_ttl: 60  # 1 minute instead of 5 minutes
+```
+
+### Problem: Triple services not working
+
+**Cause**: Protocol detection not enabled.
+
+**Solution**:
+```yaml
+enable_protocol_detection: true
+```
+
+---
+
+## Migration Guide
+
+### From Passthrough to Reflection
+
+**Step 1**: Enable reflection on backend servers
+**Step 2**: Update Pixiu configuration:
+```yaml
+# Before
+grpc_filters:
+  - name: dgp.filter.grpc.proxy
+    config: {}
+
+# After
+grpc_filters:
+  - name: dgp.filter.grpc.proxy
+    config:
+      reflection_mode: "reflection"
+```
+
+### From Legacy ForceCodec to Reflection
+
+**Old Configuration (Deprecated):**
+```yaml
+grpc_filters:
+  - name: dgp.filter.grpc.proxy
+    config:
+      force_codec: "passthrough"
+```
+
+**New Configuration (Recommended):**
+```yaml
+grpc_filters:
+  - name: dgp.filter.grpc.proxy
+    config:
+      reflection_mode: "passthrough"  # Same behavior, clearer name
+```
+
+---
+
+## Related Resources
+
+- [gRPC Server Reflection 
Protocol](https://github.com/grpc/grpc/blob/master/doc/server-reflection.md) - 
Official specification
+- [Issue #821](https://github.com/apache/dubbo-go-pixiu/issues/821) - Original 
feature request
+- [PR #849](https://github.com/apache/dubbo-go-pixiu/pull/849) - 
Implementation details
+- [Sample 
Code](https://github.com/apache/dubbo-go-pixiu-samples/tree/master/grpc/reflection)
 - Complete working examples
+
+---
+
+## Notes
+
+- **Filter Order**: gRPC Proxy filter should be the **only** filter in 
`grpc_filters` list
+- **Server Reflection**: Required for `reflection` mode, optional for `hybrid` 
mode
+- **Thread Safety**: Filter is thread-safe and supports high-concurrency 
scenarios
+- **Backward Compatibility**: `force_codec` field is deprecated; use 
`reflection_mode` instead
diff --git a/docs/user/filter/grpcproxy_CN.md b/docs/user/filter/grpcproxy_CN.md
new file mode 100644
index 00000000..e907896e
--- /dev/null
+++ b/docs/user/filter/grpcproxy_CN.md
@@ -0,0 +1,511 @@
+# gRPC 代理过滤器 (dgp.filter.grpc.proxy)
+
+[English](grpcproxy.md) | 中文
+
+---
+
+## 概述
+
+`dgp.filter.grpc.proxy` 过滤器为 Pixiu 网关提供 gRPC 代理功能,并支持 **gRPC Server 
Reflection**。该特性使得网关可以在不需要预编译 proto 文件的情况下,动态解析和检查消息内容。
+
+### 核心特性
+
+- **三种反射模式**: 根据需求在性能和功能之间选择
+- **动态消息解码**: 在运行时解析和检查 gRPC 消息,无需 proto 文件
+- **基于 TTL 的缓存**: 高效的方法描述符缓存,自动清理过期条目
+- **协议检测**: 同时支持 gRPC 和 Triple 协议
+- **优雅降级**: 混合模式提供自动回退到透传模式
+
+---
+
+## 反射模式
+
+### 透传模式(Passthrough,默认)
+
+执行透明的二进制代理,不解码消息。
+
+**特点:**
+- 最高性能(无消息解析开销)
+- 无法检查消息内容
+- 纯二进制转发
+
+**使用场景:**
+- 不需要消息检查的高吞吐场景
+- 仅基于服务/方法名称的简单路由
+- 对向后兼容性要求极高的场景
+
+**配置:**
+```yaml
+grpc_filters:
+  - name: dgp.filter.grpc.proxy
+    config:
+      reflection_mode: "passthrough"  # 或省略(默认值)
+```
+
+### 反射模式(Reflection)
+
+使用 gRPC Server Reflection API 动态解码和检查消息内容。
+
+**特点:**
+- 运行时完整消息解码
+- 支持基于内容的路由和过滤
+- 需要后端服务器支持反射
+- 由于反射调用,有轻微性能开销
+
+**使用场景:**
+- 基于内容的路由(根据消息字段路由)
+- 字段级别的过滤或转换
+- 完整消息检查的日志记录和调试
+- 需要消息验证的 API 网关场景
+
+**配置:**
+```yaml
+grpc_filters:
+  - name: dgp.filter.grpc.proxy
+    config:
+      reflection_mode: "reflection"
+      descriptor_cache_ttl: 300  # 5分钟缓存
+```
+
+### 混合模式(Hybrid)
+
+先尝试反射,失败时回退到透传模式。
+
+**特点:**
+- 两全其美:可用时进行内容检查
+- 对不支持反射的服务优雅降级
+- 反射超时防止阻塞
+
+**使用场景:**
+- 反射支持程度不一的混合环境
+- 迁移场景(逐步启用反射)
+- 需要高可用的生产环境
+
+**配置:**
+```yaml
+grpc_filters:
+  - name: dgp.filter.grpc.proxy
+    config:
+      reflection_mode: "hybrid"
+      reflection_timeout: 5s  # 等待反射的最大时间
+```
+
+---
+
+## 配置
+
+### 完整配置示例
+
+```yaml
+static_resources:
+  listeners:
+    - name: "grpc-gateway"
+      protocol_type: "GRPC"
+      address:
+        socket_address:
+          address: "0.0.0.0"
+          port: 8882
+      filter_chains:
+        filters:
+          - name: dgp.filter.network.grpcconnectionmanager
+            config:
+              route_config:
+                routes:
+                  - match:
+                      prefix: "/echo.EchoService/"
+                    route:
+                      cluster: "echo-grpc"
+              grpc_filters:
+                - name: dgp.filter.grpc.proxy
+                  config:
+                    # 反射模式(默认: "passthrough")
+                    reflection_mode: "reflection"
+
+                    # 方法描述符缓存 TTL(默认: 300秒)
+                    descriptor_cache_ttl: 300
+
+                    # 启用 Triple 协议检测(默认: false)
+                    enable_protocol_detection: true
+
+                    # 混合模式的反射超时(默认: 5秒)
+                    reflection_timeout: 5s
+
+                    # TLS 配置(可选)
+                    enable_tls: false
+                    tls_cert_file: ""
+                    tls_key_file: ""
+
+                    # 连接设置(可选)
+                    keepalive_time: 300s
+                    keepalive_timeout: 5s
+                    connect_timeout: 5s
+                    max_concurrent_streams: 0
+
+  clusters:
+    - name: "echo-grpc"
+      lb_policy: "RoundRobin"
+      endpoints:
+        - socket_address:
+            address: 127.0.0.1
+            port: 50051
+            protocol_type: "GRPC"
+```
+
+### 配置字段
+
+| 字段 | 类型 | 默认值 | 描述 |
+|------|------|--------|------|
+| `reflection_mode` | string | `"passthrough"` | 反射模式: 
`"passthrough"`、`"reflection"` 或 `"hybrid"` |
+| `descriptor_cache_ttl` | int | `300` | 方法描述符缓存 TTL(秒) |
+| `enable_protocol_detection` | bool | `false` | 启用 Triple 协议检测 |
+| `reflection_timeout` | string | `"5s"` | 混合模式下的反射超时时间 |
+| `enable_tls` | bool | `false` | 启用后端连接的 TLS |
+| `tls_cert_file` | string | `""` | TLS 证书文件路径 |
+| `tls_key_file` | string | `""` | TLS 密钥文件路径 |
+| `keepalive_time` | string | `"300s"` | 后端连接的保活时间 |
+| `keepalive_timeout` | string | `"5s"` | 保活超时时间 |
+| `connect_timeout` | string | `"5s"` | 连接超时时间 |
+| `max_concurrent_streams` | uint32 | `0`(无限制) | 最大并发流数 |
+
+---
+
+## 启用服务器反射
+
+要使用 `reflection` 或 `hybrid` 模式,你的后端 gRPC 服务器必须启用 Server Reflection。
+
+### Go (gRPC-Go)
+
+```go
+import (
+    "google.golang.org/grpc"
+    "google.golang.org/grpc/reflection"
+)
+
+func main() {
+    server := grpc.NewServer()
+
+    // 注册你的服务
+    echo.RegisterEchoServiceServer(server, &echoServer{})
+
+    // 启用服务器反射(重要!)
+    reflection.Register(server)
+
+    // 启动服务器
+    lis, _ := net.Listen("tcp", ":50051")
+    server.Serve(lis)
+}
+```
+
+### Java (gRPC-Java)
+
+```java
+import io.grpc.Server;
+import io.grpc.ServerBuilder;
+import io.grpc.reflection.v1alpha.ServerReflectionGrpc;
+
+public class EchoServer {
+    public static void main(String[] args) throws Exception {
+        Server server = ServerBuilder
+            .forPort(50051)
+            .addService(new EchoServiceImpl())
+            // 启用服务器反射
+            .addService(ServerReflectionGrpc.newInstance())
+            .build();
+
+        server.start();
+        server.awaitTermination();
+    }
+}
+```
+
+### Python (gRPC-Python)
+
+```python
+import grpc
+from grpc_reflection.v1alpha import reflection
+from concurrent import futures
+
+class EchoService(echo_pb2_grpc.EchoServiceServicer):
+    # ... 实现 ...
+
+def serve():
+    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
+    echo_pb2_grpc.add_EchoServiceServicer_to_server(EchoService(), server)
+
+    # 启用服务器反射
+    reflection.enable_server_reflection(
+        
service_names=[echo.DESCRIPTOR.services_by_name['EchoService'].full_name,
+                       reflection.SERVICE_NAME],
+        server=server
+    )
+
+    server.add_insecure_port('[::]:50051')
+    server.start()
+    server.wait_for_termination()
+```
+
+---
+
+## 模式对比
+
+| 特性 | 透传模式 | 反射模式 | 混合模式 |
+|------|---------|---------|---------|
+| **性能** | ⭐⭐⭐⭐⭐(最佳) | ⭐⭐⭐(良好) | ⭐⭐⭐⭐(更好) |
+| **消息检查** | ❌ 否 | ✅ 是 | ✅ 是(可用时) |
+| **需要反射** | ❌ 否 | ✅ 是 | ❌ 可选 |
+| **回退支持** | N/A | ❌ 否 | ✅ 是 |
+| **使用场景** | 高性能代理 | 基于内容的路由 | 混合环境 |
+
+---
+
+## 描述符缓存
+
+反射模式使用基于 TTL 的缓存来存储从后端服务器获取的方法描述符。这减少了反射请求的数量并提高了性能。
+
+### 缓存行为
+
+| 操作 | 描述 |
+|------|------|
+| **缓存命中** | 立即返回缓存的描述符 |
+| **缓存未命中** | 通过反射 API 获取描述符并缓存 |
+| **过期** | 描述符在 TTL 后过期,下次访问时重新获取 |
+| **驱逐** | 缓存满时进行 LRU 驱逐 |
+
+### 缓存配置
+
+```yaml
+grpc_filters:
+  - name: dgp.filter.grpc.proxy
+    config:
+      reflection_mode: "reflection"
+      # 缓存 TTL(秒),默认: 300
+      descriptor_cache_ttl: 300
+```
+
+**推荐的 TTL 值:**
+- 开发环境: `60`(1分钟)- 频繁更新
+- 测试环境: `300`(5分钟)- 平衡
+- 生产环境: `1800`(30分钟)- 稳定服务
+
+---
+
+## Triple 协议检测
+
+Pixiu 支持 **Dubbo Triple 协议**,这是 Apache Dubbo 社区开发的 gRPC 兼容协议。
+
+### 什么是 Triple 协议?
+
+Triple 是一个与 gRPC 兼容的协议,具有以下特点:
+- 像 gRPC 一样使用 HTTP/2
+- 支持 Protobuf 序列化
+- 添加 Triple 特定的元数据头(`tri-*`)
+- 使 gRPC 服务能与 Dubbo 生态系统协作
+
+### 启用协议检测
+
+```yaml
+grpc_filters:
+  - name: dgp.filter.grpc.proxy
+    config:
+      reflection_mode: "reflection"
+      # 启用 Triple 协议检测
+      enable_protocol_detection: true
+```
+
+**启用后:**
+- Pixiu 自动检测 Triple 或 gRPC 协议
+- 提取 Triple 特定的元数据(`tri-service-version`、`tri-group` 等)
+- 确保与 Dubbo Triple 服务的兼容性
+
+---
+
+## 使用示例
+
+### 示例 1: 简单透传代理
+
+```yaml
+static_resources:
+  listeners:
+    - name: "grpc-proxy"
+      protocol_type: "GRPC"
+      address:
+        socket_address:
+          address: "0.0.0.0"
+          port: 8882
+      filter_chains:
+        filters:
+          - name: dgp.filter.network.grpcconnectionmanager
+            config:
+              route_config:
+                routes:
+                  - match:
+                      prefix: "/"
+                    route:
+                      cluster: "backend-grpc"
+              grpc_filters:
+                - name: dgp.filter.grpc.proxy
+                  config: {}  # 默认透传模式
+  clusters:
+    - name: "backend-grpc"
+      endpoints:
+        - socket_address:
+            address: backend.example.com
+            port: 50051
+```
+
+### 示例 2: 使用反射进行基于内容的路由
+
+```yaml
+static_resources:
+  listeners:
+    - name: "grpc-gateway"
+      protocol_type: "GRPC"
+      address:
+        socket_address:
+          address: "0.0.0.0"
+          port: 8882
+      filter_chains:
+        filters:
+          - name: dgp.filter.network.grpcconnectionmanager
+            config:
+              route_config:
+                routes:
+                  - match:
+                      prefix: "/myapi/"
+                    route:
+                      cluster: "api-v1"
+              grpc_filters:
+                - name: dgp.filter.grpc.proxy
+                  config:
+                    reflection_mode: "reflection"
+                    descriptor_cache_ttl: 600
+```
+
+### 示例 3: 迁移场景的混合模式
+
+```yaml
+grpc_filters:
+  - name: dgp.filter.grpc.proxy
+    config:
+      # 先尝试反射,失败时回退到透传
+      reflection_mode: "hybrid"
+      reflection_timeout: 3s
+      enable_protocol_detection: true
+```
+
+---
+
+## 性能考虑
+
+### 模式性能
+
+| 模式 | 延迟影响 | 吞吐量 | 内存使用 |
+|------|---------|--------|---------|
+| 透传模式 | 最小 | 最高 | 最低 |
+| 反射模式 | +10-20% | 高 | 中等(缓存) |
+| 混合模式 | +5-10% | 较高 | 中等 |
+
+### 优化建议
+
+1. **使用透传模式** 当不需要消息检查时
+2. **启用缓存** 并为反射模式设置合适的 TTL
+3. **设置合理的超时** 防止混合模式阻塞
+4. **监控缓存命中率** 以调整 TTL 值
+
+---
+
+## 故障排除
+
+### 问题: 反射模式返回 "service not found"
+
+**原因**: 后端服务器未启用 Server Reflection。
+
+**解决方案**: 在服务器上启用反射:
+```go
+reflection.Register(grpcServer)
+```
+
+### 问题: 混合模式回退到透传模式
+
+**原因**: 反射超时或反射服务不可用。
+
+**解决方案**:
+1. 检查后端是否启用了反射
+2. 增加 `reflection_timeout` 值
+3. 检查到反射服务的网络连接
+
+### 问题: 内存使用过高
+
+**原因**: 描述符缓存过大或 TTL 过长。
+
+**解决方案**:
+```yaml
+# 减少缓存 TTL
+descriptor_cache_ttl: 60  # 1分钟而不是5分钟
+```
+
+### 问题: Triple 服务不工作
+
+**原因**: 未启用协议检测。
+
+**解决方案**:
+```yaml
+enable_protocol_detection: true
+```
+
+---
+
+## 迁移指南
+
+### 从透传模式迁移到反射模式
+
+**步骤 1**: 在后端服务器上启用反射
+**步骤 2**: 更新 Pixiu 配置:
+```yaml
+# 之前
+grpc_filters:
+  - name: dgp.filter.grpc.proxy
+    config: {}
+
+# 之后
+grpc_filters:
+  - name: dgp.filter.grpc.proxy
+    config:
+      reflection_mode: "reflection"
+```
+
+### 从旧的 ForceCodec 迁移到反射模式
+
+**旧配置(已弃用):**
+```yaml
+grpc_filters:
+  - name: dgp.filter.grpc.proxy
+    config:
+      force_codec: "passthrough"
+```
+
+**新配置(推荐):**
+```yaml
+grpc_filters:
+  - name: dgp.filter.grpc.proxy
+    config:
+      reflection_mode: "passthrough"  # 行为相同,名称更清晰
+```
+
+---
+
+## 相关资源
+
+- [gRPC Server Reflection 
Protocol](https://github.com/grpc/grpc/blob/master/doc/server-reflection.md) - 
官方规范
+- [Issue #821](https://github.com/apache/dubbo-go-pixiu/issues/821) - 原始功能请求
+- [PR #849](https://github.com/apache/dubbo-go-pixiu/pull/849) - 实现详情
+- 
[示例代码](https://github.com/apache/dubbo-go-pixiu-samples/tree/master/grpc/reflection)
 - 完整的示例代码
+
+---
+
+## 注意事项
+
+- **过滤器顺序**: gRPC Proxy 过滤器应该是 `grpc_filters` 列表中的**唯一**过滤器
+- **服务器反射**: `reflection` 模式必需,`hybrid` 模式可选
+- **线程安全**: 过滤器是线程安全的,支持高并发场景
+- **向后兼容**: `force_codec` 字段已弃用;请使用 `reflection_mode` 替代
diff --git a/pkg/filter/network/grpcproxy/filter/proxy/boundary_test.go 
b/pkg/filter/network/grpcproxy/filter/proxy/boundary_test.go
new file mode 100644
index 00000000..d604940d
--- /dev/null
+++ b/pkg/filter/network/grpcproxy/filter/proxy/boundary_test.go
@@ -0,0 +1,394 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package proxy
+
+import (
+       "testing"
+       "time"
+)
+
+import (
+       "go.uber.org/zap"
+
+       "google.golang.org/protobuf/types/descriptorpb"
+)
+
+import (
+       "github.com/apache/dubbo-go-pixiu/pkg/logger"
+)
+
+// TestCacheStats tests cache statistics tracking
+func TestCacheStats(t *testing.T) {
+       cache := NewDescriptorCache(5 * time.Minute)
+
+       // Initially no hits or misses
+       stats := cache.GetStats()
+       if stats.Hits != 0 || stats.Misses != 0 {
+               t.Errorf("Expected zero stats, got hits=%d, misses=%d", 
stats.Hits, stats.Misses)
+       }
+
+       // Generate some misses
+       cache.Get("nonexistent")
+       cache.Get("also-nonexistent")
+
+       stats = cache.GetStats()
+       if stats.Misses != 2 {
+               t.Errorf("Expected 2 misses, got %d", stats.Misses)
+       }
+
+       // Check max cache size
+       if stats.MaxSize != defaultMaxCacheSize {
+               t.Errorf("Expected max_cache_size %d, got %d",
+                       defaultMaxCacheSize, stats.MaxSize)
+       }
+
+       // Check TTL
+       if stats.TTL.Seconds() != 300 {
+               t.Errorf("Expected TTL 300s, got %v", stats.TTL.Seconds())
+       }
+
+       // Reset stats
+       cache.ResetStats()
+       stats = cache.GetStats()
+       if stats.Hits != 0 || stats.Misses != 0 {
+               t.Errorf("Expected stats to be reset, got hits=%d, misses=%d", 
stats.Hits, stats.Misses)
+       }
+}
+
+// TestDynamicCacheSizeAdjustment tests dynamic cache size adjustment
+func TestDynamicCacheSizeAdjustment(t *testing.T) {
+       cache := NewDescriptorCache(5 * time.Minute)
+
+       // Setting below minimum should adjust to minimum
+       cache.SetMaxSize(10)
+       stats := cache.GetStats()
+       if stats.MaxSize != minCacheSize {
+               t.Errorf("Expected MaxSize %d (minCacheSize), got %d", 
minCacheSize, stats.MaxSize)
+       }
+
+       // Setting above minimum should work
+       cache.SetMaxSize(500)
+       stats = cache.GetStats()
+       if stats.MaxSize != 500 {
+               t.Errorf("Expected MaxSize 500, got %d", stats.MaxSize)
+       }
+}
+
+// TestInvalidateByVersion tests version-based cache invalidation API
+func TestInvalidateByVersion(t *testing.T) {
+       cache := NewDescriptorCache(5 * time.Minute)
+
+       // Test with non-existent version hash
+       count := cache.InvalidateByVersion("non-existent-version")
+       if count != 0 {
+               t.Errorf("Expected 0 invalidations for non-existent version, 
got %d", count)
+       }
+}
+
+// TestReflectionManagerConfig tests configuration updates
+func TestReflectionManagerConfig(t *testing.T) {
+       config := ReflectionConfig{
+               CacheTTL:          10 * time.Minute,
+               MaxCacheSize:      500,
+               ReflectionVersion: ReflectionV1Alpha,
+               ContinueOnError:   true,
+       }
+
+       rm := NewReflectionManagerWithConfig(config)
+
+       // Verify initial config
+       initialConfig := rm.GetConfig()
+       if initialConfig.MaxCacheSize != 500 {
+               t.Errorf("Expected MaxCacheSize 500, got %d", 
initialConfig.MaxCacheSize)
+       }
+
+       // Update config
+       newConfig := ReflectionConfig{
+               MaxCacheSize:    1000,
+               ContinueOnError: false,
+       }
+       rm.SetConfig(newConfig)
+
+       updatedConfig := rm.GetConfig()
+       if updatedConfig.MaxCacheSize != 1000 {
+               t.Errorf("Expected MaxCacheSize 1000, got %d", 
updatedConfig.MaxCacheSize)
+       }
+       if updatedConfig.ContinueOnError != false {
+               t.Error("Expected ContinueOnError to be false")
+       }
+}
+
+// TestReflectionManagerCacheStats tests enhanced cache statistics
+func TestReflectionManagerCacheStats(t *testing.T) {
+       rm := NewReflectionManager(5 * time.Minute)
+
+       stats := rm.GetCacheStats()
+
+       // Verify stats struct has expected values
+       if stats.MaxCacheSize != defaultMaxCacheSize {
+               t.Errorf("Expected MaxCacheSize %d, got %d",
+                       defaultMaxCacheSize, stats.MaxCacheSize)
+       }
+
+       if stats.TTLSeconds != 300 {
+               t.Errorf("Expected TTLSeconds 300, got %v", stats.TTLSeconds)
+       }
+
+       // Initial stats should be zero
+       if stats.MethodCacheHits != 0 {
+               t.Errorf("Expected MethodCacheHits 0, got %d", 
stats.MethodCacheHits)
+       }
+
+       if stats.FileRegistryCount != 0 {
+               t.Errorf("Expected FileRegistryCount 0, got %d", 
stats.FileRegistryCount)
+       }
+}
+
+// TestGetMissingDependencies tests missing dependency tracking
+func TestGetMissingDependencies(t *testing.T) {
+       rm := NewReflectionManagerWithConfig(ReflectionConfig{
+               ContinueOnError: true,
+       })
+
+       // Initially no missing dependencies
+       missing := rm.GetMissingDependencies("test-address")
+       if missing != nil {
+               t.Errorf("Expected nil missing dependencies, got %v", missing)
+       }
+
+       // Get all missing dependencies should return empty map
+       allMissing := rm.GetAllMissingDependencies()
+       if len(allMissing) != 0 {
+               t.Errorf("Expected empty map, got %d entries", len(allMissing))
+       }
+}
+
+// TestReflectionManagerContinueOnError tests the ContinueOnError behavior
+func TestReflectionManagerContinueOnError(t *testing.T) {
+       tests := []struct {
+               name            string
+               continueOnError bool
+       }{
+               {
+                       name:            "continue on error enabled",
+                       continueOnError: true,
+               },
+               {
+                       name:            "continue on error disabled",
+                       continueOnError: false,
+               },
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       config := ReflectionConfig{
+                               CacheTTL:          5 * time.Minute,
+                               ContinueOnError:   tt.continueOnError,
+                               ReflectionVersion: ReflectionV1Alpha,
+                       }
+
+                       rm := NewReflectionManagerWithConfig(config)
+
+                       if rm.GetConfig().ContinueOnError != tt.continueOnError 
{
+                               t.Errorf("Expected ContinueOnError %v, got %v",
+                                       tt.continueOnError, 
rm.GetConfig().ContinueOnError)
+                       }
+               })
+       }
+}
+
+// TestReflectionManagerInvalidationCache tests cache invalidation
+func TestReflectionManagerInvalidationCache(t *testing.T) {
+       rm := NewReflectionManager(5 * time.Minute)
+
+       // Invalidate non-existent address should not panic
+       rm.InvalidateCache("non-existent-address")
+
+       // Clear cache should not panic
+       rm.ClearCache()
+
+       // Close should not panic
+       rm.Close()
+}
+
+// TestTopologicalSortCircularDependency tests handling of circular 
dependencies
+func TestTopologicalSortCircularDependency(t *testing.T) {
+       rm := NewReflectionManager(5 * time.Minute)
+
+       // Create circular dependency: A depends on B, B depends on A
+       fileDescs := []*descriptorpb.FileDescriptorProto{
+               {
+                       Name:       strPtr("a.proto"),
+                       Dependency: []string{"b.proto"},
+                       Package:    strPtr("test"),
+               },
+               {
+                       Name:       strPtr("b.proto"),
+                       Dependency: []string{"a.proto"},
+                       Package:    strPtr("test"),
+               },
+       }
+
+       // Should handle circular dependency gracefully (return all files)
+       result, err := rm.topologicalSort(fileDescs)
+       if err != nil {
+               t.Errorf("Topological sort should handle circular dependencies, 
got error: %v", err)
+       }
+
+       if len(result) != 2 {
+               t.Errorf("Expected 2 files in result, got %d", len(result))
+       }
+}
+
+// TestTopologicalSortEmptyList tests empty file descriptor list
+func TestTopologicalSortEmptyList(t *testing.T) {
+       rm := NewReflectionManager(5 * time.Minute)
+
+       result, err := rm.topologicalSort([]*descriptorpb.FileDescriptorProto{})
+       if err != nil {
+               t.Errorf("Expected no error for empty list, got: %v", err)
+       }
+
+       if len(result) != 0 {
+               t.Errorf("Expected empty result, got %d files", len(result))
+       }
+}
+
+// TestTopologicalSortWithExternalDependencies tests handling of external 
dependencies
+func TestTopologicalSortWithExternalDependencies(t *testing.T) {
+       rm := NewReflectionManager(5 * time.Minute)
+
+       fileDescs := []*descriptorpb.FileDescriptorProto{
+               {
+                       Name:       strPtr("a.proto"),
+                       Dependency: []string{"google/protobuf/empty.proto"},
+                       Package:    strPtr("test"),
+               },
+       }
+
+       // Should handle external dependency gracefully
+       result, err := rm.topologicalSort(fileDescs)
+       if err != nil {
+               t.Errorf("Expected no error with external dependency, got: %v", 
err)
+       }
+
+       if len(result) != 1 {
+               t.Errorf("Expected 1 file in result, got %d", len(result))
+       }
+}
+
+// TestReflectionVersionConstants tests reflection version constants
+func TestReflectionVersionConstants(t *testing.T) {
+       tests := []struct {
+               name    string
+               version ReflectionVersion
+               valid   bool
+       }{
+               {"v1alpha", ReflectionV1Alpha, true},
+               {"v1", ReflectionV1, true},
+               {"auto", ReflectionAuto, true},
+               {"invalid", ReflectionVersion("invalid"), true}, // Any string 
is valid
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       if tt.version == "" {
+                               t.Error("Reflection version should not be 
empty")
+                       }
+               })
+       }
+}
+
+// TestNewReflectionManagerWithInvalidConfig tests configuration validation
+func TestNewReflectionManagerWithInvalidConfig(t *testing.T) {
+       tests := []struct {
+               name            string
+               config          ReflectionConfig
+               expectCacheSize int
+               expectTTL       time.Duration
+       }{
+               {
+                       name: "zero TTL uses default",
+                       config: ReflectionConfig{
+                               CacheTTL:        0,
+                               MaxCacheSize:    100,
+                               ContinueOnError: true,
+                       },
+                       expectCacheSize: 100,
+                       expectTTL:       defaultDescCacheTTL,
+               },
+               {
+                       name: "negative TTL uses default",
+                       config: ReflectionConfig{
+                               CacheTTL:        -1,
+                               MaxCacheSize:    100,
+                               ContinueOnError: true,
+                       },
+                       expectCacheSize: 100,
+                       expectTTL:       defaultDescCacheTTL,
+               },
+               {
+                       name: "cache size below minimum",
+                       config: ReflectionConfig{
+                               CacheTTL:        5 * time.Minute,
+                               MaxCacheSize:    10,
+                               ContinueOnError: true,
+                       },
+                       expectCacheSize: minCacheSize,
+                       expectTTL:       5 * time.Minute,
+               },
+               {
+                       name: "empty reflection version",
+                       config: ReflectionConfig{
+                               CacheTTL:          5 * time.Minute,
+                               MaxCacheSize:      100,
+                               ReflectionVersion: "",
+                               ContinueOnError:   true,
+                       },
+                       expectCacheSize: 100,
+                       expectTTL:       5 * time.Minute,
+               },
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       rm := NewReflectionManagerWithConfig(tt.config)
+
+                       config := rm.GetConfig()
+                       if config.MaxCacheSize != tt.expectCacheSize {
+                               t.Errorf("Expected cache size %d, got %d", 
tt.expectCacheSize, config.MaxCacheSize)
+                       }
+
+                       if config.CacheTTL != tt.expectTTL {
+                               t.Errorf("Expected TTL %v, got %v", 
tt.expectTTL, config.CacheTTL)
+                       }
+               })
+       }
+}
+
+// Helper function for string pointers
+func strPtr(s string) *string {
+       return &s
+}
+
+// Init logger for tests
+func init() {
+       cfg := zap.NewDevelopmentConfig()
+       cfg.Level = zap.NewAtomicLevelAt(zap.InfoLevel)
+       logger.InitLogger(&cfg)
+}
diff --git a/pkg/filter/network/grpcproxy/filter/proxy/descriptor_cache.go 
b/pkg/filter/network/grpcproxy/filter/proxy/descriptor_cache.go
new file mode 100644
index 00000000..9b937a09
--- /dev/null
+++ b/pkg/filter/network/grpcproxy/filter/proxy/descriptor_cache.go
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package proxy
+
+import (
+       "sync"
+       "sync/atomic"
+       "time"
+)
+
+import (
+       "github.com/dubbogo/gost/container/gxlru"
+
+       "google.golang.org/protobuf/reflect/protoreflect"
+)
+
+import (
+       "github.com/apache/dubbo-go-pixiu/pkg/logger"
+)
+
+// Constants for cache configuration
+const (
+       defaultMaxCacheSize = 1000 // Maximum number of cached descriptors
+       minCacheSize        = 100  // Minimum cache size to prevent thrashing
+       // Logger prefix for descriptor cache operations
+       loggerPrefix = "[grpc-proxy-descriptor-cache] "
+)
+
+// cacheEntry represents a cached method descriptor with metadata
+// Implements gxlru.Value interface (Size always returns 1 for counting)
+type cacheEntry struct {
+       key         string
+       descriptor  protoreflect.MethodDescriptor
+       expireAt    time.Time
+       versionHash string // Hash of the descriptor for version detection
+}
+
+// Size implements gxlru.Value interface - returns 1 for per-entry counting
+func (e *cacheEntry) Size() int {
+       return 1
+}
+
+// CacheStats holds cache statistics for monitoring (returned by GetStats)
+type CacheStats struct {
+       Hits      int64
+       Misses    int64
+       Evictions int64
+       Size      int
+       MaxSize   int
+       TTL       time.Duration
+       HitRatio  float64
+}
+
+// DescriptorCache provides TTL-based caching with LRU eviction for gRPC 
method descriptors
+type DescriptorCache struct {
+       lru       *gxlru.LRUCache // LRU cache from gost
+       entries   sync.Map        // key -> *cacheEntry (for TTL and version 
lookup)
+       ttl       time.Duration
+       stopCh    chan struct{}
+       closeOnce sync.Once
+       maxSize   atomic.Int32 // Maximum cache size (atomic for lock-free 
reads)
+       // Atomic statistics for lock-free updates
+       hits      atomic.Int64
+       misses    atomic.Int64
+       evictions atomic.Int64
+}
+
+// NewDescriptorCache creates a new descriptor cache with the specified TTL
+func NewDescriptorCache(ttl time.Duration) *DescriptorCache {
+       return NewDescriptorCacheWithSize(ttl, defaultMaxCacheSize)
+}
+
+// NewDescriptorCacheWithSize creates a new descriptor cache with custom size 
limit
+func NewDescriptorCacheWithSize(ttl time.Duration, maxSize int) 
*DescriptorCache {
+       if maxSize < minCacheSize {
+               maxSize = minCacheSize
+               logger.Infof("%sCache size adjusted to minimum: %d", 
loggerPrefix, maxSize)
+       }
+
+       cache := &DescriptorCache{
+               lru:    gxlru.NewLRUCache(int64(maxSize)),
+               ttl:    ttl,
+               stopCh: make(chan struct{}),
+       }
+       cache.maxSize.Store(int32(maxSize))
+
+       go cache.cleanupLoop()
+       return cache
+}
+
+// Get retrieves a method descriptor from cache
+// Returns nil if not found or expired
+func (c *DescriptorCache) Get(key string) protoreflect.MethodDescriptor {
+       entry, ok := c.entries.Load(key)
+       if !ok {
+               c.misses.Add(1)
+               return nil
+       }
+
+       e := entry.(*cacheEntry)
+       if time.Now().Before(e.expireAt) {
+               // Valid cache hit
+               c.hits.Add(1)
+               return e.descriptor
+       }
+
+       // Entry expired, delete it
+       c.entries.Delete(key)
+       c.lru.Delete(key)
+       c.misses.Add(1)
+       return nil
+}
+
+// GetWithVersion retrieves a method descriptor and returns version mismatch 
status
+// Returns (descriptor, true) if found and version matches
+// Returns (descriptor, false) if found but version differs (stale)
+// Returns (nil, false) if not found or expired
+func (c *DescriptorCache) GetWithVersion(key string, versionHash string) 
(protoreflect.MethodDescriptor, bool) {
+       entry, ok := c.entries.Load(key)
+       if !ok {
+               c.misses.Add(1)
+               return nil, false
+       }
+
+       e := entry.(*cacheEntry)
+       if time.Now().After(e.expireAt) {
+               // Entry expired
+               c.entries.Delete(key)
+               c.lru.Delete(key)
+               c.misses.Add(1)
+               return nil, false
+       }
+
+       // Check version hash
+       if e.versionHash != "" && versionHash != "" && e.versionHash != 
versionHash {
+               // Version mismatch - stale cache
+               logger.Debugf("%sCache version mismatch for %s: cached=%s, 
current=%s",
+                       loggerPrefix, key, e.versionHash, versionHash)
+               c.entries.Delete(key)
+               c.lru.Delete(key)
+               c.misses.Add(1)
+               return nil, false
+       }
+
+       // Valid cache hit
+       c.hits.Add(1)
+       return e.descriptor, true
+}
+
+// Set stores a method descriptor in the cache with optional version hash
+func (c *DescriptorCache) Set(key string, descriptor 
protoreflect.MethodDescriptor) {
+       c.SetWithVersion(key, descriptor, "")
+}
+
+// SetWithVersion stores a method descriptor with version hash for change 
detection
+func (c *DescriptorCache) SetWithVersion(key string, descriptor 
protoreflect.MethodDescriptor, versionHash string) {
+       newEntry := &cacheEntry{
+               key:         key,
+               descriptor:  descriptor,
+               expireAt:    time.Now().Add(c.ttl),
+               versionHash: versionHash,
+       }
+
+       // Add to LRU cache (handles eviction automatically)
+       c.lru.Set(key, newEntry)
+
+       // Store in entries map for TTL/version lookup (possibly replacing old 
entry)
+       c.entries.Store(key, newEntry)
+}
+
+// Delete removes a specific entry from the cache
+func (c *DescriptorCache) Delete(key string) {
+       c.entries.Delete(key)
+       c.lru.Delete(key)
+}
+
+// InvalidateByVersion removes all entries with a specific version hash
+// Useful when service definitions are updated
+func (c *DescriptorCache) InvalidateByVersion(versionHash string) int {
+       count := 0
+       c.entries.Range(func(key, value any) bool {
+               entry := value.(*cacheEntry)
+               if entry.versionHash == versionHash {
+                       c.entries.Delete(key)
+                       c.lru.Delete(key.(string))
+                       count++
+               }
+               return true
+       })
+       if count > 0 {
+               logger.Infof("%sInvalidated %d cache entries with version hash: 
%s", loggerPrefix, count, versionHash)
+       }
+       return count
+}
+
+// Clear removes all entries from the cache
+func (c *DescriptorCache) Clear() {
+       c.entries.Range(func(key, value any) bool {
+               c.entries.Delete(key)
+               c.lru.Delete(key.(string))
+               return true
+       })
+       c.lru.Clear()
+}
+
+// Size returns the current number of entries in the cache
+func (c *DescriptorCache) Size() int {
+       return int(c.lru.Length())
+}
+
+// GetStats returns current cache statistics
+func (c *DescriptorCache) GetStats() CacheStats {
+       hits := c.hits.Load()
+       misses := c.misses.Load()
+       total := hits + misses
+
+       var hitRatio float64
+       if total > 0 {
+               hitRatio = float64(hits) / float64(total)
+       }
+
+       // Use gost's eviction count + our own tracking
+       evictions := c.evictions.Load() + c.lru.Evictions()
+
+       return CacheStats{
+               Hits:      hits,
+               Misses:    misses,
+               Evictions: evictions,
+               Size:      int(c.lru.Length()),
+               MaxSize:   int(c.maxSize.Load()),
+               TTL:       c.ttl,
+               HitRatio:  hitRatio,
+       }
+}
+
+// ResetStats clears cache statistics (atomic reset)
+func (c *DescriptorCache) ResetStats() {
+       c.hits.Store(0)
+       c.misses.Store(0)
+       c.evictions.Store(0)
+       // Note: gost's internal eviction counter cannot be reset
+}
+
+// SetMaxSize dynamically adjusts the maximum cache size
+func (c *DescriptorCache) SetMaxSize(size int) {
+       if size < minCacheSize {
+               size = minCacheSize
+       }
+       c.maxSize.Store(int32(size))
+       c.lru.SetCapacity(int64(size))
+}
+
+// cleanupLoop periodically removes expired entries
+func (c *DescriptorCache) cleanupLoop() {
+       ticker := time.NewTicker(c.ttl / 2)
+       defer ticker.Stop()
+
+       for {
+               select {
+               case <-ticker.C:
+                       now := time.Now()
+                       c.entries.Range(func(key, value any) bool {
+                               entry := value.(*cacheEntry)
+                               if now.After(entry.expireAt) {
+                                       c.entries.Delete(key)
+                                       c.lru.Delete(key.(string))
+                               }
+                               return true
+                       })
+               case <-c.stopCh:
+                       return
+               }
+       }
+}
+
+// Close stops the cleanup goroutine (safe to call multiple times)
+func (c *DescriptorCache) Close() {
+       c.closeOnce.Do(func() {
+               close(c.stopCh)
+       })
+}
+
+// BuildCacheKey creates a cache key from service and method names
+func BuildCacheKey(address, serviceName, methodName string) string {
+       return address + "/" + serviceName + "/" + methodName
+}
diff --git a/pkg/filter/network/grpcproxy/filter/proxy/descriptor_cache_test.go 
b/pkg/filter/network/grpcproxy/filter/proxy/descriptor_cache_test.go
new file mode 100644
index 00000000..eb9adc4d
--- /dev/null
+++ b/pkg/filter/network/grpcproxy/filter/proxy/descriptor_cache_test.go
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package proxy
+
+import (
+       "testing"
+       "time"
+)
+
+func TestDescriptorCache_SetAndGet(t *testing.T) {
+       cache := NewDescriptorCache(1 * time.Minute)
+       defer cache.Close()
+
+       key := "test/service/method"
+
+       // Initially should be nil
+       if got := cache.Get(key); got != nil {
+               t.Errorf("Get() on empty cache = %v, want nil", got)
+       }
+
+       // Set a nil value (valid for testing)
+       cache.Set(key, nil)
+
+       // Should be able to get it back (even if nil)
+       // Note: Our implementation stores the entry but returns nil descriptor
+       // This is acceptable behavior as it caches the "not found" state
+}
+
+func TestDescriptorCache_Expiration(t *testing.T) {
+       // Use very short TTL for testing
+       ttl := 100 * time.Millisecond
+       cache := NewDescriptorCache(ttl)
+       defer cache.Close()
+
+       key := "test/service/method"
+       cache.Set(key, nil)
+
+       // Should exist before expiration
+       // Wait a bit less than TTL
+       time.Sleep(50 * time.Millisecond)
+
+       // Entry should still be valid (we can't test the descriptor value 
easily
+       // without a real MethodDescriptor, but we can verify the cache 
mechanics)
+
+       // Wait for expiration
+       time.Sleep(100 * time.Millisecond)
+
+       // After expiration, Get should return nil
+       if got := cache.Get(key); got != nil {
+               t.Errorf("Get() after expiration = %v, want nil", got)
+       }
+}
+
+func TestDescriptorCache_Delete(t *testing.T) {
+       cache := NewDescriptorCache(1 * time.Minute)
+       defer cache.Close()
+
+       key := "test/service/method"
+       cache.Set(key, nil)
+
+       // Delete the entry
+       cache.Delete(key)
+
+       // Should be nil after deletion
+       if got := cache.Get(key); got != nil {
+               t.Errorf("Get() after Delete() = %v, want nil", got)
+       }
+}
+
+func TestDescriptorCache_Clear(t *testing.T) {
+       cache := NewDescriptorCache(1 * time.Minute)
+       defer cache.Close()
+
+       // Add multiple entries
+       keys := []string{"key1", "key2", "key3"}
+       for _, k := range keys {
+               cache.Set(k, nil)
+       }
+
+       // Verify size
+       if size := cache.Size(); size != 3 {
+               t.Errorf("Size() = %d, want 3", size)
+       }
+
+       // Clear all
+       cache.Clear()
+
+       // Verify all cleared
+       if size := cache.Size(); size != 0 {
+               t.Errorf("Size() after Clear() = %d, want 0", size)
+       }
+
+       for _, k := range keys {
+               if got := cache.Get(k); got != nil {
+                       t.Errorf("Get(%q) after Clear() = %v, want nil", k, got)
+               }
+       }
+}
+
+func TestDescriptorCache_Size(t *testing.T) {
+       cache := NewDescriptorCache(1 * time.Minute)
+       defer cache.Close()
+
+       // Initially empty
+       if size := cache.Size(); size != 0 {
+               t.Errorf("Size() on empty cache = %d, want 0", size)
+       }
+
+       // Add entries
+       for i := 0; i < 5; i++ {
+               cache.Set("key"+string(rune('0'+i)), nil)
+       }
+
+       if size := cache.Size(); size != 5 {
+               t.Errorf("Size() = %d, want 5", size)
+       }
+}
+
+func TestBuildCacheKey(t *testing.T) {
+       tests := []struct {
+               address     string
+               serviceName string
+               methodName  string
+               expected    string
+       }{
+               {
+                       address:     "localhost:50051",
+                       serviceName: "helloworld.Greeter",
+                       methodName:  "SayHello",
+                       expected:    
"localhost:50051/helloworld.Greeter/SayHello",
+               },
+               {
+                       address:     "backend.example.com:8080",
+                       serviceName: "my.package.Service",
+                       methodName:  "Method",
+                       expected:    
"backend.example.com:8080/my.package.Service/Method",
+               },
+               {
+                       address:     "",
+                       serviceName: "Service",
+                       methodName:  "Method",
+                       expected:    "/Service/Method",
+               },
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.expected, func(t *testing.T) {
+                       result := BuildCacheKey(tt.address, tt.serviceName, 
tt.methodName)
+                       if result != tt.expected {
+                               t.Errorf("BuildCacheKey(%q, %q, %q) = %q, want 
%q",
+                                       tt.address, tt.serviceName, 
tt.methodName, result, tt.expected)
+                       }
+               })
+       }
+}
diff --git a/pkg/filter/network/grpcproxy/filter/proxy/dynamic_codec.go 
b/pkg/filter/network/grpcproxy/filter/proxy/dynamic_codec.go
new file mode 100644
index 00000000..c9943c7a
--- /dev/null
+++ b/pkg/filter/network/grpcproxy/filter/proxy/dynamic_codec.go
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package proxy
+
+import (
+       "fmt"
+)
+
+import (
+       "google.golang.org/protobuf/encoding/protojson"
+
+       "google.golang.org/protobuf/proto"
+
+       "google.golang.org/protobuf/reflect/protoreflect"
+
+       "google.golang.org/protobuf/types/dynamicpb"
+)
+
+// DynamicCodec is a gRPC codec that uses protobuf reflection to encode/decode 
messages.
+// This enables the gateway to inspect and manipulate message content.
+type DynamicCodec struct {
+       methodDesc protoreflect.MethodDescriptor
+}
+
+// NewDynamicCodec creates a new DynamicCodec for the given method descriptor
+func NewDynamicCodec(methodDesc protoreflect.MethodDescriptor) *DynamicCodec {
+       return &DynamicCodec{
+               methodDesc: methodDesc,
+       }
+}
+
+// Marshal encodes a message to bytes
+func (c *DynamicCodec) Marshal(v any) ([]byte, error) {
+       switch msg := v.(type) {
+       case *dynamicpb.Message:
+               return proto.Marshal(msg)
+       case proto.Message:
+               return proto.Marshal(msg)
+       case []byte:
+               // Already bytes, pass through
+               return msg, nil
+       case *DynamicMessage:
+               return proto.Marshal(msg.Message)
+       default:
+               return nil, fmt.Errorf("dynamic codec: cannot marshal type %T", 
v)
+       }
+}
+
+// Unmarshal decodes bytes into a message
+func (c *DynamicCodec) Unmarshal(data []byte, v any) error {
+       switch msg := v.(type) {
+       case **DynamicMessage:
+               // Create dynamic message for input type
+               inputType := c.methodDesc.Input()
+               dynMsg := dynamicpb.NewMessage(inputType)
+               if err := proto.Unmarshal(data, dynMsg); err != nil {
+                       return fmt.Errorf("dynamic codec: unmarshal error: %w", 
err)
+               }
+               *msg = &DynamicMessage{
+                       Message:    dynMsg,
+                       Descriptor: inputType,
+               }
+               return nil
+       case *[]byte:
+               // Passthrough mode
+               *msg = data
+               return nil
+       case proto.Message:
+               return proto.Unmarshal(data, msg)
+       default:
+               return fmt.Errorf("dynamic codec: cannot unmarshal into type 
%T", v)
+       }
+}
+
+// Name returns the codec name
+func (c *DynamicCodec) Name() string {
+       return "dynamic_proto"
+}
+
+// DynamicMessage wraps a dynamicpb.Message with its descriptor
+type DynamicMessage struct {
+       Message    *dynamicpb.Message
+       Descriptor protoreflect.MessageDescriptor
+}
+
+// GetField retrieves a field value by name
+func (dm *DynamicMessage) GetField(name string) (protoreflect.Value, bool) {
+       fd := dm.Descriptor.Fields().ByName(protoreflect.Name(name))
+       if fd == nil {
+               return protoreflect.Value{}, false
+       }
+       return dm.Message.Get(fd), true
+}
+
+// GetFieldString retrieves a string field value by name
+func (dm *DynamicMessage) GetFieldString(name string) (string, bool) {
+       val, ok := dm.GetField(name)
+       if !ok {
+               return "", false
+       }
+       return val.String(), true
+}
+
+// GetFieldInt retrieves an int64 field value by name
+func (dm *DynamicMessage) GetFieldInt(name string) (int64, bool) {
+       val, ok := dm.GetField(name)
+       if !ok {
+               return 0, false
+       }
+       return val.Int(), true
+}
+
+// SetField sets a field value by name
+func (dm *DynamicMessage) SetField(name string, value protoreflect.Value) bool 
{
+       fd := dm.Descriptor.Fields().ByName(protoreflect.Name(name))
+       if fd == nil {
+               return false
+       }
+       dm.Message.Set(fd, value)
+       return true
+}
+
+// ToBytes serializes the message to bytes
+func (dm *DynamicMessage) ToBytes() ([]byte, error) {
+       return proto.Marshal(dm.Message)
+}
+
+// ToJSON converts the message to JSON (for logging/debugging)
+func (dm *DynamicMessage) ToJSON() ([]byte, error) {
+       return protojson.Marshal(dm.Message)
+}
+
+// ResponseCodec is a codec for handling response messages
+type ResponseCodec struct {
+       methodDesc protoreflect.MethodDescriptor
+}
+
+// NewResponseCodec creates a codec for response messages
+func NewResponseCodec(methodDesc protoreflect.MethodDescriptor) *ResponseCodec 
{
+       return &ResponseCodec{
+               methodDesc: methodDesc,
+       }
+}
+
+// Marshal encodes a response message
+func (c *ResponseCodec) Marshal(v any) ([]byte, error) {
+       switch msg := v.(type) {
+       case *dynamicpb.Message:
+               return proto.Marshal(msg)
+       case proto.Message:
+               return proto.Marshal(msg)
+       case []byte:
+               return msg, nil
+       case *DynamicMessage:
+               return proto.Marshal(msg.Message)
+       default:
+               return nil, fmt.Errorf("response codec: cannot marshal type 
%T", v)
+       }
+}
+
+// Unmarshal decodes response bytes
+func (c *ResponseCodec) Unmarshal(data []byte, v any) error {
+       switch msg := v.(type) {
+       case **DynamicMessage:
+               // Create dynamic message for output type
+               outputType := c.methodDesc.Output()
+               dynMsg := dynamicpb.NewMessage(outputType)
+               if err := proto.Unmarshal(data, dynMsg); err != nil {
+                       return fmt.Errorf("response codec: unmarshal error: 
%w", err)
+               }
+               *msg = &DynamicMessage{
+                       Message:    dynMsg,
+                       Descriptor: outputType,
+               }
+               return nil
+       case *[]byte:
+               *msg = data
+               return nil
+       case proto.Message:
+               return proto.Unmarshal(data, msg)
+       default:
+               return fmt.Errorf("response codec: cannot unmarshal into type 
%T", v)
+       }
+}
+
+// Name returns the codec name
+func (c *ResponseCodec) Name() string {
+       return "dynamic_proto"
+}
+
+// DecodeRequest decodes a request using the method descriptor
+func DecodeRequest(methodDesc protoreflect.MethodDescriptor, data []byte) 
(*DynamicMessage, error) {
+       inputType := methodDesc.Input()
+       msg := dynamicpb.NewMessage(inputType)
+       if err := proto.Unmarshal(data, msg); err != nil {
+               return nil, fmt.Errorf("failed to decode request: %w", err)
+       }
+       return &DynamicMessage{
+               Message:    msg,
+               Descriptor: inputType,
+       }, nil
+}
+
+// DecodeResponse decodes a response using the method descriptor
+func DecodeResponse(methodDesc protoreflect.MethodDescriptor, data []byte) 
(*DynamicMessage, error) {
+       outputType := methodDesc.Output()
+       msg := dynamicpb.NewMessage(outputType)
+       if err := proto.Unmarshal(data, msg); err != nil {
+               return nil, fmt.Errorf("failed to decode response: %w", err)
+       }
+       return &DynamicMessage{
+               Message:    msg,
+               Descriptor: outputType,
+       }, nil
+}
+
+// EncodeMessage encodes a dynamic message to bytes
+func EncodeMessage(msg *DynamicMessage) ([]byte, error) {
+       return proto.Marshal(msg.Message)
+}
diff --git a/pkg/filter/network/grpcproxy/filter/proxy/dynamic_codec_test.go 
b/pkg/filter/network/grpcproxy/filter/proxy/dynamic_codec_test.go
new file mode 100644
index 00000000..40ba3f35
--- /dev/null
+++ b/pkg/filter/network/grpcproxy/filter/proxy/dynamic_codec_test.go
@@ -0,0 +1,559 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package proxy
+
+import (
+       "strings"
+       "testing"
+)
+
+import (
+       "google.golang.org/protobuf/proto"
+
+       "google.golang.org/protobuf/reflect/protodesc"
+       "google.golang.org/protobuf/reflect/protoreflect"
+
+       "google.golang.org/protobuf/types/descriptorpb"
+       "google.golang.org/protobuf/types/dynamicpb"
+)
+
+// buildTestMethodDescriptor creates a test method descriptor for unit testing
+func buildTestMethodDescriptor(t *testing.T) protoreflect.MethodDescriptor {
+       // Define a simple proto file descriptor
+       fileDescProto := &descriptorpb.FileDescriptorProto{
+               Name:    proto.String("test.proto"),
+               Package: proto.String("test"),
+               MessageType: []*descriptorpb.DescriptorProto{
+                       {
+                               Name: proto.String("TestRequest"),
+                               Field: []*descriptorpb.FieldDescriptorProto{
+                                       {
+                                               Name:   proto.String("name"),
+                                               Number: proto.Int32(1),
+                                               Type:   
descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum(),
+                                               Label:  
descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum(),
+                                       },
+                                       {
+                                               Name:   proto.String("id"),
+                                               Number: proto.Int32(2),
+                                               Type:   
descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum(),
+                                               Label:  
descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum(),
+                                       },
+                               },
+                       },
+                       {
+                               Name: proto.String("TestResponse"),
+                               Field: []*descriptorpb.FieldDescriptorProto{
+                                       {
+                                               Name:   proto.String("message"),
+                                               Number: proto.Int32(1),
+                                               Type:   
descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum(),
+                                               Label:  
descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum(),
+                                       },
+                               },
+                       },
+               },
+               Service: []*descriptorpb.ServiceDescriptorProto{
+                       {
+                               Name: proto.String("TestService"),
+                               Method: []*descriptorpb.MethodDescriptorProto{
+                                       {
+                                               Name:       
proto.String("TestMethod"),
+                                               InputType:  
proto.String(".test.TestRequest"),
+                                               OutputType: 
proto.String(".test.TestResponse"),
+                                       },
+                               },
+                       },
+               },
+       }
+
+       // Build file descriptor
+       fileDesc, err := protodesc.NewFile(fileDescProto, nil)
+       if err != nil {
+               t.Fatalf("Failed to create file descriptor: %v", err)
+       }
+
+       // Get service and method
+       svcDesc := fileDesc.Services().ByName("TestService")
+       if svcDesc == nil {
+               t.Fatal("TestService not found")
+       }
+
+       methodDesc := svcDesc.Methods().ByName("TestMethod")
+       if methodDesc == nil {
+               t.Fatal("TestMethod not found")
+       }
+
+       return methodDesc
+}
+
+func TestDynamicCodec_Name(t *testing.T) {
+       methodDesc := buildTestMethodDescriptor(t)
+       codec := NewDynamicCodec(methodDesc)
+
+       if got := codec.Name(); got != "dynamic_proto" {
+               t.Errorf("Name() = %q, want %q", got, "dynamic_proto")
+       }
+}
+
+func TestDynamicCodec_MarshalBytes(t *testing.T) {
+       methodDesc := buildTestMethodDescriptor(t)
+       codec := NewDynamicCodec(methodDesc)
+
+       // Test marshaling raw bytes (passthrough)
+       data := []byte{0x0a, 0x05, 0x68, 0x65, 0x6c, 0x6c, 0x6f}
+       result, err := codec.Marshal(data)
+       if err != nil {
+               t.Fatalf("Marshal([]byte) error = %v", err)
+       }
+       if string(result) != string(data) {
+               t.Errorf("Marshal([]byte) = %v, want %v", result, data)
+       }
+}
+
+func TestDynamicCodec_MarshalDynamicMessage(t *testing.T) {
+       methodDesc := buildTestMethodDescriptor(t)
+       codec := NewDynamicCodec(methodDesc)
+
+       // Create a dynamic message
+       inputType := methodDesc.Input()
+       dynMsg := dynamicpb.NewMessage(inputType)
+
+       // Set field values
+       nameField := inputType.Fields().ByName("name")
+       dynMsg.Set(nameField, protoreflect.ValueOfString("test"))
+
+       // Marshal as DynamicMessage wrapper
+       dm := &DynamicMessage{
+               Message:    dynMsg,
+               Descriptor: inputType,
+       }
+
+       result, err := codec.Marshal(dm)
+       if err != nil {
+               t.Fatalf("Marshal(*DynamicMessage) error = %v", err)
+       }
+
+       // Verify by unmarshaling
+       newMsg := dynamicpb.NewMessage(inputType)
+       if err := proto.Unmarshal(result, newMsg); err != nil {
+               t.Fatalf("Unmarshal verification error = %v", err)
+       }
+
+       if got := newMsg.Get(nameField).String(); got != "test" {
+               t.Errorf("Unmarshaled name = %q, want %q", got, "test")
+       }
+}
+
+func TestDynamicCodec_MarshalUnsupportedType(t *testing.T) {
+       methodDesc := buildTestMethodDescriptor(t)
+       codec := NewDynamicCodec(methodDesc)
+
+       // Try to marshal unsupported type
+       _, err := codec.Marshal(123)
+       if err == nil {
+               t.Error("Marshal(int) should return error")
+       }
+}
+
+func TestDynamicCodec_UnmarshalToBytes(t *testing.T) {
+       methodDesc := buildTestMethodDescriptor(t)
+       codec := NewDynamicCodec(methodDesc)
+
+       data := []byte{0x0a, 0x05, 0x68, 0x65, 0x6c, 0x6c, 0x6f}
+       var result []byte
+       err := codec.Unmarshal(data, &result)
+       if err != nil {
+               t.Fatalf("Unmarshal(to []byte) error = %v", err)
+       }
+       if string(result) != string(data) {
+               t.Errorf("Unmarshal result = %v, want %v", result, data)
+       }
+}
+
+func TestDynamicCodec_UnmarshalToDynamicMessage(t *testing.T) {
+       methodDesc := buildTestMethodDescriptor(t)
+       codec := NewDynamicCodec(methodDesc)
+
+       // Create test data
+       inputType := methodDesc.Input()
+       srcMsg := dynamicpb.NewMessage(inputType)
+       nameField := inputType.Fields().ByName("name")
+       idField := inputType.Fields().ByName("id")
+       srcMsg.Set(nameField, protoreflect.ValueOfString("hello"))
+       srcMsg.Set(idField, protoreflect.ValueOfInt64(42))
+
+       data, err := proto.Marshal(srcMsg)
+       if err != nil {
+               t.Fatalf("Failed to marshal source message: %v", err)
+       }
+
+       // Unmarshal to DynamicMessage
+       var dm *DynamicMessage
+       err = codec.Unmarshal(data, &dm)
+       if err != nil {
+               t.Fatalf("Unmarshal(to *DynamicMessage) error = %v", err)
+       }
+
+       if dm == nil {
+               t.Fatal("Unmarshal result is nil")
+       }
+
+       // Verify fields
+       if got := dm.Message.Get(nameField).String(); got != "hello" {
+               t.Errorf("name = %q, want %q", got, "hello")
+       }
+       if got := dm.Message.Get(idField).Int(); got != 42 {
+               t.Errorf("id = %d, want %d", got, 42)
+       }
+}
+
+func TestDynamicCodec_UnmarshalUnsupportedType(t *testing.T) {
+       methodDesc := buildTestMethodDescriptor(t)
+       codec := NewDynamicCodec(methodDesc)
+
+       data := []byte{0x0a, 0x05}
+       var result int
+       err := codec.Unmarshal(data, &result)
+       if err == nil {
+               t.Error("Unmarshal(to int) should return error")
+       }
+}
+
+func TestDynamicMessage_GetField(t *testing.T) {
+       methodDesc := buildTestMethodDescriptor(t)
+       inputType := methodDesc.Input()
+       dynMsg := dynamicpb.NewMessage(inputType)
+
+       nameField := inputType.Fields().ByName("name")
+       dynMsg.Set(nameField, protoreflect.ValueOfString("test_value"))
+
+       dm := &DynamicMessage{
+               Message:    dynMsg,
+               Descriptor: inputType,
+       }
+
+       // Test GetField with existing field
+       val, ok := dm.GetField("name")
+       if !ok {
+               t.Error("GetField(name) returned false")
+       }
+       if val.String() != "test_value" {
+               t.Errorf("GetField(name) = %q, want %q", val.String(), 
"test_value")
+       }
+
+       // Test GetField with non-existing field
+       _, ok = dm.GetField("nonexistent")
+       if ok {
+               t.Error("GetField(nonexistent) should return false")
+       }
+}
+
+func TestDynamicMessage_GetFieldString(t *testing.T) {
+       methodDesc := buildTestMethodDescriptor(t)
+       inputType := methodDesc.Input()
+       dynMsg := dynamicpb.NewMessage(inputType)
+
+       nameField := inputType.Fields().ByName("name")
+       dynMsg.Set(nameField, protoreflect.ValueOfString("hello_world"))
+
+       dm := &DynamicMessage{
+               Message:    dynMsg,
+               Descriptor: inputType,
+       }
+
+       // Test existing field
+       val, ok := dm.GetFieldString("name")
+       if !ok {
+               t.Error("GetFieldString(name) returned false")
+       }
+       if val != "hello_world" {
+               t.Errorf("GetFieldString(name) = %q, want %q", val, 
"hello_world")
+       }
+
+       // Test non-existing field
+       _, ok = dm.GetFieldString("nonexistent")
+       if ok {
+               t.Error("GetFieldString(nonexistent) should return false")
+       }
+}
+
+func TestDynamicMessage_GetFieldInt(t *testing.T) {
+       methodDesc := buildTestMethodDescriptor(t)
+       inputType := methodDesc.Input()
+       dynMsg := dynamicpb.NewMessage(inputType)
+
+       idField := inputType.Fields().ByName("id")
+       dynMsg.Set(idField, protoreflect.ValueOfInt64(12345))
+
+       dm := &DynamicMessage{
+               Message:    dynMsg,
+               Descriptor: inputType,
+       }
+
+       // Test existing field
+       val, ok := dm.GetFieldInt("id")
+       if !ok {
+               t.Error("GetFieldInt(id) returned false")
+       }
+       if val != 12345 {
+               t.Errorf("GetFieldInt(id) = %d, want %d", val, 12345)
+       }
+
+       // Test non-existing field
+       _, ok = dm.GetFieldInt("nonexistent")
+       if ok {
+               t.Error("GetFieldInt(nonexistent) should return false")
+       }
+}
+
+func TestDynamicMessage_SetField(t *testing.T) {
+       methodDesc := buildTestMethodDescriptor(t)
+       inputType := methodDesc.Input()
+       dynMsg := dynamicpb.NewMessage(inputType)
+
+       dm := &DynamicMessage{
+               Message:    dynMsg,
+               Descriptor: inputType,
+       }
+
+       // Set existing field
+       ok := dm.SetField("name", protoreflect.ValueOfString("new_value"))
+       if !ok {
+               t.Error("SetField(name) returned false")
+       }
+
+       // Verify
+       val, _ := dm.GetFieldString("name")
+       if val != "new_value" {
+               t.Errorf("After SetField, name = %q, want %q", val, "new_value")
+       }
+
+       // Set non-existing field
+       ok = dm.SetField("nonexistent", protoreflect.ValueOfString("value"))
+       if ok {
+               t.Error("SetField(nonexistent) should return false")
+       }
+}
+
+func TestDynamicMessage_ToBytes(t *testing.T) {
+       methodDesc := buildTestMethodDescriptor(t)
+       inputType := methodDesc.Input()
+       dynMsg := dynamicpb.NewMessage(inputType)
+
+       nameField := inputType.Fields().ByName("name")
+       dynMsg.Set(nameField, protoreflect.ValueOfString("serialize_test"))
+
+       dm := &DynamicMessage{
+               Message:    dynMsg,
+               Descriptor: inputType,
+       }
+
+       data, err := dm.ToBytes()
+       if err != nil {
+               t.Fatalf("ToBytes() error = %v", err)
+       }
+
+       // Verify by unmarshaling
+       newMsg := dynamicpb.NewMessage(inputType)
+       if err := proto.Unmarshal(data, newMsg); err != nil {
+               t.Fatalf("Unmarshal verification error = %v", err)
+       }
+
+       if got := newMsg.Get(nameField).String(); got != "serialize_test" {
+               t.Errorf("After ToBytes, name = %q, want %q", got, 
"serialize_test")
+       }
+}
+
+func TestDynamicMessage_ToJSON(t *testing.T) {
+       methodDesc := buildTestMethodDescriptor(t)
+       inputType := methodDesc.Input()
+       dynMsg := dynamicpb.NewMessage(inputType)
+
+       nameField := inputType.Fields().ByName("name")
+       dynMsg.Set(nameField, protoreflect.ValueOfString("json_test"))
+
+       dm := &DynamicMessage{
+               Message:    dynMsg,
+               Descriptor: inputType,
+       }
+
+       data, err := dm.ToJSON()
+       if err != nil {
+               t.Fatalf("ToJSON() error = %v", err)
+       }
+
+       // Verify it returns valid JSON with the expected field
+       if len(data) == 0 {
+               t.Error("ToJSON() returned empty data")
+       }
+
+       // Should contain the field name in JSON format
+       jsonStr := string(data)
+       if !strings.Contains(jsonStr, "name") || !strings.Contains(jsonStr, 
"json_test") {
+               t.Errorf("ToJSON() = %s, expected to contain 'name' and 
'json_test'", jsonStr)
+       }
+}
+
+func TestResponseCodec_Name(t *testing.T) {
+       methodDesc := buildTestMethodDescriptor(t)
+       codec := NewResponseCodec(methodDesc)
+
+       if got := codec.Name(); got != "dynamic_proto" {
+               t.Errorf("Name() = %q, want %q", got, "dynamic_proto")
+       }
+}
+
+func TestResponseCodec_MarshalAndUnmarshal(t *testing.T) {
+       methodDesc := buildTestMethodDescriptor(t)
+       codec := NewResponseCodec(methodDesc)
+
+       // Create output message
+       outputType := methodDesc.Output()
+       dynMsg := dynamicpb.NewMessage(outputType)
+       msgField := outputType.Fields().ByName("message")
+       dynMsg.Set(msgField, protoreflect.ValueOfString("response_value"))
+
+       dm := &DynamicMessage{
+               Message:    dynMsg,
+               Descriptor: outputType,
+       }
+
+       // Marshal
+       data, err := codec.Marshal(dm)
+       if err != nil {
+               t.Fatalf("Marshal error = %v", err)
+       }
+
+       // Unmarshal
+       var result *DynamicMessage
+       err = codec.Unmarshal(data, &result)
+       if err != nil {
+               t.Fatalf("Unmarshal error = %v", err)
+       }
+
+       // Verify
+       val, ok := result.GetFieldString("message")
+       if !ok {
+               t.Error("GetFieldString(message) returned false")
+       }
+       if val != "response_value" {
+               t.Errorf("message = %q, want %q", val, "response_value")
+       }
+}
+
+func TestDecodeRequest(t *testing.T) {
+       methodDesc := buildTestMethodDescriptor(t)
+       inputType := methodDesc.Input()
+
+       // Create test message
+       srcMsg := dynamicpb.NewMessage(inputType)
+       nameField := inputType.Fields().ByName("name")
+       srcMsg.Set(nameField, protoreflect.ValueOfString("decode_test"))
+
+       data, err := proto.Marshal(srcMsg)
+       if err != nil {
+               t.Fatalf("Marshal error = %v", err)
+       }
+
+       // Decode
+       result, err := DecodeRequest(methodDesc, data)
+       if err != nil {
+               t.Fatalf("DecodeRequest error = %v", err)
+       }
+
+       // Verify
+       val, ok := result.GetFieldString("name")
+       if !ok {
+               t.Error("GetFieldString(name) returned false")
+       }
+       if val != "decode_test" {
+               t.Errorf("name = %q, want %q", val, "decode_test")
+       }
+}
+
+func TestDecodeRequest_InvalidData(t *testing.T) {
+       methodDesc := buildTestMethodDescriptor(t)
+
+       // Invalid protobuf data
+       invalidData := []byte{0xff, 0xff, 0xff}
+       _, err := DecodeRequest(methodDesc, invalidData)
+       if err == nil {
+               t.Error("DecodeRequest with invalid data should return error")
+       }
+}
+
+func TestDecodeResponse(t *testing.T) {
+       methodDesc := buildTestMethodDescriptor(t)
+       outputType := methodDesc.Output()
+
+       // Create test message
+       srcMsg := dynamicpb.NewMessage(outputType)
+       msgField := outputType.Fields().ByName("message")
+       srcMsg.Set(msgField, protoreflect.ValueOfString("response_decode_test"))
+
+       data, err := proto.Marshal(srcMsg)
+       if err != nil {
+               t.Fatalf("Marshal error = %v", err)
+       }
+
+       // Decode
+       result, err := DecodeResponse(methodDesc, data)
+       if err != nil {
+               t.Fatalf("DecodeResponse error = %v", err)
+       }
+
+       // Verify
+       val, ok := result.GetFieldString("message")
+       if !ok {
+               t.Error("GetFieldString(message) returned false")
+       }
+       if val != "response_decode_test" {
+               t.Errorf("message = %q, want %q", val, "response_decode_test")
+       }
+}
+
+func TestEncodeMessage(t *testing.T) {
+       methodDesc := buildTestMethodDescriptor(t)
+       inputType := methodDesc.Input()
+
+       dynMsg := dynamicpb.NewMessage(inputType)
+       nameField := inputType.Fields().ByName("name")
+       dynMsg.Set(nameField, protoreflect.ValueOfString("encode_test"))
+
+       dm := &DynamicMessage{
+               Message:    dynMsg,
+               Descriptor: inputType,
+       }
+
+       data, err := EncodeMessage(dm)
+       if err != nil {
+               t.Fatalf("EncodeMessage error = %v", err)
+       }
+
+       // Verify by decoding
+       result, err := DecodeRequest(methodDesc, data)
+       if err != nil {
+               t.Fatalf("DecodeRequest verification error = %v", err)
+       }
+
+       val, _ := result.GetFieldString("name")
+       if val != "encode_test" {
+               t.Errorf("After EncodeMessage, name = %q, want %q", val, 
"encode_test")
+       }
+}
diff --git a/pkg/filter/network/grpcproxy/filter/proxy/grpc_proxy_filter.go 
b/pkg/filter/network/grpcproxy/filter/proxy/grpc_proxy_filter.go
index eb1522cc..7880d878 100644
--- a/pkg/filter/network/grpcproxy/filter/proxy/grpc_proxy_filter.go
+++ b/pkg/filter/network/grpcproxy/filter/proxy/grpc_proxy_filter.go
@@ -35,6 +35,8 @@ import (
        "google.golang.org/grpc/credentials/insecure"
        "google.golang.org/grpc/keepalive"
        "google.golang.org/grpc/metadata"
+
+       "google.golang.org/protobuf/reflect/protoreflect"
 )
 
 import (
@@ -54,6 +56,17 @@ const (
        defaultConnectTimeout      = 5 * time.Second
        defaultMaxMsgSize          = 4 * 1024 * 1024 // 4MB
        defaultHealthCheckInterval = 30 * time.Second
+       defaultDescriptorCacheTTL  = 5 * time.Minute
+)
+
+// ReflectionMode defines the mode of gRPC reflection
+const (
+       // ReflectionModePassthrough performs transparent binary proxying 
(default, highest performance)
+       ReflectionModePassthrough = "passthrough"
+       // ReflectionModeReflection uses gRPC reflection to decode/encode 
messages (content-aware)
+       ReflectionModeReflection = "reflection"
+       // ReflectionModeHybrid tries reflection first, falls back to 
passthrough on failure
+       ReflectionModeHybrid = "hybrid"
 )
 
 func init() {
@@ -76,13 +89,20 @@ type (
                KeepAliveTime        time.Duration `yaml:"-" json:"-"`
                KeepAliveTimeout     time.Duration `yaml:"-" json:"-"`
                ConnectTimeout       time.Duration `yaml:"-" json:"-"`
+
+               // ReflectionMode: "passthrough" (default), "reflection", or 
"hybrid"
+               ReflectionMode        string        `yaml:"reflection_mode" 
json:"reflection_mode" mapstructure:"reflection_mode"`
+               DescriptorCacheTTLStr string        
`yaml:"descriptor_cache_ttl" json:"descriptor_cache_ttl" 
mapstructure:"descriptor_cache_ttl"`
+               DescriptorCacheTTL    time.Duration `yaml:"-" json:"-"`
+               ExtractTripleMetadata bool          
`yaml:"extract_triple_metadata" json:"extract_triple_metadata" 
mapstructure:"extract_triple_metadata"`
        }
 
        // Filter implements the gRPC proxy filter
        Filter struct {
-               Config         *Config
-               clientConnPool sync.Map     // address -> *grpc.ClientConn
-               mu             sync.RWMutex // protects concurrent operations
+               Config            *Config
+               clientConnPool    sync.Map     // address -> *grpc.ClientConn
+               mu                sync.RWMutex // protects concurrent operations
+               reflectionManager *ReflectionManager
        }
 )
 
@@ -102,8 +122,25 @@ func (p Plugin) CreateFilter(config any) 
(filter.GrpcFilter, error) {
        cfg.KeepAliveTime = parseDurationWithDefault(cfg.KeepAliveTimeStr, 
defaultKeepAliveTime)
        cfg.KeepAliveTimeout = 
parseDurationWithDefault(cfg.KeepAliveTimeoutStr, defaultKeepAliveTimeout)
        cfg.ConnectTimeout = parseDurationWithDefault(cfg.ConnectTimeoutStr, 
defaultConnectTimeout)
+       cfg.DescriptorCacheTTL = 
parseDurationWithDefault(cfg.DescriptorCacheTTLStr, defaultDescriptorCacheTTL)
+
+       // Set default reflection mode if not specified
+       if cfg.ReflectionMode == "" {
+               cfg.ReflectionMode = ReflectionModePassthrough
+       }
+
+       f := &Filter{Config: cfg}
 
-       return &Filter{Config: cfg}, nil
+       // Initialize reflection manager if reflection mode is enabled
+       if cfg.ReflectionMode == ReflectionModeReflection || cfg.ReflectionMode 
== ReflectionModeHybrid {
+               f.reflectionManager = 
NewReflectionManager(cfg.DescriptorCacheTTL)
+               logger.Infof("gRPC proxy filter initialized with reflection 
mode: %s, cache TTL: %s",
+                       cfg.ReflectionMode, cfg.DescriptorCacheTTL)
+       } else {
+               logger.Infof("gRPC proxy filter initialized with passthrough 
mode")
+       }
+
+       return f, nil
 }
 
 // Config Expose the config so that Filter Manger can inject it, so it must be 
a pointer
@@ -160,77 +197,98 @@ func (f *Filter) Handle(ctx *grpcCtx.GrpcContext) 
filter.FilterStatus {
 
 // handleStream handles all types of gRPC calls by creating a full-duplex 
stream pipe.
 func (f *Filter) handleStream(ctx *grpcCtx.GrpcContext, address string) 
filter.FilterStatus {
-       // Get or create connection
        conn, err := f.getOrCreateConnection(address)
        if err != nil {
                ctx.SetError(errors.Errorf("gRPC proxy failed to get 
connection: %v", err))
                return filter.Stop
        }
 
-       // Set metadata for the outgoing context
        md := make(metadata.MD)
        for k, v := range ctx.Attachments {
                if str, ok := v.(string); ok {
                        md.Set(k, str)
                }
        }
-       outCtx := metadata.NewOutgoingContext(ctx.Context, md)
 
-       // Create the full method path for the gRPC call
+       if f.Config.ExtractTripleMetadata {
+               tripleMeta := ExtractTripleMetadata(ctx.Attachments)
+               if len(tripleMeta) > 0 {
+                       ctx.SetAttachment("_triple_metadata", tripleMeta)
+                       logger.Debugf("Extracted Triple metadata: %v", 
tripleMeta)
+               }
+       }
+
+       outCtx := metadata.NewOutgoingContext(ctx.Context, md)
        fullMethod := ctx.ServiceName + "/" + ctx.MethodName
-       // logger.Debugf("[dubbo-go-pixiu] gRPC proxy bidirectional stream to 
%s", fullMethod)
 
-       // Create a new client stream to the backend
+       var codecOpt grpc.CallOption
+       var useReflection bool
+
+       if f.reflectionManager != nil && f.Config.ReflectionMode != 
ReflectionModePassthrough {
+               methodDesc, err := f.reflectionManager.GetMethodDescriptor(
+                       ctx.Context, conn, address, ctx.ServiceName, 
ctx.MethodName)
+               if err != nil {
+                       if f.Config.ReflectionMode == ReflectionModeHybrid {
+                               logger.Warnf("Reflection failed for %s, falling 
back to passthrough: %v", fullMethod, err)
+                               codecOpt = grpc.ForceCodec(ptcodec.Codec{})
+                               useReflection = false
+                       } else {
+                               ctx.SetError(errors.Wrapf(err, "reflection 
failed for %s", fullMethod))
+                               return filter.Stop
+                       }
+               } else {
+                       ctx.SetAttachment("_method_descriptor", methodDesc)
+                       ctx.SetAttachment("_is_client_streaming", 
methodDesc.IsStreamingClient())
+                       logger.Debugf("Reflection succeeded for %s, input: %s, 
output: %s, clientStream: %v",
+                               fullMethod, methodDesc.Input().FullName(), 
methodDesc.Output().FullName(), methodDesc.IsStreamingClient())
+                       codecOpt = grpc.ForceCodec(NewDynamicCodec(methodDesc))
+                       useReflection = true
+               }
+       } else {
+               codecOpt = grpc.ForceCodec(ptcodec.Codec{})
+               useReflection = false
+       }
+
+       ctx.SetAttachment("_use_reflection", useReflection)
+
        clientStream, err := conn.NewStream(outCtx, &grpc.StreamDesc{
                StreamName:    ctx.MethodName,
                ServerStreams: true,
                ClientStreams: true,
-       }, fullMethod, grpc.ForceCodec(ptcodec.Codec{}))
-
+       }, fullMethod, codecOpt)
        if err != nil {
                ctx.SetError(errors.Errorf("failed to create client stream: 
%v", err))
                return filter.Stop
        }
 
-       // Ensure there is a server stream to work with
        if ctx.Stream == nil {
                ctx.SetError(errors.New("no stream available in context"))
                return filter.Stop
        }
 
-       // Use a WaitGroup to coordinate the two forwarding goroutines
        var wg sync.WaitGroup
        wg.Add(2)
-
-       // Channels for error propagation and termination signaling
        errChan := make(chan error, 2)
        doneChan := make(chan struct{})
 
-       // Start forwarding data in both directions
        go f.forwardClientToServer(ctx, clientStream, &wg, errChan, doneChan)
        go f.forwardServerToClient(ctx, clientStream, &wg, errChan, doneChan)
 
-       // Goroutine to wait for context cancellation or the first error
        go func() {
                select {
                case <-ctx.Context.Done():
-                       // If the client context is canceled, signal the 
forwarding goroutines to stop
                        close(doneChan)
                case err := <-errChan:
-                       // If an error occurs, propagate it and signal 
termination
                        ctx.SetError(err)
                        close(doneChan)
                }
        }()
 
-       // Wait for both forwarding goroutines to complete
        wg.Wait()
-       close(errChan) // Close channel to allow the final error check to 
complete
+       close(errChan)
 
-       // Final check for any errors that might have occurred
        for err := range errChan {
                if err != nil && ctx.Error == nil {
-                       // Set error if one hasn't been set already
                        ctx.SetError(err)
                }
        }
@@ -249,7 +307,24 @@ func (f *Filter) handleStream(ctx *grpcCtx.GrpcContext, 
address string) filter.F
 func (f *Filter) forwardClientToServer(ctx *grpcCtx.GrpcContext, clientStream 
grpc.ClientStream, wg *sync.WaitGroup, errChan chan<- error, doneChan <-chan 
struct{}) {
        defer wg.Done()
 
-       // Send initial arguments if available (for unary and server-stream 
calls)
+       useReflection := false
+       if val, ok := ctx.GetAttachment("_use_reflection"); ok {
+               useReflection, _ = val.(bool)
+       }
+
+       var methodDesc protoreflect.MethodDescriptor
+       if useReflection {
+               if md, ok := ctx.GetAttachment("_method_descriptor"); ok {
+                       methodDesc, _ = md.(protoreflect.MethodDescriptor)
+               }
+       }
+
+       // Check if this is a client-streaming call (client-stream or 
bidirectional)
+       isClientStreaming := false
+       if val, ok := ctx.GetAttachment("_is_client_streaming"); ok {
+               isClientStreaming, _ = val.(bool)
+       }
+
        if len(ctx.Arguments) > 0 {
                for _, arg := range ctx.Arguments {
                        if err := clientStream.SendMsg(arg); err != nil {
@@ -259,17 +334,46 @@ func (f *Filter) forwardClientToServer(ctx 
*grpcCtx.GrpcContext, clientStream gr
                }
        }
 
-       // Continuously forward messages from the client stream
+       // For unary/server-stream: read one message then CloseSend to prevent 
deadlock
+       if !isClientStreaming {
+               var msg []byte
+               if err := ctx.Stream.RecvMsg(&msg); err != nil {
+                       if err == io.EOF {
+                               if err := clientStream.CloseSend(); err != nil {
+                                       logger.Errorf("Error closing send 
stream to backend: %v", err)
+                               }
+                               return
+                       }
+                       errChan <- errors.Wrap(err, "error receiving from 
client")
+                       return
+               }
+
+               if methodDesc != nil {
+                       if decoded, err := DecodeRequest(methodDesc, msg); err 
== nil {
+                               logger.Debugf("Decoded request message: %v", 
decoded.Message)
+                       }
+               }
+
+               if err := clientStream.SendMsg(msg); err != nil {
+                       errChan <- errors.Wrap(err, "error forwarding to 
backend")
+                       return
+               }
+
+               if err := clientStream.CloseSend(); err != nil {
+                       logger.Errorf("Error closing send stream to backend: 
%v", err)
+               }
+               return
+       }
+
+       // Client-streaming: continuously forward messages until EOF
        for {
                select {
                case <-doneChan:
-                       // Stop forwarding if the done signal is received
                        return
                default:
                        var msg []byte
                        if err := ctx.Stream.RecvMsg(&msg); err != nil {
                                if err == io.EOF {
-                                       // Client has finished sending, so 
close the send direction of the backend stream
                                        if err := clientStream.CloseSend(); err 
!= nil {
                                                logger.Errorf("Error closing 
send stream to backend: %v", err)
                                        }
@@ -279,6 +383,12 @@ func (f *Filter) forwardClientToServer(ctx 
*grpcCtx.GrpcContext, clientStream gr
                                return
                        }
 
+                       if methodDesc != nil {
+                               if decoded, err := DecodeRequest(methodDesc, 
msg); err == nil {
+                                       logger.Debugf("Decoded request message: 
%v", decoded.Message)
+                               }
+                       }
+
                        if err := clientStream.SendMsg(msg); err != nil {
                                errChan <- errors.Wrap(err, "error forwarding 
to backend")
                                return
@@ -291,7 +401,13 @@ func (f *Filter) forwardClientToServer(ctx 
*grpcCtx.GrpcContext, clientStream gr
 func (f *Filter) forwardServerToClient(ctx *grpcCtx.GrpcContext, clientStream 
grpc.ClientStream, wg *sync.WaitGroup, errChan chan<- error, doneChan <-chan 
struct{}) {
        defer wg.Done()
 
-       // Forward header metadata from backend to client
+       var methodDesc protoreflect.MethodDescriptor
+       if val, ok := ctx.GetAttachment("_use_reflection"); ok && val.(bool) {
+               if md, ok := ctx.GetAttachment("_method_descriptor"); ok {
+                       methodDesc, _ = md.(protoreflect.MethodDescriptor)
+               }
+       }
+
        if header, err := clientStream.Header(); err == nil {
                if s, ok := ctx.Stream.(grpc.ServerStream); ok {
                        s.SetHeader(header)
@@ -301,23 +417,26 @@ func (f *Filter) forwardServerToClient(ctx 
*grpcCtx.GrpcContext, clientStream gr
        for {
                select {
                case <-doneChan:
-                       // Stop forwarding if the done signal is received
                        return
                default:
                        var resp []byte
                        err := clientStream.RecvMsg(&resp)
                        if err != nil {
-                               // Upon any error from the backend, including 
EOF, forward the trailer metadata
                                if s, ok := ctx.Stream.(grpc.ServerStream); ok {
                                        s.SetTrailer(clientStream.Trailer())
                                }
                                if err != io.EOF {
-                                       // Propagate the actual gRPC status 
error, but not EOF
                                        errChan <- err
                                }
                                return
                        }
 
+                       if methodDesc != nil {
+                               if decoded, err := DecodeResponse(methodDesc, 
resp); err == nil {
+                                       logger.Debugf("Decoded response 
message: %v", decoded.Message)
+                               }
+                       }
+
                        if err := ctx.Stream.SendMsg(resp); err != nil {
                                errChan <- errors.Wrap(err, "failed to forward 
response to client")
                                return
@@ -496,6 +615,12 @@ func (f *Filter) createTLSCredentials() 
(credentials.TransportCredentials, error
 func (f *Filter) Close() error {
        logger.Info("Closing gRPC proxy filter and all connections")
 
+       // Close reflection manager if initialized
+       if f.reflectionManager != nil {
+               f.reflectionManager.Close()
+               logger.Info("Reflection manager closed")
+       }
+
        var wg sync.WaitGroup
        var closeErrors []error
        var errorMu sync.Mutex
diff --git 
a/pkg/filter/network/grpcproxy/filter/proxy/grpc_proxy_filter_test.go 
b/pkg/filter/network/grpcproxy/filter/proxy/grpc_proxy_filter_test.go
new file mode 100644
index 00000000..1343ad9c
--- /dev/null
+++ b/pkg/filter/network/grpcproxy/filter/proxy/grpc_proxy_filter_test.go
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package proxy
+
+import (
+       "testing"
+       "time"
+)
+
+func TestParseDurationWithDefault(t *testing.T) {
+       tests := []struct {
+               name        string
+               durationStr string
+               defaultVal  time.Duration
+               expected    time.Duration
+       }{
+               {
+                       name:        "empty string uses default",
+                       durationStr: "",
+                       defaultVal:  30 * time.Second,
+                       expected:    30 * time.Second,
+               },
+               {
+                       name:        "valid duration seconds",
+                       durationStr: "10s",
+                       defaultVal:  30 * time.Second,
+                       expected:    10 * time.Second,
+               },
+               {
+                       name:        "valid duration minutes",
+                       durationStr: "5m",
+                       defaultVal:  30 * time.Second,
+                       expected:    5 * time.Minute,
+               },
+               {
+                       name:        "valid duration hours",
+                       durationStr: "2h",
+                       defaultVal:  30 * time.Second,
+                       expected:    2 * time.Hour,
+               },
+               {
+                       name:        "valid duration milliseconds",
+                       durationStr: "500ms",
+                       defaultVal:  1 * time.Second,
+                       expected:    500 * time.Millisecond,
+               },
+               {
+                       name:        "valid complex duration",
+                       durationStr: "1h30m",
+                       defaultVal:  30 * time.Second,
+                       expected:    1*time.Hour + 30*time.Minute,
+               },
+               {
+                       name:        "invalid duration uses default",
+                       durationStr: "invalid",
+                       defaultVal:  45 * time.Second,
+                       expected:    45 * time.Second,
+               },
+               {
+                       name:        "number without unit uses default",
+                       durationStr: "100",
+                       defaultVal:  20 * time.Second,
+                       expected:    20 * time.Second,
+               },
+               {
+                       name:        "negative duration",
+                       durationStr: "-5s",
+                       defaultVal:  10 * time.Second,
+                       expected:    -5 * time.Second,
+               },
+               {
+                       name:        "zero duration",
+                       durationStr: "0s",
+                       defaultVal:  10 * time.Second,
+                       expected:    0,
+               },
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       result := parseDurationWithDefault(tt.durationStr, 
tt.defaultVal)
+                       if result != tt.expected {
+                               t.Errorf("parseDurationWithDefault(%q, %v) = 
%v, want %v",
+                                       tt.durationStr, tt.defaultVal, result, 
tt.expected)
+                       }
+               })
+       }
+}
+
+func TestPlugin_Kind(t *testing.T) {
+       plugin := Plugin{}
+       expected := Kind
+
+       if got := plugin.Kind(); got != expected {
+               t.Errorf("Kind() = %q, want %q", got, expected)
+       }
+}
+
+func TestReflectionModeConstants(t *testing.T) {
+       // Verify reflection mode constants are defined correctly
+       tests := []struct {
+               name     string
+               mode     string
+               expected string
+       }{
+               {"passthrough mode", ReflectionModePassthrough, "passthrough"},
+               {"reflection mode", ReflectionModeReflection, "reflection"},
+               {"hybrid mode", ReflectionModeHybrid, "hybrid"},
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       if tt.mode != tt.expected {
+                               t.Errorf("%s = %q, want %q", tt.name, tt.mode, 
tt.expected)
+                       }
+               })
+       }
+}
+
+func TestConfig_DefaultValues(t *testing.T) {
+       // Test that config can be created with default values
+       config := &Config{}
+
+       // Verify default empty values
+       if config.ReflectionMode != "" {
+               t.Errorf("default ReflectionMode = %q, want empty", 
config.ReflectionMode)
+       }
+       if config.DescriptorCacheTTLStr != "" {
+               t.Errorf("default DescriptorCacheTTLStr = %q, want empty", 
config.DescriptorCacheTTLStr)
+       }
+       if config.ExtractTripleMetadata {
+               t.Error("default ExtractTripleMetadata should be false")
+       }
+}
+
+func TestConfig_WithReflectionSettings(t *testing.T) {
+       config := &Config{
+               ReflectionMode:        ReflectionModeReflection,
+               DescriptorCacheTTLStr: "10m",
+               ExtractTripleMetadata: true,
+       }
+
+       if config.ReflectionMode != "reflection" {
+               t.Errorf("ReflectionMode = %q, want %q", config.ReflectionMode, 
"reflection")
+       }
+
+       // Parse the TTL
+       ttl := parseDurationWithDefault(config.DescriptorCacheTTLStr, 
5*time.Minute)
+       if ttl != 10*time.Minute {
+               t.Errorf("parsed TTL = %v, want %v", ttl, 10*time.Minute)
+       }
+
+       if !config.ExtractTripleMetadata {
+               t.Error("ExtractTripleMetadata should be true")
+       }
+}
+
+func TestFilter_Close_NilReflectionManager(t *testing.T) {
+       // Test that Close handles nil reflection manager gracefully
+       filter := &Filter{
+               Config:            &Config{},
+               reflectionManager: nil,
+       }
+
+       // Should not panic
+       filter.Close()
+}
+
+func TestFilter_Close_WithReflectionManager(t *testing.T) {
+       rm := NewReflectionManager(5 * time.Minute)
+       filter := &Filter{
+               Config:            &Config{},
+               reflectionManager: rm,
+       }
+
+       // Should not panic and should close the manager
+       filter.Close()
+}
diff --git a/pkg/filter/network/grpcproxy/filter/proxy/protocol.go 
b/pkg/filter/network/grpcproxy/filter/proxy/protocol.go
new file mode 100644
index 00000000..70e0193c
--- /dev/null
+++ b/pkg/filter/network/grpcproxy/filter/proxy/protocol.go
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package proxy
+
+import (
+       "strings"
+)
+
+// Triple-specific header prefix
+const (
+       TripleHeaderPrefix = "tri-"
+)
+
+// ExtractTripleMetadata extracts Triple-specific metadata from attachments.
+// This is used to extract tri-* headers for routing decisions based on
+// service version, group, etc. Note: This filter is only used by gRPC 
listener,
+// which handles gRPC protocol. Triple protocol should use triple listener.
+// This function is kept for potential cross-protocol scenarios where Triple
+// headers might be present in gRPC requests for routing purposes.
+func ExtractTripleMetadata(attachments map[string]any) map[string]string {
+       meta := make(map[string]string)
+       for k, v := range attachments {
+               key := strings.ToLower(k)
+               if strings.HasPrefix(key, TripleHeaderPrefix) {
+                       if str, ok := v.(string); ok {
+                               meta[key] = str
+                       }
+               }
+       }
+       return meta
+}
+
+// IsTripleHeader checks if a header key is Triple-specific
+func IsTripleHeader(key string) bool {
+       return strings.HasPrefix(strings.ToLower(key), TripleHeaderPrefix)
+}
diff --git a/pkg/filter/network/grpcproxy/filter/proxy/protocol_test.go 
b/pkg/filter/network/grpcproxy/filter/proxy/protocol_test.go
new file mode 100644
index 00000000..6cff9437
--- /dev/null
+++ b/pkg/filter/network/grpcproxy/filter/proxy/protocol_test.go
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package proxy
+
+import (
+       "testing"
+)
+
+func TestExtractTripleMetadata(t *testing.T) {
+       tests := []struct {
+               name        string
+               attachments map[string]any
+               expected    map[string]string
+       }{
+               {
+                       name: "with triple headers",
+                       attachments: map[string]any{
+                               "tri-service-version": "1.0.0",
+                               "tri-service-group":   "production",
+                               "tri-req-id":          "12345",
+                               "other-header":        "value",
+                       },
+                       expected: map[string]string{
+                               "tri-service-version": "1.0.0",
+                               "tri-service-group":   "production",
+                               "tri-req-id":          "12345",
+                       },
+               },
+               {
+                       name: "no triple headers",
+                       attachments: map[string]any{
+                               "content-type":  "application/grpc",
+                               "authorization": "Bearer token",
+                       },
+                       expected: map[string]string{},
+               },
+               {
+                       name:        "nil attachments",
+                       attachments: nil,
+                       expected:    map[string]string{},
+               },
+               {
+                       name:        "empty attachments",
+                       attachments: map[string]any{},
+                       expected:    map[string]string{},
+               },
+               {
+                       name: "mixed case headers",
+                       attachments: map[string]any{
+                               "Tri-Service-Version": "2.0.0",
+                               "TRI-SERVICE-GROUP":   "staging",
+                       },
+                       expected: map[string]string{
+                               "tri-service-version": "2.0.0",
+                               "tri-service-group":   "staging",
+                       },
+               },
+               {
+                       name: "non-string values ignored",
+                       attachments: map[string]any{
+                               "tri-service-version": "1.0.0",
+                               "tri-count":           123, // non-string, 
should be ignored
+                       },
+                       expected: map[string]string{
+                               "tri-service-version": "1.0.0",
+                       },
+               },
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       result := ExtractTripleMetadata(tt.attachments)
+                       if len(result) != len(tt.expected) {
+                               t.Errorf("ExtractTripleMetadata() returned %d 
items, want %d", len(result), len(tt.expected))
+                       }
+                       for k, v := range tt.expected {
+                               if result[k] != v {
+                                       t.Errorf("ExtractTripleMetadata()[%q] = 
%q, want %q", k, result[k], v)
+                               }
+                       }
+               })
+       }
+}
+
+func TestIsTripleHeader(t *testing.T) {
+       tests := []struct {
+               name     string
+               key      string
+               expected bool
+       }{
+               {"lowercase tri prefix", "tri-service-version", true},
+               {"uppercase TRI prefix", "TRI-SERVICE-VERSION", true},
+               {"mixed case", "Tri-Service-Group", true},
+               {"no tri prefix", "content-type", false},
+               {"partial match", "triangle", false},
+               {"empty string", "", false},
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       result := IsTripleHeader(tt.key)
+                       if result != tt.expected {
+                               t.Errorf("IsTripleHeader(%q) = %v, want %v", 
tt.key, result, tt.expected)
+                       }
+               })
+       }
+}
diff --git a/pkg/filter/network/grpcproxy/filter/proxy/reflection_manager.go 
b/pkg/filter/network/grpcproxy/filter/proxy/reflection_manager.go
new file mode 100644
index 00000000..9da1621f
--- /dev/null
+++ b/pkg/filter/network/grpcproxy/filter/proxy/reflection_manager.go
@@ -0,0 +1,709 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package proxy
+
+import (
+       "context"
+       "crypto/md5"
+       "fmt"
+       "io"
+       "sync"
+       "time"
+)
+
+import (
+       "github.com/pkg/errors"
+
+       "golang.org/x/sync/singleflight"
+
+       "google.golang.org/grpc"
+       rpb "google.golang.org/grpc/reflection/grpc_reflection_v1alpha"
+
+       "google.golang.org/protobuf/proto"
+
+       "google.golang.org/protobuf/reflect/protodesc"
+       "google.golang.org/protobuf/reflect/protoreflect"
+       "google.golang.org/protobuf/reflect/protoregistry"
+
+       "google.golang.org/protobuf/types/descriptorpb"
+)
+
+import (
+       "github.com/apache/dubbo-go-pixiu/pkg/logger"
+)
+
+const (
+       defaultDescCacheTTL = 5 * time.Minute
+       reflectionTimeout   = 10 * time.Second
+       // versionHashLength is the length of truncated MD5 hash used for 
version detection.
+       // 16 hex characters (64 bits) provide sufficient uniqueness for cache 
invalidation
+       // while keeping the hash compact for logging and comparison.
+       versionHashLength = 16
+)
+
+// ReflectionVersion specifies which version of the reflection API to use
+type ReflectionVersion string
+
+const (
+       // ReflectionV1Alpha uses the v1alpha reflection API
+       ReflectionV1Alpha ReflectionVersion = "v1alpha"
+       // ReflectionV1 uses the v1 reflection API (not yet implemented, 
reserved)
+       ReflectionV1 ReflectionVersion = "v1"
+       // ReflectionAuto attempts to detect the available version
+       ReflectionAuto ReflectionVersion = "auto"
+)
+
+// ReflectionConfig holds configuration for the reflection manager
+type ReflectionConfig struct {
+       CacheTTL          time.Duration
+       MaxCacheSize      int
+       ReflectionVersion ReflectionVersion
+       ContinueOnError   bool // Continue with partial results on dependency 
resolution failure
+}
+
+// ReflectionCacheStats holds cache statistics for the reflection manager
+type ReflectionCacheStats struct {
+       MethodCacheSize      int     `json:"method_cache_size"`
+       MethodCacheHits      int64   `json:"method_cache_hits"`
+       MethodCacheMisses    int64   `json:"method_cache_misses"`
+       MethodCacheEvictions int64   `json:"method_cache_evictions"`
+       MethodCacheHitRatio  float64 `json:"method_cache_hit_ratio"`
+       FileRegistryCount    int     `json:"file_registry_count"`
+       TTLSeconds           float64 `json:"ttl_secs"`
+       MaxCacheSize         int     `json:"max_cache_size"`
+}
+
+// fileRegistryWithMetadata holds a file registry with metadata for version 
tracking
+type fileRegistryWithMetadata struct {
+       files       *protoregistry.Files
+       versionHash string
+       timestamp   time.Time
+}
+
+// ReflectionManager manages gRPC reflection clients and descriptor caching
+// using official google.golang.org/protobuf libraries
+type ReflectionManager struct {
+       cache    *DescriptorCache
+       cacheTTL time.Duration
+       config   ReflectionConfig
+       // fileDescCache caches file descriptors per address with metadata
+       fileDescCache sync.Map // address -> *fileRegistryWithMetadata
+       // fileRegistryGroup uses singleflight to ensure only one goroutine
+       // performs reflection for each address at a time
+       fileRegistryGroup singleflight.Group
+       // Track missing dependencies per address for monitoring
+       missingDeps sync.Map // address -> []string
+}
+
+// NewReflectionManager creates a new reflection manager with default config
+func NewReflectionManager(cacheTTL time.Duration) *ReflectionManager {
+       return NewReflectionManagerWithConfig(ReflectionConfig{
+               CacheTTL:          cacheTTL,
+               MaxCacheSize:      defaultMaxCacheSize,
+               ReflectionVersion: ReflectionV1Alpha,
+               ContinueOnError:   true,
+       })
+}
+
+// NewReflectionManagerWithConfig creates a new reflection manager with custom 
config
+func NewReflectionManagerWithConfig(config ReflectionConfig) 
*ReflectionManager {
+       if config.CacheTTL <= 0 {
+               config.CacheTTL = defaultDescCacheTTL
+       }
+       if config.MaxCacheSize < minCacheSize {
+               config.MaxCacheSize = minCacheSize
+       }
+       if config.ReflectionVersion == "" {
+               config.ReflectionVersion = ReflectionV1Alpha
+       }
+
+       return &ReflectionManager{
+               cache:    NewDescriptorCacheWithSize(config.CacheTTL, 
config.MaxCacheSize),
+               cacheTTL: config.CacheTTL,
+               config:   config,
+       }
+}
+
+// GetMethodDescriptor retrieves a method descriptor using gRPC reflection
+// Results are cached for improved performance
+func (rm *ReflectionManager) GetMethodDescriptor(
+       ctx context.Context,
+       conn *grpc.ClientConn,
+       address string,
+       serviceName string,
+       methodName string,
+) (protoreflect.MethodDescriptor, error) {
+       // Build cache key
+       cacheKey := BuildCacheKey(address, serviceName, methodName)
+
+       // Check cache first (with version check)
+       if cached := rm.cache.Get(cacheKey); cached != nil {
+               logger.Debugf("Reflection cache hit for %s", cacheKey)
+               return cached, nil
+       }
+
+       logger.Debugf("Reflection cache miss for %s, performing reflection", 
cacheKey)
+
+       // Perform reflection with timeout
+       reflectCtx, cancel := context.WithTimeout(ctx, reflectionTimeout)
+       defer cancel()
+
+       // Get or create file registry for this address
+       files, versionHash, err := rm.getOrCreateFileRegistry(reflectCtx, conn, 
address, serviceName)
+       if err != nil {
+               return nil, err
+       }
+
+       // Find service descriptor
+       serviceDesc, err := 
files.FindDescriptorByName(protoreflect.FullName(serviceName))
+       if err != nil {
+               return nil, errors.Wrapf(err, "failed to find service %s", 
serviceName)
+       }
+
+       svcDesc, ok := serviceDesc.(protoreflect.ServiceDescriptor)
+       if !ok {
+               return nil, fmt.Errorf("%s is not a service", serviceName)
+       }
+
+       // Find method descriptor
+       methodDesc := svcDesc.Methods().ByName(protoreflect.Name(methodName))
+       if methodDesc == nil {
+               return nil, fmt.Errorf("method %s not found in service %s", 
methodName, serviceName)
+       }
+
+       // Cache the result with version hash
+       rm.cache.SetWithVersion(cacheKey, methodDesc, versionHash)
+       logger.Debugf("Cached method descriptor for %s (version: %s)", 
cacheKey, versionHash)
+
+       return methodDesc, nil
+}
+
+// getOrCreateFileRegistry gets or creates a file registry for the given 
address
+// Uses singleflight to ensure only one goroutine performs reflection per 
address
+// Returns the files registry and version hash for change detection
+func (rm *ReflectionManager) getOrCreateFileRegistry(
+       ctx context.Context,
+       conn *grpc.ClientConn,
+       address string,
+       serviceName string,
+) (*protoregistry.Files, string, error) {
+       // Check cache first (fast path, no locking)
+       if cached, ok := rm.fileDescCache.Load(address); ok {
+               reg := cached.(*fileRegistryWithMetadata)
+               return reg.files, reg.versionHash, nil
+       }
+
+       // Use singleflight to deduplicate concurrent reflection requests
+       // for the same address. This prevents thundering herd problem.
+       result, err, shared := rm.fileRegistryGroup.Do(address, func() (any, 
error) {
+               // Double-check cache in case another goroutine just filled it
+               if cached, ok := rm.fileDescCache.Load(address); ok {
+                       reg := cached.(*fileRegistryWithMetadata)
+                       return registryWithVersion{files: reg.files, 
versionHash: reg.versionHash}, nil
+               }
+
+               // Create reflection client based on configured version
+               client := rpb.NewServerReflectionClient(conn)
+               stream, err := client.ServerReflectionInfo(ctx)
+               if err != nil {
+                       return nil, errors.Wrap(err, "failed to create 
reflection stream")
+               }
+               defer stream.CloseSend()
+
+               // Request file descriptor for the service
+               fileDescs, missingDeps, err := 
rm.resolveServiceFileDescriptors(stream, serviceName)
+               if err != nil && !rm.config.ContinueOnError {
+                       return nil, err
+               }
+
+               // Log missing dependencies if any
+               if len(missingDeps) > 0 {
+                       logger.Warnf("Missing dependencies for %s: %v", 
address, missingDeps)
+                       rm.missingDeps.Store(address, missingDeps)
+               }
+
+               // Compute version hash from all file descriptors
+               versionHash := rm.computeVersionHash(fileDescs)
+
+               // Build file registry
+               files, err := rm.buildFileRegistry(fileDescs)
+               if err != nil && !rm.config.ContinueOnError {
+                       return nil, err
+               }
+
+               // Cache the registry with metadata for future requests
+               rm.fileDescCache.Store(address, &fileRegistryWithMetadata{
+                       files:       files,
+                       versionHash: versionHash,
+                       timestamp:   time.Now(),
+               })
+
+               return registryWithVersion{files: files, versionHash: 
versionHash}, nil
+       })
+
+       if err != nil {
+               return nil, "", err
+       }
+
+       if shared {
+               logger.Debugf("Reflection request for %s shared with another 
goroutine", address)
+       }
+
+       rw := result.(registryWithVersion)
+       return rw.files, rw.versionHash, nil
+}
+
+// registryWithVersion is a helper type to return both files and version hash 
from singleflight
+type registryWithVersion struct {
+       files       *protoregistry.Files
+       versionHash string
+}
+
+// resolveServiceFileDescriptors resolves all file descriptors needed for a 
service
+// Returns file descriptors, list of missing dependencies, and error
+func (rm *ReflectionManager) resolveServiceFileDescriptors(
+       stream rpb.ServerReflection_ServerReflectionInfoClient,
+       serviceName string,
+) ([]*descriptorpb.FileDescriptorProto, []string, error) {
+       // Request file containing the service
+       req := &rpb.ServerReflectionRequest{
+               MessageRequest: 
&rpb.ServerReflectionRequest_FileContainingSymbol{
+                       FileContainingSymbol: serviceName,
+               },
+       }
+
+       if err := stream.Send(req); err != nil {
+               return nil, nil, errors.Wrap(err, "failed to send reflection 
request")
+       }
+
+       resp, err := stream.Recv()
+       if err != nil {
+               if err == io.EOF {
+                       return nil, nil, errors.New("unexpected EOF when 
receiving reflection response")
+               }
+               return nil, nil, errors.Wrap(err, "failed to receive reflection 
response")
+       }
+
+       fdResp := resp.GetFileDescriptorResponse()
+       if fdResp == nil {
+               if errResp := resp.GetErrorResponse(); errResp != nil {
+                       return nil, nil, fmt.Errorf("reflection error: %s", 
errResp.ErrorMessage)
+               }
+               return nil, nil, errors.New("unexpected reflection response 
type")
+       }
+
+       // Parse file descriptors
+       fileDescs := make([]*descriptorpb.FileDescriptorProto, 0, 
len(fdResp.FileDescriptorProto))
+       for _, fdBytes := range fdResp.FileDescriptorProto {
+               fd := &descriptorpb.FileDescriptorProto{}
+               if err := proto.Unmarshal(fdBytes, fd); err != nil {
+                       return nil, nil, errors.Wrap(err, "failed to unmarshal 
file descriptor")
+               }
+               fileDescs = append(fileDescs, fd)
+       }
+
+       // Resolve dependencies recursively
+       resolved := make(map[string]bool)
+       missingDeps := make([]string, 0)
+       for _, fd := range fileDescs {
+               resolved[fd.GetName()] = true
+       }
+
+       for _, fd := range fileDescs {
+               deps, missing, err := rm.resolveDependencies(stream, 
fd.GetDependency(), resolved)
+               if err != nil {
+                       if !rm.config.ContinueOnError {
+                               return nil, nil, err
+                       }
+                       logger.Warnf("Failed to resolve dependencies for %s: 
%v", fd.GetName(), err)
+               }
+               missingDeps = append(missingDeps, missing...)
+               fileDescs = append(fileDescs, deps...)
+       }
+
+       return fileDescs, missingDeps, nil
+}
+
+// resolveDependencies resolves file descriptor dependencies
+// Returns resolved descriptors, list of missing dependencies, and error
+func (rm *ReflectionManager) resolveDependencies(
+       stream rpb.ServerReflection_ServerReflectionInfoClient,
+       deps []string,
+       resolved map[string]bool,
+) ([]*descriptorpb.FileDescriptorProto, []string, error) {
+       var result []*descriptorpb.FileDescriptorProto
+       var missingDeps []string
+
+       for _, dep := range deps {
+               if resolved[dep] {
+                       continue
+               }
+
+               req := &rpb.ServerReflectionRequest{
+                       MessageRequest: 
&rpb.ServerReflectionRequest_FileByFilename{
+                               FileByFilename: dep,
+                       },
+               }
+
+               if err := stream.Send(req); err != nil {
+                       if err == io.EOF {
+                               // Stream closed by server, return what we have 
collected so far
+                               logger.Debugf("Stream EOF during send for 
dependency %s", dep)
+                               return result, missingDeps, nil
+                       }
+                       if !rm.config.ContinueOnError {
+                               return nil, nil, errors.Wrapf(err, "failed to 
send request for %s", dep)
+                       }
+                       logger.Warnf("Failed to send request for dependency %s: 
%v", dep, err)
+                       missingDeps = append(missingDeps, dep)
+                       continue
+               }
+
+               resp, err := stream.Recv()
+               if err != nil {
+                       if err == io.EOF {
+                               // Stream closed by server, return what we have 
collected so far
+                               logger.Debugf("Stream EOF during recv for 
dependency %s", dep)
+                               return result, missingDeps, nil
+                       }
+                       if !rm.config.ContinueOnError {
+                               return nil, nil, errors.Wrapf(err, "failed to 
receive response for %s", dep)
+                       }
+                       logger.Warnf("Failed to receive response for dependency 
%s: %v", dep, err)
+                       missingDeps = append(missingDeps, dep)
+                       continue
+               }
+
+               fdResp := resp.GetFileDescriptorResponse()
+               if fdResp == nil {
+                       // No file descriptor in response, could be an error 
response or unexpected type
+                       if errResp := resp.GetErrorResponse(); errResp != nil {
+                               logger.Warnf("Reflection error for dependency 
%s: %s", dep, errResp.ErrorMessage)
+                               // Mark as missing and continue if 
ContinueOnError is enabled
+                               if rm.config.ContinueOnError {
+                                       missingDeps = append(missingDeps, dep)
+                                       continue
+                               }
+                               return nil, nil, fmt.Errorf("reflection error 
for dependency %s: %s", dep, errResp.ErrorMessage)
+                       }
+                       // Unexpected response type, skip this dependency
+                       missingDeps = append(missingDeps, dep)
+                       continue
+               }
+
+               for _, fdBytes := range fdResp.FileDescriptorProto {
+                       fd := &descriptorpb.FileDescriptorProto{}
+                       if err := proto.Unmarshal(fdBytes, fd); err != nil {
+                               if !rm.config.ContinueOnError {
+                                       return nil, nil, errors.Wrap(err, 
"failed to unmarshal dependency")
+                               }
+                               logger.Warnf("Failed to unmarshal dependency 
%s: %v", dep, err)
+                               continue
+                       }
+                       if !resolved[fd.GetName()] {
+                               resolved[fd.GetName()] = true
+                               result = append(result, fd)
+
+                               // Resolve nested dependencies
+                               nested, nestedMissing, err := 
rm.resolveDependencies(stream, fd.GetDependency(), resolved)
+                               if err != nil {
+                                       if !rm.config.ContinueOnError {
+                                               return nil, nil, err
+                                       }
+                                       logger.Warnf("Failed to resolve nested 
dependencies for %s: %v", fd.GetName(), err)
+                               }
+                               missingDeps = append(missingDeps, 
nestedMissing...)
+                               result = append(result, nested...)
+                       }
+               }
+       }
+
+       return result, missingDeps, nil
+}
+
+// buildFileRegistry builds a protoregistry.Files from file descriptor protos
+// Continues on error if ContinueOnError is enabled
+func (rm *ReflectionManager) buildFileRegistry(
+       fileDescs []*descriptorpb.FileDescriptorProto,
+) (*protoregistry.Files, error) {
+       files := new(protoregistry.Files)
+
+       // Build dependency graph and sort topologically
+       sorted, err := rm.topologicalSort(fileDescs)
+       if err != nil {
+               if !rm.config.ContinueOnError {
+                       return nil, err
+               }
+               logger.Warnf("Topological sort failed, using original order: 
%v", err)
+               sorted = fileDescs
+       }
+
+       // Register files in dependency order
+       registeredCount := 0
+       for _, fd := range sorted {
+               fileDesc, err := protodesc.NewFile(fd, files)
+               if err != nil {
+                       // Skip files that fail to register (might be already 
registered or missing deps)
+                       logger.Debugf("Skipping file %s: %v", fd.GetName(), err)
+                       continue
+               }
+               if err := files.RegisterFile(fileDesc); err != nil {
+                       // Skip already registered files
+                       logger.Debugf("Skipping duplicate file %s: %v", 
fd.GetName(), err)
+                       continue
+               }
+               registeredCount++
+       }
+
+       logger.Debugf("Registered %d/%d file descriptors successfully", 
registeredCount, len(sorted))
+
+       if registeredCount == 0 && !rm.config.ContinueOnError {
+               return nil, errors.New("failed to register any file 
descriptors")
+       }
+
+       return files, nil
+}
+
+// topologicalSort sorts file descriptors by dependency order
+func (rm *ReflectionManager) topologicalSort(
+       fileDescs []*descriptorpb.FileDescriptorProto,
+) ([]*descriptorpb.FileDescriptorProto, error) {
+       // Build a map for quick lookup
+       fdMap := make(map[string]*descriptorpb.FileDescriptorProto)
+       for _, fd := range fileDescs {
+               fdMap[fd.GetName()] = fd
+       }
+
+       visited := make(map[string]bool)
+       inStack := make(map[string]bool)
+       var result []*descriptorpb.FileDescriptorProto
+
+       var visit func(name string) error
+       visit = func(name string) error {
+               if inStack[name] {
+                       // Circular dependency detected
+                       logger.Warnf("Circular dependency detected involving: 
%s", name)
+                       // Don't return error, just skip to avoid infinite loop
+                       return nil
+               }
+               if visited[name] {
+                       return nil
+               }
+
+               fd, ok := fdMap[name]
+               if !ok {
+                       // External dependency, skip
+                       return nil
+               }
+
+               inStack[name] = true
+               for _, dep := range fd.GetDependency() {
+                       if err := visit(dep); err != nil {
+                               return err
+                       }
+               }
+               inStack[name] = false
+               visited[name] = true
+               result = append(result, fd)
+               return nil
+       }
+
+       for name := range fdMap {
+               if err := visit(name); err != nil {
+                       return nil, err
+               }
+       }
+
+       return result, nil
+}
+
+// computeVersionHash computes a hash from file descriptors for version 
detection
+func (rm *ReflectionManager) computeVersionHash(fileDescs 
[]*descriptorpb.FileDescriptorProto) string {
+       h := md5.New()
+
+       // Sort file names for consistent hashing
+       names := make([]string, len(fileDescs))
+       for i, fd := range fileDescs {
+               names[i] = fd.GetName()
+       }
+
+       // Write names and sizes to hash
+       for _, name := range names {
+               h.Write([]byte(name))
+               h.Write([]byte{0}) // null terminator
+       }
+
+       return fmt.Sprintf("%x", h.Sum(nil))[:versionHashLength]
+}
+
+// InvalidateCache removes cached descriptors for a specific address
+func (rm *ReflectionManager) InvalidateCache(address string) {
+       rm.fileDescCache.Delete(address)
+       // Forget the singleflight key to allow new reflection requests
+       rm.fileRegistryGroup.Forget(address)
+       // Clear missing dependencies tracking
+       rm.missingDeps.Delete(address)
+       logger.Infof("Cache invalidation for address: %s", address)
+}
+
+// InvalidateByVersion removes all cached entries with a specific version hash
+func (rm *ReflectionManager) InvalidateByVersion(versionHash string) int {
+       count := 0
+       rm.fileDescCache.Range(func(key, value any) bool {
+               reg := value.(*fileRegistryWithMetadata)
+               if reg.versionHash == versionHash {
+                       rm.fileDescCache.Delete(key)
+                       if address, ok := key.(string); ok {
+                               rm.fileRegistryGroup.Forget(address)
+                       }
+                       count++
+               }
+               return true
+       })
+       // Also invalidate method cache with this version
+       count += rm.cache.InvalidateByVersion(versionHash)
+       if count > 0 {
+               logger.Infof("Invalidated %d cache entries with version hash: 
%s", count, versionHash)
+       }
+       return count
+}
+
+// ClearCache clears all cached descriptors
+func (rm *ReflectionManager) ClearCache() {
+       rm.cache.Clear()
+       // Clear file descriptor cache and singleflight cache
+       rm.fileDescCache.Range(func(key, _ any) bool {
+               rm.fileDescCache.Delete(key)
+               // Forget the singleflight key to allow new reflection requests
+               if address, ok := key.(string); ok {
+                       rm.fileRegistryGroup.Forget(address)
+               }
+               return true
+       })
+       // Clear missing dependencies tracking
+       rm.missingDeps.Range(func(key, _ any) bool {
+               rm.missingDeps.Delete(key)
+               return true
+       })
+       logger.Info("Reflection descriptor cache cleared")
+}
+
+// Close cleans up resources
+func (rm *ReflectionManager) Close() {
+       rm.cache.Close()
+}
+
+// WarmupService pre-fetches and caches descriptors for a specific service.
+// This eliminates cold-start latency for the first request to this service.
+// The conn parameter should be a connection to the backend server.
+func (rm *ReflectionManager) WarmupService(ctx context.Context, conn 
*grpc.ClientConn, address, serviceName string) error {
+       _, _, err := rm.getOrCreateFileRegistry(ctx, conn, address, serviceName)
+       if err != nil {
+               return errors.Wrapf(err, "warmup failed for service %s at %s", 
serviceName, address)
+       }
+       logger.Infof("Warmup completed for service %s at %s", serviceName, 
address)
+       return nil
+}
+
+// WarmupServices pre-fetches and caches descriptors for multiple services 
concurrently.
+// Returns a map of service names to errors (nil if successful).
+func (rm *ReflectionManager) WarmupServices(ctx context.Context, conn 
*grpc.ClientConn, address string, serviceNames []string) map[string]error {
+       results := make(map[string]error)
+       var mu sync.Mutex
+       var wg sync.WaitGroup
+
+       for _, svc := range serviceNames {
+               wg.Add(1)
+               go func(serviceName string) {
+                       defer wg.Done()
+                       err := rm.WarmupService(ctx, conn, address, serviceName)
+                       mu.Lock()
+                       results[serviceName] = err
+                       mu.Unlock()
+               }(svc)
+       }
+
+       wg.Wait()
+       return results
+}
+
+// GetCacheStats returns cache statistics for monitoring
+func (rm *ReflectionManager) GetCacheStats() ReflectionCacheStats {
+       fileCount := 0
+       rm.fileDescCache.Range(func(_, _ any) bool {
+               fileCount++
+               return true
+       })
+
+       methodStats := rm.cache.GetStats()
+
+       return ReflectionCacheStats{
+               MethodCacheSize:      methodStats.Size,
+               MethodCacheHits:      methodStats.Hits,
+               MethodCacheMisses:    methodStats.Misses,
+               MethodCacheEvictions: methodStats.Evictions,
+               MethodCacheHitRatio:  methodStats.HitRatio,
+               FileRegistryCount:    fileCount,
+               TTLSeconds:           rm.cacheTTL.Seconds(),
+               MaxCacheSize:         methodStats.MaxSize,
+       }
+}
+
+// GetMissingDependencies returns missing dependencies for a given address
+func (rm *ReflectionManager) GetMissingDependencies(address string) []string {
+       if missing, ok := rm.missingDeps.Load(address); ok {
+               return missing.([]string)
+       }
+       return nil
+}
+
+// GetAllMissingDependencies returns all missing dependencies across all 
addresses
+func (rm *ReflectionManager) GetAllMissingDependencies() map[string][]string {
+       result := make(map[string][]string)
+       rm.missingDeps.Range(func(key, value any) bool {
+               if address, ok := key.(string); ok {
+                       result[address] = value.([]string)
+               }
+               return true
+       })
+       return result
+}
+
+// SetConfig updates the reflection manager configuration
+func (rm *ReflectionManager) SetConfig(config ReflectionConfig) {
+       if config.CacheTTL > 0 {
+               rm.cacheTTL = config.CacheTTL
+               rm.config.CacheTTL = config.CacheTTL
+       }
+       if config.MaxCacheSize >= minCacheSize {
+               rm.cache.SetMaxSize(config.MaxCacheSize)
+               rm.config.MaxCacheSize = config.MaxCacheSize
+       }
+       if config.ReflectionVersion != "" {
+               rm.config.ReflectionVersion = config.ReflectionVersion
+       }
+       rm.config.ContinueOnError = config.ContinueOnError
+
+       logger.Infof("Reflection manager config updated: version=%s, 
continueOnError=%v",
+               rm.config.ReflectionVersion, rm.config.ContinueOnError)
+}
+
+// GetConfig returns the current configuration
+func (rm *ReflectionManager) GetConfig() ReflectionConfig {
+       return rm.config
+}
diff --git 
a/pkg/filter/network/grpcproxy/filter/proxy/reflection_manager_test.go 
b/pkg/filter/network/grpcproxy/filter/proxy/reflection_manager_test.go
new file mode 100644
index 00000000..99bfba36
--- /dev/null
+++ b/pkg/filter/network/grpcproxy/filter/proxy/reflection_manager_test.go
@@ -0,0 +1,463 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package proxy
+
+import (
+       "testing"
+       "time"
+)
+
+import (
+       "google.golang.org/protobuf/proto"
+
+       "google.golang.org/protobuf/types/descriptorpb"
+)
+
+func TestNewReflectionManager(t *testing.T) {
+       tests := []struct {
+               name     string
+               ttl      time.Duration
+               expected time.Duration
+       }{
+               {
+                       name:     "positive TTL",
+                       ttl:      10 * time.Minute,
+                       expected: 10 * time.Minute,
+               },
+               {
+                       name:     "zero TTL uses default",
+                       ttl:      0,
+                       expected: defaultDescCacheTTL,
+               },
+               {
+                       name:     "negative TTL uses default",
+                       ttl:      -1 * time.Minute,
+                       expected: defaultDescCacheTTL,
+               },
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       rm := NewReflectionManager(tt.ttl)
+                       defer rm.Close()
+
+                       if rm == nil {
+                               t.Fatal("NewReflectionManager returned nil")
+                       }
+                       if rm.cache == nil {
+                               t.Error("cache is nil")
+                       }
+                       if rm.cacheTTL != tt.expected {
+                               t.Errorf("cacheTTL = %v, want %v", rm.cacheTTL, 
tt.expected)
+                       }
+               })
+       }
+}
+
+func TestReflectionManager_InvalidateCache(t *testing.T) {
+       rm := NewReflectionManager(5 * time.Minute)
+       defer rm.Close()
+
+       // Store something in the file descriptor cache
+       rm.fileDescCache.Store("test-address:50051", "dummy-value")
+
+       // Verify it's stored
+       if _, ok := rm.fileDescCache.Load("test-address:50051"); !ok {
+               t.Fatal("value not stored in fileDescCache")
+       }
+
+       // Invalidate
+       rm.InvalidateCache("test-address:50051")
+
+       // Verify it's gone
+       if _, ok := rm.fileDescCache.Load("test-address:50051"); ok {
+               t.Error("value still exists after InvalidateCache")
+       }
+}
+
+func TestReflectionManager_ClearCache(t *testing.T) {
+       rm := NewReflectionManager(5 * time.Minute)
+       defer rm.Close()
+
+       // Store multiple entries
+       rm.fileDescCache.Store("addr1:50051", "value1")
+       rm.fileDescCache.Store("addr2:50052", "value2")
+       rm.fileDescCache.Store("addr3:50053", "value3")
+
+       // Also set some method cache entries
+       rm.cache.Set("key1", nil)
+       rm.cache.Set("key2", nil)
+
+       // Clear all
+       rm.ClearCache()
+
+       // Verify file descriptor cache is cleared
+       count := 0
+       rm.fileDescCache.Range(func(_, _ any) bool {
+               count++
+               return true
+       })
+       if count != 0 {
+               t.Errorf("fileDescCache has %d entries after ClearCache, want 
0", count)
+       }
+
+       // Verify method cache is cleared
+       if size := rm.cache.Size(); size != 0 {
+               t.Errorf("method cache has %d entries after ClearCache, want 
0", size)
+       }
+}
+
+func TestReflectionManager_Close(t *testing.T) {
+       rm := NewReflectionManager(5 * time.Minute)
+
+       // Should not panic
+       rm.Close()
+
+       // Close again should also not panic
+       rm.Close()
+}
+
+func TestReflectionManager_GetCacheStats(t *testing.T) {
+       rm := NewReflectionManager(10 * time.Minute)
+       defer rm.Close()
+
+       // Initially empty
+       stats := rm.GetCacheStats()
+
+       if stats.MethodCacheSize != 0 {
+               t.Errorf("MethodCacheSize = %v, want 0", stats.MethodCacheSize)
+       }
+       if stats.FileRegistryCount != 0 {
+               t.Errorf("FileRegistryCount = %v, want 0", 
stats.FileRegistryCount)
+       }
+       if stats.TTLSeconds != 600.0 {
+               t.Errorf("TTLSeconds = %v, want 600", stats.TTLSeconds)
+       }
+
+       // Add some entries
+       rm.fileDescCache.Store("addr1", "value1")
+       rm.fileDescCache.Store("addr2", "value2")
+       rm.cache.Set("method1", nil)
+
+       stats = rm.GetCacheStats()
+       if stats.MethodCacheSize != 1 {
+               t.Errorf("MethodCacheSize = %v, want 1", stats.MethodCacheSize)
+       }
+       if stats.FileRegistryCount != 2 {
+               t.Errorf("FileRegistryCount = %v, want 2", 
stats.FileRegistryCount)
+       }
+}
+
+func TestReflectionManager_TopologicalSort(t *testing.T) {
+       rm := NewReflectionManager(5 * time.Minute)
+       defer rm.Close()
+
+       tests := []struct {
+               name      string
+               fileDescs []*descriptorpb.FileDescriptorProto
+               wantOrder []string // Expected order (dependencies before 
dependents)
+       }{
+               {
+                       name: "single file no deps",
+                       fileDescs: []*descriptorpb.FileDescriptorProto{
+                               {Name: proto.String("a.proto")},
+                       },
+                       wantOrder: []string{"a.proto"},
+               },
+               {
+                       name: "linear dependency chain",
+                       fileDescs: []*descriptorpb.FileDescriptorProto{
+                               {Name: proto.String("c.proto"), Dependency: 
[]string{"b.proto"}},
+                               {Name: proto.String("b.proto"), Dependency: 
[]string{"a.proto"}},
+                               {Name: proto.String("a.proto")},
+                       },
+                       wantOrder: []string{"a.proto", "b.proto", "c.proto"},
+               },
+               {
+                       name: "diamond dependency",
+                       fileDescs: []*descriptorpb.FileDescriptorProto{
+                               {Name: proto.String("d.proto"), Dependency: 
[]string{"b.proto", "c.proto"}},
+                               {Name: proto.String("b.proto"), Dependency: 
[]string{"a.proto"}},
+                               {Name: proto.String("c.proto"), Dependency: 
[]string{"a.proto"}},
+                               {Name: proto.String("a.proto")},
+                       },
+                       // a must come first, b and c can be in any order, d 
must come last
+                       wantOrder: nil, // We'll check constraints instead
+               },
+               {
+                       name: "external dependency (not in list)",
+                       fileDescs: []*descriptorpb.FileDescriptorProto{
+                               {Name: proto.String("b.proto"), Dependency: 
[]string{"external.proto"}},
+                               {Name: proto.String("a.proto")},
+                       },
+                       wantOrder: nil, // Just verify no error
+               },
+               {
+                       name:      "empty list",
+                       fileDescs: []*descriptorpb.FileDescriptorProto{},
+                       wantOrder: []string{},
+               },
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       result, err := rm.topologicalSort(tt.fileDescs)
+                       if err != nil {
+                               t.Fatalf("topologicalSort() error = %v", err)
+                       }
+
+                       if tt.wantOrder != nil {
+                               // Exact order check
+                               if len(result) != len(tt.wantOrder) {
+                                       t.Errorf("got %d files, want %d", 
len(result), len(tt.wantOrder))
+                                       return
+                               }
+                               for i, fd := range result {
+                                       if fd.GetName() != tt.wantOrder[i] {
+                                               t.Errorf("position %d: got %s, 
want %s", i, fd.GetName(), tt.wantOrder[i])
+                                       }
+                               }
+                       } else {
+                               // Just verify length matches and no error
+                               if len(result) != len(tt.fileDescs) {
+                                       t.Errorf("got %d files, want %d", 
len(result), len(tt.fileDescs))
+                               }
+                       }
+               })
+       }
+}
+
+func TestReflectionManager_TopologicalSort_DependencyOrder(t *testing.T) {
+       rm := NewReflectionManager(5 * time.Minute)
+       defer rm.Close()
+
+       // Create a complex dependency graph
+       fileDescs := []*descriptorpb.FileDescriptorProto{
+               {Name: proto.String("leaf1.proto"), Dependency: 
[]string{"mid.proto"}},
+               {Name: proto.String("leaf2.proto"), Dependency: 
[]string{"mid.proto"}},
+               {Name: proto.String("mid.proto"), Dependency: 
[]string{"root.proto"}},
+               {Name: proto.String("root.proto")},
+       }
+
+       result, err := rm.topologicalSort(fileDescs)
+       if err != nil {
+               t.Fatalf("topologicalSort() error = %v", err)
+       }
+
+       // Build position map
+       position := make(map[string]int)
+       for i, fd := range result {
+               position[fd.GetName()] = i
+       }
+
+       // Verify dependency constraints
+       // root should come before mid
+       if position["root.proto"] >= position["mid.proto"] {
+               t.Error("root.proto should come before mid.proto")
+       }
+       // mid should come before leaf1 and leaf2
+       if position["mid.proto"] >= position["leaf1.proto"] {
+               t.Error("mid.proto should come before leaf1.proto")
+       }
+       if position["mid.proto"] >= position["leaf2.proto"] {
+               t.Error("mid.proto should come before leaf2.proto")
+       }
+}
+
+func TestReflectionManager_BuildFileRegistry(t *testing.T) {
+       rm := NewReflectionManager(5 * time.Minute)
+       defer rm.Close()
+
+       // Create a simple proto file descriptor
+       fileDescs := []*descriptorpb.FileDescriptorProto{
+               {
+                       Name:    proto.String("test.proto"),
+                       Package: proto.String("test"),
+                       MessageType: []*descriptorpb.DescriptorProto{
+                               {
+                                       Name: proto.String("TestMessage"),
+                                       Field: 
[]*descriptorpb.FieldDescriptorProto{
+                                               {
+                                                       Name:   
proto.String("value"),
+                                                       Number: proto.Int32(1),
+                                                       Type:   
descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum(),
+                                                       Label:  
descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum(),
+                                               },
+                                       },
+                               },
+                       },
+               },
+       }
+
+       registry, err := rm.buildFileRegistry(fileDescs)
+       if err != nil {
+               t.Fatalf("buildFileRegistry() error = %v", err)
+       }
+
+       if registry == nil {
+               t.Fatal("buildFileRegistry() returned nil registry")
+       }
+
+       // Verify the file was registered
+       fd, err := registry.FindFileByPath("test.proto")
+       if err != nil {
+               t.Fatalf("FindFileByPath() error = %v", err)
+       }
+
+       if fd.Package().Name() != "test" {
+               t.Errorf("package = %q, want %q", fd.Package().Name(), "test")
+       }
+}
+
+func TestReflectionManager_BuildFileRegistry_WithDependencies(t *testing.T) {
+       rm := NewReflectionManager(5 * time.Minute)
+       defer rm.Close()
+
+       // Create proto files with dependency
+       fileDescs := []*descriptorpb.FileDescriptorProto{
+               {
+                       Name:    proto.String("base.proto"),
+                       Package: proto.String("base"),
+                       MessageType: []*descriptorpb.DescriptorProto{
+                               {
+                                       Name: proto.String("BaseMessage"),
+                                       Field: 
[]*descriptorpb.FieldDescriptorProto{
+                                               {
+                                                       Name:   
proto.String("id"),
+                                                       Number: proto.Int32(1),
+                                                       Type:   
descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum(),
+                                                       Label:  
descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum(),
+                                               },
+                                       },
+                               },
+                       },
+               },
+               {
+                       Name:       proto.String("derived.proto"),
+                       Package:    proto.String("derived"),
+                       Dependency: []string{"base.proto"},
+                       MessageType: []*descriptorpb.DescriptorProto{
+                               {
+                                       Name: proto.String("DerivedMessage"),
+                                       Field: 
[]*descriptorpb.FieldDescriptorProto{
+                                               {
+                                                       Name:     
proto.String("base"),
+                                                       Number:   
proto.Int32(1),
+                                                       Type:     
descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum(),
+                                                       TypeName: 
proto.String(".base.BaseMessage"),
+                                                       Label:    
descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum(),
+                                               },
+                                       },
+                               },
+                       },
+               },
+       }
+
+       registry, err := rm.buildFileRegistry(fileDescs)
+       if err != nil {
+               t.Fatalf("buildFileRegistry() error = %v", err)
+       }
+
+       // Verify both files were registered
+       _, err = registry.FindFileByPath("base.proto")
+       if err != nil {
+               t.Errorf("FindFileByPath(base.proto) error = %v", err)
+       }
+
+       _, err = registry.FindFileByPath("derived.proto")
+       if err != nil {
+               t.Errorf("FindFileByPath(derived.proto) error = %v", err)
+       }
+}
+
+func TestReflectionManager_BuildFileRegistry_Empty(t *testing.T) {
+       rm := NewReflectionManager(5 * time.Minute)
+       defer rm.Close()
+
+       registry, err := 
rm.buildFileRegistry([]*descriptorpb.FileDescriptorProto{})
+       if err != nil {
+               t.Fatalf("buildFileRegistry() error = %v", err)
+       }
+
+       if registry == nil {
+               t.Error("buildFileRegistry() returned nil for empty input")
+       }
+}
+
+func TestReflectionManager_BuildFileRegistry_WithService(t *testing.T) {
+       rm := NewReflectionManager(5 * time.Minute)
+       defer rm.Close()
+
+       fileDescs := []*descriptorpb.FileDescriptorProto{
+               {
+                       Name:    proto.String("service.proto"),
+                       Package: proto.String("myservice"),
+                       MessageType: []*descriptorpb.DescriptorProto{
+                               {
+                                       Name: proto.String("Request"),
+                                       Field: 
[]*descriptorpb.FieldDescriptorProto{
+                                               {
+                                                       Name:   
proto.String("query"),
+                                                       Number: proto.Int32(1),
+                                                       Type:   
descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum(),
+                                                       Label:  
descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum(),
+                                               },
+                                       },
+                               },
+                               {
+                                       Name: proto.String("Response"),
+                                       Field: 
[]*descriptorpb.FieldDescriptorProto{
+                                               {
+                                                       Name:   
proto.String("result"),
+                                                       Number: proto.Int32(1),
+                                                       Type:   
descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum(),
+                                                       Label:  
descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum(),
+                                               },
+                                       },
+                               },
+                       },
+                       Service: []*descriptorpb.ServiceDescriptorProto{
+                               {
+                                       Name: proto.String("MyService"),
+                                       Method: 
[]*descriptorpb.MethodDescriptorProto{
+                                               {
+                                                       Name:       
proto.String("Search"),
+                                                       InputType:  
proto.String(".myservice.Request"),
+                                                       OutputType: 
proto.String(".myservice.Response"),
+                                               },
+                                       },
+                               },
+                       },
+               },
+       }
+
+       registry, err := rm.buildFileRegistry(fileDescs)
+       if err != nil {
+               t.Fatalf("buildFileRegistry() error = %v", err)
+       }
+
+       // Find service descriptor
+       desc, err := registry.FindDescriptorByName("myservice.MyService")
+       if err != nil {
+               t.Fatalf("FindDescriptorByName() error = %v", err)
+       }
+
+       if desc.Name() != "MyService" {
+               t.Errorf("service name = %q, want %q", desc.Name(), "MyService")
+       }
+}


Reply via email to