This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new cea5343e Implement the reentrant PushConsumer message receiving (#547)
cea5343e is described below

commit cea5343ea8a2d2fee2614be31164bf7844ef8930
Author: Aaron Ai <[email protected]>
AuthorDate: Wed Jun 21 10:55:47 2023 +0800

    Implement the reentrant PushConsumer message receiving (#547)
---
 .../client/java/impl/consumer/ConsumerImpl.java    |  6 ++-
 .../java/impl/consumer/ProcessQueueImpl.java       | 48 ++++++++++++++++------
 .../java/impl/consumer/ConsumerImplTest.java       |  3 +-
 .../java/impl/consumer/ProcessQueueImplTest.java   |  3 +-
 4 files changed, 43 insertions(+), 17 deletions(-)

diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
index 5175b4dd..a807fd28 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
@@ -42,6 +42,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Set;
+import java.util.UUID;
 import java.util.regex.Pattern;
 import org.apache.rocketmq.client.apis.ClientConfiguration;
 import org.apache.rocketmq.client.apis.consumer.FilterExpression;
@@ -242,11 +243,12 @@ abstract class ConsumerImpl extends ClientImpl {
     }
 
     ReceiveMessageRequest wrapReceiveMessageRequest(int batchSize, 
MessageQueueImpl mq,
-        FilterExpression filterExpression, Duration longPollingTimeout) {
+        FilterExpression filterExpression, Duration longPollingTimeout, String 
attemptId) {
+        attemptId = null == attemptId ? UUID.randomUUID().toString() : 
attemptId;
         return ReceiveMessageRequest.newBuilder().setGroup(getProtobufGroup())
             
.setMessageQueue(mq.toProtobuf()).setFilterExpression(wrapFilterExpression(filterExpression))
             
.setLongPollingTimeout(Durations.fromNanos(longPollingTimeout.toNanos()))
-            .setBatchSize(batchSize).setAutoRenew(true).build();
+            
.setBatchSize(batchSize).setAutoRenew(true).setAttemptId(attemptId).build();
     }
 
     ReceiveMessageRequest wrapReceiveMessageRequest(int batchSize, 
MessageQueueImpl mq,
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
index a80f90a2..c43b4a10 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
@@ -32,10 +32,12 @@ import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.SettableFuture;
 import com.google.errorprone.annotations.concurrent.GuardedBy;
+import io.grpc.StatusRuntimeException;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.UUID;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
@@ -175,29 +177,37 @@ class ProcessQueueImpl implements ProcessQueue {
      *
      * <p> Make sure that no exception will be thrown.
      */
-    public void onReceiveMessageException(Throwable t) {
+    public void onReceiveMessageException(Throwable t, String attemptId) {
         Duration delay = t instanceof TooManyRequestsException ? 
RECEIVING_FLOW_CONTROL_BACKOFF_DELAY :
             RECEIVING_FAILURE_BACKOFF_DELAY;
-        receiveMessageLater(delay);
+        receiveMessageLater(delay, attemptId);
     }
 
-    private void receiveMessageLater(Duration delay) {
+    private void receiveMessageLater(Duration delay, String attemptId) {
         final ClientId clientId = consumer.getClientId();
         final ScheduledExecutorService scheduler = consumer.getScheduler();
         try {
             log.info("Try to receive message later, mq={}, delay={}, 
clientId={}", mq, delay, clientId);
-            scheduler.schedule(this::receiveMessage, delay.toNanos(), 
TimeUnit.NANOSECONDS);
+            scheduler.schedule(() -> receiveMessage(attemptId), 
delay.toNanos(), TimeUnit.NANOSECONDS);
         } catch (Throwable t) {
             if (scheduler.isShutdown()) {
                 return;
             }
             // Should never reach here.
             log.error("[Bug] Failed to schedule message receiving request, 
mq={}, clientId={}", mq, clientId, t);
-            onReceiveMessageException(t);
+            onReceiveMessageException(t, attemptId);
         }
     }
 
+    private String generateAttemptId() {
+        return UUID.randomUUID().toString();
+    }
+
     public void receiveMessage() {
+        receiveMessage(this.generateAttemptId());
+    }
+
+    public void receiveMessage(String attemptId) {
         final ClientId clientId = consumer.getClientId();
         if (dropped) {
             log.info("Process queue has been dropped, no longer receive 
message, mq={}, clientId={}", mq, clientId);
@@ -205,13 +215,17 @@ class ProcessQueueImpl implements ProcessQueue {
         }
         if (this.isCacheFull()) {
             log.warn("Process queue cache is full, would receive message 
later, mq={}, clientId={}", mq, clientId);
-            receiveMessageLater(RECEIVING_BACKOFF_DELAY_WHEN_CACHE_IS_FULL);
+            receiveMessageLater(RECEIVING_BACKOFF_DELAY_WHEN_CACHE_IS_FULL, 
attemptId);
             return;
         }
-        receiveMessageImmediately();
+        receiveMessageImmediately(attemptId);
     }
 
     private void receiveMessageImmediately() {
+        receiveMessageImmediately(this.generateAttemptId());
+    }
+
+    private void receiveMessageImmediately(String attemptId) {
         final ClientId clientId = consumer.getClientId();
         if (!consumer.isRunning()) {
             log.info("Stop to receive message because consumer is not running, 
mq={}, clientId={}", mq, clientId);
@@ -222,7 +236,7 @@ class ProcessQueueImpl implements ProcessQueue {
             final int batchSize = this.getReceptionBatchSize();
             final Duration longPollingTimeout = 
consumer.getPushConsumerSettings().getLongPollingTimeout();
             final ReceiveMessageRequest request = 
consumer.wrapReceiveMessageRequest(batchSize, mq, filterExpression,
-                longPollingTimeout);
+                longPollingTimeout, attemptId);
             activityNanoTime = System.nanoTime();
 
             // Intercept before message reception.
@@ -248,27 +262,35 @@ class ProcessQueueImpl implements ProcessQueue {
                         // Should never reach here.
                         log.error("[Bug] Exception raised while handling 
receive result, mq={}, endpoints={}, "
                             + "clientId={}", mq, endpoints, clientId, t);
-                        onReceiveMessageException(t);
+                        onReceiveMessageException(t, attemptId);
                     }
                 }
 
                 @Override
                 public void onFailure(Throwable t) {
+                    String nextAttemptId = null;
+                    if (t instanceof StatusRuntimeException) {
+                        StatusRuntimeException exception = 
(StatusRuntimeException) t;
+                        if 
(io.grpc.Status.DEADLINE_EXCEEDED.equals(exception.getStatus())) {
+                            nextAttemptId = request.getAttemptId();
+                        }
+                    }
                     // Intercept after message reception.
                     final MessageInterceptorContextImpl context0 =
                         new MessageInterceptorContextImpl(context, 
MessageHookPointsStatus.ERROR);
                     consumer.doAfter(context0, Collections.emptyList());
 
-                    log.error("Exception raised during message reception, 
mq={}, endpoints={}, clientId={}", mq,
-                        endpoints, clientId, t);
-                    onReceiveMessageException(t);
+                    log.error("Exception raised during message reception, 
mq={}, endpoints={}, attemptId={}, " +
+                            "nextAttemptId={}, clientId={}", mq, endpoints, 
request.getAttemptId(), nextAttemptId,
+                        clientId, t);
+                    onReceiveMessageException(t, nextAttemptId);
                 }
             }, MoreExecutors.directExecutor());
             receptionTimes.getAndIncrement();
             consumer.getReceptionTimes().getAndIncrement();
         } catch (Throwable t) {
             log.error("Exception raised during message reception, mq={}, 
clientId={}", mq, clientId, t);
-            onReceiveMessageException(t);
+            onReceiveMessageException(t, attemptId);
         }
     }
 
diff --git 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImplTest.java
 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImplTest.java
index 619cd7d2..c7a12140 100644
--- 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImplTest.java
+++ 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImplTest.java
@@ -29,6 +29,7 @@ import com.google.common.util.concurrent.ListenableFuture;
 import java.time.Duration;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 import java.util.concurrent.ExecutionException;
 import org.apache.rocketmq.client.apis.ClientConfiguration;
 import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
@@ -71,7 +72,7 @@ public class ConsumerImplTest extends TestBase {
             any(ReceiveMessageRequest.class), any(Duration.class));
         final MessageQueueImpl mq = fakeMessageQueueImpl(FAKE_TOPIC_0);
         final ReceiveMessageRequest request = 
pushConsumer.wrapReceiveMessageRequest(1,
-            mq, new FilterExpression(), Duration.ofSeconds(15));
+            mq, new FilterExpression(), Duration.ofSeconds(15), 
UUID.randomUUID().toString());
         final ListenableFuture<ReceiveMessageResult> future0 =
             pushConsumer.receiveMessage(request, mq, Duration.ofSeconds(15));
         final ReceiveMessageResult receiveMessageResult = future0.get();
diff --git 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
index b4a29717..d9aa61fb 100644
--- 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
+++ 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertFalse;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.nullable;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -134,7 +135,7 @@ public class ProcessQueueImplTest extends TestBase {
         when(pushSubscriptionSettings.getReceiveBatchSize()).thenReturn(32);
         ReceiveMessageRequest request = 
ReceiveMessageRequest.newBuilder().build();
         when(pushConsumer.wrapReceiveMessageRequest(anyInt(), 
any(MessageQueueImpl.class),
-            any(FilterExpression.class), 
any(Duration.class))).thenReturn(request);
+            any(FilterExpression.class), any(Duration.class), 
nullable(String.class))).thenReturn(request);
         processQueue.fetchMessageImmediately();
         await().atMost(Duration.ofSeconds(3))
             .untilAsserted(() -> verify(pushConsumer, 
times(cachedMessagesCountThresholdPerQueue))

Reply via email to