This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new 57c5935b Bump rocketmq-proto to 2.0.2 (#364)
57c5935b is described below
commit 57c5935bc5a820b3e66429075d375a9bdf72b4be
Author: Aaron Ai <[email protected]>
AuthorDate: Thu Feb 23 19:50:28 2023 +0800
Bump rocketmq-proto to 2.0.2 (#364)
---
.../org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java | 6 ++++--
.../apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java | 6 ++++--
.../rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java | 2 +-
.../rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java | 2 +-
java/pom.xml | 2 +-
5 files changed, 11 insertions(+), 7 deletions(-)
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
index c4ee9751..3efcb540 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
@@ -242,17 +242,19 @@ abstract class ConsumerImpl extends ClientImpl {
}
ReceiveMessageRequest wrapReceiveMessageRequest(int batchSize,
MessageQueueImpl mq,
- FilterExpression filterExpression) {
+ FilterExpression filterExpression, Duration longPollingTimeout) {
return ReceiveMessageRequest.newBuilder().setGroup(getProtobufGroup())
.setMessageQueue(mq.toProtobuf()).setFilterExpression(wrapFilterExpression(filterExpression))
+
.setLongPollingTimeout(Durations.fromNanos(longPollingTimeout.toNanos()))
.setBatchSize(batchSize).setAutoRenew(true).build();
}
ReceiveMessageRequest wrapReceiveMessageRequest(int batchSize,
MessageQueueImpl mq,
- FilterExpression filterExpression, Duration invisibleDuration) {
+ FilterExpression filterExpression, Duration invisibleDuration,
Duration longPollingTimeout) {
final com.google.protobuf.Duration duration =
Durations.fromNanos(invisibleDuration.toNanos());
return ReceiveMessageRequest.newBuilder().setGroup(getProtobufGroup())
.setMessageQueue(mq.toProtobuf()).setFilterExpression(wrapFilterExpression(filterExpression))
+
.setLongPollingTimeout(Durations.fromNanos(longPollingTimeout.toNanos()))
.setBatchSize(batchSize).setAutoRenew(false).setInvisibleDuration(duration).build();
}
}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
index 8443c293..ff4a13dc 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
@@ -219,7 +219,9 @@ class ProcessQueueImpl implements ProcessQueue {
try {
final Endpoints endpoints = mq.getBroker().getEndpoints();
final int batchSize = this.getReceptionBatchSize();
- final ReceiveMessageRequest request =
consumer.wrapReceiveMessageRequest(batchSize, mq, filterExpression);
+ final Duration longPollingTimeout =
consumer.getPushConsumerSettings().getLongPollingTimeout();
+ final ReceiveMessageRequest request =
consumer.wrapReceiveMessageRequest(batchSize, mq, filterExpression,
+ longPollingTimeout);
activityNanoTime = System.nanoTime();
// Intercept before message reception.
@@ -227,7 +229,7 @@ class ProcessQueueImpl implements ProcessQueue {
consumer.doBefore(context, Collections.emptyList());
final ListenableFuture<ReceiveMessageResult> future =
consumer.receiveMessage(request, mq,
- consumer.getPushConsumerSettings().getLongPollingTimeout());
+ longPollingTimeout);
Futures.addCallback(future, new
FutureCallback<ReceiveMessageResult>() {
@Override
public void onSuccess(ReceiveMessageResult result) {
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
index 4b7ddb13..5d6092a8 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
@@ -202,7 +202,7 @@ class SimpleConsumerImpl extends ConsumerImpl implements
SimpleConsumer {
final ListenableFuture<ReceiveMessageResult> future0 =
Futures.transformAsync(routeFuture, result -> {
final MessageQueueImpl mq = result.takeMessageQueue();
final ReceiveMessageRequest request =
wrapReceiveMessageRequest(maxMessageNum, mq, filterExpression,
- invisibleDuration);
+ invisibleDuration, awaitDuration);
return receiveMessage(request, mq, awaitDuration);
}, MoreExecutors.directExecutor());
return Futures.transformAsync(future0, result ->
Futures.immediateFuture(result.getMessageViews()),
diff --git
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
index 4e497cd9..b4a29717 100644
---
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
+++
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
@@ -134,7 +134,7 @@ public class ProcessQueueImplTest extends TestBase {
when(pushSubscriptionSettings.getReceiveBatchSize()).thenReturn(32);
ReceiveMessageRequest request =
ReceiveMessageRequest.newBuilder().build();
when(pushConsumer.wrapReceiveMessageRequest(anyInt(),
any(MessageQueueImpl.class),
- any(FilterExpression.class))).thenReturn(request);
+ any(FilterExpression.class),
any(Duration.class))).thenReturn(request);
processQueue.fetchMessageImmediately();
await().atMost(Duration.ofSeconds(3))
.untilAsserted(() -> verify(pushConsumer,
times(cachedMessagesCountThresholdPerQueue))
diff --git a/java/pom.xml b/java/pom.xml
index 689b3df1..b97290e6 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -47,7 +47,7 @@
~ 1. Whether it is essential, because the current shaded jar is
fat enough.
~ 2. Make sure that it is compatible with Java 8.
-->
- <rocketmq-proto.version>2.0.1</rocketmq-proto.version>
+ <rocketmq-proto.version>2.0.2</rocketmq-proto.version>
<annotations-api.version>6.0.53</annotations-api.version>
<protobuf.version>3.21.7</protobuf.version>
<grpc.version>1.50.0</grpc.version>