This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 149deaa5a79 [fix][client] Fix wrong results of hasMessageAvailable and
readNext after seeking by timestamp (#22363)
149deaa5a79 is described below
commit 149deaa5a79ed8570489bead4215ae213a4e9206
Author: Yunze Xu <[email protected]>
AuthorDate: Wed Mar 27 19:49:27 2024 +0800
[fix][client] Fix wrong results of hasMessageAvailable and readNext after
seeking by timestamp (#22363)
Co-authored-by: Lari Hotari <[email protected]>
---
.../org/apache/pulsar/client/impl/ReaderTest.java | 84 +++++++++++++++++++---
.../apache/pulsar/client/impl/ConsumerImpl.java | 25 ++++---
2 files changed, 89 insertions(+), 20 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
index cee3ea09968..2d3e8d4c6e9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
@@ -66,8 +66,8 @@ import org.apache.pulsar.common.util.Murmur3_32Hash;
import org.apache.pulsar.schema.Schemas;
import org.awaitility.Awaitility;
import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@@ -77,7 +77,7 @@ public class ReaderTest extends MockedPulsarServiceBaseTest {
private static final String subscription = "reader-sub";
- @BeforeMethod
+ @BeforeClass(alwaysRun = true)
@Override
protected void setup() throws Exception {
super.internalSetup();
@@ -89,7 +89,7 @@ public class ReaderTest extends MockedPulsarServiceBaseTest {
admin.namespaces().createNamespace("my-property/my-ns",
Sets.newHashSet("test"));
}
- @AfterMethod(alwaysRun = true)
+ @AfterClass(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
@@ -198,21 +198,41 @@ public class ReaderTest extends
MockedPulsarServiceBaseTest {
testReadMessages(topic, true);
}
- @Test
- public void testReadMessageWithBatchingWithMessageInclusive() throws
Exception {
+ @DataProvider
+ public static Object[][] seekBeforeHasMessageAvailable() {
+ return new Object[][] { { true }, { false } };
+ }
+
+ @Test(timeOut = 20000, dataProvider = "seekBeforeHasMessageAvailable")
+ public void testReadMessageWithBatchingWithMessageInclusive(boolean
seekBeforeHasMessageAvailable)
+ throws Exception {
String topic =
"persistent://my-property/my-ns/my-reader-topic-with-batching-inclusive";
Set<String> keys = publishMessages(topic, 10, true);
Reader<byte[]> reader =
pulsarClient.newReader().topic(topic).startMessageId(MessageId.latest)
.startMessageIdInclusive().readerName(subscription).create();
- while (reader.hasMessageAvailable()) {
- Assert.assertTrue(keys.remove(reader.readNext().getKey()));
+ if (seekBeforeHasMessageAvailable) {
+ reader.seek(0L); // it should seek to the earliest
}
+
+ assertTrue(reader.hasMessageAvailable());
+ final Message<byte[]> msg = reader.readNext();
+ assertTrue(keys.remove(msg.getKey()));
// start from latest with start message inclusive should only read the
last message in batch
assertEquals(keys.size(), 9);
- Assert.assertFalse(keys.contains("key9"));
- Assert.assertFalse(reader.hasMessageAvailable());
+
+ final MessageIdAdv msgId = (MessageIdAdv) msg.getMessageId();
+ if (seekBeforeHasMessageAvailable) {
+ assertEquals(msgId.getBatchIndex(), 0);
+ assertFalse(keys.contains("key0"));
+ assertTrue(reader.hasMessageAvailable());
+ } else {
+ assertEquals(msgId.getBatchIndex(), 9);
+ assertFalse(reader.hasMessageAvailable());
+ assertFalse(keys.contains("key9"));
+ assertFalse(reader.hasMessageAvailable());
+ }
}
private void testReadMessages(String topic, boolean enableBatch) throws
Exception {
@@ -310,7 +330,7 @@ public class ReaderTest extends MockedPulsarServiceBaseTest
{
@Test
public void testReaderWithTimeLong() throws Exception {
String ns = "my-property/my-ns";
- String topic = "persistent://" + ns + "/testReadFromPartition";
+ String topic = "persistent://" + ns + "/testReaderWithTimeLong";
RetentionPolicies retention = new RetentionPolicies(-1, -1);
admin.namespaces().setRetention(ns, retention);
@@ -840,4 +860,46 @@ public class ReaderTest extends
MockedPulsarServiceBaseTest {
producer.send("msg");
assertTrue(reader.hasMessageAvailable());
}
+
+ @Test(dataProvider = "initializeLastMessageIdInBroker")
+ public void testHasMessageAvailableAfterSeekTimestamp(boolean
initializeLastMessageIdInBroker) throws Exception {
+ final String topic =
"persistent://my-property/my-ns/test-has-message-available-after-seek-timestamp";
+
+ @Cleanup Producer<String> producer =
pulsarClient.newProducer(Schema.STRING).topic(topic).create();
+ final long timestampBeforeSend = System.currentTimeMillis();
+ final MessageId sentMsgId = producer.send("msg");
+
+ final List<MessageId> messageIds = new ArrayList<>();
+ messageIds.add(MessageId.earliest);
+ messageIds.add(sentMsgId);
+ messageIds.add(MessageId.latest);
+
+ for (MessageId messageId : messageIds) {
+ @Cleanup Reader<String> reader =
pulsarClient.newReader(Schema.STRING).topic(topic).receiverQueueSize(1)
+ .startMessageId(messageId).create();
+ if (initializeLastMessageIdInBroker) {
+ if (messageId == MessageId.earliest) {
+ assertTrue(reader.hasMessageAvailable());
+ } else {
+ assertFalse(reader.hasMessageAvailable());
+ }
+ } // else: lastMessageIdInBroker is earliest
+ reader.seek(System.currentTimeMillis());
+ assertFalse(reader.hasMessageAvailable());
+ }
+
+ for (MessageId messageId : messageIds) {
+ @Cleanup Reader<String> reader =
pulsarClient.newReader(Schema.STRING).topic(topic).receiverQueueSize(1)
+ .startMessageId(messageId).create();
+ if (initializeLastMessageIdInBroker) {
+ if (messageId == MessageId.earliest) {
+ assertTrue(reader.hasMessageAvailable());
+ } else {
+ assertFalse(reader.hasMessageAvailable());
+ }
+ } // else: lastMessageIdInBroker is earliest
+ reader.seek(timestampBeforeSend);
+ assertTrue(reader.hasMessageAvailable());
+ }
+ }
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 6c2ded819a5..5a0e5de330d 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -218,6 +218,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
private final AtomicReference<ClientCnx>
clientCnxUsedForConsumerRegistration = new AtomicReference<>();
private final List<Throwable> previousExceptions = new
CopyOnWriteArrayList<Throwable>();
+ private volatile boolean hasSoughtByTimestamp = false;
static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client,
String topic,
ConsumerConfigurationData<T>
conf,
@@ -2252,7 +2253,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
new PulsarClientException("Only support seek by messageId or
timestamp"));
}
- private CompletableFuture<Void> seekAsyncInternal(long requestId, ByteBuf
seek, MessageId seekId, String seekBy) {
+ private CompletableFuture<Void> seekAsyncInternal(long requestId, ByteBuf
seek, MessageId seekId,
+ Long seekTimestamp,
String seekBy) {
AtomicLong opTimeoutMs = new
AtomicLong(client.getConfiguration().getOperationTimeoutMs());
Backoff backoff = new BackoffBuilder()
.setInitialTime(100, TimeUnit.MILLISECONDS)
@@ -2269,11 +2271,11 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
return FutureUtil.failedFuture(new IllegalStateException(message));
}
seekFuture = new CompletableFuture<>();
- seekAsyncInternal(requestId, seek, seekId, seekBy, backoff,
opTimeoutMs);
+ seekAsyncInternal(requestId, seek, seekId, seekTimestamp, seekBy,
backoff, opTimeoutMs);
return seekFuture;
}
- private void seekAsyncInternal(long requestId, ByteBuf seek, MessageId
seekId, String seekBy,
+ private void seekAsyncInternal(long requestId, ByteBuf seek, MessageId
seekId, Long seekTimestamp, String seekBy,
final Backoff backoff, final AtomicLong
remainingTime) {
ClientCnx cnx = cnx();
if (isConnected() && cnx != null) {
@@ -2281,6 +2283,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
seekMessageId = (MessageIdAdv) seekId;
log.info("[{}][{}] Seeking subscription to {}", topic,
subscription, seekBy);
+ final boolean originalHasSoughtByTimestamp = hasSoughtByTimestamp;
+ hasSoughtByTimestamp = (seekTimestamp != null);
cnx.sendRequestWithId(seek, requestId).thenRun(() -> {
log.info("[{}][{}] Successfully reset subscription to {}",
topic, subscription, seekBy);
acknowledgmentsGroupingTracker.flushAndClean();
@@ -2304,6 +2308,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
}
}).exceptionally(e -> {
seekMessageId = originSeekMessageId;
+ hasSoughtByTimestamp = originalHasSoughtByTimestamp;
log.error("[{}][{}] Failed to reset subscription: {}", topic,
subscription, e.getCause().getMessage());
failSeek(
@@ -2326,7 +2331,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
log.warn("[{}] [{}] Could not get connection while seek --
Will try again in {} ms",
topic, getHandlerName(), nextDelay);
remainingTime.addAndGet(-nextDelay);
- seekAsyncInternal(requestId, seek, seekId, seekBy, backoff,
remainingTime);
+ seekAsyncInternal(requestId, seek, seekId, seekTimestamp,
seekBy, backoff, remainingTime);
}, nextDelay, TimeUnit.MILLISECONDS);
}
}
@@ -2343,7 +2348,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
String seekBy = String.format("the timestamp %d", timestamp);
long requestId = client.newRequestId();
return seekAsyncInternal(requestId, Commands.newSeek(consumerId,
requestId, timestamp),
- MessageId.earliest, seekBy);
+ MessageId.earliest, timestamp, seekBy);
}
@Override
@@ -2369,7 +2374,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
}
seek = Commands.newSeek(consumerId, requestId,
msgId.getLedgerId(), msgId.getEntryId(), ackSetArr);
}
- return seekAsyncInternal(requestId, seek, messageId, seekBy);
+ return seekAsyncInternal(requestId, seek, messageId, null, seekBy);
}
public boolean hasMessageAvailable() throws PulsarClientException {
@@ -2389,13 +2394,15 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
// we haven't read yet. use startMessageId for comparison
if (lastDequeuedMessageId == MessageId.earliest) {
+ // If the last seek is called with timestamp, startMessageId
cannot represent the position to start, so we
+ // have to get the mark-delete position from the GetLastMessageId
response to compare as well.
// if we are starting from latest, we should seek to the actual
last message first.
// allow the last one to be read when read head inclusively.
- if (MessageId.latest.equals(startMessageId)) {
-
+ final boolean hasSoughtByTimestamp = this.hasSoughtByTimestamp;
+ if (MessageId.latest.equals(startMessageId) ||
hasSoughtByTimestamp) {
CompletableFuture<GetLastMessageIdResponse> future =
internalGetLastMessageIdAsync();
// if the consumer is configured to read inclusive then we
need to seek to the last message
- if (resetIncludeHead) {
+ if (resetIncludeHead && !hasSoughtByTimestamp) {
future = future.thenCompose((lastMessageIdResponse) ->
seekAsync(lastMessageIdResponse.lastMessageId)
.thenApply((ignore) ->
lastMessageIdResponse));