This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 7d4d7a60ac3 Revert "[fix][client] Seek should be thread-safe (#20242)"
7d4d7a60ac3 is described below
commit 7d4d7a60ac346aab5469685270930462fd7ad3e4
Author: tison <[email protected]>
AuthorDate: Mon May 15 10:47:00 2023 +0800
Revert "[fix][client] Seek should be thread-safe (#20242)"
This reverts commit bc1764f9ef71dd31e8cd61c7571e493442bc6395.
(cherry picked from commit 7b54664c364a7360f30cd37f0aa989ab13a14af4)
---
.../apache/pulsar/client/impl/ConsumerImpl.java | 109 +++++++++------------
.../pulsar/client/impl/ConsumerImplTest.java | 27 +----
2 files changed, 50 insertions(+), 86 deletions(-)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 6c64fe04069..199e8a9ae71 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -208,7 +208,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
private final AtomicReference<ClientCnx>
clientCnxUsedForConsumerRegistration = new AtomicReference<>();
private final List<Throwable> previousExceptions = new
CopyOnWriteArrayList<Throwable>();
-
static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client,
String topic,
ConsumerConfigurationData<T>
conf,
@@ -252,12 +251,10 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
}
protected ConsumerImpl(PulsarClientImpl client, String topic,
ConsumerConfigurationData<T> conf,
- ExecutorProvider executorProvider, int
partitionIndex, boolean hasParentConsumer,
- boolean parentConsumerHasListener,
CompletableFuture<Consumer<T>> subscribeFuture,
- MessageId startMessageId,
- long startMessageRollbackDurationInSec, Schema<T>
schema,
- ConsumerInterceptors<T> interceptors,
- boolean createTopicIfDoesNotExist) {
+ ExecutorProvider executorProvider, int partitionIndex, boolean
hasParentConsumer,
+ boolean parentConsumerHasListener, CompletableFuture<Consumer<T>>
subscribeFuture, MessageId startMessageId,
+ long startMessageRollbackDurationInSec, Schema<T> schema,
ConsumerInterceptors<T> interceptors,
+ boolean createTopicIfDoesNotExist) {
super(client, topic, conf, conf.getReceiverQueueSize(),
executorProvider, subscribeFuture, schema,
interceptors);
this.consumerId = client.newConsumerId();
@@ -322,21 +319,21 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
}
this.connectionHandler = new ConnectionHandler(this,
- new BackoffBuilder()
-
.setInitialTime(client.getConfiguration().getInitialBackoffIntervalNanos(),
- TimeUnit.NANOSECONDS)
-
.setMax(client.getConfiguration().getMaxBackoffIntervalNanos(),
TimeUnit.NANOSECONDS)
- .setMandatoryStop(0, TimeUnit.MILLISECONDS)
- .create(),
+ new BackoffBuilder()
+
.setInitialTime(client.getConfiguration().getInitialBackoffIntervalNanos(),
+ TimeUnit.NANOSECONDS)
+
.setMax(client.getConfiguration().getMaxBackoffIntervalNanos(),
TimeUnit.NANOSECONDS)
+ .setMandatoryStop(0, TimeUnit.MILLISECONDS)
+ .create(),
this);
this.topicName = TopicName.get(topic);
if (this.topicName.isPersistent()) {
this.acknowledgmentsGroupingTracker =
- new PersistentAcknowledgmentsGroupingTracker(this, conf,
client.eventLoopGroup());
+ new PersistentAcknowledgmentsGroupingTracker(this, conf,
client.eventLoopGroup());
} else {
this.acknowledgmentsGroupingTracker =
- NonPersistentAcknowledgmentGroupingTracker.of();
+ NonPersistentAcknowledgmentGroupingTracker.of();
}
if (conf.getDeadLetterPolicy() != null) {
@@ -413,16 +410,16 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
log.error("[{}][{}] Failed to unsubscribe: {}", topic,
subscription, e.getCause().getMessage());
setState(State.Ready);
unsubscribeFuture.completeExceptionally(
- PulsarClientException.wrap(e.getCause(),
- String.format("Failed to unsubscribe the
subscription %s of topic %s",
- topicName.toString(), subscription)));
+ PulsarClientException.wrap(e.getCause(),
+ String.format("Failed to unsubscribe the subscription
%s of topic %s",
+ topicName.toString(), subscription)));
return null;
});
} else {
unsubscribeFuture.completeExceptionally(
- new PulsarClientException(
- String.format("The client is not connected to the
broker when unsubscribing the "
- + "subscription %s of the topic %s",
subscription, topicName.toString())));
+ new PulsarClientException(
+ String.format("The client is not connected to the broker
when unsubscribing the "
+ + "subscription %s of the topic %s", subscription,
topicName.toString())));
}
return unsubscribeFuture;
}
@@ -1403,7 +1400,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
}
private ByteBuf processMessageChunk(ByteBuf compressedPayload,
MessageMetadata msgMetadata, MessageIdImpl msgId,
- MessageIdData messageId, ClientCnx
cnx) {
+ MessageIdData messageId, ClientCnx cnx) {
// Lazy task scheduling to expire incomplete chunk message
if (expireTimeOfIncompleteChunkedMessageMillis > 0 &&
expireChunkMessageTaskScheduled.compareAndSet(false,
@@ -1448,7 +1445,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
increaseAvailablePermits(cnx);
if (expireTimeOfIncompleteChunkedMessageMillis > 0
&& System.currentTimeMillis() >
(msgMetadata.getPublishTime()
- + expireTimeOfIncompleteChunkedMessageMillis)) {
+ + expireTimeOfIncompleteChunkedMessageMillis)) {
doAcknowledge(msgId, AckType.Individual,
Collections.emptyMap(), null);
} else {
trackMessage(msgId);
@@ -1639,7 +1636,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
}
protected void trackMessage(MessageId messageId) {
- trackMessage(messageId, 0);
+ trackMessage(messageId, 0);
}
protected void trackMessage(MessageId messageId, int redeliveryCount) {
@@ -1780,7 +1777,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
}
private ByteBuf uncompressPayloadIfNeeded(MessageIdData messageId,
MessageMetadata msgMetadata, ByteBuf payload,
- ClientCnx currentCnx, boolean
checkMaxMessageSize) {
+ ClientCnx currentCnx, boolean checkMaxMessageSize) {
CompressionType compressionType = msgMetadata.getCompression();
CompressionCodec codec =
CompressionCodecProvider.getCompressionCodec(compressionType);
int uncompressedSize = msgMetadata.getUncompressedSize();
@@ -1833,7 +1830,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
}
private void discardCorruptedMessage(MessageIdData messageId, ClientCnx
currentCnx,
- ValidationError validationError) {
+ ValidationError validationError) {
log.error("[{}][{}] Discarding corrupted message at {}:{}", topic,
subscription, messageId.getLedgerId(),
messageId.getEntryId());
discardMessage(messageId, currentCnx, validationError);
@@ -2025,8 +2022,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
String originTopicNameStr = getOriginTopicNameStr(message);
TypedMessageBuilder<byte[]> typedMessageBuilderNew =
producerDLQ.newMessage(Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get()))
- .value(message.getData())
- .properties(getPropertiesMap(message,
originMessageIdStr, originTopicNameStr));
+ .value(message.getData())
+ .properties(getPropertiesMap(message,
originMessageIdStr, originTopicNameStr));
if (message.hasKey()) {
typedMessageBuilderNew.key(message.getKey());
}
@@ -2055,7 +2052,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
}
result.complete(false);
return null;
- });
+ });
}
}, internalPinnedExecutor).exceptionally(ex -> {
log.error("Dead letter producer exception with topic: {}",
deadLetterPolicy.getDeadLetterTopic(), ex);
@@ -2154,15 +2151,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
final CompletableFuture<Void> seekFuture = new CompletableFuture<>();
ClientCnx cnx = cnx();
- if (!duringSeek.compareAndSet(false, true)) {
- log.warn("[{}][{}] Attempting to seek operation that is already in
progress, cancelling {}",
- topic, subscription, seekBy);
- seekFuture.cancel(true);
- return seekFuture;
- }
-
MessageIdAdv originSeekMessageId = seekMessageId;
seekMessageId = (MessageIdAdv) seekId;
+ duringSeek.set(true);
log.info("[{}][{}] Seeking subscription to {}", topic, subscription,
seekBy);
cnx.sendRequestWithId(seek, requestId).thenRun(() -> {
@@ -2180,9 +2171,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
log.error("[{}][{}] Failed to reset subscription: {}", topic,
subscription, e.getCause().getMessage());
seekFuture.completeExceptionally(
- PulsarClientException.wrap(e.getCause(),
- String.format("Failed to seek the subscription %s
of the topic %s to %s",
- subscription, topicName.toString(),
seekBy)));
+ PulsarClientException.wrap(e.getCause(),
+ String.format("Failed to seek the subscription %s of the
topic %s to %s",
+ subscription, topicName.toString(), seekBy)));
return null;
});
return seekFuture;
@@ -2194,7 +2185,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
return seekAsyncCheckState(seekBy).orElseGet(() -> {
long requestId = client.newRequestId();
return seekAsyncInternal(requestId, Commands.newSeek(consumerId,
requestId, timestamp),
- MessageId.earliest, seekBy);
+ MessageId.earliest, seekBy);
});
}
@@ -2360,11 +2351,10 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
public CompletableFuture<GetLastMessageIdResponse>
internalGetLastMessageIdAsync() {
if (getState() == State.Closing || getState() == State.Closed) {
return FutureUtil
- .failedFuture(new
PulsarClientException.AlreadyClosedException(
- String.format("The consumer %s was already closed
when the subscription %s of the topic %s "
- + "getting the last message id",
consumerName, subscription,
- topicName.toString())));
- }
+ .failedFuture(new PulsarClientException.AlreadyClosedException(
+ String.format("The consumer %s was already closed when the
subscription %s of the topic %s "
+ + "getting the last message id", consumerName,
subscription, topicName.toString())));
+ }
AtomicLong opTimeoutMs = new
AtomicLong(client.getConfiguration().getOperationTimeoutMs());
Backoff backoff = new BackoffBuilder()
@@ -2386,12 +2376,11 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
if (isConnected() && cnx != null) {
if
(!Commands.peerSupportsGetLastMessageId(cnx.getRemoteEndpointProtocolVersion()))
{
future.completeExceptionally(
- new PulsarClientException.NotSupportedException(
- String.format(
- "The command `GetLastMessageId` is not
supported for the protocol version %d. "
- + "The consumer is %s, topic
%s, subscription %s",
- cnx.getRemoteEndpointProtocolVersion(),
- consumerName, topicName.toString(),
subscription)));
+ new PulsarClientException.NotSupportedException(
+ String.format("The command `GetLastMessageId` is not
supported for the protocol version %d. "
+ + "The consumer is %s, topic %s,
subscription %s",
+ cnx.getRemoteEndpointProtocolVersion(),
+ consumerName, topicName.toString(),
subscription)));
return;
}
@@ -2410,31 +2399,31 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
}
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Successfully getLastMessageId {}:{}",
- topic, subscription, lastMessageId.getLedgerId(),
lastMessageId.getEntryId());
+ topic, subscription, lastMessageId.getLedgerId(),
lastMessageId.getEntryId());
}
MessageId lastMsgId = lastMessageId.getBatchIndex() <= 0
? new MessageIdImpl(lastMessageId.getLedgerId(),
- lastMessageId.getEntryId(),
lastMessageId.getPartition())
+ lastMessageId.getEntryId(),
lastMessageId.getPartition())
: new BatchMessageIdImpl(lastMessageId.getLedgerId(),
lastMessageId.getEntryId(),
- lastMessageId.getPartition(),
lastMessageId.getBatchIndex());
+ lastMessageId.getPartition(),
lastMessageId.getBatchIndex());
future.complete(new GetLastMessageIdResponse(lastMsgId,
markDeletePosition));
}).exceptionally(e -> {
log.error("[{}][{}] Failed getLastMessageId command", topic,
subscription);
future.completeExceptionally(
- PulsarClientException.wrap(e.getCause(),
- String.format("The subscription %s of the
topic %s gets the last message id was failed",
- subscription, topicName.toString())));
+ PulsarClientException.wrap(e.getCause(),
+ String.format("The subscription %s of the topic %s
gets the last message id was failed",
+ subscription, topicName.toString())));
return null;
});
} else {
long nextDelay = Math.min(backoff.next(), remainingTime.get());
if (nextDelay <= 0) {
future.completeExceptionally(
- new PulsarClientException.TimeoutException(
- String.format("The subscription %s of the
topic %s could not get the last message id "
- + "withing configured timeout",
subscription, topicName.toString())));
+ new PulsarClientException.TimeoutException(
+ String.format("The subscription %s of the topic %s
could not get the last message id "
+ + "withing configured timeout", subscription,
topicName.toString())));
return;
}
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
index ab11675f543..29d180f5f9a 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
@@ -19,7 +19,6 @@
package org.apache.pulsar.client.impl;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
@@ -27,9 +26,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-import static org.testng.Assert.assertTrue;
-import io.netty.buffer.ByteBuf;
+
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
@@ -262,26 +259,4 @@ public class ConsumerImplTest {
assertThat(consumer.getPriorityLevel()).isEqualTo(1);
}
-
- @Test(invocationTimeOut = 1000)
- public void testSeekAsyncInternal() {
- // given
- ClientCnx cnx = mock(ClientCnx.class);
- CompletableFuture<ProducerResponse> clientReq = new
CompletableFuture<>();
- when(cnx.sendRequestWithId(any(ByteBuf.class),
anyLong())).thenReturn(clientReq);
-
- consumer.setClientCnx(cnx);
- consumer.setState(HandlerState.State.Ready);
-
- // when
- CompletableFuture<Void> firstResult = consumer.seekAsync(1L);
- CompletableFuture<Void> secondResult = consumer.seekAsync(1L);
-
- clientReq.complete(null);
-
- // then
- assertTrue(firstResult.isDone());
- assertTrue(secondResult.isCancelled());
- verify(cnx, times(1)).sendRequestWithId(any(ByteBuf.class), anyLong());
- }
}