This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 4a2bdeadc [ISSUE #5195] write data directly when there is no 
topicsList of publishing (#5196)
4a2bdeadc is described below

commit 4a2bdeadceba8f49bc737335d245fda975c4e7c0
Author: lk <[email protected]>
AuthorDate: Wed Sep 28 11:44:12 2022 +0800

    [ISSUE #5195] write data directly when there is no topicsList of publishing 
(#5196)
---
 .../rocketmq/proxy/grpc/v2/client/ClientActivity.java       |  8 ++++++--
 .../rocketmq/proxy/grpc/v2/client/ClientActivityTest.java   | 13 +++++++++++++
 2 files changed, 19 insertions(+), 2 deletions(-)

diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java
index 352e98d81..2192014b5 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java
@@ -253,14 +253,18 @@ public class ClientActivity extends 
AbstractMessingActivity {
             default:
                 break;
         }
-        if (grpcClientChannel == null) {
+        if 
(Settings.PubSubCase.PUBSUB_NOT_SET.equals(settings.getPubSubCase())) {
             responseObserver.onError(io.grpc.Status.INVALID_ARGUMENT
                 .withDescription("there is no publishing or subscription data 
in settings")
                 .asRuntimeException());
             return;
         }
         TelemetryCommand command = processClientSettings(ctx, request);
-        grpcClientChannel.writeTelemetryCommand(command);
+        if (grpcClientChannel != null) {
+            grpcClientChannel.writeTelemetryCommand(command);
+        } else {
+            responseObserver.onNext(command);
+        }
     }
 
     protected TelemetryCommand processClientSettings(ProxyContext ctx, 
TelemetryCommand request) {
diff --git 
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivityTest.java
 
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivityTest.java
index 8d9089f88..ea045774f 100644
--- 
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivityTest.java
+++ 
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivityTest.java
@@ -322,6 +322,19 @@ public class ClientActivityTest extends BaseActivityTest {
         }
     }
 
+    @Test
+    public void testEmptyProducerSettings() throws Throwable {
+        ProxyContext context = createContext();
+        TelemetryCommand command = this.sendClientTelemetry(
+            context,
+            Settings.newBuilder()
+                .setClientType(ClientType.PRODUCER)
+                .setPublishing(Publishing.getDefaultInstance())
+                .build()).get();
+        assertTrue(command.hasSettings());
+        assertTrue(command.getSettings().hasPublishing());
+    }
+
     @Test
     public void testReportThreadStackTrace() {
         this.clientActivity = new ClientActivity(this.messagingProcessor, 
this.grpcClientSettingsManager, grpcChannelManagerMock);

Reply via email to