This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new def1932 Make Consumer thread safe and lock-free (#10352)
def1932 is described below
commit def1932033f4e9c99ff25df20887f282a338718e
Author: feynmanlin <[email protected]>
AuthorDate: Thu May 6 07:08:25 2021 +0800
Make Consumer thread safe and lock-free (#10352)
### Motivation
Lock-free solution for https://github.com/apache/pulsar/pull/10240
---
pulsar-client/pom.xml | 7 +++
.../apache/pulsar/client/impl/ConsumerImpl.java | 68 +++++++++-------------
.../pulsar/client/impl/ClientTestFixtures.java | 1 -
.../pulsar/client/impl/ConsumerImplTest.java | 16 ++++-
.../apache/pulsar/client/impl/ReaderImplTest.java | 20 ++++++-
5 files changed, 66 insertions(+), 46 deletions(-)
diff --git a/pulsar-client/pom.xml b/pulsar-client/pom.xml
index 0fb4449..142671a 100644
--- a/pulsar-client/pom.xml
+++ b/pulsar-client/pom.xml
@@ -169,6 +169,13 @@
<version>${skyscreamer.version}</version>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
<build>
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 48dd929..b0ce397 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -47,6 +47,7 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
@@ -129,8 +130,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
private final int receiverQueueRefillThreshold;
- private final ReadWriteLock lock = new ReentrantReadWriteLock();
-
private final UnAckedMessageTracker unAckedMessageTracker;
private final AcknowledgmentsGroupingTracker
acknowledgmentsGroupingTracker;
private final NegativeAcksTracker negativeAcksTracker;
@@ -188,6 +187,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
private final boolean poolMessages;
private final AtomicReference<ClientCnx>
clientCnxUsedForConsumerRegistration = new AtomicReference<>();
+ private final ExecutorService internalPinnedExecutor;
static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client,
String topic,
@@ -255,6 +255,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
this.expireTimeOfIncompleteChunkedMessageMillis =
conf.getExpireTimeOfIncompleteChunkedMessageMillis();
this.autoAckOldestChunkedMessageOnQueueFull =
conf.isAutoAckOldestChunkedMessageOnQueueFull();
this.poolMessages = conf.isPoolMessages();
+ this.internalPinnedExecutor = client.getInternalExecutorService();
if (client.getConfiguration().getStatsIntervalSeconds() > 0) {
stats = new ConsumerStatsRecorderImpl(client, conf, this);
@@ -412,25 +413,17 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
protected CompletableFuture<Message<T>> internalReceiveAsync() {
CompletableFutureCancellationHandler cancellationHandler = new
CompletableFutureCancellationHandler();
CompletableFuture<Message<T>> result =
cancellationHandler.createFuture();
- Message<T> message = null;
- lock.writeLock().lock();
- try {
- message = incomingMessages.poll(0, TimeUnit.MILLISECONDS);
+ internalPinnedExecutor.execute(() -> {
+ Message<T> message = incomingMessages.poll();
if (message == null) {
pendingReceives.add(result);
cancellationHandler.setCancelAction(() ->
pendingReceives.remove(result));
}
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- result.completeExceptionally(e);
- } finally {
- lock.writeLock().unlock();
- }
-
- if (message != null) {
- messageProcessed(message);
- result.complete(beforeConsume(message));
- }
+ if (message != null) {
+ messageProcessed(message);
+ result.complete(beforeConsume(message));
+ }
+ });
return result;
}
@@ -475,8 +468,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
protected CompletableFuture<Messages<T>> internalBatchReceiveAsync() {
CompletableFutureCancellationHandler cancellationHandler = new
CompletableFutureCancellationHandler();
CompletableFuture<Messages<T>> result =
cancellationHandler.createFuture();
- lock.writeLock().lock();
- try {
+ internalPinnedExecutor.execute(() -> {
if (pendingBatchReceives == null) {
pendingBatchReceives = Queues.newConcurrentLinkedQueue();
}
@@ -498,9 +490,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
pendingBatchReceives.add(opBatchReceive);
cancellationHandler.setCancelAction(() ->
pendingBatchReceives.remove(opBatchReceive));
}
- } finally {
- lock.writeLock().unlock();
- }
+ });
return result;
}
@@ -973,15 +963,12 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
}
private void failPendingReceive() {
- lock.readLock().lock();
- try {
+ internalPinnedExecutor.execute(() -> {
if (pinnedExecutor != null && !pinnedExecutor.isShutdown()) {
failPendingReceives(this.pendingReceives);
failPendingBatchReceives(this.pendingBatchReceives);
}
- } finally {
- lock.readLock().unlock();
- }
+ });
}
void activeConsumerChanged(boolean isActive) {
@@ -1079,11 +1066,10 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
poolMessages);
uncompressedPayload.release();
- lock.readLock().lock();
- try {
- // Enqueue the message so that it can be retrieved when
application calls receive()
- // if the conf.getReceiverQueueSize() is 0 then discard
message if no one is waiting for it.
- // if asyncReceive is waiting then notify callback without
adding to incomingMessages queue
+ // Enqueue the message so that it can be retrieved when
application calls receive()
+ // if the conf.getReceiverQueueSize() is 0 then discard message if
no one is waiting for it.
+ // if asyncReceive is waiting then notify callback without adding
to incomingMessages queue
+ internalPinnedExecutor.execute(() -> {
if (deadLetterPolicy != null &&
possibleSendToDeadLetterTopicMessages != null &&
redeliveryCount >=
deadLetterPolicy.getMaxRedeliverCount()) {
possibleSendToDeadLetterTopicMessages.put((MessageIdImpl)
message.getMessageId(),
@@ -1094,16 +1080,19 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
} else if (enqueueMessageAndCheckBatchReceive(message) &&
hasPendingBatchReceive()) {
notifyPendingBatchReceivedCallBack();
}
- } finally {
- lock.readLock().unlock();
- }
+ });
} else {
// handle batch message enqueuing; uncompressed payload has all
messages in batch
receiveIndividualMessagesFromBatch(msgMetadata, redeliveryCount,
ackSet, uncompressedPayload, messageId, cnx);
uncompressedPayload.release();
}
+ internalPinnedExecutor.execute(()
+ -> tryTriggerListener());
+ }
+
+ private void tryTriggerListener() {
if (listener != null) {
triggerListener();
}
@@ -1306,17 +1295,14 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
if (possibleToDeadLetter != null) {
possibleToDeadLetter.add(message);
}
- lock.readLock().lock();
- try {
+ internalPinnedExecutor.execute(() -> {
if (peekPendingReceive() != null) {
notifyPendingReceivedCallback(message, null);
} else if (enqueueMessageAndCheckBatchReceive(message) &&
hasPendingBatchReceive()) {
notifyPendingBatchReceivedCallBack();
}
- } finally {
- lock.readLock().unlock();
- }
- singleMessagePayload.release();
+ singleMessagePayload.release();
+ });
}
if (ackBitSet != null) {
ackBitSet.recycle();
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java
index 8bb7bbc..0adb165 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java
@@ -21,7 +21,6 @@ package org.apache.pulsar.client.impl;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoop;
import io.netty.util.Timer;
-import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.mockito.Mockito;
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
index f3ccfd8..2702439 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
@@ -25,12 +25,14 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
-import io.netty.util.concurrent.DefaultThreadFactory;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Messages;
@@ -38,6 +40,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.util.ExecutorProvider;
+import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
@@ -49,12 +52,15 @@ public class ConsumerImplTest {
private ExecutorProvider executorProvider;
private ConsumerImpl<byte[]> consumer;
private ConsumerConfigurationData consumerConf;
+ private ExecutorService executorService;
@BeforeMethod(alwaysRun = true)
public void setUp() {
executorProvider = new ExecutorProvider(1, "ConsumerImplTest");
consumerConf = new ConsumerConfigurationData<>();
PulsarClientImpl client = ClientTestFixtures.createPulsarClientMock();
+ executorService = Executors.newSingleThreadExecutor();
+ when(client.getInternalExecutorService()).thenReturn(executorService);
ClientConfigurationData clientConf = client.getConfiguration();
clientConf.setOperationTimeoutMs(100);
clientConf.setStatsIntervalSeconds(0);
@@ -74,6 +80,10 @@ public class ConsumerImplTest {
executorProvider.shutdownNow();
executorProvider = null;
}
+ if (executorService != null) {
+ executorService.shutdownNow();
+ executorService = null;
+ }
}
@Test(invocationTimeOut = 1000)
@@ -162,7 +172,7 @@ public class ConsumerImplTest {
public void testReceiveAsyncCanBeCancelled() {
// given
CompletableFuture<Message<byte[]>> future = consumer.receiveAsync();
- Assert.assertEquals(consumer.peekPendingReceive(), future);
+ Awaitility.await().untilAsserted(() ->
Assert.assertEquals(consumer.peekPendingReceive(), future));
// when
future.cancel(true);
// then
@@ -173,7 +183,7 @@ public class ConsumerImplTest {
public void testBatchReceiveAsyncCanBeCancelled() {
// given
CompletableFuture<Messages<byte[]>> future =
consumer.batchReceiveAsync();
- Assert.assertTrue(consumer.hasPendingBatchReceive());
+ Awaitility.await().untilAsserted(() ->
Assert.assertTrue(consumer.hasPendingBatchReceive()));
// when
future.cancel(true);
// then
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ReaderImplTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ReaderImplTest.java
index 6e587f7..d0c4023 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ReaderImplTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ReaderImplTest.java
@@ -22,32 +22,50 @@ import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import static org.mockito.Mockito.when;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
public class ReaderImplTest {
ReaderImpl<byte[]> reader;
+ private ExecutorService executorService;
@BeforeMethod
void setupReader() {
PulsarClientImpl mockedClient =
ClientTestFixtures.createPulsarClientMockWithMockedClientCnx();
ReaderConfigurationData<byte[]> readerConfiguration = new
ReaderConfigurationData<>();
readerConfiguration.setTopicName("topicName");
+ executorService = Executors.newSingleThreadExecutor();
+
when(mockedClient.getInternalExecutorService()).thenReturn(executorService);
CompletableFuture<Consumer<byte[]>> consumerFuture = new
CompletableFuture<>();
reader = new ReaderImpl<>(mockedClient, readerConfiguration,
ClientTestFixtures.createMockedExecutorProvider(),
consumerFuture, Schema.BYTES);
}
+ @AfterMethod
+ public void clean() {
+ if (executorService != null) {
+ executorService.shutdownNow();
+ executorService = null;
+ }
+ }
+
@Test
void shouldSupportCancellingReadNextAsync() {
// given
CompletableFuture<Message<byte[]>> future = reader.readNextAsync();
- assertNotNull(reader.getConsumer().peekPendingReceive());
+ Awaitility.await().untilAsserted(() -> {
+ assertNotNull(reader.getConsumer().peekPendingReceive());
+ });
// when
future.cancel(false);