This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new 7eeb300  Bugfix: blocking message receiveing may exhausts all async 
worker threads (#157)
7eeb300 is described below

commit 7eeb30038da4a714e3bfb1cbcac51e6de3debda2
Author: Aaron Ai <[email protected]>
AuthorDate: Thu Aug 18 15:08:13 2022 +0800

    Bugfix: blocking message receiveing may exhausts all async worker threads 
(#157)
---
 .../rocketmq/client/java/impl/ClientManager.java   |  4 +--
 .../client/java/impl/ClientManagerImpl.java        |  5 ++--
 .../client/java/impl/consumer/ConsumerImpl.java    |  8 ++---
 .../java/impl/consumer/ProcessQueueImpl.java       | 13 +++++++--
 .../apache/rocketmq/client/java/rpc/RpcClient.java |  4 +--
 .../rocketmq/client/java/rpc/RpcClientImpl.java    | 34 +++++++++++++++-------
 .../java/impl/consumer/ConsumerImplTest.java       |  4 +--
 .../apache/rocketmq/client/java/tool/TestBase.java |  5 ++--
 8 files changed, 48 insertions(+), 29 deletions(-)

diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java
index a115a01..39db0e0 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java
@@ -42,7 +42,7 @@ import com.google.common.util.concurrent.AbstractIdleService;
 import io.grpc.Metadata;
 import io.grpc.stub.StreamObserver;
 import java.time.Duration;
-import java.util.Iterator;
+import java.util.List;
 import java.util.concurrent.ScheduledExecutorService;
 import org.apache.rocketmq.client.apis.ClientException;
 import org.apache.rocketmq.client.java.route.Endpoints;
@@ -118,7 +118,7 @@ public abstract class ClientManager extends 
AbstractIdleService {
      * @param metadata  gRPC request header metadata.
      * @return invocation of response future.
      */
-    public abstract RpcFuture<ReceiveMessageRequest, 
Iterator<ReceiveMessageResponse>>
+    public abstract RpcFuture<ReceiveMessageRequest, 
List<ReceiveMessageResponse>>
     receiveMessage(Endpoints endpoints, Metadata metadata, 
ReceiveMessageRequest request, Duration duration);
 
     /**
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
index 9cf475a..5762a49 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
@@ -47,6 +47,7 @@ import java.io.IOException;
 import java.time.Duration;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -246,12 +247,12 @@ public class ClientManagerImpl extends ClientManager {
     }
 
     @Override
-    public RpcFuture<ReceiveMessageRequest, Iterator<ReceiveMessageResponse>> 
receiveMessage(Endpoints endpoints,
+    public RpcFuture<ReceiveMessageRequest, List<ReceiveMessageResponse>> 
receiveMessage(Endpoints endpoints,
         Metadata metadata, ReceiveMessageRequest request, Duration duration) {
         final Context context = new Context(endpoints, metadata);
         try {
             final RpcClient rpcClient = getRpcClient(endpoints);
-            final ListenableFuture<Iterator<ReceiveMessageResponse>> future =
+            final ListenableFuture<List<ReceiveMessageResponse>> future =
                 rpcClient.receiveMessage(metadata, request, asyncWorker, 
duration);
             return new RpcFuture<>(context, request, future);
         } catch (Throwable t) {
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
index aaef423..9c60856 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
@@ -42,7 +42,6 @@ import io.grpc.Metadata;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 import java.util.regex.Pattern;
@@ -83,16 +82,15 @@ abstract class ConsumerImpl extends ClientImpl {
             final Duration tolerance = clientConfiguration.getRequestTimeout();
             final Duration timeout = Duration.ofNanos(awaitDuration.toNanos() 
+ tolerance.toNanos());
             final ClientManager clientManager = this.getClientManager();
-            final RpcFuture<ReceiveMessageRequest, 
Iterator<ReceiveMessageResponse>> future =
+            final RpcFuture<ReceiveMessageRequest, 
List<ReceiveMessageResponse>> future =
                 clientManager.receiveMessage(endpoints, metadata, request, 
timeout);
-            return Futures.transformAsync(future, iterator -> {
+            return Futures.transformAsync(future, responses -> {
                 Status status = 
Status.newBuilder().setCode(Code.INTERNAL_SERVER_ERROR)
                     .setMessage("status was not set by server")
                     .build();
                 Timestamp deliveryTimestampFromRemote = null;
                 List<Message> messageList = new ArrayList<>();
-                while (iterator.hasNext()) {
-                    final ReceiveMessageResponse response = iterator.next();
+                for (ReceiveMessageResponse response : responses) {
                     switch (response.getContentCase()) {
                         case STATUS:
                             status = response.getStatus();
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
index 9119bb7..9c106a2 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
@@ -109,6 +109,7 @@ class ProcessQueueImpl implements ProcessQueue {
     private final AtomicLong receivedMessagesQuantity;
 
     private volatile long activityNanoTime = System.nanoTime();
+    private volatile long cacheFullNanoTime = Long.MIN_VALUE;
 
     public ProcessQueueImpl(PushConsumerImpl consumer, MessageQueueImpl mq, 
FilterExpression filterExpression) {
         this.consumer = consumer;
@@ -137,14 +138,18 @@ class ProcessQueueImpl implements ProcessQueue {
     @Override
     public boolean expired() {
         final PushSubscriptionSettings settings = 
consumer.getPushConsumerSettings();
-        Duration maxIdleDuration = Duration.ofNanos(2 * 
(settings.getLongPollingTimeout().toNanos()
+        Duration maxIdleDuration = Duration.ofNanos(3 * 
(settings.getLongPollingTimeout().toNanos()
             + 
consumer.getClientConfiguration().getRequestTimeout().toNanos()));
         final Duration idleDuration = Duration.ofNanos(System.nanoTime() - 
activityNanoTime);
         if (idleDuration.compareTo(maxIdleDuration) < 0) {
             return false;
         }
-        LOGGER.warn("Process queue is idle, idleDuration={}, 
maxIdleDuration={}, mq={}, clientId={}", idleDuration,
-            maxIdleDuration, mq, consumer.clientId());
+        final Duration afterCacheFullDuration = 
Duration.ofNanos(System.nanoTime() - cacheFullNanoTime);
+        if (afterCacheFullDuration.compareTo(maxIdleDuration) < 0) {
+            return false;
+        }
+        LOGGER.warn("Process queue is idle, idleDuration={}, 
maxIdleDuration={}, afterCacheFullDuration={}, mq={}, "
+            + "clientId={}", idleDuration, maxIdleDuration, 
afterCacheFullDuration, mq, consumer.clientId());
         return true;
     }
 
@@ -297,6 +302,7 @@ class ProcessQueueImpl implements ProcessQueue {
             LOGGER.warn("Process queue total cached messages quantity exceeds 
the threshold, threshold={}, actual={}," +
                     " mq={}, clientId={}", cacheMessageCountThresholdPerQueue, 
actualMessagesQuantity, mq,
                 consumer.clientId());
+            cacheFullNanoTime = System.nanoTime();
             return true;
         }
         final int cacheMessageBytesThresholdPerQueue = 
consumer.cacheMessageBytesThresholdPerQueue();
@@ -305,6 +311,7 @@ class ProcessQueueImpl implements ProcessQueue {
             LOGGER.warn("Process queue total cached messages memory exceeds 
the threshold, threshold={} bytes," +
                     " actual={} bytes, mq={}, clientId={}", 
cacheMessageBytesThresholdPerQueue,
                 actualCachedMessagesBytes, mq, consumer.clientId());
+            cacheFullNanoTime = System.nanoTime();
             return true;
         }
         return false;
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClient.java 
b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClient.java
index 71cdda1..10f7fa1 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClient.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClient.java
@@ -42,7 +42,7 @@ import com.google.common.util.concurrent.ListenableFuture;
 import io.grpc.Metadata;
 import io.grpc.stub.StreamObserver;
 import java.time.Duration;
-import java.util.Iterator;
+import java.util.List;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 
@@ -123,7 +123,7 @@ public interface RpcClient {
      * @param executor gRPC asynchronous executor.
      * @return invocation of response future.
      */
-    ListenableFuture<Iterator<ReceiveMessageResponse>> receiveMessage(Metadata 
metadata,
+    ListenableFuture<List<ReceiveMessageResponse>> receiveMessage(Metadata 
metadata,
         ReceiveMessageRequest request, ExecutorService executor, Duration 
duration);
 
     /**
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClientImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClientImpl.java
index 16f733e..310b762 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClientImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClientImpl.java
@@ -40,7 +40,7 @@ import apache.rocketmq.v2.SendMessageRequest;
 import apache.rocketmq.v2.SendMessageResponse;
 import apache.rocketmq.v2.TelemetryCommand;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
 import io.grpc.ClientInterceptor;
 import io.grpc.ManagedChannel;
 import io.grpc.Metadata;
@@ -54,9 +54,8 @@ import io.grpc.stub.MetadataUtils;
 import io.grpc.stub.StreamObserver;
 import java.net.InetSocketAddress;
 import java.time.Duration;
-import java.util.Iterator;
+import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.Callable;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -70,7 +69,6 @@ public class RpcClientImpl implements RpcClient {
 
     private final ManagedChannel channel;
     private final MessagingServiceGrpc.MessagingServiceFutureStub futureStub;
-    private final MessagingServiceGrpc.MessagingServiceBlockingStub 
blockingStub;
     private final MessagingServiceGrpc.MessagingServiceStub stub;
 
     private long activityNanoTime;
@@ -97,7 +95,6 @@ public class RpcClientImpl implements RpcClient {
         }
         this.channel = channelBuilder.build();
         this.futureStub = MessagingServiceGrpc.newFutureStub(channel);
-        this.blockingStub = MessagingServiceGrpc.newBlockingStub(channel);
         this.stub = MessagingServiceGrpc.newStub(channel);
         this.activityNanoTime = System.nanoTime();
     }
@@ -145,13 +142,30 @@ public class RpcClientImpl implements RpcClient {
     }
 
     @Override
-    public ListenableFuture<Iterator<ReceiveMessageResponse>> 
receiveMessage(Metadata metadata,
+    public ListenableFuture<List<ReceiveMessageResponse>> 
receiveMessage(Metadata metadata,
         ReceiveMessageRequest request, ExecutorService executor, Duration 
duration) {
         this.activityNanoTime = System.nanoTime();
-        final Callable<Iterator<ReceiveMessageResponse>> callable = () -> 
blockingStub
-            
.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor)
-            .withDeadlineAfter(duration.toNanos(), 
TimeUnit.NANOSECONDS).receiveMessage(request);
-        return MoreExecutors.listeningDecorator(executor).submit(callable);
+        SettableFuture<List<ReceiveMessageResponse>> future = 
SettableFuture.create();
+        List<ReceiveMessageResponse> responses = new ArrayList<>();
+        
stub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor)
+            .withDeadlineAfter(duration.toNanos(), TimeUnit.NANOSECONDS)
+            .receiveMessage(request, new 
StreamObserver<ReceiveMessageResponse>() {
+                @Override
+                public void onNext(ReceiveMessageResponse response) {
+                    responses.add(response);
+                }
+
+                @Override
+                public void onError(Throwable t) {
+                    future.setException(t);
+                }
+
+                @Override
+                public void onCompleted() {
+                    future.set(responses);
+                }
+            });
+        return future;
     }
 
     @Override
diff --git 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImplTest.java
 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImplTest.java
index 8fb6179..a7ae86a 100644
--- 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImplTest.java
+++ 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImplTest.java
@@ -28,7 +28,7 @@ import apache.rocketmq.v2.ReceiveMessageResponse;
 import com.google.common.util.concurrent.ListenableFuture;
 import io.grpc.Metadata;
 import java.time.Duration;
-import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import org.apache.rocketmq.client.apis.ClientConfiguration;
@@ -65,7 +65,7 @@ public class ConsumerImplTest extends TestBase {
         final ClientManager clientManager = Mockito.mock(ClientManager.class);
         Mockito.doReturn(clientManager).when(pushConsumer).getClientManager();
         int receivedMessageCount = 1;
-        final RpcFuture<ReceiveMessageRequest, 
Iterator<ReceiveMessageResponse>> future =
+        final RpcFuture<ReceiveMessageRequest, List<ReceiveMessageResponse>> 
future =
             okReceiveMessageResponsesFuture(FAKE_TOPIC_0, 
receivedMessageCount);
         future.get();
         
Mockito.doReturn(future).when(clientManager).receiveMessage(any(Endpoints.class),
 any(Metadata.class),
diff --git 
a/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java 
b/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
index 00021bc..9a6a1b0 100644
--- 
a/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
+++ 
b/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
@@ -54,7 +54,6 @@ import java.time.Duration;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
@@ -335,7 +334,7 @@ public class TestBase {
             .setSystemProperties(systemProperties).build();
     }
 
-    protected RpcFuture<ReceiveMessageRequest, 
Iterator<ReceiveMessageResponse>> okReceiveMessageResponsesFuture(
+    protected RpcFuture<ReceiveMessageRequest, List<ReceiveMessageResponse>> 
okReceiveMessageResponsesFuture(
         String topic, int messageCount) {
         final Status status = Status.newBuilder().setCode(Code.OK).build();
         final apache.rocketmq.v2.Message message = fakePbMessage(topic);
@@ -346,7 +345,7 @@ public class TestBase {
             ReceiveMessageResponse messageResponse = 
ReceiveMessageResponse.newBuilder().setMessage(message).build();
             responses.add(messageResponse);
         }
-        return new RpcFuture<>(fakeRpcContext(), null, 
Futures.immediateFuture(responses.iterator()));
+        return new RpcFuture<>(fakeRpcContext(), null, 
Futures.immediateFuture(responses));
     }
 
     protected ListenableFuture<EndTransactionResponse> 
okEndTransactionResponseFuture() {

Reply via email to