This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 1045f8be626981c818c6eddf30f5732d25dbac66
Author: Yunze Xu <[email protected]>
AuthorDate: Wed Mar 27 19:49:27 2024 +0800

    [fix][client] Fix wrong results of hasMessageAvailable and readNext after 
seeking by timestamp (#22363)
    
    Co-authored-by: Lari Hotari <[email protected]>
    (cherry picked from commit 149deaa5a79ed8570489bead4215ae213a4e9206)
---
 .../org/apache/pulsar/client/impl/ReaderTest.java  | 85 +++++++++++++++++++---
 .../apache/pulsar/client/impl/ConsumerImpl.java    | 25 ++++---
 2 files changed, 90 insertions(+), 20 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
index d511c6dc37f..00c3eadb06a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
@@ -44,6 +44,7 @@ import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageIdAdv;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
@@ -64,8 +65,8 @@ import org.apache.pulsar.common.util.Murmur3_32Hash;
 import org.apache.pulsar.schema.Schemas;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
@@ -75,7 +76,7 @@ public class ReaderTest extends MockedPulsarServiceBaseTest {
 
     private static final String subscription = "reader-sub";
 
-    @BeforeMethod
+    @BeforeClass(alwaysRun = true)
     @Override
     protected void setup() throws Exception {
         super.internalSetup();
@@ -87,7 +88,7 @@ public class ReaderTest extends MockedPulsarServiceBaseTest {
         admin.namespaces().createNamespace("my-property/my-ns", 
Sets.newHashSet("test"));
     }
 
-    @AfterMethod(alwaysRun = true)
+    @AfterClass(alwaysRun = true)
     @Override
     protected void cleanup() throws Exception {
         super.internalCleanup();
@@ -146,21 +147,41 @@ public class ReaderTest extends 
MockedPulsarServiceBaseTest {
         testReadMessages(topic, true);
     }
 
-    @Test
-    public void testReadMessageWithBatchingWithMessageInclusive() throws 
Exception {
+    @DataProvider
+    public static Object[][] seekBeforeHasMessageAvailable() {
+        return new Object[][] { { true }, { false } };
+    }
+
+    @Test(timeOut = 20000, dataProvider = "seekBeforeHasMessageAvailable")
+    public void testReadMessageWithBatchingWithMessageInclusive(boolean 
seekBeforeHasMessageAvailable)
+            throws Exception {
         String topic = 
"persistent://my-property/my-ns/my-reader-topic-with-batching-inclusive";
         Set<String> keys = publishMessages(topic, 10, true);
 
         Reader<byte[]> reader = 
pulsarClient.newReader().topic(topic).startMessageId(MessageId.latest)
                                             
.startMessageIdInclusive().readerName(subscription).create();
 
-        while (reader.hasMessageAvailable()) {
-            Assert.assertTrue(keys.remove(reader.readNext().getKey()));
+        if (seekBeforeHasMessageAvailable) {
+            reader.seek(0L); // it should seek to the earliest
         }
+
+        assertTrue(reader.hasMessageAvailable());
+        final Message<byte[]> msg = reader.readNext();
+        assertTrue(keys.remove(msg.getKey()));
         // start from latest with start message inclusive should only read the 
last message in batch
         assertEquals(keys.size(), 9);
-        Assert.assertFalse(keys.contains("key9"));
-        Assert.assertFalse(reader.hasMessageAvailable());
+
+        final MessageIdAdv msgId = (MessageIdAdv) msg.getMessageId();
+        if (seekBeforeHasMessageAvailable) {
+            assertEquals(msgId.getBatchIndex(), 0);
+            assertFalse(keys.contains("key0"));
+            assertTrue(reader.hasMessageAvailable());
+        } else {
+            assertEquals(msgId.getBatchIndex(), 9);
+            assertFalse(reader.hasMessageAvailable());
+            assertFalse(keys.contains("key9"));
+            assertFalse(reader.hasMessageAvailable());
+        }
     }
 
     private void testReadMessages(String topic, boolean enableBatch) throws 
Exception {
@@ -258,7 +279,7 @@ public class ReaderTest extends MockedPulsarServiceBaseTest 
{
     @Test
     public void testReaderWithTimeLong() throws Exception {
         String ns = "my-property/my-ns";
-        String topic = "persistent://" + ns + "/testReadFromPartition";
+        String topic = "persistent://" + ns + "/testReaderWithTimeLong";
         RetentionPolicies retention = new RetentionPolicies(-1, -1);
         admin.namespaces().setRetention(ns, retention);
 
@@ -788,4 +809,46 @@ public class ReaderTest extends 
MockedPulsarServiceBaseTest {
         producer.send("msg");
         assertTrue(reader.hasMessageAvailable());
     }
+
+    @Test(dataProvider = "initializeLastMessageIdInBroker")
+    public void testHasMessageAvailableAfterSeekTimestamp(boolean 
initializeLastMessageIdInBroker) throws Exception {
+        final String topic = 
"persistent://my-property/my-ns/test-has-message-available-after-seek-timestamp";
+
+        @Cleanup Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING).topic(topic).create();
+        final long timestampBeforeSend = System.currentTimeMillis();
+        final MessageId sentMsgId = producer.send("msg");
+
+        final List<MessageId> messageIds = new ArrayList<>();
+        messageIds.add(MessageId.earliest);
+        messageIds.add(sentMsgId);
+        messageIds.add(MessageId.latest);
+
+        for (MessageId messageId : messageIds) {
+            @Cleanup Reader<String> reader = 
pulsarClient.newReader(Schema.STRING).topic(topic).receiverQueueSize(1)
+                    .startMessageId(messageId).create();
+            if (initializeLastMessageIdInBroker) {
+                if (messageId == MessageId.earliest) {
+                    assertTrue(reader.hasMessageAvailable());
+                } else {
+                    assertFalse(reader.hasMessageAvailable());
+                }
+            } // else: lastMessageIdInBroker is earliest
+            reader.seek(System.currentTimeMillis());
+            assertFalse(reader.hasMessageAvailable());
+        }
+
+        for (MessageId messageId : messageIds) {
+            @Cleanup Reader<String> reader = 
pulsarClient.newReader(Schema.STRING).topic(topic).receiverQueueSize(1)
+                    .startMessageId(messageId).create();
+            if (initializeLastMessageIdInBroker) {
+                if (messageId == MessageId.earliest) {
+                    assertTrue(reader.hasMessageAvailable());
+                } else {
+                    assertFalse(reader.hasMessageAvailable());
+                }
+            } // else: lastMessageIdInBroker is earliest
+            reader.seek(timestampBeforeSend);
+            assertTrue(reader.hasMessageAvailable());
+        }
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index dae53c59d6e..468b5f7a8b8 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -217,6 +217,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
 
     private final AtomicReference<ClientCnx> 
clientCnxUsedForConsumerRegistration = new AtomicReference<>();
     private final List<Throwable> previousExceptions = new 
CopyOnWriteArrayList<Throwable>();
+    private volatile boolean hasSoughtByTimestamp = false;
     static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client,
                                                String topic,
                                                ConsumerConfigurationData<T> 
conf,
@@ -2235,7 +2236,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                 new PulsarClientException("Only support seek by messageId or 
timestamp"));
     }
 
-    private CompletableFuture<Void> seekAsyncInternal(long requestId, ByteBuf 
seek, MessageId seekId, String seekBy) {
+    private CompletableFuture<Void> seekAsyncInternal(long requestId, ByteBuf 
seek, MessageId seekId,
+                                                      Long seekTimestamp, 
String seekBy) {
         AtomicLong opTimeoutMs = new 
AtomicLong(client.getConfiguration().getOperationTimeoutMs());
         Backoff backoff = new BackoffBuilder()
                 .setInitialTime(100, TimeUnit.MILLISECONDS)
@@ -2252,11 +2254,11 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
             return FutureUtil.failedFuture(new IllegalStateException(message));
         }
         seekFuture = new CompletableFuture<>();
-        seekAsyncInternal(requestId, seek, seekId, seekBy, backoff, 
opTimeoutMs);
+        seekAsyncInternal(requestId, seek, seekId, seekTimestamp, seekBy, 
backoff, opTimeoutMs);
         return seekFuture;
     }
 
-    private void seekAsyncInternal(long requestId, ByteBuf seek, MessageId 
seekId, String seekBy,
+    private void seekAsyncInternal(long requestId, ByteBuf seek, MessageId 
seekId, Long seekTimestamp, String seekBy,
                                    final Backoff backoff, final AtomicLong 
remainingTime) {
         ClientCnx cnx = cnx();
         if (isConnected() && cnx != null) {
@@ -2264,6 +2266,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
             seekMessageId = (MessageIdAdv) seekId;
             log.info("[{}][{}] Seeking subscription to {}", topic, 
subscription, seekBy);
 
+            final boolean originalHasSoughtByTimestamp = hasSoughtByTimestamp;
+            hasSoughtByTimestamp = (seekTimestamp != null);
             cnx.sendRequestWithId(seek, requestId).thenRun(() -> {
                 log.info("[{}][{}] Successfully reset subscription to {}", 
topic, subscription, seekBy);
                 acknowledgmentsGroupingTracker.flushAndClean();
@@ -2287,6 +2291,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                 }
             }).exceptionally(e -> {
                 seekMessageId = originSeekMessageId;
+                hasSoughtByTimestamp = originalHasSoughtByTimestamp;
                 log.error("[{}][{}] Failed to reset subscription: {}", topic, 
subscription, e.getCause().getMessage());
 
                 failSeek(
@@ -2309,7 +2314,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                 log.warn("[{}] [{}] Could not get connection while seek -- 
Will try again in {} ms",
                         topic, getHandlerName(), nextDelay);
                 remainingTime.addAndGet(-nextDelay);
-                seekAsyncInternal(requestId, seek, seekId, seekBy, backoff, 
remainingTime);
+                seekAsyncInternal(requestId, seek, seekId, seekTimestamp, 
seekBy, backoff, remainingTime);
             }, nextDelay, TimeUnit.MILLISECONDS);
         }
     }
@@ -2326,7 +2331,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         String seekBy = String.format("the timestamp %d", timestamp);
         long requestId = client.newRequestId();
         return seekAsyncInternal(requestId, Commands.newSeek(consumerId, 
requestId, timestamp),
-                MessageId.earliest, seekBy);
+                MessageId.earliest, timestamp, seekBy);
     }
 
     @Override
@@ -2352,7 +2357,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
             }
             seek = Commands.newSeek(consumerId, requestId, 
msgId.getLedgerId(), msgId.getEntryId(), ackSetArr);
         }
-        return seekAsyncInternal(requestId, seek, messageId, seekBy);
+        return seekAsyncInternal(requestId, seek, messageId, null, seekBy);
     }
 
     public boolean hasMessageAvailable() throws PulsarClientException {
@@ -2372,13 +2377,15 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
 
         // we haven't read yet. use startMessageId for comparison
         if (lastDequeuedMessageId == MessageId.earliest) {
+            // If the last seek is called with timestamp, startMessageId 
cannot represent the position to start, so we
+            // have to get the mark-delete position from the GetLastMessageId 
response to compare as well.
             // if we are starting from latest, we should seek to the actual 
last message first.
             // allow the last one to be read when read head inclusively.
-            if (MessageId.latest.equals(startMessageId)) {
-
+            final boolean hasSoughtByTimestamp = this.hasSoughtByTimestamp;
+            if (MessageId.latest.equals(startMessageId) || 
hasSoughtByTimestamp) {
                 CompletableFuture<GetLastMessageIdResponse> future = 
internalGetLastMessageIdAsync();
                 // if the consumer is configured to read inclusive then we 
need to seek to the last message
-                if (resetIncludeHead) {
+                if (resetIncludeHead && !hasSoughtByTimestamp) {
                     future = future.thenCompose((lastMessageIdResponse) ->
                             seekAsync(lastMessageIdResponse.lastMessageId)
                                     .thenApply((ignore) -> 
lastMessageIdResponse));

Reply via email to