eolivelli commented on a change in pull request #8207:
URL: https://github.com/apache/pulsar/pull/8207#discussion_r500767874
##########
File path:
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
##########
@@ -219,51 +219,44 @@ private void receiveMessageFromConsumer(ConsumerImpl<T>
consumer) {
messageReceived(consumer, message);
// we're modifying pausedConsumers
- lock.writeLock().lock();
- try {
- int size = incomingMessages.size();
- if (size >= maxReceiverQueueSize
- || (size > sharedQueueResumeThreshold &&
!pausedConsumers.isEmpty())) {
- // mark this consumer to be resumed later: if No more
space left in shared queue,
- // or if any consumer is already paused (to create fair
chance for already paused consumers)
- pausedConsumers.add(consumer);
- } else {
- // Schedule next receiveAsync() if the incoming queue is
not full. Use a different thread to avoid
- // recursion and stack overflow
- client.eventLoopGroup().execute(() -> {
- receiveMessageFromConsumer(consumer);
- });
- }
- } finally {
- lock.writeLock().unlock();
+ int size = incomingMessages.size();
+ if (size >= maxReceiverQueueSize
+ || (size > sharedQueueResumeThreshold &&
!pausedConsumers.isEmpty())) {
+ // mark this consumer to be resumed later: if No more space
left in shared queue,
+ // or if any consumer is already paused (to create fair chance
for already paused consumers)
+ pausedConsumers.add(consumer);
+ } else {
+ // Schedule next receiveAsync() if the incoming queue is not
full. Use a different thread to avoid
+ // recursion and stack overflow
+ client.eventLoopGroup().execute(() -> {
+ receiveMessageFromConsumer(consumer);
+ });
}
});
}
private void messageReceived(ConsumerImpl<T> consumer, Message<T> message)
{
checkArgument(message instanceof MessageImpl);
- lock.writeLock().lock();
- try {
- TopicMessageImpl<T> topicMessage = new TopicMessageImpl<>(
+ TopicMessageImpl<T> topicMessage = new TopicMessageImpl<>(
consumer.getTopic(), consumer.getTopicNameWithoutPartition(),
message);
- if (log.isDebugEnabled()) {
- log.debug("[{}][{}] Received message from topics-consumer {}",
+ if (log.isDebugEnabled()) {
+ log.debug("[{}][{}] Received message from topics-consumer {}",
topic, subscription, message.getMessageId());
- }
+ }
- // if asyncReceive is waiting : return message to callback without
adding to incomingMessages queue
- if (!pendingReceives.isEmpty()) {
- CompletableFuture<Message<T>> receivedFuture =
pendingReceives.poll();
- unAckedMessageTracker.add(topicMessage.getMessageId());
- listenerExecutor.execute(() ->
receivedFuture.complete(topicMessage));
- } else if (enqueueMessageAndCheckBatchReceive(topicMessage)) {
- if (hasPendingBatchReceive()) {
- notifyPendingBatchReceivedCallBack();
- }
+ // if asyncReceive is waiting : return message to callback without
adding to incomingMessages queue
+ CompletableFuture<Message<T>> receivedFuture = pendingReceives.poll();
+ if (receivedFuture != null) {
+ unAckedMessageTracker.add(topicMessage.getMessageId());
+ listenerExecutor.execute(() ->
receivedFuture.complete(topicMessage));
+ } else if (enqueueMessageAndCheckBatchReceive(topicMessage) &&
hasPendingBatchReceive()) {
+ try {
+ lock.writeLock().lock();
Review comment:
we are now using only writeLock so we could use a simple ReentrantLock
and not a ReentrantReadWriteLock
##########
File path:
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
##########
@@ -219,51 +219,44 @@ private void receiveMessageFromConsumer(ConsumerImpl<T>
consumer) {
messageReceived(consumer, message);
// we're modifying pausedConsumers
- lock.writeLock().lock();
- try {
- int size = incomingMessages.size();
- if (size >= maxReceiverQueueSize
- || (size > sharedQueueResumeThreshold &&
!pausedConsumers.isEmpty())) {
- // mark this consumer to be resumed later: if No more
space left in shared queue,
- // or if any consumer is already paused (to create fair
chance for already paused consumers)
- pausedConsumers.add(consumer);
- } else {
- // Schedule next receiveAsync() if the incoming queue is
not full. Use a different thread to avoid
- // recursion and stack overflow
- client.eventLoopGroup().execute(() -> {
- receiveMessageFromConsumer(consumer);
- });
- }
- } finally {
- lock.writeLock().unlock();
+ int size = incomingMessages.size();
+ if (size >= maxReceiverQueueSize
+ || (size > sharedQueueResumeThreshold &&
!pausedConsumers.isEmpty())) {
+ // mark this consumer to be resumed later: if No more space
left in shared queue,
+ // or if any consumer is already paused (to create fair chance
for already paused consumers)
+ pausedConsumers.add(consumer);
+ } else {
+ // Schedule next receiveAsync() if the incoming queue is not
full. Use a different thread to avoid
+ // recursion and stack overflow
+ client.eventLoopGroup().execute(() -> {
+ receiveMessageFromConsumer(consumer);
+ });
}
});
}
private void messageReceived(ConsumerImpl<T> consumer, Message<T> message)
{
checkArgument(message instanceof MessageImpl);
- lock.writeLock().lock();
- try {
- TopicMessageImpl<T> topicMessage = new TopicMessageImpl<>(
+ TopicMessageImpl<T> topicMessage = new TopicMessageImpl<>(
consumer.getTopic(), consumer.getTopicNameWithoutPartition(),
message);
- if (log.isDebugEnabled()) {
- log.debug("[{}][{}] Received message from topics-consumer {}",
+ if (log.isDebugEnabled()) {
+ log.debug("[{}][{}] Received message from topics-consumer {}",
topic, subscription, message.getMessageId());
- }
+ }
- // if asyncReceive is waiting : return message to callback without
adding to incomingMessages queue
- if (!pendingReceives.isEmpty()) {
- CompletableFuture<Message<T>> receivedFuture =
pendingReceives.poll();
- unAckedMessageTracker.add(topicMessage.getMessageId());
- listenerExecutor.execute(() ->
receivedFuture.complete(topicMessage));
- } else if (enqueueMessageAndCheckBatchReceive(topicMessage)) {
- if (hasPendingBatchReceive()) {
- notifyPendingBatchReceivedCallBack();
- }
+ // if asyncReceive is waiting : return message to callback without
adding to incomingMessages queue
+ CompletableFuture<Message<T>> receivedFuture = pendingReceives.poll();
+ if (receivedFuture != null) {
+ unAckedMessageTracker.add(topicMessage.getMessageId());
+ listenerExecutor.execute(() ->
receivedFuture.complete(topicMessage));
+ } else if (enqueueMessageAndCheckBatchReceive(topicMessage) &&
hasPendingBatchReceive()) {
+ try {
+ lock.writeLock().lock();
+ notifyPendingBatchReceivedCallBack();
Review comment:
not every call of this `notifyPendingBatchReceivedCallBack` method are
guarded by the lock, do we need to use the lock here ?
##########
File path:
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
##########
@@ -219,51 +219,44 @@ private void receiveMessageFromConsumer(ConsumerImpl<T>
consumer) {
messageReceived(consumer, message);
// we're modifying pausedConsumers
Review comment:
probably this comment is no more useful
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]