This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 9d08f64 Remove consumer unnecessary locks (#9261)
9d08f64 is described below
commit 9d08f64a827ad670c89cf708ddd409e1d5b5f763
Author: hangc0276 <[email protected]>
AuthorDate: Thu Apr 1 10:45:40 2021 +0800
Remove consumer unnecessary locks (#9261)
### Motivation
1. The `ConsumerImpl` has many unnecessary locks for thread-safe Queue,
such as `Queues.newConcurrentLinkedQueue`, `GrowableArrayBlockingQueue`,
`ConcurrentLinkedQueue`
### Changes
1. Remove unnecessary locks in `ConsumerImpl`
Related to PR#8207
---
.../apache/pulsar/client/impl/ConsumerBase.java | 6 ++-
.../apache/pulsar/client/impl/ConsumerImpl.java | 60 ++++++++--------------
.../client/impl/MultiTopicsConsumerImpl.java | 23 ++++-----
3 files changed, 35 insertions(+), 54 deletions(-)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index dfce6ed..28c248f 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -35,10 +35,10 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
import io.netty.util.Timeout;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.pulsar.client.api.BatchReceivePolicy;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerEventListener;
@@ -733,6 +733,7 @@ public abstract class ConsumerBase<T> extends HandlerState
implements Consumer<T
if (opBatchReceive == null) {
return;
}
+
try {
reentrantLock.lock();
notifyPendingBatchReceivedCallBack(opBatchReceive);
@@ -790,6 +791,7 @@ public abstract class ConsumerBase<T> extends HandlerState
implements Consumer<T
}
msgPeeked = incomingMessages.peek();
}
+
completePendingBatchReceive(opBatchReceive.future, messages);
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index c7f67f4..d57fc39 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -409,7 +409,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
CompletableFuture<Message<T>> result =
cancellationHandler.createFuture();
Message<T> message = null;
try {
- lock.writeLock().lock();
message = incomingMessages.poll(0, TimeUnit.MILLISECONDS);
if (message == null) {
pendingReceives.add(result);
@@ -418,8 +417,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
result.completeExceptionally(e);
- } finally {
- lock.writeLock().unlock();
}
if (message != null) {
@@ -496,6 +493,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
} finally {
lock.writeLock().unlock();
}
+
return result;
}
@@ -948,14 +946,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
}
private void failPendingReceive() {
- lock.readLock().lock();
- try {
- if (pinnedExecutor != null && !pinnedExecutor.isShutdown()) {
- failPendingReceives(this.pendingReceives);
- failPendingBatchReceives(this.pendingBatchReceives);
- }
- } finally {
- lock.readLock().unlock();
+ if (pinnedExecutor != null && !pinnedExecutor.isShutdown()) {
+ failPendingReceives(this.pendingReceives);
+ failPendingBatchReceives(this.pendingBatchReceives);
}
}
@@ -1053,23 +1046,18 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
uncompressedPayload, createEncryptionContext(msgMetadata),
cnx, schema, redeliveryCount);
uncompressedPayload.release();
- lock.readLock().lock();
- try {
- // Enqueue the message so that it can be retrieved when
application calls receive()
- // if the conf.getReceiverQueueSize() is 0 then discard
message if no one is waiting for it.
- // if asyncReceive is waiting then notify callback without
adding to incomingMessages queue
- if (deadLetterPolicy != null &&
possibleSendToDeadLetterTopicMessages != null && redeliveryCount >=
deadLetterPolicy.getMaxRedeliverCount()) {
-
possibleSendToDeadLetterTopicMessages.put((MessageIdImpl)message.getMessageId(),
Collections.singletonList(message));
- }
- if (peekPendingReceive() != null) {
- notifyPendingReceivedCallback(message, null);
- } else if (enqueueMessageAndCheckBatchReceive(message)) {
- if (hasPendingBatchReceive()) {
- notifyPendingBatchReceivedCallBack();
- }
- }
- } finally {
- lock.readLock().unlock();
+ // Enqueue the message so that it can be retrieved when
application calls receive()
+ // if the conf.getReceiverQueueSize() is 0 then discard message if
no one is waiting for it.
+ // if asyncReceive is waiting then notify callback without adding
to incomingMessages queue
+ if (deadLetterPolicy != null &&
possibleSendToDeadLetterTopicMessages != null &&
+ redeliveryCount >=
deadLetterPolicy.getMaxRedeliverCount()) {
+
possibleSendToDeadLetterTopicMessages.put((MessageIdImpl)message.getMessageId(),
+ Collections.singletonList(message));
+ }
+ if (peekPendingReceive() != null) {
+ notifyPendingReceivedCallback(message, null);
+ } else if (enqueueMessageAndCheckBatchReceive(message) &&
hasPendingBatchReceive()) {
+ notifyPendingBatchReceivedCallBack();
}
} else {
// handle batch message enqueuing; uncompressed payload has all
messages in batch
@@ -1280,17 +1268,11 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
if (possibleToDeadLetter != null) {
possibleToDeadLetter.add(message);
}
- lock.readLock().lock();
- try {
- if (peekPendingReceive() != null) {
- notifyPendingReceivedCallback(message, null);
- } else if (enqueueMessageAndCheckBatchReceive(message)) {
- if (hasPendingBatchReceive()) {
- notifyPendingBatchReceivedCallBack();
- }
- }
- } finally {
- lock.readLock().unlock();
+
+ if (peekPendingReceive() != null) {
+ notifyPendingReceivedCallback(message, null);
+ } else if (enqueueMessageAndCheckBatchReceive(message) &&
hasPendingBatchReceive()) {
+ notifyPendingBatchReceivedCallBack();
}
singleMessagePayload.release();
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 39f7715..5263401 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -97,8 +97,8 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
private volatile Timeout partitionsAutoUpdateTimeout = null;
TopicsPartitionChangedListener topicsPartitionChangedListener;
CompletableFuture<Void> partitionsAutoUpdateFuture = null;
-
private final ReadWriteLock lock = new ReentrantReadWriteLock();
+
private final ConsumerStatsRecorder stats;
private final UnAckedMessageTracker unAckedMessageTracker;
private final ConsumerConfigurationData<T> internalConfig;
@@ -386,6 +386,7 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
} finally {
lock.writeLock().unlock();
}
+
return result;
}
@@ -595,18 +596,14 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
@Override
public void redeliverUnacknowledgedMessages() {
- lock.writeLock().lock();
- try {
- consumers.values().stream().forEach(consumer -> {
- consumer.redeliverUnacknowledgedMessages();
- consumer.unAckedChunkedMessageIdSequenceMap.clear();
- });
- incomingMessages.clear();
- resetIncomingMessageSize();
- unAckedMessageTracker.clear();
- } finally {
- lock.writeLock().unlock();
- }
+ consumers.values().stream().forEach(consumer -> {
+ consumer.redeliverUnacknowledgedMessages();
+ consumer.unAckedChunkedMessageIdSequenceMap.clear();
+ });
+ incomingMessages.clear();
+ resetIncomingMessageSize();
+ unAckedMessageTracker.clear();
+
resumeReceivingFromPausedConsumersIfNeeded();
}