This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 4a2bdeadc [ISSUE #5195] write data directly when there is no
topicsList of publishing (#5196)
4a2bdeadc is described below
commit 4a2bdeadceba8f49bc737335d245fda975c4e7c0
Author: lk <[email protected]>
AuthorDate: Wed Sep 28 11:44:12 2022 +0800
[ISSUE #5195] write data directly when there is no topicsList of publishing
(#5196)
---
.../rocketmq/proxy/grpc/v2/client/ClientActivity.java | 8 ++++++--
.../rocketmq/proxy/grpc/v2/client/ClientActivityTest.java | 13 +++++++++++++
2 files changed, 19 insertions(+), 2 deletions(-)
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java
index 352e98d81..2192014b5 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java
@@ -253,14 +253,18 @@ public class ClientActivity extends
AbstractMessingActivity {
default:
break;
}
- if (grpcClientChannel == null) {
+ if
(Settings.PubSubCase.PUBSUB_NOT_SET.equals(settings.getPubSubCase())) {
responseObserver.onError(io.grpc.Status.INVALID_ARGUMENT
.withDescription("there is no publishing or subscription data
in settings")
.asRuntimeException());
return;
}
TelemetryCommand command = processClientSettings(ctx, request);
- grpcClientChannel.writeTelemetryCommand(command);
+ if (grpcClientChannel != null) {
+ grpcClientChannel.writeTelemetryCommand(command);
+ } else {
+ responseObserver.onNext(command);
+ }
}
protected TelemetryCommand processClientSettings(ProxyContext ctx,
TelemetryCommand request) {
diff --git
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivityTest.java
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivityTest.java
index 8d9089f88..ea045774f 100644
---
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivityTest.java
+++
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivityTest.java
@@ -322,6 +322,19 @@ public class ClientActivityTest extends BaseActivityTest {
}
}
+ @Test
+ public void testEmptyProducerSettings() throws Throwable {
+ ProxyContext context = createContext();
+ TelemetryCommand command = this.sendClientTelemetry(
+ context,
+ Settings.newBuilder()
+ .setClientType(ClientType.PRODUCER)
+ .setPublishing(Publishing.getDefaultInstance())
+ .build()).get();
+ assertTrue(command.hasSettings());
+ assertTrue(command.getSettings().hasPublishing());
+ }
+
@Test
public void testReportThreadStackTrace() {
this.clientActivity = new ClientActivity(this.messagingProcessor,
this.grpcClientSettingsManager, grpcChannelManagerMock);