This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 0533816d4 [ISSUE #5839] Code.ILLEGAL_POLLING_TIME is not compatible
with gRPC Client <=5.0.2 (#5841)
0533816d4 is described below
commit 0533816d42961d0ba0ea012a115261ea71dd30c4
Author: Aaron Ai <[email protected]>
AuthorDate: Mon Jan 9 17:53:41 2023 +0800
[ISSUE #5839] Code.ILLEGAL_POLLING_TIME is not compatible with gRPC Client
<=5.0.2 (#5841)
* Fix #5839: Code.ILLEGAL_POLLING_TIME is not compatible with gRPC Client
<=5.0.2
* Merge two tests into testReceiveMessageWithIllegalPollingTime
---
.../grpc/v2/consumer/ReceiveMessageActivity.java | 7 +++-
.../v2/consumer/ReceiveMessageActivityTest.java | 42 ++++++++++++++++++++++
2 files changed, 48 insertions(+), 1 deletion(-)
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
index 31b841132..ddbe07083 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
@@ -49,6 +49,7 @@ import
org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
public class ReceiveMessageActivity extends AbstractMessingActivity {
protected ReceiptHandleProcessor receiptHandleProcessor;
+ private static final String ILLEGAL_POLLING_TIME_INTRODUCED_CLIENT_VERSION
= "5.0.3";
public ReceiveMessageActivity(MessagingProcessor messagingProcessor,
ReceiptHandleProcessor receiptHandleProcessor,
GrpcClientSettingsManager grpcClientSettingsManager,
GrpcChannelManager grpcChannelManager) {
@@ -85,7 +86,11 @@ public class ReceiveMessageActivity extends
AbstractMessingActivity {
if (timeRemaining >=
config.getGrpcClientConsumerMinLongPollingTimeoutMillis()) {
pollingTime = timeRemaining;
} else {
- writer.writeAndComplete(ctx, Code.ILLEGAL_POLLING_TIME,
"The deadline time remaining is not enough" +
+ final String clientVersion = ctx.getClientVersion();
+ Code code =
+ null == clientVersion ||
ILLEGAL_POLLING_TIME_INTRODUCED_CLIENT_VERSION.compareTo(clientVersion) > 0 ?
+ Code.BAD_REQUEST : Code.ILLEGAL_POLLING_TIME;
+ writer.writeAndComplete(ctx, code, "The deadline time
remaining is not enough" +
" for polling, please check network condition");
return;
}
diff --git
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java
index 4c2f7bd1c..e5aeb025d 100644
---
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java
+++
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java
@@ -25,6 +25,7 @@ import apache.rocketmq.v2.ReceiveMessageRequest;
import apache.rocketmq.v2.ReceiveMessageResponse;
import apache.rocketmq.v2.Resource;
import apache.rocketmq.v2.Settings;
+import com.google.protobuf.Duration;
import com.google.protobuf.util.Durations;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
@@ -112,6 +113,47 @@ public class ReceiveMessageActivityTest extends
BaseActivityTest {
assertEquals(0L, pollTimeCaptor.getValue().longValue());
}
+ @Test
+ public void testReceiveMessageWithIllegalPollingTime() {
+ StreamObserver<ReceiveMessageResponse> receiveStreamObserver =
mock(ServerCallStreamObserver.class);
+ ArgumentCaptor<ReceiveMessageResponse> responseArgumentCaptor0 =
ArgumentCaptor.forClass(ReceiveMessageResponse.class);
+
doNothing().when(receiveStreamObserver).onNext(responseArgumentCaptor0.capture());
+
+
when(this.grpcClientSettingsManager.getClientSettings(any())).thenReturn(Settings.newBuilder().getDefaultInstanceForType());
+
+ final ProxyContext context = createContext();
+ context.setClientVersion("5.0.2");
+ context.setRemainingMs(-1L);
+ final ReceiveMessageRequest request =
ReceiveMessageRequest.newBuilder()
+ .setGroup(Resource.newBuilder().setName(CONSUMER_GROUP).build())
+
.setMessageQueue(MessageQueue.newBuilder().setTopic(Resource.newBuilder().setName(TOPIC).build()).build())
+ .setAutoRenew(false)
+
.setLongPollingTimeout(Duration.newBuilder().setSeconds(20).build())
+ .setFilterExpression(FilterExpression.newBuilder()
+ .setType(FilterType.TAG)
+ .setExpression("*")
+ .build())
+ .build();
+ this.receiveMessageActivity.receiveMessage(
+ context,
+ request,
+ receiveStreamObserver
+ );
+ assertEquals(Code.BAD_REQUEST,
getResponseCodeFromReceiveMessageResponseList(responseArgumentCaptor0.getAllValues()));
+
+ ArgumentCaptor<ReceiveMessageResponse> responseArgumentCaptor1 =
+ ArgumentCaptor.forClass(ReceiveMessageResponse.class);
+
doNothing().when(receiveStreamObserver).onNext(responseArgumentCaptor1.capture());
+ context.setClientVersion("5.0.3");
+ this.receiveMessageActivity.receiveMessage(
+ context,
+ request,
+ receiveStreamObserver
+ );
+ assertEquals(Code.ILLEGAL_POLLING_TIME,
+
getResponseCodeFromReceiveMessageResponseList(responseArgumentCaptor1.getAllValues()));
+ }
+
@Test
public void testReceiveMessageIllegalFilter() {
StreamObserver<ReceiveMessageResponse> receiveStreamObserver =
mock(ServerCallStreamObserver.class);