This is an automated email from the ASF dual-hosted git repository.
tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new bc1764f9ef7 [fix][client] Seek should be thread-safe (#20242)
bc1764f9ef7 is described below
commit bc1764f9ef71dd31e8cd61c7571e493442bc6395
Author: Kim, Joo Hyuk <[email protected]>
AuthorDate: Mon May 8 20:57:20 2023 +0900
[fix][client] Seek should be thread-safe (#20242)
---
.../apache/pulsar/client/impl/ConsumerImpl.java | 109 ++++++++++++---------
.../pulsar/client/impl/ConsumerImplTest.java | 27 ++++-
2 files changed, 86 insertions(+), 50 deletions(-)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 199e8a9ae71..6c64fe04069 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -208,6 +208,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
private final AtomicReference<ClientCnx>
clientCnxUsedForConsumerRegistration = new AtomicReference<>();
private final List<Throwable> previousExceptions = new
CopyOnWriteArrayList<Throwable>();
+
static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client,
String topic,
ConsumerConfigurationData<T>
conf,
@@ -251,10 +252,12 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
}
protected ConsumerImpl(PulsarClientImpl client, String topic,
ConsumerConfigurationData<T> conf,
- ExecutorProvider executorProvider, int partitionIndex, boolean
hasParentConsumer,
- boolean parentConsumerHasListener, CompletableFuture<Consumer<T>>
subscribeFuture, MessageId startMessageId,
- long startMessageRollbackDurationInSec, Schema<T> schema,
ConsumerInterceptors<T> interceptors,
- boolean createTopicIfDoesNotExist) {
+ ExecutorProvider executorProvider, int
partitionIndex, boolean hasParentConsumer,
+ boolean parentConsumerHasListener,
CompletableFuture<Consumer<T>> subscribeFuture,
+ MessageId startMessageId,
+ long startMessageRollbackDurationInSec, Schema<T>
schema,
+ ConsumerInterceptors<T> interceptors,
+ boolean createTopicIfDoesNotExist) {
super(client, topic, conf, conf.getReceiverQueueSize(),
executorProvider, subscribeFuture, schema,
interceptors);
this.consumerId = client.newConsumerId();
@@ -319,21 +322,21 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
}
this.connectionHandler = new ConnectionHandler(this,
- new BackoffBuilder()
-
.setInitialTime(client.getConfiguration().getInitialBackoffIntervalNanos(),
- TimeUnit.NANOSECONDS)
-
.setMax(client.getConfiguration().getMaxBackoffIntervalNanos(),
TimeUnit.NANOSECONDS)
- .setMandatoryStop(0, TimeUnit.MILLISECONDS)
- .create(),
+ new BackoffBuilder()
+
.setInitialTime(client.getConfiguration().getInitialBackoffIntervalNanos(),
+ TimeUnit.NANOSECONDS)
+
.setMax(client.getConfiguration().getMaxBackoffIntervalNanos(),
TimeUnit.NANOSECONDS)
+ .setMandatoryStop(0, TimeUnit.MILLISECONDS)
+ .create(),
this);
this.topicName = TopicName.get(topic);
if (this.topicName.isPersistent()) {
this.acknowledgmentsGroupingTracker =
- new PersistentAcknowledgmentsGroupingTracker(this, conf,
client.eventLoopGroup());
+ new PersistentAcknowledgmentsGroupingTracker(this, conf,
client.eventLoopGroup());
} else {
this.acknowledgmentsGroupingTracker =
- NonPersistentAcknowledgmentGroupingTracker.of();
+ NonPersistentAcknowledgmentGroupingTracker.of();
}
if (conf.getDeadLetterPolicy() != null) {
@@ -410,16 +413,16 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
log.error("[{}][{}] Failed to unsubscribe: {}", topic,
subscription, e.getCause().getMessage());
setState(State.Ready);
unsubscribeFuture.completeExceptionally(
- PulsarClientException.wrap(e.getCause(),
- String.format("Failed to unsubscribe the subscription
%s of topic %s",
- topicName.toString(), subscription)));
+ PulsarClientException.wrap(e.getCause(),
+ String.format("Failed to unsubscribe the
subscription %s of topic %s",
+ topicName.toString(), subscription)));
return null;
});
} else {
unsubscribeFuture.completeExceptionally(
- new PulsarClientException(
- String.format("The client is not connected to the broker
when unsubscribing the "
- + "subscription %s of the topic %s", subscription,
topicName.toString())));
+ new PulsarClientException(
+ String.format("The client is not connected to the
broker when unsubscribing the "
+ + "subscription %s of the topic %s",
subscription, topicName.toString())));
}
return unsubscribeFuture;
}
@@ -1400,7 +1403,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
}
private ByteBuf processMessageChunk(ByteBuf compressedPayload,
MessageMetadata msgMetadata, MessageIdImpl msgId,
- MessageIdData messageId, ClientCnx cnx) {
+ MessageIdData messageId, ClientCnx
cnx) {
// Lazy task scheduling to expire incomplete chunk message
if (expireTimeOfIncompleteChunkedMessageMillis > 0 &&
expireChunkMessageTaskScheduled.compareAndSet(false,
@@ -1445,7 +1448,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
increaseAvailablePermits(cnx);
if (expireTimeOfIncompleteChunkedMessageMillis > 0
&& System.currentTimeMillis() >
(msgMetadata.getPublishTime()
- + expireTimeOfIncompleteChunkedMessageMillis)) {
+ + expireTimeOfIncompleteChunkedMessageMillis)) {
doAcknowledge(msgId, AckType.Individual,
Collections.emptyMap(), null);
} else {
trackMessage(msgId);
@@ -1636,7 +1639,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
}
protected void trackMessage(MessageId messageId) {
- trackMessage(messageId, 0);
+ trackMessage(messageId, 0);
}
protected void trackMessage(MessageId messageId, int redeliveryCount) {
@@ -1777,7 +1780,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
}
private ByteBuf uncompressPayloadIfNeeded(MessageIdData messageId,
MessageMetadata msgMetadata, ByteBuf payload,
- ClientCnx currentCnx, boolean checkMaxMessageSize) {
+ ClientCnx currentCnx, boolean
checkMaxMessageSize) {
CompressionType compressionType = msgMetadata.getCompression();
CompressionCodec codec =
CompressionCodecProvider.getCompressionCodec(compressionType);
int uncompressedSize = msgMetadata.getUncompressedSize();
@@ -1830,7 +1833,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
}
private void discardCorruptedMessage(MessageIdData messageId, ClientCnx
currentCnx,
- ValidationError validationError) {
+ ValidationError validationError) {
log.error("[{}][{}] Discarding corrupted message at {}:{}", topic,
subscription, messageId.getLedgerId(),
messageId.getEntryId());
discardMessage(messageId, currentCnx, validationError);
@@ -2022,8 +2025,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
String originTopicNameStr = getOriginTopicNameStr(message);
TypedMessageBuilder<byte[]> typedMessageBuilderNew =
producerDLQ.newMessage(Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get()))
- .value(message.getData())
- .properties(getPropertiesMap(message,
originMessageIdStr, originTopicNameStr));
+ .value(message.getData())
+ .properties(getPropertiesMap(message,
originMessageIdStr, originTopicNameStr));
if (message.hasKey()) {
typedMessageBuilderNew.key(message.getKey());
}
@@ -2052,7 +2055,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
}
result.complete(false);
return null;
- });
+ });
}
}, internalPinnedExecutor).exceptionally(ex -> {
log.error("Dead letter producer exception with topic: {}",
deadLetterPolicy.getDeadLetterTopic(), ex);
@@ -2151,9 +2154,15 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
final CompletableFuture<Void> seekFuture = new CompletableFuture<>();
ClientCnx cnx = cnx();
+ if (!duringSeek.compareAndSet(false, true)) {
+ log.warn("[{}][{}] Attempting to seek operation that is already in
progress, cancelling {}",
+ topic, subscription, seekBy);
+ seekFuture.cancel(true);
+ return seekFuture;
+ }
+
MessageIdAdv originSeekMessageId = seekMessageId;
seekMessageId = (MessageIdAdv) seekId;
- duringSeek.set(true);
log.info("[{}][{}] Seeking subscription to {}", topic, subscription,
seekBy);
cnx.sendRequestWithId(seek, requestId).thenRun(() -> {
@@ -2171,9 +2180,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
log.error("[{}][{}] Failed to reset subscription: {}", topic,
subscription, e.getCause().getMessage());
seekFuture.completeExceptionally(
- PulsarClientException.wrap(e.getCause(),
- String.format("Failed to seek the subscription %s of the
topic %s to %s",
- subscription, topicName.toString(), seekBy)));
+ PulsarClientException.wrap(e.getCause(),
+ String.format("Failed to seek the subscription %s
of the topic %s to %s",
+ subscription, topicName.toString(),
seekBy)));
return null;
});
return seekFuture;
@@ -2185,7 +2194,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
return seekAsyncCheckState(seekBy).orElseGet(() -> {
long requestId = client.newRequestId();
return seekAsyncInternal(requestId, Commands.newSeek(consumerId,
requestId, timestamp),
- MessageId.earliest, seekBy);
+ MessageId.earliest, seekBy);
});
}
@@ -2351,10 +2360,11 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
public CompletableFuture<GetLastMessageIdResponse>
internalGetLastMessageIdAsync() {
if (getState() == State.Closing || getState() == State.Closed) {
return FutureUtil
- .failedFuture(new PulsarClientException.AlreadyClosedException(
- String.format("The consumer %s was already closed when the
subscription %s of the topic %s "
- + "getting the last message id", consumerName,
subscription, topicName.toString())));
- }
+ .failedFuture(new
PulsarClientException.AlreadyClosedException(
+ String.format("The consumer %s was already closed
when the subscription %s of the topic %s "
+ + "getting the last message id",
consumerName, subscription,
+ topicName.toString())));
+ }
AtomicLong opTimeoutMs = new
AtomicLong(client.getConfiguration().getOperationTimeoutMs());
Backoff backoff = new BackoffBuilder()
@@ -2376,11 +2386,12 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
if (isConnected() && cnx != null) {
if
(!Commands.peerSupportsGetLastMessageId(cnx.getRemoteEndpointProtocolVersion()))
{
future.completeExceptionally(
- new PulsarClientException.NotSupportedException(
- String.format("The command `GetLastMessageId` is not
supported for the protocol version %d. "
- + "The consumer is %s, topic %s,
subscription %s",
- cnx.getRemoteEndpointProtocolVersion(),
- consumerName, topicName.toString(),
subscription)));
+ new PulsarClientException.NotSupportedException(
+ String.format(
+ "The command `GetLastMessageId` is not
supported for the protocol version %d. "
+ + "The consumer is %s, topic
%s, subscription %s",
+ cnx.getRemoteEndpointProtocolVersion(),
+ consumerName, topicName.toString(),
subscription)));
return;
}
@@ -2399,31 +2410,31 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
}
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Successfully getLastMessageId {}:{}",
- topic, subscription, lastMessageId.getLedgerId(),
lastMessageId.getEntryId());
+ topic, subscription, lastMessageId.getLedgerId(),
lastMessageId.getEntryId());
}
MessageId lastMsgId = lastMessageId.getBatchIndex() <= 0
? new MessageIdImpl(lastMessageId.getLedgerId(),
- lastMessageId.getEntryId(),
lastMessageId.getPartition())
+ lastMessageId.getEntryId(),
lastMessageId.getPartition())
: new BatchMessageIdImpl(lastMessageId.getLedgerId(),
lastMessageId.getEntryId(),
- lastMessageId.getPartition(),
lastMessageId.getBatchIndex());
+ lastMessageId.getPartition(),
lastMessageId.getBatchIndex());
future.complete(new GetLastMessageIdResponse(lastMsgId,
markDeletePosition));
}).exceptionally(e -> {
log.error("[{}][{}] Failed getLastMessageId command", topic,
subscription);
future.completeExceptionally(
- PulsarClientException.wrap(e.getCause(),
- String.format("The subscription %s of the topic %s
gets the last message id was failed",
- subscription, topicName.toString())));
+ PulsarClientException.wrap(e.getCause(),
+ String.format("The subscription %s of the
topic %s gets the last message id was failed",
+ subscription, topicName.toString())));
return null;
});
} else {
long nextDelay = Math.min(backoff.next(), remainingTime.get());
if (nextDelay <= 0) {
future.completeExceptionally(
- new PulsarClientException.TimeoutException(
- String.format("The subscription %s of the topic %s
could not get the last message id "
- + "withing configured timeout", subscription,
topicName.toString())));
+ new PulsarClientException.TimeoutException(
+ String.format("The subscription %s of the
topic %s could not get the last message id "
+ + "withing configured timeout",
subscription, topicName.toString())));
return;
}
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
index 29d180f5f9a..ab11675f543 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.client.impl;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
@@ -26,7 +27,9 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
-
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertTrue;
+import io.netty.buffer.ByteBuf;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
@@ -259,4 +262,26 @@ public class ConsumerImplTest {
assertThat(consumer.getPriorityLevel()).isEqualTo(1);
}
+
+ @Test(invocationTimeOut = 1000)
+ public void testSeekAsyncInternal() {
+ // given
+ ClientCnx cnx = mock(ClientCnx.class);
+ CompletableFuture<ProducerResponse> clientReq = new
CompletableFuture<>();
+ when(cnx.sendRequestWithId(any(ByteBuf.class),
anyLong())).thenReturn(clientReq);
+
+ consumer.setClientCnx(cnx);
+ consumer.setState(HandlerState.State.Ready);
+
+ // when
+ CompletableFuture<Void> firstResult = consumer.seekAsync(1L);
+ CompletableFuture<Void> secondResult = consumer.seekAsync(1L);
+
+ clientReq.complete(null);
+
+ // then
+ assertTrue(firstResult.isDone());
+ assertTrue(secondResult.isCancelled());
+ verify(cnx, times(1)).sendRequestWithId(any(ByteBuf.class), anyLong());
+ }
}