This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new 7eeb300 Bugfix: blocking message receiveing may exhausts all async
worker threads (#157)
7eeb300 is described below
commit 7eeb30038da4a714e3bfb1cbcac51e6de3debda2
Author: Aaron Ai <[email protected]>
AuthorDate: Thu Aug 18 15:08:13 2022 +0800
Bugfix: blocking message receiveing may exhausts all async worker threads
(#157)
---
.../rocketmq/client/java/impl/ClientManager.java | 4 +--
.../client/java/impl/ClientManagerImpl.java | 5 ++--
.../client/java/impl/consumer/ConsumerImpl.java | 8 ++---
.../java/impl/consumer/ProcessQueueImpl.java | 13 +++++++--
.../apache/rocketmq/client/java/rpc/RpcClient.java | 4 +--
.../rocketmq/client/java/rpc/RpcClientImpl.java | 34 +++++++++++++++-------
.../java/impl/consumer/ConsumerImplTest.java | 4 +--
.../apache/rocketmq/client/java/tool/TestBase.java | 5 ++--
8 files changed, 48 insertions(+), 29 deletions(-)
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java
index a115a01..39db0e0 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java
@@ -42,7 +42,7 @@ import com.google.common.util.concurrent.AbstractIdleService;
import io.grpc.Metadata;
import io.grpc.stub.StreamObserver;
import java.time.Duration;
-import java.util.Iterator;
+import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.java.route.Endpoints;
@@ -118,7 +118,7 @@ public abstract class ClientManager extends
AbstractIdleService {
* @param metadata gRPC request header metadata.
* @return invocation of response future.
*/
- public abstract RpcFuture<ReceiveMessageRequest,
Iterator<ReceiveMessageResponse>>
+ public abstract RpcFuture<ReceiveMessageRequest,
List<ReceiveMessageResponse>>
receiveMessage(Endpoints endpoints, Metadata metadata,
ReceiveMessageRequest request, Duration duration);
/**
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
index 9cf475a..5762a49 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
@@ -47,6 +47,7 @@ import java.io.IOException;
import java.time.Duration;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
@@ -246,12 +247,12 @@ public class ClientManagerImpl extends ClientManager {
}
@Override
- public RpcFuture<ReceiveMessageRequest, Iterator<ReceiveMessageResponse>>
receiveMessage(Endpoints endpoints,
+ public RpcFuture<ReceiveMessageRequest, List<ReceiveMessageResponse>>
receiveMessage(Endpoints endpoints,
Metadata metadata, ReceiveMessageRequest request, Duration duration) {
final Context context = new Context(endpoints, metadata);
try {
final RpcClient rpcClient = getRpcClient(endpoints);
- final ListenableFuture<Iterator<ReceiveMessageResponse>> future =
+ final ListenableFuture<List<ReceiveMessageResponse>> future =
rpcClient.receiveMessage(metadata, request, asyncWorker,
duration);
return new RpcFuture<>(context, request, future);
} catch (Throwable t) {
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
index aaef423..9c60856 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
@@ -42,7 +42,6 @@ import io.grpc.Metadata;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.regex.Pattern;
@@ -83,16 +82,15 @@ abstract class ConsumerImpl extends ClientImpl {
final Duration tolerance = clientConfiguration.getRequestTimeout();
final Duration timeout = Duration.ofNanos(awaitDuration.toNanos()
+ tolerance.toNanos());
final ClientManager clientManager = this.getClientManager();
- final RpcFuture<ReceiveMessageRequest,
Iterator<ReceiveMessageResponse>> future =
+ final RpcFuture<ReceiveMessageRequest,
List<ReceiveMessageResponse>> future =
clientManager.receiveMessage(endpoints, metadata, request,
timeout);
- return Futures.transformAsync(future, iterator -> {
+ return Futures.transformAsync(future, responses -> {
Status status =
Status.newBuilder().setCode(Code.INTERNAL_SERVER_ERROR)
.setMessage("status was not set by server")
.build();
Timestamp deliveryTimestampFromRemote = null;
List<Message> messageList = new ArrayList<>();
- while (iterator.hasNext()) {
- final ReceiveMessageResponse response = iterator.next();
+ for (ReceiveMessageResponse response : responses) {
switch (response.getContentCase()) {
case STATUS:
status = response.getStatus();
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
index 9119bb7..9c106a2 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
@@ -109,6 +109,7 @@ class ProcessQueueImpl implements ProcessQueue {
private final AtomicLong receivedMessagesQuantity;
private volatile long activityNanoTime = System.nanoTime();
+ private volatile long cacheFullNanoTime = Long.MIN_VALUE;
public ProcessQueueImpl(PushConsumerImpl consumer, MessageQueueImpl mq,
FilterExpression filterExpression) {
this.consumer = consumer;
@@ -137,14 +138,18 @@ class ProcessQueueImpl implements ProcessQueue {
@Override
public boolean expired() {
final PushSubscriptionSettings settings =
consumer.getPushConsumerSettings();
- Duration maxIdleDuration = Duration.ofNanos(2 *
(settings.getLongPollingTimeout().toNanos()
+ Duration maxIdleDuration = Duration.ofNanos(3 *
(settings.getLongPollingTimeout().toNanos()
+
consumer.getClientConfiguration().getRequestTimeout().toNanos()));
final Duration idleDuration = Duration.ofNanos(System.nanoTime() -
activityNanoTime);
if (idleDuration.compareTo(maxIdleDuration) < 0) {
return false;
}
- LOGGER.warn("Process queue is idle, idleDuration={},
maxIdleDuration={}, mq={}, clientId={}", idleDuration,
- maxIdleDuration, mq, consumer.clientId());
+ final Duration afterCacheFullDuration =
Duration.ofNanos(System.nanoTime() - cacheFullNanoTime);
+ if (afterCacheFullDuration.compareTo(maxIdleDuration) < 0) {
+ return false;
+ }
+ LOGGER.warn("Process queue is idle, idleDuration={},
maxIdleDuration={}, afterCacheFullDuration={}, mq={}, "
+ + "clientId={}", idleDuration, maxIdleDuration,
afterCacheFullDuration, mq, consumer.clientId());
return true;
}
@@ -297,6 +302,7 @@ class ProcessQueueImpl implements ProcessQueue {
LOGGER.warn("Process queue total cached messages quantity exceeds
the threshold, threshold={}, actual={}," +
" mq={}, clientId={}", cacheMessageCountThresholdPerQueue,
actualMessagesQuantity, mq,
consumer.clientId());
+ cacheFullNanoTime = System.nanoTime();
return true;
}
final int cacheMessageBytesThresholdPerQueue =
consumer.cacheMessageBytesThresholdPerQueue();
@@ -305,6 +311,7 @@ class ProcessQueueImpl implements ProcessQueue {
LOGGER.warn("Process queue total cached messages memory exceeds
the threshold, threshold={} bytes," +
" actual={} bytes, mq={}, clientId={}",
cacheMessageBytesThresholdPerQueue,
actualCachedMessagesBytes, mq, consumer.clientId());
+ cacheFullNanoTime = System.nanoTime();
return true;
}
return false;
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClient.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClient.java
index 71cdda1..10f7fa1 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClient.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClient.java
@@ -42,7 +42,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import io.grpc.Metadata;
import io.grpc.stub.StreamObserver;
import java.time.Duration;
-import java.util.Iterator;
+import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
@@ -123,7 +123,7 @@ public interface RpcClient {
* @param executor gRPC asynchronous executor.
* @return invocation of response future.
*/
- ListenableFuture<Iterator<ReceiveMessageResponse>> receiveMessage(Metadata
metadata,
+ ListenableFuture<List<ReceiveMessageResponse>> receiveMessage(Metadata
metadata,
ReceiveMessageRequest request, ExecutorService executor, Duration
duration);
/**
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClientImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClientImpl.java
index 16f733e..310b762 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClientImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClientImpl.java
@@ -40,7 +40,7 @@ import apache.rocketmq.v2.SendMessageRequest;
import apache.rocketmq.v2.SendMessageResponse;
import apache.rocketmq.v2.TelemetryCommand;
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
import io.grpc.ClientInterceptor;
import io.grpc.ManagedChannel;
import io.grpc.Metadata;
@@ -54,9 +54,8 @@ import io.grpc.stub.MetadataUtils;
import io.grpc.stub.StreamObserver;
import java.net.InetSocketAddress;
import java.time.Duration;
-import java.util.Iterator;
+import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
@@ -70,7 +69,6 @@ public class RpcClientImpl implements RpcClient {
private final ManagedChannel channel;
private final MessagingServiceGrpc.MessagingServiceFutureStub futureStub;
- private final MessagingServiceGrpc.MessagingServiceBlockingStub
blockingStub;
private final MessagingServiceGrpc.MessagingServiceStub stub;
private long activityNanoTime;
@@ -97,7 +95,6 @@ public class RpcClientImpl implements RpcClient {
}
this.channel = channelBuilder.build();
this.futureStub = MessagingServiceGrpc.newFutureStub(channel);
- this.blockingStub = MessagingServiceGrpc.newBlockingStub(channel);
this.stub = MessagingServiceGrpc.newStub(channel);
this.activityNanoTime = System.nanoTime();
}
@@ -145,13 +142,30 @@ public class RpcClientImpl implements RpcClient {
}
@Override
- public ListenableFuture<Iterator<ReceiveMessageResponse>>
receiveMessage(Metadata metadata,
+ public ListenableFuture<List<ReceiveMessageResponse>>
receiveMessage(Metadata metadata,
ReceiveMessageRequest request, ExecutorService executor, Duration
duration) {
this.activityNanoTime = System.nanoTime();
- final Callable<Iterator<ReceiveMessageResponse>> callable = () ->
blockingStub
-
.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor)
- .withDeadlineAfter(duration.toNanos(),
TimeUnit.NANOSECONDS).receiveMessage(request);
- return MoreExecutors.listeningDecorator(executor).submit(callable);
+ SettableFuture<List<ReceiveMessageResponse>> future =
SettableFuture.create();
+ List<ReceiveMessageResponse> responses = new ArrayList<>();
+
stub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor)
+ .withDeadlineAfter(duration.toNanos(), TimeUnit.NANOSECONDS)
+ .receiveMessage(request, new
StreamObserver<ReceiveMessageResponse>() {
+ @Override
+ public void onNext(ReceiveMessageResponse response) {
+ responses.add(response);
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ future.setException(t);
+ }
+
+ @Override
+ public void onCompleted() {
+ future.set(responses);
+ }
+ });
+ return future;
}
@Override
diff --git
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImplTest.java
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImplTest.java
index 8fb6179..a7ae86a 100644
---
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImplTest.java
+++
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImplTest.java
@@ -28,7 +28,7 @@ import apache.rocketmq.v2.ReceiveMessageResponse;
import com.google.common.util.concurrent.ListenableFuture;
import io.grpc.Metadata;
import java.time.Duration;
-import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import org.apache.rocketmq.client.apis.ClientConfiguration;
@@ -65,7 +65,7 @@ public class ConsumerImplTest extends TestBase {
final ClientManager clientManager = Mockito.mock(ClientManager.class);
Mockito.doReturn(clientManager).when(pushConsumer).getClientManager();
int receivedMessageCount = 1;
- final RpcFuture<ReceiveMessageRequest,
Iterator<ReceiveMessageResponse>> future =
+ final RpcFuture<ReceiveMessageRequest, List<ReceiveMessageResponse>>
future =
okReceiveMessageResponsesFuture(FAKE_TOPIC_0,
receivedMessageCount);
future.get();
Mockito.doReturn(future).when(clientManager).receiveMessage(any(Endpoints.class),
any(Metadata.class),
diff --git
a/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
b/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
index 00021bc..9a6a1b0 100644
---
a/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
+++
b/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
@@ -54,7 +54,6 @@ import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
@@ -335,7 +334,7 @@ public class TestBase {
.setSystemProperties(systemProperties).build();
}
- protected RpcFuture<ReceiveMessageRequest,
Iterator<ReceiveMessageResponse>> okReceiveMessageResponsesFuture(
+ protected RpcFuture<ReceiveMessageRequest, List<ReceiveMessageResponse>>
okReceiveMessageResponsesFuture(
String topic, int messageCount) {
final Status status = Status.newBuilder().setCode(Code.OK).build();
final apache.rocketmq.v2.Message message = fakePbMessage(topic);
@@ -346,7 +345,7 @@ public class TestBase {
ReceiveMessageResponse messageResponse =
ReceiveMessageResponse.newBuilder().setMessage(message).build();
responses.add(messageResponse);
}
- return new RpcFuture<>(fakeRpcContext(), null,
Futures.immediateFuture(responses.iterator()));
+ return new RpcFuture<>(fakeRpcContext(), null,
Futures.immediateFuture(responses));
}
protected ListenableFuture<EndTransactionResponse>
okEndTransactionResponseFuture() {