This is an automated email from the ASF dual-hosted git repository.
alexstocks pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/dubbo-go-samples.git
The following commit(s) were added to refs/heads/main by this push:
new 4e87a4f7 feat: Enhance samples interaction with Dubbo-java #3096,add
dubbo-java streaming samples (#985)
4e87a4f7 is described below
commit 4e87a4f7f23a5752058f049be190b6943e9f850e
Author: 陈乐樂 <[email protected]>
AuthorDate: Sat Dec 20 15:33:22 2025 +0800
feat: Enhance samples interaction with Dubbo-java #3096,add dubbo-java
streaming samples (#985)
* feat: refactor streaming sample with Java client/server modules and
Go-Java interoperability
- Split Java implementation into separate java-client and java-server
modules
- Remove Spring Boot dependency, use pure Dubbo API
- Implement all 4 RPC modes: unary, bidirectional stream, client stream,
server stream
- Add logback configuration to reduce framework log noise
- Enhance client output with clear formatting and emojis
- Add comprehensive README documentation in both English and Chinese
- Verify full Go-Java interoperability across all streaming modes
- Add run scripts for easy execution
* docs: update README with enhanced formatting and interoperability details
- Add comprehensive project structure documentation
- Include detailed Java implementation examples
- Add clear running instructions for both Go and Java
- Enhance output examples with emojis and formatting
- Add interoperability testing matrix showing all combinations
- Update both English and Chinese versions
* change for root dir
* change for assert point
* Update direct call example in README_CN.md
Add interoperation example for direct call without registry
* Fix formatting of client stream description in README
* Update README_CN.md
Co-authored-by: Xuetao Li <[email protected]>
* FIX
* fix
* fix
---------
Co-authored-by: Xuetao Li <[email protected]>
---
README_CN.md | 2 +-
streaming/README.md | 218 +++++++++++++--
streaming/README_zh.md | 222 ++++++++++++++--
streaming/go-client/cmd/client.go | 75 ++++--
streaming/go-server/cmd/server.go | 6 +-
streaming/java-client/pom.xml | 96 +++++++
streaming/java-client/run.sh | 3 +
.../samples/tri/streaming/StreamingClient.java | 296 +++++++++++++++++++++
.../java-client/src/main/resources/logback.xml | 20 ++
streaming/java-server/pom.xml | 96 +++++++
streaming/java-server/run.sh | 3 +
.../dubbo/samples/tri/streaming/GreeterImpl.java | 143 ++++++++++
.../samples/tri/streaming/StreamingServer.java | 62 +++++
.../java-server/src/main/resources/logback.xml | 17 ++
streaming/pom.xml | 168 ++++++++++++
streaming/proto/greet.proto | 3 +
16 files changed, 1367 insertions(+), 63 deletions(-)
diff --git a/README_CN.md b/README_CN.md
index d9f2f81a..d665be38 100644
--- a/README_CN.md
+++ b/README_CN.md
@@ -39,7 +39,7 @@
* `rpc/grpc`:基于 gRPC 协议的示例。
* `rpc/jsonrpc`:基于 JSON-RPC 协议的示例。
* `rpc/triple`:Triple 协议示例,涵盖多种序列化方式。
-* `streaming`:流式 RPC 调用示例, 并包含了Dubbo-go与Dubbo-java同时使用流式传输的互操作示例。
+* `streaming`:流式 RPC 调用示例,并包含了Dubbo-go与Dubbo-java同时使用流式传输的互操作示例。
* `task`:任务调度与执行示例。
* `timeout`:Dubbo-go 超时处理示例。
* `tls`:演示如何在 Dubbo-go 中使用 TLS(基于 X.509 证书),实现客户端与服务端之间的加密通信和/或双向认证。
diff --git a/streaming/README.md b/streaming/README.md
index b1c83f9e..d610614a 100644
--- a/streaming/README.md
+++ b/streaming/README.md
@@ -1,12 +1,21 @@
-# Dubbo-go Streaming Sample
+# Dubbo Streaming Sample
## 1. Introduction
-This sample demonstrates how to use streaming communication in Dubbo-go.
+This sample demonstrates how to use streaming communication in Dubbo,
including:
+- Go language streaming implementation
+- Java language streaming implementation
+- Interoperability verification between Go and Java
-## 2. How to Use Dubbo-go Streaming Communication
+Supported streaming modes:
+- Unary Call: Single request, single response
+- Bidirectional Stream: Multiple requests, multiple responses
+- Client Stream: Multiple requests, single response
+- Server Stream: Single request, multiple responses
-To enable streaming communication for a method in the proto file, add "stream"
before the parameter of the method and generate the corresponding files using
proto-gen-triple.
+## 2. Proto Definition
+
+Define streaming methods in the proto file by adding the `stream` keyword
before parameters that need streaming:
```protobuf
service GreetService {
@@ -17,9 +26,11 @@ service GreetService {
}
```
-Write the server handler file.
+## 3. Go Implementation
+
+### 3.1 Go Server
-Source file path: dubbo-go-sample/streaming/go-server/cmd/server.go
+Source file path: `streaming/go-server/cmd/server.go`
```go
type GreetTripleServer struct {
@@ -71,9 +82,9 @@ func (srv *GreetTripleServer) GreetServerStream(ctx
context.Context, req *greet.
}
```
-Write the client file.
+### 3.2 Go Client
-Source file path: dubbo-go-sample/streaming/go-client/cmd/client.go
+Source file path: `streaming/go-client/cmd/client.go`
```go
func main() {
@@ -182,22 +193,185 @@ func testServerStream(cli greet.GreetService) error {
}
```
-## 3. **Example** **Output**
+## 4. Java Implementation
-Start the server first and then the client. You can see the response return
normally.
+### 4.1 Project Structure
```
-[start to test TRIPLE unary call]
-TRIPLE unary call resp: [triple]
-[start to test TRIPLE bidi stream]
-TRIPLE bidi stream resp: [triple]
-[start to test TRIPLE client stream]
-TRIPLE client stream resp: [triple,triple,triple,triple,triple]
-[start to test TRIPLE server stream]
-TRIPLE server stream resp: [triple]
-TRIPLE server stream resp: [triple]
-TRIPLE server stream resp: [triple]
-TRIPLE server stream resp: [triple]
-TRIPLE server stream resp: [triple]
+streaming/
+├── pom.xml # Parent POM
+├── proto/ # Shared Proto files
+│ └── greet.proto
+├── java-server/ # Java Server
+│ ├── pom.xml
+│ ├── src/main/java/
+│ │ └── org/apache/dubbo/samples/tri/streaming/
+│ │ ├── StreamingServer.java
+│ │ └── GreeterImpl.java
+│ └── run.sh
+└── java-client/ # Java Client
+ ├── pom.xml
+ ├── src/main/java/
+ │ └── org/apache/dubbo/samples/tri/streaming/
+ │ └── StreamingClient.java
+ └── run.sh
+```
+
+### 4.2 Java Server
+
+Source file path:
`streaming/java-server/src/main/java/org/apache/dubbo/samples/tri/streaming/GreeterImpl.java`
+
+Implements all four RPC modes:
+
+```java
+public class GreeterImpl extends DubboGreetServiceTriple.GreetServiceImplBase {
+
+ // Unary call
+ @Override
+ public GreetResponse greet(GreetRequest request) {
+ return GreetResponse.newBuilder()
+ .setGreeting("Hello " + request.getName())
+ .build();
+ }
+
+ // Bidirectional stream
+ @Override
+ public StreamObserver<GreetStreamRequest> greetStream(
+ StreamObserver<GreetStreamResponse> responseObserver) {
+ // Bidirectional stream logic
+ }
+
+ // Client stream
+ @Override
+ public StreamObserver<GreetClientStreamRequest> greetClientStream(
+ StreamObserver<GreetClientStreamResponse> responseObserver) {
+ // Client stream logic
+ }
+
+ // Server stream
+ @Override
+ public void greetServerStream(GreetServerStreamRequest request,
+ StreamObserver<GreetServerStreamResponse> responseObserver) {
+ // Send 10 responses
+ for (int i = 0; i < 10; i++) {
+ responseObserver.onNext(response);
+ }
+ responseObserver.onCompleted();
+ }
+}
+```
+
+### 4.3 Java Client
+
+Source file path:
`streaming/java-client/src/main/java/org/apache/dubbo/samples/tri/streaming/StreamingClient.java`
+
+Tests all streaming modes with clear, formatted output.
+
+## 5. Running Examples
+
+### 5.1 Run Go Server and Client
+
+```bash
+# Start Go server
+cd streaming/go-server
+go run cmd/server.go
+
+# In another terminal, start Go client
+cd streaming/go-client
+go run cmd/client.go
+```
+
+### 5.2 Run Java Server and Client
+
+```bash
+# Build parent project
+cd streaming
+mvn clean install
+
+# Start Java server
+cd java-server
+./run.sh
+# Or
+mvn compile exec:java
-Dexec.mainClass="org.apache.dubbo.samples.tri.streaming.StreamingServer"
+
+# In another terminal, start Java client
+cd java-client
+./run.sh
+# Or
+mvn compile exec:java
-Dexec.mainClass="org.apache.dubbo.samples.tri.streaming.StreamingClient"
```
+## 6. Example Output
+
+### 6.1 Java Client Output
+
+```
+======================================================================
+ Starting Dubbo Streaming Client Tests
+======================================================================
+ Connected to server: tri://127.0.0.1:20000
+======================================================================
+
+======================================================================
+ TEST 1: Bidirectional Streaming
+======================================================================
+ Sending request #0: Client-0
+ Received response #1: Echo from biStream: Client-0
+ Sending request #1: Client-1
+ Received response #2: Echo from biStream: Client-1
+ Sending request #2: Client-2
+ Received response #3: Echo from biStream: Client-2
+ Sending request #3: Client-3
+ Received response #4: Echo from biStream: Client-3
+ Sending request #4: Client-4
+ Received response #5: Echo from biStream: Client-4
+
+ All requests sent, waiting for responses...
+
+ BiStream completed - Received 5 responses
+
+======================================================================
+ TEST 2: Server Streaming
+======================================================================
+ Sending request: StreamingClient
+ Waiting for server stream responses...
+
+ Received response #1: Response 0 from serverStream for StreamingClient
+ Received response #2: Response 1 from serverStream for StreamingClient
+ ...
+ Received response #10: Response 9 from serverStream for StreamingClient
+
+ ServerStream completed - Received 10 responses
+
+======================================================================
+ TEST RESULTS SUMMARY
+======================================================================
+ Bidirectional Streaming: PASSED
+ Server Streaming: PASSED
+----------------------------------------------------------------------
+ Overall: ALL TESTS PASSED!
+======================================================================
+```
+
+### 6.2 Go Client Output
+
+```
+INFO cmd/client.go:69 start to test TRIPLE unary call
+INFO cmd/client.go:74 TRIPLE unary call resp: Hello triple
+
+INFO cmd/client.go:79 start to test TRIPLE bidi stream
+INFO cmd/client.go:91 TRIPLE bidi stream resp: Echo from biStream: triple
+
+INFO cmd/client.go:102 start to test TRIPLE client stream
+INFO cmd/client.go:116 TRIPLE client stream resp: Received 5 names:
triple, triple, triple, triple, triple
+
+INFO cmd/client.go:121 start to test TRIPLE server stream
+INFO cmd/client.go:127 TRIPLE server stream resp: Response 0 from
serverStream for triple
+INFO cmd/client.go:127 TRIPLE server stream resp: Response 1 from
serverStream for triple
+...
+INFO cmd/client.go:127 TRIPLE server stream resp: Response 9 from
serverStream for triple
+```
+
+## Attention
+YOU CAN NOT run both Golang Server and Java Server at the same time for they
both listen to the same port 20000.
+
diff --git a/streaming/README_zh.md b/streaming/README_zh.md
index 7892d93a..92204a13 100644
--- a/streaming/README_zh.md
+++ b/streaming/README_zh.md
@@ -1,12 +1,21 @@
-# Dubbo-go Streaming Sample
+# Dubbo Streaming Sample
-## 1.介绍
+## 1. 介绍
-本案例演示如何在Dubbo-go中使用流式通信
+本案例演示如何在 Dubbo 中使用流式通信,包括:
+- Go 语言的流式通信实现
+- Java 语言的流式通信实现
+- Go 与 Java 之间的互通性验证
-## 2.如何使用Dubbo-go流式通信
+支持的流式模式:
+- 一元调用 (Unary): 单个请求,单个响应
+- 双向流 (Bidirectional Stream): 多个请求,多个响应
+- 客户端流 (Client Stream): 多个请求,单个响应
+- 服务端流 (Server Stream): 单个请求,多个响应
-在proto文件中需要流式通信的方法的参数前面添加stream,使用proto-gen-triple生成相应文件
+## 2. Proto 定义
+
+在 proto 文件中定义流式通信方法,在需要流式传输的参数前添加 `stream` 关键字:
```protobuf
service GreetService {
@@ -17,9 +26,11 @@ service GreetService {
}
```
-编写服务端handler文件
+## 3. Go 语言实现
+
+### 3.1 Go 服务端
-源文件路径: dubbo-go-sample/streaming/go-server/cmd/server.go
+源文件路径: `streaming/go-server/cmd/server.go`
```go
type GreetTripleServer struct {
@@ -71,9 +82,9 @@ func (srv *GreetTripleServer) GreetServerStream(ctx
context.Context, req *greet.
}
```
-编写客户端client文件
+### 3.2 Go 客户端
-源文件路径: dubbo-go-sample/streaming/go-client/cmd/client.go
+源文件路径: `streaming/go-client/cmd/client.go`
```go
func main() {
@@ -180,22 +191,187 @@ func testServerStream(cli greet.GreetService) error {
}
```
-## 3.案例效果
+## 4. Java 语言实现
+
+### 4.1 项目结构
+
+```
+streaming/
+├── pom.xml # 父 POM
+├── proto/ # 共享 Proto 文件
+│ └── greet.proto
+├── java-server/ # Java 服务端
+│ ├── pom.xml
+│ ├── src/main/java/
+│ │ └── org/apache/dubbo/samples/tri/streaming/
+│ │ ├── StreamingServer.java
+│ │ └── GreeterImpl.java
+│ └── run.sh
+└── java-client/ # Java 客户端
+ ├── pom.xml
+ ├── src/main/java/
+ │ └── org/apache/dubbo/samples/tri/streaming/
+ │ └── StreamingClient.java
+ └── run.sh
+```
+
+### 4.2 Java 服务端
+
+源文件路径:
`streaming/java-server/src/main/java/org/apache/dubbo/samples/tri/streaming/GreeterImpl.java`
+
+实现了所有四种 RPC 模式:
+
+```java
+public class GreeterImpl extends DubboGreetServiceTriple.GreetServiceImplBase {
+
+ // 一元调用
+ @Override
+ public GreetResponse greet(GreetRequest request) {
+ return GreetResponse.newBuilder()
+ .setGreeting("Hello " + request.getName())
+ .build();
+ }
+
+ // 双向流
+ @Override
+ public StreamObserver<GreetStreamRequest> greetStream(
+ StreamObserver<GreetStreamResponse> responseObserver) {
+ // 实现双向流逻辑
+ }
+
+ // 客户端流
+ @Override
+ public StreamObserver<GreetClientStreamRequest> greetClientStream(
+ StreamObserver<GreetClientStreamResponse> responseObserver) {
+ // 实现客户端流逻辑
+ }
+
+ // 服务端流
+ @Override
+ public void greetServerStream(GreetServerStreamRequest request,
+ StreamObserver<GreetServerStreamResponse> responseObserver) {
+ // 发送 10 个响应
+ for (int i = 0; i < 10; i++) {
+ responseObserver.onNext(response);
+ }
+ responseObserver.onCompleted();
+ }
+}
+```
+
+### 4.3 Java 客户端
+
+源文件路径:
`streaming/java-client/src/main/java/org/apache/dubbo/samples/tri/streaming/StreamingClient.java`
+
+测试所有流式模式,输出格式清晰易读。
+
+## 5. 运行示例
+
+### 5.1 运行 Go 服务端和客户端
-运行服务端和客户端,可以看到请求正常返回
+```bash
+# 启动 Go 服务端
+cd streaming/go-server
+go run cmd/server.go
+# 在另一个终端启动 Go 客户端
+cd streaming/go-client
+go run cmd/client.go
```
-[start to test TRIPLE unary call]
-TRIPLE unary call resp: [triple]
-[start to test TRIPLE bidi stream]
-TRIPLE bidi stream resp: [triple]
-[start to test TRIPLE client stream]
-TRIPLE client stream resp: [triple,triple,triple,triple,triple]
-[start to test TRIPLE server stream]
-TRIPLE server stream resp: [triple]
-TRIPLE server stream resp: [triple]
-TRIPLE server stream resp: [triple]
-TRIPLE server stream resp: [triple]
-TRIPLE server stream resp: [triple]
+
+### 5.2 运行 Java 服务端和客户端
+
+```bash
+# 编译父项目
+cd streaming
+mvn clean install
+
+# 启动 Java 服务端
+cd java-server
+./run.sh
+# 或者
+mvn compile exec:java
-Dexec.mainClass="org.apache.dubbo.samples.tri.streaming.StreamingServer"
+
+# 在另一个终端启动 Java 客户端
+cd java-client
+./run.sh
+# 或者
+mvn compile exec:java
-Dexec.mainClass="org.apache.dubbo.samples.tri.streaming.StreamingClient"
```
+## 6. 案例效果
+
+### 6.1 Java 客户端输出示例
+
+```
+======================================================================
+ Starting Dubbo Streaming Client Tests
+======================================================================
+ Connected to server: tri://127.0.0.1:20000
+======================================================================
+
+======================================================================
+ TEST 1: Bidirectional Streaming
+======================================================================
+ Sending request #0: Client-0
+ Received response #1: Echo from biStream: Client-0
+ Sending request #1: Client-1
+ Received response #2: Echo from biStream: Client-1
+ Sending request #2: Client-2
+ Received response #3: Echo from biStream: Client-2
+ Sending request #3: Client-3
+ Received response #4: Echo from biStream: Client-3
+ Sending request #4: Client-4
+ Received response #5: Echo from biStream: Client-4
+
+ All requests sent, waiting for responses...
+
+ BiStream completed - Received 5 responses
+
+======================================================================
+ TEST 2: Server Streaming
+======================================================================
+ Sending request: StreamingClient
+ Waiting for server stream responses...
+
+ Received response #1: Response 0 from serverStream for StreamingClient
+ Received response #2: Response 1 from serverStream for StreamingClient
+ ...
+ Received response #10: Response 9 from serverStream for StreamingClient
+
+ ServerStream completed - Received 10 responses
+
+======================================================================
+ TEST RESULTS SUMMARY
+======================================================================
+ Bidirectional Streaming: PASSED
+ Server Streaming: PASSED
+----------------------------------------------------------------------
+ Overall: ALL TESTS PASSED!
+======================================================================
+```
+
+### 6.2 Go 客户端输出示例
+
+```
+INFO cmd/client.go:69 start to test TRIPLE unary call
+INFO cmd/client.go:74 TRIPLE unary call resp: Hello triple
+
+INFO cmd/client.go:79 start to test TRIPLE bidi stream
+INFO cmd/client.go:91 TRIPLE bidi stream resp: Echo from biStream: triple
+
+INFO cmd/client.go:102 start to test TRIPLE client stream
+INFO cmd/client.go:116 TRIPLE client stream resp: Received 5 names:
triple, triple, triple, triple, triple
+
+INFO cmd/client.go:121 start to test TRIPLE server stream
+INFO cmd/client.go:127 TRIPLE server stream resp: Response 0 from
serverStream for triple
+INFO cmd/client.go:127 TRIPLE server stream resp: Response 1 from
serverStream for triple
+...
+INFO cmd/client.go:127 TRIPLE server stream resp: Response 9 from
serverStream for triple
+```
+
+## 注意
+
+由于Golang-server和Java-server同时监听localhost:20000端口,你不能同时启动两个服务端
+
+
diff --git a/streaming/go-client/cmd/client.go
b/streaming/go-client/cmd/client.go
index d94b3792..559c0a5f 100644
--- a/streaming/go-client/cmd/client.go
+++ b/streaming/go-client/cmd/client.go
@@ -19,6 +19,7 @@ package main
import (
"context"
+ "fmt"
)
import (
@@ -37,58 +38,76 @@ func main() {
client.WithClientURL("tri://127.0.0.1:20000"),
)
if err != nil {
- panic(err)
+ panic(err) // fail fast: client not created
}
svc, err := greet.NewGreetService(cli)
if err != nil {
- panic(err)
+ panic(err) // fail fast: service proxy not created
}
TestClient(svc)
}
func TestClient(cli greet.GreetService) {
if err := testUnary(cli); err != nil {
- logger.Error(err)
+ panic(err)
}
if err := testBidiStream(cli); err != nil {
- logger.Error(err)
+ panic(err)
}
if err := testClientStream(cli); err != nil {
- logger.Error(err)
+ panic(err)
}
if err := testServerStream(cli); err != nil {
- logger.Error(err)
+ panic(err)
}
}
+// testUnary: 1 request -> 1 response
func testUnary(cli greet.GreetService) error {
logger.Info("start to test TRIPLE unary call")
resp, err := cli.Greet(context.Background(), &greet.GreetRequest{Name:
"triple"})
if err != nil {
return err
}
+ if resp == nil {
+ return fmt.Errorf("unexpected unary resp: <nil>")
+ }
logger.Infof("TRIPLE unary call resp: %s", resp.Greeting)
+ if resp.Greeting != "triple" && resp.Greeting != "Hello triple" {
+ return fmt.Errorf("unexpected unary resp: %+v", resp)
+ }
return nil
}
+// testBidiStream: N requests -> N responses (1:1)
func testBidiStream(cli greet.GreetService) error {
logger.Info("start to test TRIPLE bidi stream")
stream, err := cli.GreetStream(context.Background())
if err != nil {
return err
}
- if sendErr := stream.Send(&greet.GreetStreamRequest{Name: "triple"});
sendErr != nil {
- return err
- }
- resp, err := stream.Recv()
- if err != nil {
- return err
+ names := []string{"triple-1", "triple-2", "triple-3"}
+ for _, name := range names {
+ if sendErr := stream.Send(&greet.GreetStreamRequest{Name:
name}); sendErr != nil {
+ return sendErr
+ }
+ resp, recvErr := stream.Recv()
+ if recvErr != nil {
+ return recvErr
+ }
+ if resp == nil {
+ return fmt.Errorf("unexpected bidi resp: <nil>")
+ }
+ expectedJava := fmt.Sprintf("Echo from biStream: %s", name)
+ if resp.Greeting != name && resp.Greeting != expectedJava {
+ return fmt.Errorf("unexpected bidi resp, expect %s or
%s got %+v", name, expectedJava, resp)
+ }
+ logger.Infof("TRIPLE bidi stream resp: %s", resp.Greeting)
}
- logger.Infof("TRIPLE bidi stream resp: %s", resp.Greeting)
if err := stream.CloseRequest(); err != nil {
return err
}
@@ -98,6 +117,7 @@ func testBidiStream(cli greet.GreetService) error {
return nil
}
+// testClientStream: 5 requests -> 1 response
func testClientStream(cli greet.GreetService) error {
logger.Info("start to test TRIPLE client stream")
stream, err := cli.GreetClientStream(context.Background())
@@ -106,28 +126,51 @@ func testClientStream(cli greet.GreetService) error {
}
for i := 0; i < 5; i++ {
if sendErr := stream.Send(&greet.GreetClientStreamRequest{Name:
"triple"}); sendErr != nil {
- return err
+ return sendErr
}
}
resp, err := stream.CloseAndRecv()
if err != nil {
return err
}
+ if resp == nil {
+ return fmt.Errorf("unexpected client stream resp: <nil>")
+ }
logger.Infof("TRIPLE client stream resp: %s", resp.Greeting)
+ expectedGo := "triple,triple,triple,triple,triple"
+ expectedJavaPrefix := "Received 5 names: triple, triple, triple,
triple, triple"
+ if resp.Greeting != expectedGo && resp.Greeting != expectedJavaPrefix {
+ return fmt.Errorf("unexpected client stream resp: %+v", resp)
+ }
return nil
}
+// testServerStream: 1 request -> 10 responses
func testServerStream(cli greet.GreetService) error {
logger.Info("start to test TRIPLE server stream")
stream, err := cli.GreetServerStream(context.Background(),
&greet.GreetServerStreamRequest{Name: "triple"})
if err != nil {
return err
}
+ count := 0
for stream.Recv() {
- logger.Infof("TRIPLE server stream resp: %s",
stream.Msg().Greeting)
+ msg := stream.Msg()
+ if msg == nil {
+ return fmt.Errorf("unexpected server stream msg: <nil>")
+ }
+ expectedGo := "triple"
+ expectedJava := fmt.Sprintf("Response %d from serverStream for
StreamingClient", count)
+ if msg.Greeting != expectedGo && msg.Greeting != expectedJava {
+ return fmt.Errorf("unexpected server stream msg: %+v",
msg)
+ }
+ count++
+ logger.Infof("TRIPLE server stream resp #%d: %s", count,
msg.Greeting)
}
if stream.Err() != nil {
- return err
+ return stream.Err()
+ }
+ if count != 10 {
+ return fmt.Errorf("unexpected server stream count, expect 10
got %d", count)
}
if err := stream.Close(); err != nil {
return err
diff --git a/streaming/go-server/cmd/server.go
b/streaming/go-server/cmd/server.go
index 0f4e45a6..4ee6357b 100644
--- a/streaming/go-server/cmd/server.go
+++ b/streaming/go-server/cmd/server.go
@@ -56,11 +56,13 @@ func main() {
type GreetTripleServer struct {
}
+// Greet: 1 request -> 1 response, returns request name (echo)
func (srv *GreetTripleServer) Greet(ctx context.Context, req
*greet.GreetRequest) (*greet.GreetResponse, error) {
resp := &greet.GreetResponse{Greeting: req.Name}
return resp, nil
}
+// GreetStream: N requests -> N responses (1:1), returns request name (echo)
func (srv *GreetTripleServer) GreetStream(ctx context.Context, stream
greet.GreetService_GreetStreamServer) error {
for {
req, err := stream.Recv()
@@ -77,6 +79,7 @@ func (srv *GreetTripleServer) GreetStream(ctx
context.Context, stream greet.Gree
return nil
}
+// GreetClientStream: N requests -> 1 response, returns names joined by ","
(e.g. "a,b,c")
func (srv *GreetTripleServer) GreetClientStream(ctx context.Context, stream
greet.GreetService_GreetClientStreamServer) (*greet.GreetClientStreamResponse,
error) {
var reqs []string
for stream.Recv() {
@@ -92,8 +95,9 @@ func (srv *GreetTripleServer) GreetClientStream(ctx
context.Context, stream gree
return resp, nil
}
+// GreetServerStream: 1 request -> 10 responses, returns request name (echo)
func (srv *GreetTripleServer) GreetServerStream(ctx context.Context, req
*greet.GreetServerStreamRequest, stream
greet.GreetService_GreetServerStreamServer) error {
- for i := 0; i < 5; i++ {
+ for i := 0; i < 10; i++ {
if err :=
stream.Send(&greet.GreetServerStreamResponse{Greeting: req.Name}); err != nil {
return fmt.Errorf("triple ServerStream send err: %s",
err)
}
diff --git a/streaming/java-client/pom.xml b/streaming/java-client/pom.xml
new file mode 100644
index 00000000..b2898d67
--- /dev/null
+++ b/streaming/java-client/pom.xml
@@ -0,0 +1,96 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.dubbo</groupId>
+ <artifactId>dubbo-samples-streaming-parent</artifactId>
+ <version>1.0.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>dubbo-samples-streaming-client</artifactId>
+ <packaging>jar</packaging>
+
+ <name>Dubbo Samples Streaming Client</name>
+ <description>Dubbo Triple Protocol Streaming Client
Implementation</description>
+
+ <dependencies>
+ <!-- Dubbo Core -->
+ <dependency>
+ <groupId>org.apache.dubbo</groupId>
+ <artifactId>dubbo</artifactId>
+ </dependency>
+
+ <!-- Protobuf -->
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ </dependency>
+
+ <!-- Netty for network transport -->
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-all</artifactId>
+ </dependency>
+
+ <!-- Logging -->
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ </dependency>
+
+ <!-- JUnit 5 for testing -->
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <!-- Compiler Plugin -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ </plugin>
+
+ <!-- Protobuf Plugin -->
+ <plugin>
+ <groupId>org.xolstice.maven.plugins</groupId>
+ <artifactId>protobuf-maven-plugin</artifactId>
+ </plugin>
+
+ <!-- Exec Plugin for running the client -->
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>exec-maven-plugin</artifactId>
+ <configuration>
+
<mainClass>org.apache.dubbo.samples.tri.streaming.StreamingClient</mainClass>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/streaming/java-client/run.sh b/streaming/java-client/run.sh
new file mode 100644
index 00000000..8aa4c34e
--- /dev/null
+++ b/streaming/java-client/run.sh
@@ -0,0 +1,3 @@
+#!/bin/bash
+mvn clean compile
+mvn exec:java
-Dexec.mainClass="org.apache.dubbo.samples.tri.streaming.StreamingClient"
diff --git
a/streaming/java-client/src/main/java/org/apache/dubbo/samples/tri/streaming/StreamingClient.java
b/streaming/java-client/src/main/java/org/apache/dubbo/samples/tri/streaming/StreamingClient.java
new file mode 100644
index 00000000..4de5bd7e
--- /dev/null
+++
b/streaming/java-client/src/main/java/org/apache/dubbo/samples/tri/streaming/StreamingClient.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.samples.tri.streaming;
+
+import org.apache.dubbo.common.stream.StreamObserver;
+import org.apache.dubbo.config.ApplicationConfig;
+import org.apache.dubbo.config.ReferenceConfig;
+import org.apache.dubbo.config.bootstrap.DubboBootstrap;
+import org.apache.dubbo.samples.tri.streaming.api.GreetService;
+import org.apache.dubbo.samples.tri.streaming.api.GreetStreamRequest;
+import org.apache.dubbo.samples.tri.streaming.api.GreetStreamResponse;
+import org.apache.dubbo.samples.tri.streaming.api.GreetServerStreamRequest;
+import org.apache.dubbo.samples.tri.streaming.api.GreetServerStreamResponse;
+
+import org.apache.dubbo.samples.tri.streaming.api.GreetRequest;
+import org.apache.dubbo.samples.tri.streaming.api.GreetResponse;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class StreamingClient {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(StreamingClient.class);
+
+ private static final String SERVER_URL = "tri://127.0.0.1:20000";
+ private static final int TIMEOUT_SECONDS = 30;
+
+ public static void main(String[] args) {
+ System.out.println("\n" + "=".repeat(70));
+ System.out.println("Starting Dubbo Streaming Client Tests");
+ System.out.println("=".repeat(70));
+
+ // Configure reference to the remote service
+ ReferenceConfig<GreetService> reference = new ReferenceConfig<>();
+ reference.setInterface(GreetService.class);
+ reference.setUrl(SERVER_URL);
+ reference.setTimeout(10000);
+
+ // Start Dubbo Bootstrap
+ DubboBootstrap bootstrap = DubboBootstrap.getInstance();
+ bootstrap.application(new ApplicationConfig("streaming-client"))
+ .reference(reference)
+ .start();
+
+ System.out.println("Connected to server: " + SERVER_URL);
+ System.out.println("=".repeat(70) + "\n");
+
+ // Get the service proxy
+ GreetService greeter = reference.get();
+
+ // Test results tracking
+ boolean bidiStreamSuccess = false;
+ boolean serverStreamSuccess = false;
+ boolean unarySuccess = false;
+
+ try {
+ // Unary call quick check
+ System.out.println("\n" + "=".repeat(70));
+ System.out.println("TEST 0: Unary");
+ System.out.println("=".repeat(70));
+ unarySuccess = testUnary(greeter);
+
+ // Test bidirectional streaming
+ System.out.println("\n" + "=".repeat(70));
+ System.out.println("TEST 1: Bidirectional Streaming");
+ System.out.println("=".repeat(70));
+ bidiStreamSuccess = testBidiStream(greeter);
+
+ // Test server streaming
+ System.out.println("\n" + "=".repeat(70));
+ System.out.println("TEST 2: Server Streaming");
+ System.out.println("=".repeat(70));
+ serverStreamSuccess = testServerStream(greeter);
+
+ } catch (Exception e) {
+ LOGGER.error("Error during testing", e);
+ printTestSummary(unarySuccess, bidiStreamSuccess,
serverStreamSuccess);
+ bootstrap.stop();
+ System.exit(1);
+ } finally {
+ // Print test results summary
+ printTestSummary(unarySuccess, bidiStreamSuccess,
serverStreamSuccess);
+ // Shutdown
+ bootstrap.stop();
+ System.out.println("\n Client shutdown complete\n");
+ }
+
+ if (!unarySuccess || !bidiStreamSuccess || !serverStreamSuccess) {
+ System.exit(1);
+ }
+ }
+
+ // Unary quick check
+ private static boolean testUnary(GreetService greeter) throws Exception {
+ GreetRequest request =
GreetRequest.newBuilder().setName("triple").build();
+ GreetResponse response = greeter.greet(request);
+ String expectedGreetingJava = "Hello triple";
+ String expectedGreetingGo = "triple";
+ if (response == null ||
(!expectedGreetingJava.equals(response.getGreeting()) &&
!expectedGreetingGo.equals(response.getGreeting()))) {
+ throw new IllegalStateException("Unexpected unary response, expect
\"" + expectedGreetingJava + "\" or \"" + expectedGreetingGo + "\" got: " +
response);
+ }
+ System.out.println(" Unary call response: " +
response.getGreeting());
+ return true;
+ }
+
+ // BiStream test: send 5 requests, expect 5 matching responses
+ private static boolean testBidiStream(GreetService greeter) throws
Exception {
+ final CountDownLatch latch = new CountDownLatch(1);
+ final AtomicInteger responseCount = new AtomicInteger(0);
+ final int expectedResponses = 5;
+ final AtomicBoolean success = new AtomicBoolean(false);
+ final AtomicReference<Throwable> failureRef = new AtomicReference<>();
+ final String[] expectedNames = {"Client-0", "Client-1", "Client-2",
"Client-3", "Client-4"};
+
+ try {
+ // Create response observer
+ StreamObserver<GreetStreamResponse> responseObserver = new
StreamObserver<GreetStreamResponse>() {
+ @Override
+ public void onNext(GreetStreamResponse response) {
+ int count = responseCount.incrementAndGet();
+ String name = expectedNames[count - 1];
+ String expectedJava = "Echo from biStream: " + name;
+ System.out.println(" Received response #" + count + ":
" + response.getGreeting());
+ if (response == null ||
(!expectedJava.equals(response.getGreeting()) &&
!name.equals(response.getGreeting()))) {
+ failureRef.set(new IllegalStateException("Unexpected
bidi resp at #" + count + ", expect \"" + expectedJava + "\" or \"" + name +
"\" got " + response));
+ latch.countDown();
+ }
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ System.err.println(" BiStream error: " +
throwable.getMessage());
+ failureRef.set(throwable);
+ latch.countDown();
+ }
+
+ @Override
+ public void onCompleted() {
+ System.out.println("\n BiStream completed - Received " +
responseCount.get() + " responses");
+ success.set(responseCount.get() == expectedResponses &&
failureRef.get() == null);
+ latch.countDown();
+ }
+ };
+
+ // Get request observer
+ StreamObserver<GreetStreamRequest> requestObserver =
greeter.greetStream(responseObserver);
+
+ // Send multiple requests
+ for (int i = 0; i < expectedResponses; i++) {
+ GreetStreamRequest request = GreetStreamRequest.newBuilder()
+ .setName(expectedNames[i])
+ .build();
+ System.out.println(" Sending request #" + i + ": " +
request.getName());
+ requestObserver.onNext(request);
+
+ // Small delay between requests
+ Thread.sleep(100);
+ }
+
+ // Signal completion
+ requestObserver.onCompleted();
+ System.out.println("\n All requests sent, waiting for
responses...");
+
+ // Wait for responses
+ if (!latch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
+ throw new IllegalStateException("BiStream test timed out");
+ }
+
+ if (failureRef.get() != null) {
+ throw new IllegalStateException("BiStream failed",
failureRef.get());
+ }
+ if (responseCount.get() != expectedResponses) {
+ throw new IllegalStateException("BiStream response count
mismatch, expect " + expectedResponses + " got " + responseCount.get());
+ }
+
+ return success.get();
+
+ } catch (Exception e) {
+ LOGGER.error("Error in testBidiStream", e);
+ throw e;
+ }
+ }
+
+ // ServerStream test: send 1 request, expect 10 responses
+ private static boolean testServerStream(GreetService greeter) throws
Exception {
+ final CountDownLatch latch = new CountDownLatch(1);
+ final AtomicInteger responseCount = new AtomicInteger(0);
+ final int expectedResponses = 10;
+ final AtomicBoolean success = new AtomicBoolean(false);
+ final AtomicReference<Throwable> failureRef = new AtomicReference<>();
+
+ try {
+ // Create request
+ GreetServerStreamRequest request =
GreetServerStreamRequest.newBuilder()
+ .setName("StreamingClient")
+ .build();
+
+ System.out.println(" Sending request: " + request.getName());
+ System.out.println(" Waiting for server stream responses...\n");
+
+ // Create response observer
+ StreamObserver<GreetServerStreamResponse> responseObserver = new
StreamObserver<GreetServerStreamResponse>() {
+ @Override
+ public void onNext(GreetServerStreamResponse response) {
+ int count = responseCount.incrementAndGet();
+ System.out.println(" Received response #" + count + ":
" + response.getGreeting());
+ String expectedJava = "Response " + (count - 1) + " from
serverStream for " + request.getName();
+ String expectedGo = request.getName();
+ if (response == null ||
(!expectedJava.equals(response.getGreeting()) &&
!expectedGo.equals(response.getGreeting()))) {
+ failureRef.set(new IllegalStateException("Unexpected
server stream resp at #" + count + ", expect \"" + expectedJava + "\" or \"" +
expectedGo + "\" got " + response));
+ latch.countDown();
+ }
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ System.err.println(" ServerStream error: " +
throwable.getMessage());
+ failureRef.set(throwable);
+ latch.countDown(); // Ensure latch is counted down on error
+ }
+
+ @Override
+ public void onCompleted() {
+ System.out.println("\n ServerStream completed - Received
" + responseCount.get() + " responses");
+ success.set(responseCount.get() == expectedResponses &&
failureRef.get() == null);
+ latch.countDown();
+ }
+ };
+
+ // Call server streaming method
+ greeter.greetServerStream(request, responseObserver);
+
+ // Wait for responses
+ if (!latch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
+ throw new IllegalStateException("ServerStream test timed out");
+ }
+
+ if (failureRef.get() != null) {
+ throw new IllegalStateException("ServerStream failed",
failureRef.get());
+ }
+ if (responseCount.get() != expectedResponses) {
+ throw new IllegalStateException("ServerStream response count
mismatch, expect " + expectedResponses + " got " + responseCount.get());
+ }
+
+ return success.get();
+
+ } catch (Exception e) {
+ LOGGER.error("Error in testServerStream", e);
+ throw e;
+ }
+ }
+
+ /**
+ * Print test results summary.
+ *
+ * ASSERT POINT for integration testing:
+ * - Integration test can check exit behavior based on bidiStreamSuccess
&& serverStreamSuccess
+ *
+ * @param bidiStreamSuccess whether bidirectional streaming test passed
+ * @param serverStreamSuccess whether server streaming test passed
+ */
+ private static void printTestSummary(boolean unarySuccess, boolean
bidiStreamSuccess, boolean serverStreamSuccess) {
+ System.out.println("\n" + "=".repeat(70));
+ System.out.println(" TEST RESULTS SUMMARY");
+ System.out.println("=".repeat(70));
+ System.out.println(" Unary: " + (unarySuccess ? " PASSED" : "
FAILED"));
+ System.out.println(" Bidirectional Streaming: " + (bidiStreamSuccess
? " PASSED" : " FAILED"));
+ System.out.println(" Server Streaming: " + (serverStreamSuccess ? "
PASSED" : " FAILED"));
+ System.out.println("-".repeat(70));
+ if (unarySuccess && bidiStreamSuccess && serverStreamSuccess) {
+ System.out.println(" Overall: ALL TESTS PASSED!");
+ } else {
+ System.out.println(" Overall: SOME TESTS FAILED");
+ }
+ System.out.println("=".repeat(70));
+ }
+}
diff --git a/streaming/java-client/src/main/resources/logback.xml
b/streaming/java-client/src/main/resources/logback.xml
new file mode 100644
index 00000000..383f5468
--- /dev/null
+++ b/streaming/java-client/src/main/resources/logback.xml
@@ -0,0 +1,20 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<configuration>
+ <!-- Console appender -->
+ <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} -
%msg%n</pattern>
+ </encoder>
+ </appender>
+
+ <!-- Set Dubbo framework logs to WARN level to reduce noise -->
+ <logger name="org.apache.dubbo" level="WARN"/>
+ <logger name="io.netty" level="WARN"/>
+
+ <!-- Keep application logs at INFO level -->
+ <logger name="org.apache.dubbo.samples.tri.streaming" level="INFO"/>
+
+ <root level="INFO">
+ <appender-ref ref="CONSOLE"/>
+ </root>
+</configuration>
diff --git a/streaming/java-server/pom.xml b/streaming/java-server/pom.xml
new file mode 100644
index 00000000..d7bc66ea
--- /dev/null
+++ b/streaming/java-server/pom.xml
@@ -0,0 +1,96 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.dubbo</groupId>
+ <artifactId>dubbo-samples-streaming-parent</artifactId>
+ <version>1.0.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>dubbo-samples-streaming-server</artifactId>
+ <packaging>jar</packaging>
+
+ <name>Dubbo Samples Streaming Server</name>
+ <description>Dubbo Triple Protocol Streaming Server
Implementation</description>
+
+ <dependencies>
+ <!-- Dubbo -->
+ <dependency>
+ <groupId>org.apache.dubbo</groupId>
+ <artifactId>dubbo</artifactId>
+ </dependency>
+
+ <!-- Protobuf -->
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ </dependency>
+
+ <!-- Netty -->
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-all</artifactId>
+ </dependency>
+
+ <!-- Logging -->
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ </dependency>
+
+ <!-- JUnit 5 -->
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <!-- Compiler Plugin -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ </plugin>
+
+ <!-- Protobuf Plugin -->
+ <plugin>
+ <groupId>org.xolstice.maven.plugins</groupId>
+ <artifactId>protobuf-maven-plugin</artifactId>
+ </plugin>
+
+ <!-- run the server -->
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>exec-maven-plugin</artifactId>
+ <configuration>
+
<mainClass>org.apache.dubbo.samples.tri.streaming.StreamingServer</mainClass>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/streaming/java-server/run.sh b/streaming/java-server/run.sh
new file mode 100644
index 00000000..cad7c4ed
--- /dev/null
+++ b/streaming/java-server/run.sh
@@ -0,0 +1,3 @@
+#!/bin/bash
+mvn clean compile
+mvn exec:java
-Dexec.mainClass="org.apache.dubbo.samples.tri.streaming.StreamingServer"
diff --git
a/streaming/java-server/src/main/java/org/apache/dubbo/samples/tri/streaming/GreeterImpl.java
b/streaming/java-server/src/main/java/org/apache/dubbo/samples/tri/streaming/GreeterImpl.java
new file mode 100644
index 00000000..d8ebddc7
--- /dev/null
+++
b/streaming/java-server/src/main/java/org/apache/dubbo/samples/tri/streaming/GreeterImpl.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.samples.tri.streaming;
+
+import org.apache.dubbo.common.stream.StreamObserver;
+import org.apache.dubbo.samples.tri.streaming.api.DubboGreetServiceTriple;
+import org.apache.dubbo.samples.tri.streaming.api.GreetRequest;
+import org.apache.dubbo.samples.tri.streaming.api.GreetResponse;
+import org.apache.dubbo.samples.tri.streaming.api.GreetStreamRequest;
+import org.apache.dubbo.samples.tri.streaming.api.GreetStreamResponse;
+import org.apache.dubbo.samples.tri.streaming.api.GreetClientStreamRequest;
+import org.apache.dubbo.samples.tri.streaming.api.GreetClientStreamResponse;
+import org.apache.dubbo.samples.tri.streaming.api.GreetServerStreamRequest;
+import org.apache.dubbo.samples.tri.streaming.api.GreetServerStreamResponse;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+public class GreeterImpl extends DubboGreetServiceTriple.GreetServiceImplBase {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(GreeterImpl.class);
+
+ // Unary: 1 request -> 1 response, returns "Hello {name}"
+ @Override
+ public GreetResponse greet(GreetRequest request) {
+ LOGGER.info("Received unary request: {}", request.getName());
+
+ GreetResponse response = GreetResponse.newBuilder()
+ .setGreeting("Hello " + request.getName())
+ .build();
+
+ LOGGER.info("Sending unary response: {}", response.getGreeting());
+ return response;
+ }
+
+ // BiStream: N requests -> N responses (1:1), returns "Echo from biStream:
{name}"
+ @Override
+ public StreamObserver<GreetStreamRequest>
greetStream(StreamObserver<GreetStreamResponse> responseObserver) {
+ return new StreamObserver<GreetStreamRequest>() {
+ @Override
+ public void onNext(GreetStreamRequest request) {
+ try {
+ LOGGER.info("Received biStream request: {}",
request.getName());
+
+ // Echo back the request with the same name
+ GreetStreamResponse response =
GreetStreamResponse.newBuilder()
+ .setGreeting("Echo from biStream: " +
request.getName())
+ .build();
+
+ responseObserver.onNext(response);
+ } catch (Exception e) {
+ LOGGER.error("Error processing biStream request", e);
+ responseObserver.onError(e);
+ }
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ LOGGER.error("BiStream error occurred", throwable);
+ }
+
+ @Override
+ public void onCompleted() {
+ LOGGER.info("BiStream completed");
+ responseObserver.onCompleted();
+ }
+ };
+ }
+
+ // ClientStream: N requests -> 1 response, returns "Received {count}
names: {name1}, {name2}, ..."
+ @Override
+ public StreamObserver<GreetClientStreamRequest> greetClientStream(
+ StreamObserver<GreetClientStreamResponse> responseObserver) {
+ return new StreamObserver<GreetClientStreamRequest>() {
+ private final List<String> names = new ArrayList<>();
+
+ @Override
+ public void onNext(GreetClientStreamRequest request) {
+ LOGGER.info("Received clientStream request: {}",
request.getName());
+ names.add(request.getName());
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ LOGGER.error("ClientStream error occurred", throwable);
+ }
+
+ @Override
+ public void onCompleted() {
+ // Aggregate all received names and send a single response
+ String greeting = "Received " + names.size() + " names: " +
String.join(", ", names);
+ GreetClientStreamResponse response =
GreetClientStreamResponse.newBuilder()
+ .setGreeting(greeting)
+ .build();
+
+ responseObserver.onNext(response);
+ responseObserver.onCompleted();
+ LOGGER.info("ClientStream completed with {} requests",
names.size());
+ }
+ };
+ }
+
+ // ServerStream: 1 request -> 10 responses, returns "Response {i} from
serverStream for {name}"
+ @Override
+ public void greetServerStream(GreetServerStreamRequest request,
StreamObserver<GreetServerStreamResponse> responseObserver) {
+ try {
+ LOGGER.info("Received serverStream request: {}",
request.getName());
+
+ for (int i = 0; i < 10; i++) {
+ GreetServerStreamResponse response =
GreetServerStreamResponse.newBuilder()
+ .setGreeting("Response " + i + " from serverStream for
" + request.getName())
+ .build();
+
+ responseObserver.onNext(response);
+ LOGGER.debug("Sent serverStream response {}", i);
+ }
+
+ responseObserver.onCompleted();
+ LOGGER.info("ServerStream completed for request: {}",
request.getName());
+
+ } catch (Exception e) {
+ LOGGER.error("Error in serverStream", e);
+ responseObserver.onError(e);
+ }
+ }
+}
diff --git
a/streaming/java-server/src/main/java/org/apache/dubbo/samples/tri/streaming/StreamingServer.java
b/streaming/java-server/src/main/java/org/apache/dubbo/samples/tri/streaming/StreamingServer.java
new file mode 100644
index 00000000..f1024564
--- /dev/null
+++
b/streaming/java-server/src/main/java/org/apache/dubbo/samples/tri/streaming/StreamingServer.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.samples.tri.streaming;
+
+import org.apache.dubbo.config.ApplicationConfig;
+import org.apache.dubbo.config.ProtocolConfig;
+import org.apache.dubbo.config.ServiceConfig;
+import org.apache.dubbo.config.bootstrap.DubboBootstrap;
+import org.apache.dubbo.samples.tri.streaming.api.GreetService;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StreamingServer {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(StreamingServer.class);
+
+ private static final int SERVER_PORT = 20000;
+ private static final String PROTOCOL_NAME = "tri";
+ private static final String APPLICATION_NAME = "streaming-server";
+
+ public static void main(String[] args) {
+ try {
+ LOGGER.info("Starting Dubbo Streaming Server...");
+
+ // Create service configuration
+ ServiceConfig<GreetService> service = new ServiceConfig<>();
+ service.setInterface(GreetService.class);
+ service.setRef(new GreeterImpl());
+
+ // Configure and start Dubbo Bootstrap
+ DubboBootstrap bootstrap = DubboBootstrap.getInstance();
+ bootstrap.application(new ApplicationConfig(APPLICATION_NAME))
+ .protocol(new ProtocolConfig(PROTOCOL_NAME, SERVER_PORT))
+ .service(service)
+ .start();
+
+ LOGGER.info("Dubbo Streaming Server started successfully on port
{}", SERVER_PORT);
+ LOGGER.info("Server is ready to accept connections...");
+
+ // Keep the server running
+ bootstrap.await();
+
+ } catch (Exception e) {
+ LOGGER.error("Failed to start Dubbo Streaming Server", e);
+ System.exit(1);
+ }
+ }
+}
diff --git a/streaming/java-server/src/main/resources/logback.xml
b/streaming/java-server/src/main/resources/logback.xml
new file mode 100644
index 00000000..9967b595
--- /dev/null
+++ b/streaming/java-server/src/main/resources/logback.xml
@@ -0,0 +1,17 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<configuration>
+ <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} -
%msg%n</pattern>
+ </encoder>
+ </appender>
+
+ <logger name="org.apache.dubbo" level="WARN"/>
+ <logger name="io.netty" level="WARN"/>
+
+ <logger name="org.apache.dubbo.samples.tri.streaming" level="INFO"/>
+
+ <root level="INFO">
+ <appender-ref ref="CONSOLE"/>
+ </root>
+</configuration>
diff --git a/streaming/pom.xml b/streaming/pom.xml
new file mode 100644
index 00000000..da354457
--- /dev/null
+++ b/streaming/pom.xml
@@ -0,0 +1,168 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>org.apache.dubbo</groupId>
+ <artifactId>dubbo-samples-streaming-parent</artifactId>
+ <version>1.0.0-SNAPSHOT</version>
+ <packaging>pom</packaging>
+
+ <name>Dubbo Samples Streaming Parent</name>
+ <description>Parent POM for Dubbo Triple Protocol Streaming
Samples</description>
+
+ <modules>
+ <module>java-client</module>
+ <module>java-server</module>
+ </modules>
+
+ <properties>
+ <maven.compiler.source>8</maven.compiler.source>
+ <maven.compiler.target>8</maven.compiler.target>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+
+ <dubbo.version>3.3.1</dubbo.version>
+ <protobuf.version>3.25.5</protobuf.version>
+ <netty.version>4.1.100.Final</netty.version>
+ <slf4j.version>1.7.36</slf4j.version>
+ <logback.version>1.2.11</logback.version>
+ <junit.version>5.9.3</junit.version>
+
+ <maven-compiler-plugin.version>3.8.1</maven-compiler-plugin.version>
+ <protobuf-maven-plugin.version>0.6.1</protobuf-maven-plugin.version>
+ <os-maven-plugin.version>1.6.1</os-maven-plugin.version>
+ <exec-maven-plugin.version>3.1.0</exec-maven-plugin.version>
+ </properties>
+
+ <dependencyManagement>
+ <dependencies>
+ <!-- Dubbo BOM -->
+ <dependency>
+ <groupId>org.apache.dubbo</groupId>
+ <artifactId>dubbo-bom</artifactId>
+ <version>${dubbo.version}</version>
+ <type>pom</type>
+ <scope>import</scope>
+ </dependency>
+
+ <!-- Dubbo Core -->
+ <dependency>
+ <groupId>org.apache.dubbo</groupId>
+ <artifactId>dubbo</artifactId>
+ <version>${dubbo.version}</version>
+ </dependency>
+
+ <!-- Protobuf -->
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ <version>${protobuf.version}</version>
+ </dependency>
+
+ <!-- Netty for network transport -->
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-all</artifactId>
+ <version>${netty.version}</version>
+ </dependency>
+
+ <!-- Logging -->
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>${slf4j.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ <version>${logback.version}</version>
+ </dependency>
+
+ <!-- JUnit 5 for testing -->
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter</artifactId>
+ <version>${junit.version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
+
+ <build>
+ <pluginManagement>
+ <plugins>
+ <!-- Compiler Plugin -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>${maven-compiler-plugin.version}</version>
+ <configuration>
+ <source>8</source>
+ <target>8</target>
+ <encoding>UTF-8</encoding>
+ </configuration>
+ </plugin>
+
+ <!-- Protobuf Plugin -->
+ <plugin>
+ <groupId>org.xolstice.maven.plugins</groupId>
+ <artifactId>protobuf-maven-plugin</artifactId>
+ <version>${protobuf-maven-plugin.version}</version>
+ <configuration>
+
<protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}</protocArtifact>
+
<protoSourceRoot>${project.basedir}/../proto</protoSourceRoot>
+ <protocPlugins>
+ <protocPlugin>
+ <id>dubbo</id>
+ <groupId>org.apache.dubbo</groupId>
+ <artifactId>dubbo-compiler</artifactId>
+ <version>${dubbo.version}</version>
+
<mainClass>org.apache.dubbo.gen.tri.Dubbo3TripleGenerator</mainClass>
+ </protocPlugin>
+ </protocPlugins>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+ <!-- main classes -->
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>exec-maven-plugin</artifactId>
+ <version>${exec-maven-plugin.version}</version>
+ </plugin>
+ </plugins>
+ </pluginManagement>
+
+ <extensions>
+ <!-- OS Maven -->
+ <extension>
+ <groupId>kr.motd.maven</groupId>
+ <artifactId>os-maven-plugin</artifactId>
+ <version>${os-maven-plugin.version}</version>
+ </extension>
+ </extensions>
+ </build>
+</project>
diff --git a/streaming/proto/greet.proto b/streaming/proto/greet.proto
index ccd41557..439678f6 100644
--- a/streaming/proto/greet.proto
+++ b/streaming/proto/greet.proto
@@ -20,6 +20,9 @@ syntax = "proto3";
package greet;
option go_package = "github.com/apache/dubbo-go-samples/streaming/proto;greet";
+option java_multiple_files = true;
+option java_package = "org.apache.dubbo.samples.tri.streaming.api";
+option java_outer_classname = "GreetProto";
message GreetRequest {
string name = 1;