This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 318ff3398d39fa16010a664c747adbe77fd44d20 Author: Yunze Xu <[email protected]> AuthorDate: Thu Mar 7 11:32:18 2024 +0800 [fix][client] fix Reader.hasMessageAvailable might return true after seeking to latest (#22201) (cherry picked from commit 95a53f3a033c4e57db2ddb2d1f0e9a4bc8b9f441) --- .../org/apache/pulsar/client/impl/ReaderTest.java | 27 ++++++ .../apache/pulsar/client/impl/ConsumerImpl.java | 104 ++++++++++++++------- .../pulsar/client/impl/ConsumerImplTest.java | 2 +- 3 files changed, 96 insertions(+), 37 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java index 2f91d792581..d511c6dc37f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java @@ -66,6 +66,7 @@ import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @Slf4j @@ -761,4 +762,30 @@ public class ReaderTest extends MockedPulsarServiceBaseTest { producer.close(); admin.topics().delete(topic, false); } + + @DataProvider + public static Object[][] initializeLastMessageIdInBroker() { + return new Object[][] { { true }, { false } }; + } + + @Test(dataProvider = "initializeLastMessageIdInBroker") + public void testHasMessageAvailableAfterSeek(boolean initializeLastMessageIdInBroker) throws Exception { + final String topic = "persistent://my-property/my-ns/test-has-message-available-after-seek"; + @Cleanup Reader<String> reader = pulsarClient.newReader(Schema.STRING).topic(topic).receiverQueueSize(1) + .startMessageId(MessageId.earliest).create(); + + @Cleanup Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create(); + producer.send("msg"); + + if (initializeLastMessageIdInBroker) { + assertTrue(reader.hasMessageAvailable()); + } // else: lastMessageIdInBroker is earliest + + reader.seek(MessageId.latest); + // lastMessageIdInBroker is the last message ID, while startMessageId is still earliest + assertFalse(reader.hasMessageAvailable()); + + producer.send("msg"); + assertTrue(reader.hasMessageAvailable()); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 717355abbef..dae53c59d6e 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -166,7 +166,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle private volatile MessageIdAdv startMessageId; private volatile MessageIdAdv seekMessageId; - private final AtomicBoolean duringSeek; + @VisibleForTesting + final AtomicReference<SeekStatus> seekStatus; + private volatile CompletableFuture<Void> seekFuture; private final MessageIdAdv initialStartMessageId; @@ -296,7 +298,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle stats = ConsumerStatsDisabled.INSTANCE; } - duringSeek = new AtomicBoolean(false); + seekStatus = new AtomicReference<>(SeekStatus.NOT_STARTED); // Create msgCrypto if not created already if (conf.getCryptoKeyReader() != null) { @@ -771,7 +773,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle closeConsumerTasks(); deregisterFromClientCnx(); client.cleanupConsumer(this); - clearReceiverQueue(); + clearReceiverQueue(false); return CompletableFuture.completedFuture(null); } @@ -779,7 +781,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle topic, subscription, cnx.ctx().channel(), consumerId); long requestId = client.newRequestId(); - if (duringSeek.get()) { + if (seekStatus.get() != SeekStatus.NOT_STARTED) { acknowledgmentsGroupingTracker.flushAndClean(); } @@ -790,7 +792,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle int currentSize; synchronized (this) { currentSize = incomingMessages.size(); - startMessageId = clearReceiverQueue(); + setClientCnx(cnx); + clearReceiverQueue(true); if (possibleSendToDeadLetterTopicMessages != null) { possibleSendToDeadLetterTopicMessages.clear(); } @@ -828,7 +831,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle // synchronized this, because redeliverUnAckMessage eliminate the epoch inconsistency between them final CompletableFuture<Void> future = new CompletableFuture<>(); synchronized (this) { - setClientCnx(cnx); ByteBuf request = Commands.newSubscribe(topic, subscription, consumerId, requestId, getSubType(), priorityLevel, consumerName, isDurable, startMessageIdData, metadata, readCompacted, conf.isReplicateSubscriptionState(), @@ -933,15 +935,24 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle * Clear the internal receiver queue and returns the message id of what was the 1st message in the queue that was * not seen by the application. */ - private MessageIdAdv clearReceiverQueue() { + private void clearReceiverQueue(boolean updateStartMessageId) { List<Message<?>> currentMessageQueue = new ArrayList<>(incomingMessages.size()); incomingMessages.drainTo(currentMessageQueue); resetIncomingMessageSize(); - if (duringSeek.compareAndSet(true, false)) { - return seekMessageId; + CompletableFuture<Void> seekFuture = this.seekFuture; + MessageIdAdv seekMessageId = this.seekMessageId; + + if (seekStatus.get() != SeekStatus.NOT_STARTED) { + if (updateStartMessageId) { + startMessageId = seekMessageId; + } + if (seekStatus.compareAndSet(SeekStatus.COMPLETED, SeekStatus.NOT_STARTED)) { + internalPinnedExecutor.execute(() -> seekFuture.complete(null)); + } + return; } else if (subscriptionMode == SubscriptionMode.Durable) { - return startMessageId; + return; } if (!currentMessageQueue.isEmpty()) { @@ -958,15 +969,14 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle } // release messages if they are pooled messages currentMessageQueue.forEach(Message::release); - return previousMessage; - } else if (!lastDequeuedMessageId.equals(MessageId.earliest)) { + if (updateStartMessageId) { + startMessageId = previousMessage; + } + } else if (updateStartMessageId && !lastDequeuedMessageId.equals(MessageId.earliest)) { // If the queue was empty we need to restart from the message just after the last one that has been dequeued // in the past - return new BatchMessageIdImpl((MessageIdImpl) lastDequeuedMessageId); - } else { - // No message was received or dequeued by this consumer. Next message would still be the startMessageId - return startMessageId; - } + startMessageId = new BatchMessageIdImpl((MessageIdImpl) lastDequeuedMessageId); + } // else: No message was received or dequeued by this consumer. Next message would still be the startMessageId } /** @@ -2233,25 +2243,23 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle .setMandatoryStop(0, TimeUnit.MILLISECONDS) .create(); - CompletableFuture<Void> seekFuture = new CompletableFuture<>(); - seekAsyncInternal(requestId, seek, seekId, seekBy, backoff, opTimeoutMs, seekFuture); + if (!seekStatus.compareAndSet(SeekStatus.NOT_STARTED, SeekStatus.IN_PROGRESS)) { + final String message = String.format( + "[%s][%s] attempting to seek operation that is already in progress (seek by %s)", + topic, subscription, seekBy); + log.warn("[{}][{}] Attempting to seek operation that is already in progress, cancelling {}", + topic, subscription, seekBy); + return FutureUtil.failedFuture(new IllegalStateException(message)); + } + seekFuture = new CompletableFuture<>(); + seekAsyncInternal(requestId, seek, seekId, seekBy, backoff, opTimeoutMs); return seekFuture; } private void seekAsyncInternal(long requestId, ByteBuf seek, MessageId seekId, String seekBy, - final Backoff backoff, final AtomicLong remainingTime, - CompletableFuture<Void> seekFuture) { + final Backoff backoff, final AtomicLong remainingTime) { ClientCnx cnx = cnx(); if (isConnected() && cnx != null) { - if (!duringSeek.compareAndSet(false, true)) { - final String message = String.format( - "[%s][%s] attempting to seek operation that is already in progress (seek by %s)", - topic, subscription, seekBy); - log.warn("[{}][{}] Attempting to seek operation that is already in progress, cancelling {}", - topic, subscription, seekBy); - seekFuture.completeExceptionally(new IllegalStateException(message)); - return; - } MessageIdAdv originSeekMessageId = seekMessageId; seekMessageId = (MessageIdAdv) seekId; log.info("[{}][{}] Seeking subscription to {}", topic, subscription, seekBy); @@ -2263,14 +2271,25 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle lastDequeuedMessageId = MessageId.earliest; clearIncomingMessages(); - seekFuture.complete(null); + CompletableFuture<Void> future = null; + synchronized (this) { + if (!hasParentConsumer && cnx() == null) { + // It's during reconnection, complete the seek future after connection is established + seekStatus.set(SeekStatus.COMPLETED); + } else { + future = seekFuture; + startMessageId = seekMessageId; + seekStatus.set(SeekStatus.NOT_STARTED); + } + } + if (future != null) { + future.complete(null); + } }).exceptionally(e -> { - // re-set duringSeek and seekMessageId if seek failed seekMessageId = originSeekMessageId; - duringSeek.set(false); log.error("[{}][{}] Failed to reset subscription: {}", topic, subscription, e.getCause().getMessage()); - seekFuture.completeExceptionally( + failSeek( PulsarClientException.wrap(e.getCause(), String.format("Failed to seek the subscription %s of the topic %s to %s", subscription, topicName.toString(), seekBy))); @@ -2279,7 +2298,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle } else { long nextDelay = Math.min(backoff.next(), remainingTime.get()); if (nextDelay <= 0) { - seekFuture.completeExceptionally( + failSeek( new PulsarClientException.TimeoutException( String.format("The subscription %s of the topic %s could not seek " + "withing configured timeout", subscription, topicName.toString()))); @@ -2290,11 +2309,18 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle log.warn("[{}] [{}] Could not get connection while seek -- Will try again in {} ms", topic, getHandlerName(), nextDelay); remainingTime.addAndGet(-nextDelay); - seekAsyncInternal(requestId, seek, seekId, seekBy, backoff, remainingTime, seekFuture); + seekAsyncInternal(requestId, seek, seekId, seekBy, backoff, remainingTime); }, nextDelay, TimeUnit.MILLISECONDS); } } + private void failSeek(Throwable throwable) { + CompletableFuture<Void> seekFuture = this.seekFuture; + if (seekStatus.compareAndSet(SeekStatus.IN_PROGRESS, SeekStatus.NOT_STARTED)) { + seekFuture.completeExceptionally(throwable); + } + } + @Override public CompletableFuture<Void> seekAsync(long timestamp) { String seekBy = String.format("the timestamp %d", timestamp); @@ -2952,4 +2978,10 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle private static final Logger log = LoggerFactory.getLogger(ConsumerImpl.class); + @VisibleForTesting + enum SeekStatus { + NOT_STARTED, + IN_PROGRESS, + COMPLETED + } } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java index 070919c57a4..9995246c175 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java @@ -283,6 +283,7 @@ public class ConsumerImplTest { consumer.setClientCnx(cnx); consumer.setState(HandlerState.State.Ready); + consumer.seekStatus.set(ConsumerImpl.SeekStatus.NOT_STARTED); // when CompletableFuture<Void> firstResult = consumer.seekAsync(1L); @@ -290,7 +291,6 @@ public class ConsumerImplTest { clientReq.complete(null); - // then assertTrue(firstResult.isDone()); assertTrue(secondResult.isCompletedExceptionally()); verify(cnx, times(1)).sendRequestWithId(any(ByteBuf.class), anyLong());
