This is an automated email from the ASF dual-hosted git repository.
xuetaoli pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/dubbo-go-pixiu-samples.git
The following commit(s) were added to refs/heads/main by this push:
new 3b627fe feat(grpc): add gRPC Server Reflection demo for issue #821
(#116)
3b627fe is described below
commit 3b627fe3c71df89335a9a4ba5e5f2b570942e89e
Author: Tsukikage <[email protected]>
AuthorDate: Tue Jan 6 17:12:22 2026 +0800
feat(grpc): add gRPC Server Reflection demo for issue #821 (#116)
* feat(grpc): add gRPC Server Reflection demo for issue #821
Add a complete demo showcasing gRPC Server Reflection support with:
- EchoService with all 4 RPC types (unary, server/client/bidirectional
streaming)
- Server with reflection.Register() enabled
- Three Pixiu config examples (passthrough, reflection, hybrid modes)
- Client and integration tests
- Bilingual documentation (EN/CN)
Ref: https://github.com/apache/dubbo-go-pixiu/issues/821
Ref: https://github.com/apache/dubbo-go-pixiu/pull/849
* fix(grpc): replace deprecated grpc.Dial with grpc.NewClient
grpc.Dial is deprecated since gRPC-Go 1.x, use NewClient instead
* docs: update README and add gRPC reflection demo to integration test
- Add gRPC reflection demo description to README.md and README_CN.md
- Add grpc/reflection to start_integrate_test.sh for CI integration
- Unify pixiu config ports to 8881 for reflection and hybrid modes
- Update go.mod to use pkg/errors as direct dependency
---
README.md | 5 +
README_CN.md | 5 +
go.mod | 2 +-
grpc/reflection/README.md | 145 +++++++++++++++
grpc/reflection/README_CN.md | 145 +++++++++++++++
grpc/reflection/client/client.go | 216 ++++++++++++++++++++++
grpc/reflection/pixiu/conf-hybrid.yaml | 75 ++++++++
grpc/reflection/pixiu/conf-passthrough.yaml | 67 +++++++
grpc/reflection/pixiu/conf-reflection.yaml | 72 ++++++++
grpc/reflection/pixiu/conf.yaml | 75 ++++++++
grpc/reflection/proto/echo.pb.go | 274 ++++++++++++++++++++++++++++
grpc/reflection/proto/echo.proto | 68 +++++++
grpc/reflection/proto/echo_grpc.pb.go | 256 ++++++++++++++++++++++++++
grpc/reflection/server/server.go | 182 ++++++++++++++++++
grpc/reflection/test/reflection_test.go | 211 +++++++++++++++++++++
start_integrate_test.sh | 1 +
16 files changed, 1798 insertions(+), 1 deletion(-)
diff --git a/README.md b/README.md
index 39cce3c..57afe11 100644
--- a/README.md
+++ b/README.md
@@ -39,6 +39,11 @@ This project includes multiple samples covering conversions
such as HTTP to Dubb
* **grpc/simple**: Demonstrates how to use Pixiu as a gateway for standard
gRPC services, supporting unary calls, client streaming, server streaming, and
bidirectional streaming.
+* **grpc/reflection**: Demonstrates gRPC Server Reflection with Pixiu,
supporting three reflection modes:
+ * `passthrough` - High-performance binary pass-through mode
+ * `reflection` - Full dynamic message decoding using server reflection
+ * `hybrid` - Reflection with passthrough fallback for optimal flexibility
+
* **http/grpc**: Converts HTTP requests to gRPC requests, supporting
configuration via proto files or dynamic retrieval from a gRPC server with
reflection enabled.
* **http/simple**: Common HTTP proxy examples demonstrating typical API
gateway functionality.
diff --git a/README_CN.md b/README_CN.md
index 2755f12..865f9ba 100644
--- a/README_CN.md
+++ b/README_CN.md
@@ -37,6 +37,11 @@
- grpc/simple: 演示了如何使用 Pixiu 作为标准 gRPC 服务的网关,支持一元调用、客户端流、服务端流和双向流通信。
+- grpc/reflection: 演示了 gRPC 服务端反射与 Pixiu 的集成,支持三种反射模式:
+ - `passthrough` - 高性能二进制透传模式
+ - `reflection` - 使用服务端反射进行完整的动态消息解码
+ - `hybrid` - 反射与透传回退相结合,提供最佳灵活性
+
- http/grpc:将http请求转换为 grpc 请求,支持配置 proto 文件或动态从开启反射功能的 grpc server中获取 proto 信息
- http/simple:此目录包含常见的 Http 请求代理功能,作为常见的 API 网关
diff --git a/go.mod b/go.mod
index 0ca6cad..3224979 100644
--- a/go.mod
+++ b/go.mod
@@ -14,6 +14,7 @@ require (
github.com/gorilla/mux v1.8.0
github.com/joho/godotenv v1.5.1
github.com/openai/openai-go v1.12.0
+ github.com/pkg/errors v0.9.1
github.com/spf13/cobra v1.6.0
github.com/stretchr/testify v1.11.1
github.com/uber/jaeger-client-go v2.29.1+incompatible
@@ -111,7 +112,6 @@ require (
github.com/pelletier/go-toml v1.9.5 // indirect
github.com/pelletier/go-toml/v2 v2.0.8 // indirect
github.com/pierrec/lz4 v2.6.1+incompatible // indirect
- github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/polarismesh/polaris-go v1.3.0 // indirect
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c //
indirect
diff --git a/grpc/reflection/README.md b/grpc/reflection/README.md
new file mode 100644
index 0000000..8c51fbf
--- /dev/null
+++ b/grpc/reflection/README.md
@@ -0,0 +1,145 @@
+# gRPC Server Reflection Example
+
+[中文](./README_CN.md)
+
+This example demonstrates how to use Apache Dubbo-go-pixiu as a gateway with
**gRPC Server Reflection** support. This feature enables dynamic message
parsing and inspection at the gateway level without requiring pre-compiled
proto files.
+
+> **Related Issue**: [#821 - Add gRPC Server Reflection
support](https://github.com/apache/dubbo-go-pixiu/issues/821)
+
+## Overview
+
+The gRPC Server Reflection feature (implemented in [PR
#849](https://github.com/apache/dubbo-go-pixiu/pull/849) to resolve [Issue
#821](https://github.com/apache/dubbo-go-pixiu/issues/821)) provides three
reflection modes:
+
+| Mode | Description | Use Case |
+|------|-------------|----------|
+| **passthrough** | Binary pass-through without decoding (default) |
High-performance scenarios where message inspection is not needed |
+| **reflection** | Full dynamic message decoding via gRPC reflection |
Content-based routing, field filtering, message transformation |
+| **hybrid** | Reflection with passthrough fallback | Mixed environments with
varying reflection support |
+
+## Project Structure
+
+```
+grpc/reflection/
+├── proto/
+│ ├── echo.proto # Service definition
+│ ├── echo.pb.go # Generated protobuf code
+│ └── echo_grpc.pb.go # Generated gRPC code
+├── server/
+│ └── server.go # gRPC server with reflection enabled
+├── client/
+│ └── client.go # Test client
+├── pixiu/
+│ ├── conf.yaml # Default configuration (reflection mode)
+│ ├── conf-passthrough.yaml # Passthrough mode configuration
+│ ├── conf-reflection.yaml # Reflection mode configuration
+│ └── conf-hybrid.yaml # Hybrid mode configuration
+├── test/
+│ └── reflection_test.go # Integration tests
+├── README.md
+└── README_CN.md
+```
+
+## Prerequisites
+
+- Go 1.21+
+- Protocol Buffers compiler (protoc)
+- Apache Dubbo-go-pixiu with gRPC Server Reflection support ([PR
#849](https://github.com/apache/dubbo-go-pixiu/pull/849))
+
+## How to Run
+
+You will need three separate terminal windows.
+
+### 1. Start the gRPC Server
+
+In the first terminal, navigate to the project root and start the gRPC server
with reflection enabled:
+
+```sh
+# Terminal 1
+cd dubbo-go-pixiu-samples
+go run grpc/reflection/server/server.go
+```
+
+The server will start on `localhost:50051` with gRPC Server Reflection enabled.
+
+### 2. Start the Pixiu Gateway
+
+In the second terminal, start the Pixiu gateway with your preferred reflection
mode:
+
+```sh
+# Terminal 2: Using reflection mode (default)
+cd dubbo-go-pixiu-samples
+go run pixiu/*.go gateway start -c grpc/reflection/pixiu/conf.yaml
+
+# Alternative: Use passthrough mode
+# go run pixiu/*.go gateway start -c
grpc/reflection/pixiu/conf-passthrough.yaml
+
+# Alternative: Use hybrid mode
+# go run pixiu/*.go gateway start -c grpc/reflection/pixiu/conf-hybrid.yaml
+```
+
+Pixiu will listen for gRPC requests on `localhost:8881`.
+
+### 3. Run the Client
+
+In the third terminal, run the test client:
+
+```sh
+# Terminal 3
+cd dubbo-go-pixiu-samples
+go run grpc/reflection/client/client.go
+```
+
+The client will send requests to Pixiu, which proxies them to the gRPC server.
You should see output demonstrating all three RPC types (unary, server
streaming, and bidirectional streaming).
+
+## Running Tests
+
+Ensure the gRPC server and Pixiu gateway are running, then execute:
+
+```sh
+go test -v ./grpc/reflection/test/
+```
+
+## Configuration Details
+
+### Reflection Mode Configuration
+
+```yaml
+grpc_filters:
+ - name: dgp.filter.grpc.proxy
+ config:
+ # Options: "passthrough" | "reflection" | "hybrid"
+ reflection_mode: "reflection"
+
+ # Cache TTL for method descriptors (seconds)
+ descriptor_cache_ttl: 300
+
+ # Enable Triple protocol detection
+ enable_protocol_detection: true
+
+ # Timeout for reflection operations (hybrid mode)
+ reflection_timeout: 5s
+```
+
+### Key Features
+
+1. **Dynamic Message Decoding**: Parse and inspect gRPC messages at runtime
without proto files
+2. **TTL-based Caching**: Efficient caching of method descriptors with
automatic cleanup
+3. **Protocol Detection**: Support for both gRPC and Triple protocol
compatibility
+4. **Fallback Mechanism**: Hybrid mode provides graceful degradation
+
+## Enabling Server Reflection
+
+To enable gRPC Server Reflection on your server, add this line after
registering your service:
+
+```go
+import "google.golang.org/grpc/reflection"
+
+// After registering your service
+reflection.Register(grpcServer)
+```
+
+## Related Resources
+
+- [Issue #821: Add gRPC Server Reflection
support](https://github.com/apache/dubbo-go-pixiu/issues/821) - Original
feature request
+- [PR #849: Implementation](https://github.com/apache/dubbo-go-pixiu/pull/849)
- Pull request implementing this feature
+- [gRPC Server Reflection
Protocol](https://github.com/grpc/grpc/blob/master/doc/server-reflection.md) -
Official gRPC reflection specification
diff --git a/grpc/reflection/README_CN.md b/grpc/reflection/README_CN.md
new file mode 100644
index 0000000..08d26d5
--- /dev/null
+++ b/grpc/reflection/README_CN.md
@@ -0,0 +1,145 @@
+# gRPC Server Reflection 示例
+
+[English](./README.md)
+
+本示例演示如何使用 Apache Dubbo-go-pixiu 作为支持 **gRPC Server Reflection**
的网关。该功能支持在网关层进行动态消息解析和检查,无需预编译的 proto 文件。
+
+> **相关 Issue**: [#821 - 添加 gRPC Server Reflection
支持](https://github.com/apache/dubbo-go-pixiu/issues/821)
+
+## 概述
+
+gRPC Server Reflection 功能(通过 [PR
#849](https://github.com/apache/dubbo-go-pixiu/pull/849) 实现,解决 [Issue
#821](https://github.com/apache/dubbo-go-pixiu/issues/821))提供三种反射模式:
+
+| 模式 | 描述 | 使用场景 |
+|------|------|----------|
+| **passthrough** | 二进制直通,不进行解码(默认) | 不需要消息检查的高性能场景 |
+| **reflection** | 通过 gRPC 反射进行完整的动态消息解码 | 基于内容的路由、字段过滤、消息转换 |
+| **hybrid** | 反射模式,失败时回退到直通模式 | 反射支持程度不一的混合环境 |
+
+## 项目结构
+
+```
+grpc/reflection/
+├── proto/
+│ ├── echo.proto # 服务定义
+│ ├── echo.pb.go # 生成的 protobuf 代码
+│ └── echo_grpc.pb.go # 生成的 gRPC 代码
+├── server/
+│ └── server.go # 启用反射的 gRPC 服务器
+├── client/
+│ └── client.go # 测试客户端
+├── pixiu/
+│ ├── conf.yaml # 默认配置(reflection 模式)
+│ ├── conf-passthrough.yaml # passthrough 模式配置
+│ ├── conf-reflection.yaml # reflection 模式配置
+│ └── conf-hybrid.yaml # hybrid 模式配置
+├── test/
+│ └── reflection_test.go # 集成测试
+├── README.md
+└── README_CN.md
+```
+
+## 前置条件
+
+- Go 1.21+
+- Protocol Buffers 编译器 (protoc)
+- 支持 gRPC Server Reflection 的 Apache Dubbo-go-pixiu([PR
#849](https://github.com/apache/dubbo-go-pixiu/pull/849))
+
+## 运行方法
+
+需要三个独立的终端窗口。
+
+### 1. 启动 gRPC 服务器
+
+在第一个终端中,导航到项目根目录并启动启用了反射的 gRPC 服务器:
+
+```sh
+# 终端 1
+cd dubbo-go-pixiu-samples
+go run grpc/reflection/server/server.go
+```
+
+服务器将在 `localhost:50051` 上启动,并启用 gRPC Server Reflection。
+
+### 2. 启动 Pixiu 网关
+
+在第二个终端中,使用您选择的反射模式启动 Pixiu 网关:
+
+```sh
+# 终端 2:使用 reflection 模式(默认)
+cd dubbo-go-pixiu-samples
+go run pixiu/*.go gateway start -c grpc/reflection/pixiu/conf.yaml
+
+# 可选:使用 passthrough 模式
+# go run pixiu/*.go gateway start -c
grpc/reflection/pixiu/conf-passthrough.yaml
+
+# 可选:使用 hybrid 模式
+# go run pixiu/*.go gateway start -c grpc/reflection/pixiu/conf-hybrid.yaml
+```
+
+Pixiu 将在 `localhost:8881` 上监听 gRPC 请求。
+
+### 3. 运行客户端
+
+在第三个终端中,运行测试客户端:
+
+```sh
+# 终端 3
+cd dubbo-go-pixiu-samples
+go run grpc/reflection/client/client.go
+```
+
+客户端将向 Pixiu 发送请求,Pixiu 将请求代理到 gRPC 服务器。您将看到演示所有三种 RPC 类型(一元调用、服务端流式、双向流式)的输出。
+
+## 运行测试
+
+确保 gRPC 服务器和 Pixiu 网关正在运行,然后执行:
+
+```sh
+go test -v ./grpc/reflection/test/
+```
+
+## 配置详情
+
+### 反射模式配置
+
+```yaml
+grpc_filters:
+ - name: dgp.filter.grpc.proxy
+ config:
+ # 选项: "passthrough" | "reflection" | "hybrid"
+ reflection_mode: "reflection"
+
+ # 方法描述符缓存 TTL(秒)
+ descriptor_cache_ttl: 300
+
+ # 启用 Triple 协议检测
+ enable_protocol_detection: true
+
+ # 反射操作超时时间(hybrid 模式)
+ reflection_timeout: 5s
+```
+
+### 核心特性
+
+1. **动态消息解码**:运行时解析和检查 gRPC 消息,无需 proto 文件
+2. **基于 TTL 的缓存**:高效缓存方法描述符,支持自动清理
+3. **协议检测**:支持 gRPC 和 Triple 协议兼容性
+4. **回退机制**:hybrid 模式提供优雅降级
+
+## 启用服务端反射
+
+要在您的服务器上启用 gRPC Server Reflection,在注册服务后添加以下代码:
+
+```go
+import "google.golang.org/grpc/reflection"
+
+// 注册服务后
+reflection.Register(grpcServer)
+```
+
+## 相关资源
+
+- [Issue #821: 添加 gRPC Server Reflection
支持](https://github.com/apache/dubbo-go-pixiu/issues/821) - 原始功能请求
+- [PR #849: 功能实现](https://github.com/apache/dubbo-go-pixiu/pull/849) - 实现此功能的
Pull Request
+- [gRPC Server Reflection
协议](https://github.com/grpc/grpc/blob/master/doc/server-reflection.md) - 官方
gRPC 反射规范
diff --git a/grpc/reflection/client/client.go b/grpc/reflection/client/client.go
new file mode 100644
index 0000000..2cea8fb
--- /dev/null
+++ b/grpc/reflection/client/client.go
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Package main implements a gRPC client to test the EchoService through Pixiu.
+package main
+
+import (
+ "context"
+ "flag"
+ "fmt"
+ "io"
+ "log"
+ "time"
+)
+
+import (
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/credentials/insecure"
+)
+
+import (
+ pb "github.com/dubbo-go-pixiu/samples/grpc/reflection/proto"
+)
+
+var (
+ serverAddr = flag.String("addr", "localhost:8881", "The Pixiu gateway
address")
+)
+
+func main() {
+ flag.Parse()
+
+ // Connect to Pixiu gateway (or directly to gRPC server)
+ conn, err := grpc.NewClient(*serverAddr,
grpc.WithTransportCredentials(insecure.NewCredentials()))
+ if err != nil {
+ log.Fatalf("failed to connect: %v", err)
+ }
+ defer conn.Close()
+
+ client := pb.NewEchoServiceClient(conn)
+
+ // Test all RPC types
+ testUnaryEcho(client)
+ testStreamEcho(client)
+ testClientStreamEcho(client)
+ testBidirectionalEcho(client)
+
+ log.Println("All tests completed successfully!")
+}
+
+// testUnaryEcho tests the simple unary RPC.
+func testUnaryEcho(client pb.EchoServiceClient) {
+ log.Println("=== Testing Unary Echo ===")
+
+ ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+ defer cancel()
+
+ req := &pb.EchoRequest{
+ Message: "Hello, gRPC Server Reflection!",
+ Timestamp: time.Now().UnixNano(),
+ Metadata: map[string]string{
+ "client": "demo-client",
+ "version": "1.0.0",
+ },
+ }
+
+ resp, err := client.Echo(ctx, req)
+ if err != nil {
+ log.Fatalf("Echo failed: %v", err)
+ }
+
+ log.Printf("Response message: %s", resp.Message)
+ log.Printf("Server ID: %s", resp.ServerId)
+ log.Printf("Reflection enabled: %v", resp.ReflectionEnabled)
+ log.Printf("Server timestamp: %d", resp.ServerTimestamp)
+ log.Println()
+}
+
+// testStreamEcho tests server-side streaming RPC.
+func testStreamEcho(client pb.EchoServiceClient) {
+ log.Println("=== Testing Stream Echo (Server Streaming) ===")
+
+ ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
+ defer cancel()
+
+ req := &pb.EchoRequest{
+ Message: "Stream message",
+ Timestamp: time.Now().UnixNano(),
+ Metadata: map[string]string{
+ "stream_type": "server",
+ },
+ }
+
+ stream, err := client.StreamEcho(ctx, req)
+ if err != nil {
+ log.Fatalf("StreamEcho failed: %v", err)
+ }
+
+ count := 0
+ for {
+ resp, err := stream.Recv()
+ if err == io.EOF {
+ break
+ }
+ if err != nil {
+ log.Fatalf("StreamEcho recv failed: %v", err)
+ }
+ count++
+ log.Printf("Received [%d]: %s (server: %s)", count,
resp.Message, resp.ServerId)
+ }
+ log.Printf("Stream completed, received %d messages", count)
+ log.Println()
+}
+
+// testClientStreamEcho tests client-side streaming RPC.
+func testClientStreamEcho(client pb.EchoServiceClient) {
+ log.Println("=== Testing Client Stream Echo (Client Streaming) ===")
+
+ ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
+ defer cancel()
+
+ stream, err := client.ClientStreamEcho(ctx)
+ if err != nil {
+ log.Fatalf("ClientStreamEcho failed: %v", err)
+ }
+
+ // Send multiple messages
+ messages := []string{"First", "Second", "Third", "Fourth", "Fifth"}
+ for _, msg := range messages {
+ req := &pb.EchoRequest{
+ Message: msg,
+ Timestamp: time.Now().UnixNano(),
+ Metadata: map[string]string{
+ "stream_type": "client",
+ },
+ }
+ if err := stream.Send(req); err != nil {
+ log.Fatalf("ClientStreamEcho send failed: %v", err)
+ }
+ log.Printf("Sent: %s", msg)
+ }
+
+ // Close and receive response
+ resp, err := stream.CloseAndRecv()
+ if err != nil {
+ log.Fatalf("ClientStreamEcho close failed: %v", err)
+ }
+ log.Printf("Response: %s (server: %s)", resp.Message, resp.ServerId)
+ log.Println()
+}
+
+// testBidirectionalEcho tests bidirectional streaming RPC.
+func testBidirectionalEcho(client pb.EchoServiceClient) {
+ log.Println("=== Testing Bidirectional Echo ===")
+
+ ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
+ defer cancel()
+
+ stream, err := client.BidirectionalEcho(ctx)
+ if err != nil {
+ log.Fatalf("BidirectionalEcho failed: %v", err)
+ }
+
+ // Send and receive in goroutines
+ messages := []string{
+ "First bidirectional message",
+ "Second bidirectional message",
+ "Third bidirectional message",
+ }
+
+ // Send messages
+ go func() {
+ for i, msg := range messages {
+ req := &pb.EchoRequest{
+ Message: msg,
+ Timestamp: time.Now().UnixNano(),
+ Metadata: map[string]string{
+ "index": fmt.Sprintf("%d", i),
+ },
+ }
+ if err := stream.Send(req); err != nil {
+ log.Fatalf("BidirectionalEcho send failed: %v",
err)
+ }
+ log.Printf("Sent: %s", msg)
+ time.Sleep(100 * time.Millisecond)
+ }
+ stream.CloseSend()
+ }()
+
+ // Receive responses
+ for {
+ resp, err := stream.Recv()
+ if err == io.EOF {
+ break
+ }
+ if err != nil {
+ log.Fatalf("BidirectionalEcho recv failed: %v", err)
+ }
+ log.Printf("Received: %s (server: %s)", resp.Message,
resp.ServerId)
+ }
+ log.Println("Bidirectional stream completed")
+ log.Println()
+}
diff --git a/grpc/reflection/pixiu/conf-hybrid.yaml
b/grpc/reflection/pixiu/conf-hybrid.yaml
new file mode 100644
index 0000000..291b66a
--- /dev/null
+++ b/grpc/reflection/pixiu/conf-hybrid.yaml
@@ -0,0 +1,75 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# ============================================================
+# HYBRID MODE Configuration
+# ============================================================
+# This mode combines passthrough and reflection approaches.
+# It attempts to use reflection for message decoding but falls
+# back to passthrough if reflection fails or is unavailable.
+# Best for mixed environments with varying reflection support.
+# ============================================================
+---
+static_resources:
+ listeners:
+ - name: "grpc-reflection-hybrid"
+ protocol_type: "GRPC"
+ address:
+ socket_address:
+ address: "0.0.0.0"
+ port: 8881
+ filter_chains:
+ filters:
+ - name: dgp.filter.network.grpcconnectionmanager
+ config:
+ route_config:
+ routes:
+ - match:
+ prefix: "/echo.EchoService/"
+ route:
+ cluster: "echo-grpc"
+ cluster_not_found_response_code: 404
+ grpc_filters:
+ - name: dgp.filter.grpc.proxy
+ config:
+ # HYBRID MODE: Reflection with passthrough fallback
+ # Attempts dynamic decoding, falls back to binary
pass-through
+ # if reflection is unavailable or fails
+ reflection_mode: "hybrid"
+ # TTL for descriptor cache (in seconds)
+ descriptor_cache_ttl: 300
+ # Enable protocol detection for Triple compatibility
+ enable_protocol_detection: true
+ # Timeout for reflection operations before fallback
+ reflection_timeout: 5s
+ config:
+ idle_timeout: 5s
+ read_timeout: 5s
+ write_timeout: 5s
+ clusters:
+ - name: "echo-grpc"
+ lb_policy: "RoundRobin"
+ endpoints:
+ - socket_address:
+ address: 127.0.0.1
+ port: 50051
+ protocol_type: "GRPC"
+ shutdown_config:
+ timeout: "60s"
+ step_timeout: "10s"
+ reject_policy: "immediacy"
diff --git a/grpc/reflection/pixiu/conf-passthrough.yaml
b/grpc/reflection/pixiu/conf-passthrough.yaml
new file mode 100644
index 0000000..e9ef5fa
--- /dev/null
+++ b/grpc/reflection/pixiu/conf-passthrough.yaml
@@ -0,0 +1,67 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# ============================================================
+# PASSTHROUGH MODE Configuration
+# ============================================================
+# This is the default high-performance mode where messages are
+# passed through as binary without decoding. Best for scenarios
+# where message inspection is not required.
+# ============================================================
+---
+static_resources:
+ listeners:
+ - name: "grpc-reflection-passthrough"
+ protocol_type: "GRPC"
+ address:
+ socket_address:
+ address: "0.0.0.0"
+ port: 8881
+ filter_chains:
+ filters:
+ - name: dgp.filter.network.grpcconnectionmanager
+ config:
+ route_config:
+ routes:
+ - match:
+ prefix: "/echo.EchoService/"
+ route:
+ cluster: "echo-grpc"
+ cluster_not_found_response_code: 404
+ grpc_filters:
+ - name: dgp.filter.grpc.proxy
+ config:
+ # PASSTHROUGH MODE: Binary pass-through without decoding
+ # This is the default and most performant mode
+ reflection_mode: "passthrough"
+ config:
+ idle_timeout: 5s
+ read_timeout: 5s
+ write_timeout: 5s
+ clusters:
+ - name: "echo-grpc"
+ lb_policy: "RoundRobin"
+ endpoints:
+ - socket_address:
+ address: 127.0.0.1
+ port: 50051
+ protocol_type: "GRPC"
+ shutdown_config:
+ timeout: "60s"
+ step_timeout: "10s"
+ reject_policy: "immediacy"
diff --git a/grpc/reflection/pixiu/conf-reflection.yaml
b/grpc/reflection/pixiu/conf-reflection.yaml
new file mode 100644
index 0000000..5a8eb79
--- /dev/null
+++ b/grpc/reflection/pixiu/conf-reflection.yaml
@@ -0,0 +1,72 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# ============================================================
+# REFLECTION MODE Configuration
+# ============================================================
+# This mode uses gRPC Server Reflection to dynamically decode
+# and inspect message contents. Enables content-based routing,
+# field filtering, and message manipulation at the gateway.
+# ============================================================
+---
+static_resources:
+ listeners:
+ - name: "grpc-reflection-full"
+ protocol_type: "GRPC"
+ address:
+ socket_address:
+ address: "0.0.0.0"
+ port: 8881
+ filter_chains:
+ filters:
+ - name: dgp.filter.network.grpcconnectionmanager
+ config:
+ route_config:
+ routes:
+ - match:
+ prefix: "/echo.EchoService/"
+ route:
+ cluster: "echo-grpc"
+ cluster_not_found_response_code: 404
+ grpc_filters:
+ - name: dgp.filter.grpc.proxy
+ config:
+ # REFLECTION MODE: Full dynamic message decoding
+ # Uses gRPC Server Reflection to discover service methods
+ # and decode messages at runtime
+ reflection_mode: "reflection"
+ # TTL for descriptor cache (in seconds)
+ descriptor_cache_ttl: 300
+ # Enable protocol detection for Triple compatibility
+ enable_protocol_detection: true
+ config:
+ idle_timeout: 5s
+ read_timeout: 5s
+ write_timeout: 5s
+ clusters:
+ - name: "echo-grpc"
+ lb_policy: "RoundRobin"
+ endpoints:
+ - socket_address:
+ address: 127.0.0.1
+ port: 50051
+ protocol_type: "GRPC"
+ shutdown_config:
+ timeout: "60s"
+ step_timeout: "10s"
+ reject_policy: "immediacy"
diff --git a/grpc/reflection/pixiu/conf.yaml b/grpc/reflection/pixiu/conf.yaml
new file mode 100644
index 0000000..07a610d
--- /dev/null
+++ b/grpc/reflection/pixiu/conf.yaml
@@ -0,0 +1,75 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# ============================================================
+# gRPC Server Reflection Demo - Default Configuration
+# ============================================================
+# This configuration demonstrates the gRPC Server Reflection
+# feature with the "reflection" mode enabled.
+#
+# For other modes, see:
+# - conf-passthrough.yaml: Binary pass-through (default, high performance)
+# - conf-reflection.yaml: Full dynamic decoding via reflection
+# - conf-hybrid.yaml: Reflection with passthrough fallback
+# ============================================================
+---
+static_resources:
+ listeners:
+ - name: "grpc-reflection-demo"
+ protocol_type: "GRPC"
+ address:
+ socket_address:
+ address: "0.0.0.0"
+ port: 8881
+ filter_chains:
+ filters:
+ - name: dgp.filter.network.grpcconnectionmanager
+ config:
+ route_config:
+ routes:
+ - match:
+ prefix: "/echo.EchoService/"
+ route:
+ cluster: "echo-grpc"
+ cluster_not_found_response_code: 404
+ grpc_filters:
+ - name: dgp.filter.grpc.proxy
+ config:
+ # Enable gRPC Server Reflection mode
+ # Options: "passthrough" | "reflection" | "hybrid"
+ reflection_mode: "reflection"
+ # Cache TTL for method descriptors (seconds)
+ descriptor_cache_ttl: 300
+ # Enable Triple protocol detection
+ enable_protocol_detection: true
+ config:
+ idle_timeout: 5s
+ read_timeout: 5s
+ write_timeout: 5s
+ clusters:
+ - name: "echo-grpc"
+ lb_policy: "RoundRobin"
+ endpoints:
+ - socket_address:
+ address: 127.0.0.1
+ port: 50051
+ protocol_type: "GRPC"
+ shutdown_config:
+ timeout: "60s"
+ step_timeout: "10s"
+ reject_policy: "immediacy"
diff --git a/grpc/reflection/proto/echo.pb.go b/grpc/reflection/proto/echo.pb.go
new file mode 100644
index 0000000..5790e1c
--- /dev/null
+++ b/grpc/reflection/proto/echo.pb.go
@@ -0,0 +1,274 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Code generated by protoc-gen-go. DO NOT EDIT.
+// versions:
+// protoc-gen-go v1.36.9
+// protoc v6.32.1
+// source: echo.proto
+
+package proto
+
+import (
+ protoreflect "google.golang.org/protobuf/reflect/protoreflect"
+ protoimpl "google.golang.org/protobuf/runtime/protoimpl"
+ reflect "reflect"
+ sync "sync"
+ unsafe "unsafe"
+)
+
+const (
+ // Verify that this generated code is sufficiently up-to-date.
+ _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
+ // Verify that runtime/protoimpl is sufficiently up-to-date.
+ _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
+)
+
+// EchoRequest contains the message to be echoed.
+type EchoRequest struct {
+ state protoimpl.MessageState `protogen:"open.v1"`
+ // The message content to echo back.
+ Message string `protobuf:"bytes,1,opt,name=message,proto3"
json:"message,omitempty"`
+ // Optional metadata for testing field-level operations.
+ Metadata map[string]string `protobuf:"bytes,2,rep,name=metadata,proto3"
json:"metadata,omitempty" protobuf_key:"bytes,1,opt,name=key"
protobuf_val:"bytes,2,opt,name=value"`
+ // Timestamp for latency measurement.
+ Timestamp int64 `protobuf:"varint,3,opt,name=timestamp,proto3"
json:"timestamp,omitempty"`
+ unknownFields protoimpl.UnknownFields
+ sizeCache protoimpl.SizeCache
+}
+
+func (x *EchoRequest) Reset() {
+ *x = EchoRequest{}
+ mi := &file_echo_proto_msgTypes[0]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+}
+
+func (x *EchoRequest) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*EchoRequest) ProtoMessage() {}
+
+func (x *EchoRequest) ProtoReflect() protoreflect.Message {
+ mi := &file_echo_proto_msgTypes[0]
+ if x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use EchoRequest.ProtoReflect.Descriptor instead.
+func (*EchoRequest) Descriptor() ([]byte, []int) {
+ return file_echo_proto_rawDescGZIP(), []int{0}
+}
+
+func (x *EchoRequest) GetMessage() string {
+ if x != nil {
+ return x.Message
+ }
+ return ""
+}
+
+func (x *EchoRequest) GetMetadata() map[string]string {
+ if x != nil {
+ return x.Metadata
+ }
+ return nil
+}
+
+func (x *EchoRequest) GetTimestamp() int64 {
+ if x != nil {
+ return x.Timestamp
+ }
+ return 0
+}
+
+// EchoResponse contains the echoed message with server info.
+type EchoResponse struct {
+ state protoimpl.MessageState `protogen:"open.v1"`
+ // The original message content.
+ Message string `protobuf:"bytes,1,opt,name=message,proto3"
json:"message,omitempty"`
+ // Server-side timestamp.
+ ServerTimestamp int64
`protobuf:"varint,2,opt,name=server_timestamp,json=serverTimestamp,proto3"
json:"server_timestamp,omitempty"`
+ // Indicates if reflection was used to decode the message.
+ ReflectionEnabled bool
`protobuf:"varint,3,opt,name=reflection_enabled,json=reflectionEnabled,proto3"
json:"reflection_enabled,omitempty"`
+ // Server identifier for load balancing verification.
+ ServerId string
`protobuf:"bytes,4,opt,name=server_id,json=serverId,proto3"
json:"server_id,omitempty"`
+ // Echo of the request metadata.
+ Metadata map[string]string
`protobuf:"bytes,5,rep,name=metadata,proto3" json:"metadata,omitempty"
protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"`
+ unknownFields protoimpl.UnknownFields
+ sizeCache protoimpl.SizeCache
+}
+
+func (x *EchoResponse) Reset() {
+ *x = EchoResponse{}
+ mi := &file_echo_proto_msgTypes[1]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+}
+
+func (x *EchoResponse) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*EchoResponse) ProtoMessage() {}
+
+func (x *EchoResponse) ProtoReflect() protoreflect.Message {
+ mi := &file_echo_proto_msgTypes[1]
+ if x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use EchoResponse.ProtoReflect.Descriptor instead.
+func (*EchoResponse) Descriptor() ([]byte, []int) {
+ return file_echo_proto_rawDescGZIP(), []int{1}
+}
+
+func (x *EchoResponse) GetMessage() string {
+ if x != nil {
+ return x.Message
+ }
+ return ""
+}
+
+func (x *EchoResponse) GetServerTimestamp() int64 {
+ if x != nil {
+ return x.ServerTimestamp
+ }
+ return 0
+}
+
+func (x *EchoResponse) GetReflectionEnabled() bool {
+ if x != nil {
+ return x.ReflectionEnabled
+ }
+ return false
+}
+
+func (x *EchoResponse) GetServerId() string {
+ if x != nil {
+ return x.ServerId
+ }
+ return ""
+}
+
+func (x *EchoResponse) GetMetadata() map[string]string {
+ if x != nil {
+ return x.Metadata
+ }
+ return nil
+}
+
+var File_echo_proto protoreflect.FileDescriptor
+
+const file_echo_proto_rawDesc = "" +
+ "\n" +
+ "\n" +
+ "echo.proto\x12\x04echo\"\xbf\x01\n" +
+ "\vEchoRequest\x12\x18\n" +
+ "\amessage\x18\x01 \x01(\tR\amessage\x12;\n" +
+ "\bmetadata\x18\x02
\x03(\v2\x1f.echo.EchoRequest.MetadataEntryR\bmetadata\x12\x1c\n" +
+ "\ttimestamp\x18\x03 \x01(\x03R\ttimestamp\x1a;\n" +
+ "\rMetadataEntry\x12\x10\n" +
+ "\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n" +
+ "\x05value\x18\x02 \x01(\tR\x05value:\x028\x01\"\x9a\x02\n" +
+ "\fEchoResponse\x12\x18\n" +
+ "\amessage\x18\x01 \x01(\tR\amessage\x12)\n" +
+ "\x10server_timestamp\x18\x02 \x01(\x03R\x0fserverTimestamp\x12-\n" +
+ "\x12reflection_enabled\x18\x03
\x01(\bR\x11reflectionEnabled\x12\x1b\n" +
+ "\tserver_id\x18\x04 \x01(\tR\bserverId\x12<\n" +
+ "\bmetadata\x18\x05 \x03(\v2
.echo.EchoResponse.MetadataEntryR\bmetadata\x1a;\n" +
+ "\rMetadataEntry\x12\x10\n" +
+ "\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n" +
+ "\x05value\x18\x02 \x01(\tR\x05value:\x028\x012\xf0\x01\n" +
+ "\vEchoService\x12-\n" +
+ "\x04Echo\x12\x11.echo.EchoRequest\x1a\x12.echo.EchoResponse\x125\n" +
+ "\n" +
+
"StreamEcho\x12\x11.echo.EchoRequest\x1a\x12.echo.EchoResponse0\x01\x12;\n" +
+
"\x10ClientStreamEcho\x12\x11.echo.EchoRequest\x1a\x12.echo.EchoResponse(\x01\x12>\n"
+
+
"\x11BidirectionalEcho\x12\x11.echo.EchoRequest\x1a\x12.echo.EchoResponse(\x010\x01B9Z7github.com/dubbo-go-pixiu/samples/grpc/reflection/protob\x06proto3"
+
+var (
+ file_echo_proto_rawDescOnce sync.Once
+ file_echo_proto_rawDescData []byte
+)
+
+func file_echo_proto_rawDescGZIP() []byte {
+ file_echo_proto_rawDescOnce.Do(func() {
+ file_echo_proto_rawDescData =
protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_echo_proto_rawDesc),
len(file_echo_proto_rawDesc)))
+ })
+ return file_echo_proto_rawDescData
+}
+
+var file_echo_proto_msgTypes = make([]protoimpl.MessageInfo, 4)
+var file_echo_proto_goTypes = []any{
+ (*EchoRequest)(nil), // 0: echo.EchoRequest
+ (*EchoResponse)(nil), // 1: echo.EchoResponse
+ nil, // 2: echo.EchoRequest.MetadataEntry
+ nil, // 3: echo.EchoResponse.MetadataEntry
+}
+var file_echo_proto_depIdxs = []int32{
+ 2, // 0: echo.EchoRequest.metadata:type_name ->
echo.EchoRequest.MetadataEntry
+ 3, // 1: echo.EchoResponse.metadata:type_name ->
echo.EchoResponse.MetadataEntry
+ 0, // 2: echo.EchoService.Echo:input_type -> echo.EchoRequest
+ 0, // 3: echo.EchoService.StreamEcho:input_type -> echo.EchoRequest
+ 0, // 4: echo.EchoService.ClientStreamEcho:input_type ->
echo.EchoRequest
+ 0, // 5: echo.EchoService.BidirectionalEcho:input_type ->
echo.EchoRequest
+ 1, // 6: echo.EchoService.Echo:output_type -> echo.EchoResponse
+ 1, // 7: echo.EchoService.StreamEcho:output_type -> echo.EchoResponse
+ 1, // 8: echo.EchoService.ClientStreamEcho:output_type ->
echo.EchoResponse
+ 1, // 9: echo.EchoService.BidirectionalEcho:output_type ->
echo.EchoResponse
+ 6, // [6:10] is the sub-list for method output_type
+ 2, // [2:6] is the sub-list for method input_type
+ 2, // [2:2] is the sub-list for extension type_name
+ 2, // [2:2] is the sub-list for extension extendee
+ 0, // [0:2] is the sub-list for field type_name
+}
+
+func init() { file_echo_proto_init() }
+func file_echo_proto_init() {
+ if File_echo_proto != nil {
+ return
+ }
+ type x struct{}
+ out := protoimpl.TypeBuilder{
+ File: protoimpl.DescBuilder{
+ GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
+ RawDescriptor:
unsafe.Slice(unsafe.StringData(file_echo_proto_rawDesc),
len(file_echo_proto_rawDesc)),
+ NumEnums: 0,
+ NumMessages: 4,
+ NumExtensions: 0,
+ NumServices: 1,
+ },
+ GoTypes: file_echo_proto_goTypes,
+ DependencyIndexes: file_echo_proto_depIdxs,
+ MessageInfos: file_echo_proto_msgTypes,
+ }.Build()
+ File_echo_proto = out.File
+ file_echo_proto_goTypes = nil
+ file_echo_proto_depIdxs = nil
+}
diff --git a/grpc/reflection/proto/echo.proto b/grpc/reflection/proto/echo.proto
new file mode 100644
index 0000000..bde8d75
--- /dev/null
+++ b/grpc/reflection/proto/echo.proto
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto3";
+
+package echo;
+
+option go_package = "github.com/dubbo-go-pixiu/samples/grpc/reflection/proto";
+
+// EchoService demonstrates gRPC Server Reflection capabilities.
+// This service is designed to test dynamic message parsing through reflection.
+service EchoService {
+ // Echo returns the message back with additional metadata.
+ rpc Echo(EchoRequest) returns (EchoResponse);
+
+ // StreamEcho demonstrates server streaming with reflection support.
+ rpc StreamEcho(EchoRequest) returns (stream EchoResponse);
+
+ // ClientStreamEcho demonstrates client streaming with reflection support.
+ rpc ClientStreamEcho(stream EchoRequest) returns (EchoResponse);
+
+ // BidirectionalEcho demonstrates bidirectional streaming.
+ rpc BidirectionalEcho(stream EchoRequest) returns (stream EchoResponse);
+}
+
+// EchoRequest contains the message to be echoed.
+message EchoRequest {
+ // The message content to echo back.
+ string message = 1;
+
+ // Optional metadata for testing field-level operations.
+ map<string, string> metadata = 2;
+
+ // Timestamp for latency measurement.
+ int64 timestamp = 3;
+}
+
+// EchoResponse contains the echoed message with server info.
+message EchoResponse {
+ // The original message content.
+ string message = 1;
+
+ // Server-side timestamp.
+ int64 server_timestamp = 2;
+
+ // Indicates if reflection was used to decode the message.
+ bool reflection_enabled = 3;
+
+ // Server identifier for load balancing verification.
+ string server_id = 4;
+
+ // Echo of the request metadata.
+ map<string, string> metadata = 5;
+}
diff --git a/grpc/reflection/proto/echo_grpc.pb.go
b/grpc/reflection/proto/echo_grpc.pb.go
new file mode 100644
index 0000000..9a52abf
--- /dev/null
+++ b/grpc/reflection/proto/echo_grpc.pb.go
@@ -0,0 +1,256 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
+// versions:
+// - protoc-gen-go-grpc v1.5.1
+// - protoc v6.32.1
+// source: echo.proto
+
+package proto
+
+import (
+ context "context"
+ grpc "google.golang.org/grpc"
+ codes "google.golang.org/grpc/codes"
+ status "google.golang.org/grpc/status"
+)
+
+// This is a compile-time assertion to ensure that this generated file
+// is compatible with the grpc package it is being compiled against.
+// Requires gRPC-Go v1.64.0 or later.
+const _ = grpc.SupportPackageIsVersion9
+
+const (
+ EchoService_Echo_FullMethodName = "/echo.EchoService/Echo"
+ EchoService_StreamEcho_FullMethodName =
"/echo.EchoService/StreamEcho"
+ EchoService_ClientStreamEcho_FullMethodName =
"/echo.EchoService/ClientStreamEcho"
+ EchoService_BidirectionalEcho_FullMethodName =
"/echo.EchoService/BidirectionalEcho"
+)
+
+// EchoServiceClient is the client API for EchoService service.
+//
+// For semantics around ctx use and closing/ending streaming RPCs, please
refer to
https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
+//
+// EchoService demonstrates gRPC Server Reflection capabilities.
+// This service is designed to test dynamic message parsing through reflection.
+type EchoServiceClient interface {
+ // Echo returns the message back with additional metadata.
+ Echo(ctx context.Context, in *EchoRequest, opts ...grpc.CallOption)
(*EchoResponse, error)
+ // StreamEcho demonstrates server streaming with reflection support.
+ StreamEcho(ctx context.Context, in *EchoRequest, opts
...grpc.CallOption) (grpc.ServerStreamingClient[EchoResponse], error)
+ // ClientStreamEcho demonstrates client streaming with reflection
support.
+ ClientStreamEcho(ctx context.Context, opts ...grpc.CallOption)
(grpc.ClientStreamingClient[EchoRequest, EchoResponse], error)
+ // BidirectionalEcho demonstrates bidirectional streaming.
+ BidirectionalEcho(ctx context.Context, opts ...grpc.CallOption)
(grpc.BidiStreamingClient[EchoRequest, EchoResponse], error)
+}
+
+type echoServiceClient struct {
+ cc grpc.ClientConnInterface
+}
+
+func NewEchoServiceClient(cc grpc.ClientConnInterface) EchoServiceClient {
+ return &echoServiceClient{cc}
+}
+
+func (c *echoServiceClient) Echo(ctx context.Context, in *EchoRequest, opts
...grpc.CallOption) (*EchoResponse, error) {
+ cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
+ out := new(EchoResponse)
+ err := c.cc.Invoke(ctx, EchoService_Echo_FullMethodName, in, out,
cOpts...)
+ if err != nil {
+ return nil, err
+ }
+ return out, nil
+}
+
+func (c *echoServiceClient) StreamEcho(ctx context.Context, in *EchoRequest,
opts ...grpc.CallOption) (grpc.ServerStreamingClient[EchoResponse], error) {
+ cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
+ stream, err := c.cc.NewStream(ctx, &EchoService_ServiceDesc.Streams[0],
EchoService_StreamEcho_FullMethodName, cOpts...)
+ if err != nil {
+ return nil, err
+ }
+ x := &grpc.GenericClientStream[EchoRequest, EchoResponse]{ClientStream:
stream}
+ if err := x.ClientStream.SendMsg(in); err != nil {
+ return nil, err
+ }
+ if err := x.ClientStream.CloseSend(); err != nil {
+ return nil, err
+ }
+ return x, nil
+}
+
+// This type alias is provided for backwards compatibility with existing code
that references the prior non-generic stream type by name.
+type EchoService_StreamEchoClient = grpc.ServerStreamingClient[EchoResponse]
+
+func (c *echoServiceClient) ClientStreamEcho(ctx context.Context, opts
...grpc.CallOption) (grpc.ClientStreamingClient[EchoRequest, EchoResponse],
error) {
+ cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
+ stream, err := c.cc.NewStream(ctx, &EchoService_ServiceDesc.Streams[1],
EchoService_ClientStreamEcho_FullMethodName, cOpts...)
+ if err != nil {
+ return nil, err
+ }
+ x := &grpc.GenericClientStream[EchoRequest, EchoResponse]{ClientStream:
stream}
+ return x, nil
+}
+
+// This type alias is provided for backwards compatibility with existing code
that references the prior non-generic stream type by name.
+type EchoService_ClientStreamEchoClient =
grpc.ClientStreamingClient[EchoRequest, EchoResponse]
+
+func (c *echoServiceClient) BidirectionalEcho(ctx context.Context, opts
...grpc.CallOption) (grpc.BidiStreamingClient[EchoRequest, EchoResponse],
error) {
+ cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
+ stream, err := c.cc.NewStream(ctx, &EchoService_ServiceDesc.Streams[2],
EchoService_BidirectionalEcho_FullMethodName, cOpts...)
+ if err != nil {
+ return nil, err
+ }
+ x := &grpc.GenericClientStream[EchoRequest, EchoResponse]{ClientStream:
stream}
+ return x, nil
+}
+
+// This type alias is provided for backwards compatibility with existing code
that references the prior non-generic stream type by name.
+type EchoService_BidirectionalEchoClient =
grpc.BidiStreamingClient[EchoRequest, EchoResponse]
+
+// EchoServiceServer is the server API for EchoService service.
+// All implementations must embed UnimplementedEchoServiceServer
+// for forward compatibility.
+//
+// EchoService demonstrates gRPC Server Reflection capabilities.
+// This service is designed to test dynamic message parsing through reflection.
+type EchoServiceServer interface {
+ // Echo returns the message back with additional metadata.
+ Echo(context.Context, *EchoRequest) (*EchoResponse, error)
+ // StreamEcho demonstrates server streaming with reflection support.
+ StreamEcho(*EchoRequest, grpc.ServerStreamingServer[EchoResponse]) error
+ // ClientStreamEcho demonstrates client streaming with reflection
support.
+ ClientStreamEcho(grpc.ClientStreamingServer[EchoRequest, EchoResponse])
error
+ // BidirectionalEcho demonstrates bidirectional streaming.
+ BidirectionalEcho(grpc.BidiStreamingServer[EchoRequest, EchoResponse])
error
+ mustEmbedUnimplementedEchoServiceServer()
+}
+
+// UnimplementedEchoServiceServer must be embedded to have
+// forward compatible implementations.
+//
+// NOTE: this should be embedded by value instead of pointer to avoid a nil
+// pointer dereference when methods are called.
+type UnimplementedEchoServiceServer struct{}
+
+func (UnimplementedEchoServiceServer) Echo(context.Context, *EchoRequest)
(*EchoResponse, error) {
+ return nil, status.Errorf(codes.Unimplemented, "method Echo not
implemented")
+}
+func (UnimplementedEchoServiceServer) StreamEcho(*EchoRequest,
grpc.ServerStreamingServer[EchoResponse]) error {
+ return status.Errorf(codes.Unimplemented, "method StreamEcho not
implemented")
+}
+func (UnimplementedEchoServiceServer)
ClientStreamEcho(grpc.ClientStreamingServer[EchoRequest, EchoResponse]) error {
+ return status.Errorf(codes.Unimplemented, "method ClientStreamEcho not
implemented")
+}
+func (UnimplementedEchoServiceServer)
BidirectionalEcho(grpc.BidiStreamingServer[EchoRequest, EchoResponse]) error {
+ return status.Errorf(codes.Unimplemented, "method BidirectionalEcho not
implemented")
+}
+func (UnimplementedEchoServiceServer)
mustEmbedUnimplementedEchoServiceServer() {}
+func (UnimplementedEchoServiceServer) testEmbeddedByValue()
{}
+
+// UnsafeEchoServiceServer may be embedded to opt out of forward compatibility
for this service.
+// Use of this interface is not recommended, as added methods to
EchoServiceServer will
+// result in compilation errors.
+type UnsafeEchoServiceServer interface {
+ mustEmbedUnimplementedEchoServiceServer()
+}
+
+func RegisterEchoServiceServer(s grpc.ServiceRegistrar, srv EchoServiceServer)
{
+ // If the following call panics, it indicates
UnimplementedEchoServiceServer was
+ // embedded by pointer and is nil. This will cause panics if an
+ // unimplemented method is ever invoked, so we test this at
initialization
+ // time to prevent it from happening at runtime later due to I/O.
+ if t, ok := srv.(interface{ testEmbeddedByValue() }); ok {
+ t.testEmbeddedByValue()
+ }
+ s.RegisterService(&EchoService_ServiceDesc, srv)
+}
+
+func _EchoService_Echo_Handler(srv interface{}, ctx context.Context, dec
func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{},
error) {
+ in := new(EchoRequest)
+ if err := dec(in); err != nil {
+ return nil, err
+ }
+ if interceptor == nil {
+ return srv.(EchoServiceServer).Echo(ctx, in)
+ }
+ info := &grpc.UnaryServerInfo{
+ Server: srv,
+ FullMethod: EchoService_Echo_FullMethodName,
+ }
+ handler := func(ctx context.Context, req interface{}) (interface{},
error) {
+ return srv.(EchoServiceServer).Echo(ctx, req.(*EchoRequest))
+ }
+ return interceptor(ctx, in, info, handler)
+}
+
+func _EchoService_StreamEcho_Handler(srv interface{}, stream
grpc.ServerStream) error {
+ m := new(EchoRequest)
+ if err := stream.RecvMsg(m); err != nil {
+ return err
+ }
+ return srv.(EchoServiceServer).StreamEcho(m,
&grpc.GenericServerStream[EchoRequest, EchoResponse]{ServerStream: stream})
+}
+
+// This type alias is provided for backwards compatibility with existing code
that references the prior non-generic stream type by name.
+type EchoService_StreamEchoServer = grpc.ServerStreamingServer[EchoResponse]
+
+func _EchoService_ClientStreamEcho_Handler(srv interface{}, stream
grpc.ServerStream) error {
+ return
srv.(EchoServiceServer).ClientStreamEcho(&grpc.GenericServerStream[EchoRequest,
EchoResponse]{ServerStream: stream})
+}
+
+// This type alias is provided for backwards compatibility with existing code
that references the prior non-generic stream type by name.
+type EchoService_ClientStreamEchoServer =
grpc.ClientStreamingServer[EchoRequest, EchoResponse]
+
+func _EchoService_BidirectionalEcho_Handler(srv interface{}, stream
grpc.ServerStream) error {
+ return
srv.(EchoServiceServer).BidirectionalEcho(&grpc.GenericServerStream[EchoRequest,
EchoResponse]{ServerStream: stream})
+}
+
+// This type alias is provided for backwards compatibility with existing code
that references the prior non-generic stream type by name.
+type EchoService_BidirectionalEchoServer =
grpc.BidiStreamingServer[EchoRequest, EchoResponse]
+
+// EchoService_ServiceDesc is the grpc.ServiceDesc for EchoService service.
+// It's only intended for direct use with grpc.RegisterService,
+// and not to be introspected or modified (even as a copy)
+var EchoService_ServiceDesc = grpc.ServiceDesc{
+ ServiceName: "echo.EchoService",
+ HandlerType: (*EchoServiceServer)(nil),
+ Methods: []grpc.MethodDesc{
+ {
+ MethodName: "Echo",
+ Handler: _EchoService_Echo_Handler,
+ },
+ },
+ Streams: []grpc.StreamDesc{
+ {
+ StreamName: "StreamEcho",
+ Handler: _EchoService_StreamEcho_Handler,
+ ServerStreams: true,
+ },
+ {
+ StreamName: "ClientStreamEcho",
+ Handler: _EchoService_ClientStreamEcho_Handler,
+ ClientStreams: true,
+ },
+ {
+ StreamName: "BidirectionalEcho",
+ Handler: _EchoService_BidirectionalEcho_Handler,
+ ServerStreams: true,
+ ClientStreams: true,
+ },
+ },
+ Metadata: "echo.proto",
+}
diff --git a/grpc/reflection/server/server.go b/grpc/reflection/server/server.go
new file mode 100644
index 0000000..f1a09ad
--- /dev/null
+++ b/grpc/reflection/server/server.go
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Package main implements a gRPC server with Server Reflection enabled.
+// This demonstrates the gRPC Server Reflection feature for dynamic message
parsing.
+package main
+
+import (
+ "context"
+ "flag"
+ "fmt"
+ "io"
+ "log"
+ "net"
+ "os"
+ "time"
+)
+
+import (
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/reflection"
+)
+
+import (
+ pb "github.com/dubbo-go-pixiu/samples/grpc/reflection/proto"
+)
+
+var (
+ port = flag.Int("port", 50051, "The server port")
+ serverID = flag.String("server_id", "", "Server identifier for load
balancing verification")
+)
+
+// echoServer implements the EchoService.
+type echoServer struct {
+ pb.UnimplementedEchoServiceServer
+ serverID string
+}
+
+// Echo returns the message back with additional metadata.
+func (s *echoServer) Echo(ctx context.Context, req *pb.EchoRequest)
(*pb.EchoResponse, error) {
+ log.Printf("[Echo] Received message: %s", req.Message)
+
+ return &pb.EchoResponse{
+ Message: req.Message,
+ ServerTimestamp: time.Now().UnixNano(),
+ ReflectionEnabled: true, // This server has reflection enabled
+ ServerId: s.serverID,
+ Metadata: req.Metadata,
+ }, nil
+}
+
+// StreamEcho demonstrates server streaming with reflection support.
+func (s *echoServer) StreamEcho(req *pb.EchoRequest, stream
pb.EchoService_StreamEchoServer) error {
+ log.Printf("[StreamEcho] Received message: %s", req.Message)
+
+ // Send 5 responses for demonstration
+ for i := 0; i < 5; i++ {
+ resp := &pb.EchoResponse{
+ Message: fmt.Sprintf("[%d] %s", i+1,
req.Message),
+ ServerTimestamp: time.Now().UnixNano(),
+ ReflectionEnabled: true,
+ ServerId: s.serverID,
+ Metadata: req.Metadata,
+ }
+ if err := stream.Send(resp); err != nil {
+ return err
+ }
+ time.Sleep(100 * time.Millisecond)
+ }
+ return nil
+}
+
+// ClientStreamEcho demonstrates client streaming with reflection support.
+func (s *echoServer) ClientStreamEcho(stream
pb.EchoService_ClientStreamEchoServer) error {
+ log.Printf("[ClientStreamEcho] Stream started")
+
+ var messages []string
+ var lastMetadata map[string]string
+
+ for {
+ req, err := stream.Recv()
+ if err == io.EOF {
+ // Client finished sending, return aggregated response
+ return stream.SendAndClose(&pb.EchoResponse{
+ Message: fmt.Sprintf("Received %d
messages: %v", len(messages), messages),
+ ServerTimestamp: time.Now().UnixNano(),
+ ReflectionEnabled: true,
+ ServerId: s.serverID,
+ Metadata: lastMetadata,
+ })
+ }
+ if err != nil {
+ return err
+ }
+
+ log.Printf("[ClientStreamEcho] Received: %s", req.Message)
+ messages = append(messages, req.Message)
+ lastMetadata = req.Metadata
+ }
+}
+
+// BidirectionalEcho demonstrates bidirectional streaming.
+func (s *echoServer) BidirectionalEcho(stream
pb.EchoService_BidirectionalEchoServer) error {
+ log.Printf("[BidirectionalEcho] Stream started")
+
+ for {
+ req, err := stream.Recv()
+ if err == io.EOF {
+ return nil
+ }
+ if err != nil {
+ return err
+ }
+
+ log.Printf("[BidirectionalEcho] Received: %s", req.Message)
+
+ resp := &pb.EchoResponse{
+ Message: req.Message,
+ ServerTimestamp: time.Now().UnixNano(),
+ ReflectionEnabled: true,
+ ServerId: s.serverID,
+ Metadata: req.Metadata,
+ }
+ if err := stream.Send(resp); err != nil {
+ return err
+ }
+ }
+}
+
+func getServerID() string {
+ if *serverID != "" {
+ return *serverID
+ }
+ // Use hostname as default server ID
+ hostname, err := os.Hostname()
+ if err != nil {
+ return "unknown"
+ }
+ return hostname
+}
+
+func main() {
+ flag.Parse()
+
+ lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
+ if err != nil {
+ log.Fatalf("failed to listen: %v", err)
+ }
+
+ // Create gRPC server with default options
+ grpcServer := grpc.NewServer()
+
+ // Register the EchoService
+ pb.RegisterEchoServiceServer(grpcServer, &echoServer{
+ serverID: getServerID(),
+ })
+
+ // IMPORTANT: Enable gRPC Server Reflection
+ // This allows Pixiu to dynamically discover and parse service methods
+ reflection.Register(grpcServer)
+
+ log.Printf("gRPC server with reflection enabled listening on port %d",
*port)
+ log.Printf("Server ID: %s", getServerID())
+
+ if err := grpcServer.Serve(lis); err != nil {
+ log.Fatalf("failed to serve: %v", err)
+ }
+}
diff --git a/grpc/reflection/test/reflection_test.go
b/grpc/reflection/test/reflection_test.go
new file mode 100644
index 0000000..aee72aa
--- /dev/null
+++ b/grpc/reflection/test/reflection_test.go
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test
+
+import (
+ "context"
+ "io"
+ "testing"
+ "time"
+)
+
+import (
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/credentials/insecure"
+)
+
+import (
+ pb "github.com/dubbo-go-pixiu/samples/grpc/reflection/proto"
+)
+
+const (
+ // pixiuAddr is the default Pixiu gateway address for testing
+ pixiuAddr = "localhost:8881"
+)
+
+func getClient(t *testing.T) (pb.EchoServiceClient, func()) {
+ conn, err := grpc.NewClient(pixiuAddr,
grpc.WithTransportCredentials(insecure.NewCredentials()))
+ require.NoError(t, err, "Failed to connect to Pixiu gateway")
+
+ cleanup := func() {
+ conn.Close()
+ }
+
+ return pb.NewEchoServiceClient(conn), cleanup
+}
+
+func TestUnaryEcho(t *testing.T) {
+ client, cleanup := getClient(t)
+ defer cleanup()
+
+ ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+ defer cancel()
+
+ req := &pb.EchoRequest{
+ Message: "Test message for reflection",
+ Timestamp: time.Now().UnixNano(),
+ Metadata: map[string]string{
+ "test": "unary",
+ },
+ }
+
+ resp, err := client.Echo(ctx, req)
+ require.NoError(t, err, "Echo RPC should succeed")
+
+ assert.Equal(t, req.Message, resp.Message, "Message should be echoed
back")
+ assert.NotEmpty(t, resp.ServerId, "Server ID should be set")
+ assert.True(t, resp.ReflectionEnabled, "Reflection should be enabled on
server")
+ assert.Greater(t, resp.ServerTimestamp, int64(0), "Server timestamp
should be positive")
+}
+
+func TestStreamEcho(t *testing.T) {
+ client, cleanup := getClient(t)
+ defer cleanup()
+
+ ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
+ defer cancel()
+
+ req := &pb.EchoRequest{
+ Message: "Stream test message",
+ Timestamp: time.Now().UnixNano(),
+ Metadata: map[string]string{
+ "test": "stream",
+ },
+ }
+
+ stream, err := client.StreamEcho(ctx, req)
+ require.NoError(t, err, "StreamEcho should start successfully")
+
+ messageCount := 0
+ for {
+ resp, err := stream.Recv()
+ if err == io.EOF {
+ break
+ }
+ require.NoError(t, err, "StreamEcho receive should not fail")
+
+ messageCount++
+ assert.Contains(t, resp.Message, req.Message, "Response should
contain original message")
+ assert.NotEmpty(t, resp.ServerId, "Server ID should be set")
+ assert.True(t, resp.ReflectionEnabled, "Reflection should be
enabled")
+ }
+
+ assert.Equal(t, 5, messageCount, "Should receive exactly 5 streamed
messages")
+}
+
+func TestClientStreamEcho(t *testing.T) {
+ client, cleanup := getClient(t)
+ defer cleanup()
+
+ ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
+ defer cancel()
+
+ stream, err := client.ClientStreamEcho(ctx)
+ require.NoError(t, err, "ClientStreamEcho should start successfully")
+
+ messages := []string{"First", "Second", "Third"}
+
+ // Send all messages
+ for _, msg := range messages {
+ req := &pb.EchoRequest{
+ Message: msg,
+ Timestamp: time.Now().UnixNano(),
+ }
+ err := stream.Send(req)
+ require.NoError(t, err, "Send should succeed")
+ }
+
+ // Close and receive response
+ resp, err := stream.CloseAndRecv()
+ require.NoError(t, err, "CloseAndRecv should succeed")
+
+ assert.Contains(t, resp.Message, "3 messages", "Response should mention
message count")
+ assert.True(t, resp.ReflectionEnabled, "Reflection should be enabled")
+ assert.NotEmpty(t, resp.ServerId, "Server ID should be set")
+}
+
+func TestBidirectionalEcho(t *testing.T) {
+ client, cleanup := getClient(t)
+ defer cleanup()
+
+ ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
+ defer cancel()
+
+ stream, err := client.BidirectionalEcho(ctx)
+ require.NoError(t, err, "BidirectionalEcho should start successfully")
+
+ messages := []string{"Msg1", "Msg2", "Msg3"}
+
+ // Send all messages
+ for _, msg := range messages {
+ req := &pb.EchoRequest{
+ Message: msg,
+ Timestamp: time.Now().UnixNano(),
+ }
+ err := stream.Send(req)
+ require.NoError(t, err, "Send should succeed")
+ }
+ stream.CloseSend()
+
+ // Receive all responses
+ receivedCount := 0
+ for {
+ resp, err := stream.Recv()
+ if err == io.EOF {
+ break
+ }
+ require.NoError(t, err, "Receive should not fail")
+
+ assert.Equal(t, messages[receivedCount], resp.Message, "Message
should match")
+ assert.True(t, resp.ReflectionEnabled, "Reflection should be
enabled")
+ receivedCount++
+ }
+
+ assert.Equal(t, len(messages), receivedCount, "Should receive same
number of messages as sent")
+}
+
+func TestEchoWithMetadata(t *testing.T) {
+ client, cleanup := getClient(t)
+ defer cleanup()
+
+ ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+ defer cancel()
+
+ metadata := map[string]string{
+ "key1": "value1",
+ "key2": "value2",
+ "key3": "value3",
+ }
+
+ req := &pb.EchoRequest{
+ Message: "Metadata test",
+ Timestamp: time.Now().UnixNano(),
+ Metadata: metadata,
+ }
+
+ resp, err := client.Echo(ctx, req)
+ require.NoError(t, err, "Echo should succeed")
+
+ // Verify metadata is echoed back
+ for k, v := range metadata {
+ assert.Equal(t, v, resp.Metadata[k], "Metadata key %s should be
echoed", k)
+ }
+}
diff --git a/start_integrate_test.sh b/start_integrate_test.sh
index e3a794b..5242f78 100755
--- a/start_integrate_test.sh
+++ b/start_integrate_test.sh
@@ -35,6 +35,7 @@ array=(
"http/simple"
# grpc proxy
"grpc/deprecated"
+ "grpc/reflection"
# plugins
"plugins/opa/embedded"
"plugins/opa/server-mode"