This is an automated email from the ASF dual-hosted git repository.
zhaocong pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new fb60e9189f0 [fix][broker] Fix message loss during topic compaction
(#20980)
fb60e9189f0 is described below
commit fb60e9189f06af75facca2fb7eab7ffe05109a77
Author: Cong Zhao <[email protected]>
AuthorDate: Mon Aug 14 20:33:29 2023 +0800
[fix][broker] Fix message loss during topic compaction (#20980)
(cherry picked from commit 3ab420cd81c31ebd16213e14580d9e317bc0698d)
---
.../pulsar/client/impl/RawBatchConverter.java | 8 +++-
.../pulsar/compaction/TwoPhaseCompactor.java | 18 ++++---
.../apache/pulsar/compaction/CompactionTest.java | 56 ++++++++++++++++++++++
3 files changed, 74 insertions(+), 8 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
index 4d718f71a2e..0436ebdeed4 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
@@ -42,6 +42,10 @@ public class RawBatchConverter {
public static boolean isReadableBatch(RawMessage msg) {
ByteBuf payload = msg.getHeadersAndPayload();
MessageMetadata metadata = Commands.parseMessageMetadata(payload);
+ return isReadableBatch(metadata);
+ }
+
+ public static boolean isReadableBatch(MessageMetadata metadata) {
return metadata.hasNumMessagesInBatch() &&
metadata.getEncryptionKeysCount() == 0;
}
@@ -69,9 +73,9 @@ public class RawBatchConverter {
msg.getMessageIdData().getEntryId(),
msg.getMessageIdData().getPartition(),
i);
- if (!smm.isCompactedOut()) {
+ if (!smm.isCompactedOut() && smm.hasPartitionKey()) {
idsAndKeysAndSize.add(ImmutableTriple.of(id,
- smm.hasPartitionKey() ? smm.getPartitionKey() : null,
+ smm.getPartitionKey(),
smm.hasPayloadSize() ? smm.getPayloadSize() : 0));
}
singleMessagePayload.release();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
index 821dd9c0c9d..a8c4cbe1e4d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
@@ -126,22 +126,28 @@ public class TwoPhaseCompactor extends Compactor {
boolean deletedMessage = false;
boolean replaceMessage = false;
mxBean.addCompactionReadOp(reader.getTopic(),
m.getHeadersAndPayload().readableBytes());
- if (RawBatchConverter.isReadableBatch(m)) {
+ MessageMetadata metadata =
Commands.parseMessageMetadata(m.getHeadersAndPayload());
+ if (RawBatchConverter.isReadableBatch(metadata)) {
try {
+ int numMessagesInBatch =
metadata.getNumMessagesInBatch();
+ int deleteCnt = 0;
for (ImmutableTriple<MessageId, String, Integer> e :
RawBatchConverter
.extractIdsAndKeysAndSize(m)) {
if (e != null) {
if (e.getRight() > 0) {
MessageId old =
latestForKey.put(e.getMiddle(), e.getLeft());
- replaceMessage = old != null;
+ if (old != null) {
+
mxBean.addCompactionRemovedEvent(reader.getTopic());
+ }
} else {
- deletedMessage = true;
latestForKey.remove(e.getMiddle());
+ deleteCnt++;
+
mxBean.addCompactionRemovedEvent(reader.getTopic());
}
}
- if (replaceMessage || deletedMessage) {
-
mxBean.addCompactionRemovedEvent(reader.getTopic());
- }
+ }
+ if (deleteCnt == numMessagesInBatch) {
+ deletedMessage = true;
}
} catch (IOException ioe) {
log.info("Error decoding batch for message {}. Whole
batch will be included in output",
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index c8105b01125..c5dbd9c49aa 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -48,6 +48,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.api.OpenBuilder;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
@@ -96,6 +97,7 @@ import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@Test(groups = "broker-impl")
+@Slf4j
public class CompactionTest extends MockedPulsarServiceBaseTest {
protected ScheduledExecutorService compactionScheduler;
protected BookKeeper bk;
@@ -553,6 +555,60 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
}
}
+ @Test
+ public void testBatchMessageWithNullValue() throws Exception {
+ String topic = "persistent://my-property/use/my-ns/my-topic1";
+
+ pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
+ .receiverQueueSize(1).readCompacted(true).subscribe().close();
+
+ try (Producer<byte[]> producer =
pulsarClient.newProducer().topic(topic)
+ .maxPendingMessages(3)
+ .enableBatching(true)
+ .batchingMaxMessages(3)
+ .batchingMaxPublishDelay(1, TimeUnit.HOURS)
+ .messageRoutingMode(MessageRoutingMode.SinglePartition)
+ .create()
+ ) {
+ // batch 1
+
producer.newMessage().key("key1").value("my-message-1".getBytes()).sendAsync();
+ producer.newMessage().key("key1").value(null).sendAsync();
+
producer.newMessage().key("key2").value("my-message-3".getBytes()).send();
+
+ // batch 2
+
producer.newMessage().key("key3").value("my-message-4".getBytes()).sendAsync();
+
producer.newMessage().key("key3").value("my-message-5".getBytes()).sendAsync();
+
producer.newMessage().key("key3").value("my-message-6".getBytes()).send();
+
+ // batch 3
+
producer.newMessage().key("key4").value("my-message-7".getBytes()).sendAsync();
+ producer.newMessage().key("key4").value(null).sendAsync();
+
producer.newMessage().key("key5").value("my-message-9".getBytes()).send();
+ }
+
+
+ // compact the topic
+ compact(topic);
+
+ // Read messages before compaction to get ids
+ List<Message<byte[]>> messages = new ArrayList<>();
+ try (Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topic)
+
.subscriptionName("sub1").receiverQueueSize(1).readCompacted(true).subscribe())
{
+ while (true) {
+ Message<byte[]> message = consumer.receive(5,
TimeUnit.SECONDS);
+ if (message == null) {
+ break;
+ }
+ messages.add(message);
+ }
+ }
+
+ assertEquals(messages.size(), 3);
+ assertEquals(messages.get(0).getKey(), "key2");
+ assertEquals(messages.get(1).getKey(), "key3");
+ assertEquals(messages.get(2).getKey(), "key5");
+ }
+
@Test
public void testWholeBatchCompactedOut() throws Exception {
String topic = "persistent://my-property/use/my-ns/my-topic1";