This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d49415e Return message ID from compacted ledger while the compaction
cursor reach the end of the topic (#13533)
d49415e is described below
commit d49415e5a558ea9a82c93e55e77b2c3542eacb10
Author: lipenghui <[email protected]>
AuthorDate: Wed Dec 29 13:26:57 2021 +0800
Return message ID from compacted ledger while the compaction cursor reach
the end of the topic (#13533)
### Motivation
The problem happens when the compaction cursor reaches the end of the topic
but the tail messages
of the topic have been removed by producer writes null value messages
during the topic compaction.
For example:
- 5 messages in the original topic with key: 0,1,2,3,4
- the corresponding message IDs are: 1:0, 1:1, 1:2, 1:3, 1:4
- producer send null value messages for key 3 and 4
- trigger the topic compaction task
After the compaction task complete,
- 5 messages in the original topic: 1:0, 1:1, 1:2, 1:3, 1:4
- 3 messages in the compacted ledger: 1:0, 1:1, 1:2
At this moment, if the reader tries to get the last message ID of the topic,
we should return `1:2` not `1:4`, because the reader is not able to read
the message
with keys `3` and `4` from the compacted topic, otherwise, the
`reader.readNext()` method
will be blocked until a new message written to the topic.
### Modifications
The fix is straightforward, when the broker receives a get last message ID
request,
the broker will check if the compaction cursor reaches the end of the
original topic.
If yes, respond last message ID from the compacted ledger.
### Verifying this change
New test added `testHasMessageAvailableWithNullValueMessage` which ensure
the `hasMessageAvailable()`
return false no more messages from the compacted topic if the compaction
cursor reaches the end of the topic.
---
.../apache/pulsar/broker/service/ServerCnx.java | 5 ++-
.../apache/pulsar/compaction/CompactedTopic.java | 2 +
.../pulsar/compaction/CompactedTopicImpl.java | 6 +--
.../pulsar/compaction/CompactedTopicTest.java | 48 +++++++++++++++++++++-
4 files changed, 55 insertions(+), 6 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index deecb36..a7a7c2d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1729,7 +1729,10 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
ManagedLedgerImpl ml = (ManagedLedgerImpl)
persistentTopic.getManagedLedger();
// If it's not pointing to a valid entry, respond messageId of the
current position.
- if (lastPosition.getEntryId() == -1) {
+ // If the compaction cursor reach the end of the topic, respond
messageId from compacted ledger
+ Optional<Position> compactionHorizon =
persistentTopic.getCompactedTopic().getCompactionHorizon();
+ if (lastPosition.getEntryId() == -1 || (compactionHorizon.isPresent()
+ && lastPosition.compareTo((PositionImpl)
compactionHorizon.get()) <= 0)) {
handleLastMessageIdFromCompactedLedger(persistentTopic, requestId,
partitionIndex,
markDeletePosition);
return;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
index 31955a5..9e50fc0 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.compaction;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.Entry;
@@ -34,4 +35,5 @@ public interface CompactedTopic {
ReadEntriesCallback callback,
Consumer consumer);
CompletableFuture<Entry> readLastEntryOfCompactedLedger();
+ Optional<Position> getCompactionHorizon();
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
index a6d6fc9..aac213f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
@@ -20,7 +20,6 @@ package org.apache.pulsar.compaction;
import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.github.benmanes.caffeine.cache.Caffeine;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ComparisonChain;
import io.netty.buffer.ByteBuf;
import java.util.ArrayList;
@@ -311,9 +310,8 @@ public class CompactedTopicImpl implements CompactedTopic {
.compare(p.getEntryId(), m.getEntryId()).result();
}
- @VisibleForTesting
- PositionImpl getCompactionHorizon() {
- return this.compactionHorizon;
+ public synchronized Optional<Position> getCompactionHorizon() {
+ return Optional.ofNullable(this.compactionHorizon);
}
private static final Logger log =
LoggerFactory.getLogger(CompactedTopicImpl.class);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
index d44868a..4d00d28 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
@@ -254,7 +254,8 @@ public class CompactedTopicTest extends
MockedPulsarServiceBaseTest {
Assert.assertTrue(compactedTopic.getCompactedTopicContext().isPresent());
Assert.assertEquals(compactedTopic.getCompactedTopicContext().get().getLedger().getId(),
newCompactedLedger.getId());
- Assert.assertEquals(compactedTopic.getCompactionHorizon(), newHorizon);
+ Assert.assertTrue(compactedTopic.getCompactionHorizon().isPresent());
+ Assert.assertEquals(compactedTopic.getCompactionHorizon().get(),
newHorizon);
compactedTopic.deleteCompactedLedger(oldCompactedLedger.getId()).join();
// old ledger should be deleted, new still there
@@ -688,4 +689,49 @@ public class CompactedTopicTest extends
MockedPulsarServiceBaseTest {
Assert.assertFalse(reader.hasMessageAvailable());
}
+ @Test
+ public void testHasMessageAvailableWithNullValueMessage() throws Exception
{
+ String topic =
"persistent://my-property/use/my-ns/testHasMessageAvailable-" +
+ UUID.randomUUID();
+ final int numMessages = 10;
+ @Cleanup
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .topic(topic)
+ .blockIfQueueFull(true)
+ .enableBatching(false)
+ .create();
+ CompletableFuture<MessageId> lastMessage = null;
+ for (int i = 0; i < numMessages; ++i) {
+ lastMessage = producer.newMessage().key(i +
"").value(String.format("msg [%d]", i)).sendAsync();
+ }
+
+ for (int i = numMessages / 2; i < numMessages; ++i) {
+ lastMessage = producer.newMessage().key(i +
"").value(null).sendAsync();
+ }
+ producer.flush();
+ lastMessage.join();
+ admin.topics().triggerCompaction(topic);
+ Awaitility.await().untilAsserted(() -> {
+ PersistentTopicInternalStats stats =
admin.topics().getInternalStats(topic);
+ Assert.assertNotEquals(stats.compactedLedger.ledgerId, -1);
+ Assert.assertEquals(stats.compactedLedger.entries, numMessages /
2);
+ Assert.assertEquals(admin.topics().getStats(topic)
+
.getSubscriptions().get(COMPACTION_SUBSCRIPTION).getConsumers().size(), 0);
+ Assert.assertEquals(stats.lastConfirmedEntry,
stats.cursors.get(COMPACTION_SUBSCRIPTION).markDeletePosition);
+ });
+
+ @Cleanup
+ Reader<byte[]> reader = pulsarClient.newReader()
+ .topic(topic)
+ .startMessageIdInclusive()
+ .startMessageId(MessageId.earliest)
+ .readCompacted(true)
+ .create();
+ for (int i = numMessages / 2; i < numMessages; ++i) {
+ reader.readNext();
+ }
+ Assert.assertFalse(reader.hasMessageAvailable());
+ Assert.assertNull(reader.readNext(3, TimeUnit.SECONDS));
+ }
+
}