This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new dd88f536645 [fix] [broker] Fix write all compacted out entry into
compacted topic (#21917)
dd88f536645 is described below
commit dd88f536645b04ded655726e98634336e7930edc
Author: thetumbled <[email protected]>
AuthorDate: Sun Jan 21 19:47:40 2024 +0800
[fix] [broker] Fix write all compacted out entry into compacted topic
(#21917)
---
.../pulsar/client/impl/RawBatchConverter.java | 6 ++-
.../apache/pulsar/compaction/CompactorTest.java | 56 ++++++++++++++++++++++
2 files changed, 61 insertions(+), 1 deletion(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
index b3c9d7c9f2b..94fbf3365b2 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
@@ -130,7 +130,11 @@ public class RawBatchConverter {
msg.getMessageIdData().getEntryId(),
msg.getMessageIdData().getPartition(),
i);
- if (!singleMessageMetadata.hasPartitionKey()) {
+ if (singleMessageMetadata.isCompactedOut()) {
+ // we may read compacted out message from the compacted
topic
+
Commands.serializeSingleMessageInBatchWithPayload(emptyMetadata,
+ Unpooled.EMPTY_BUFFER, batchBuffer);
+ } else if (!singleMessageMetadata.hasPartitionKey()) {
if (retainNullKey) {
messagesRetained++;
Commands.serializeSingleMessageInBatchWithPayload(singleMessageMetadata,
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
index e3a788f36c1..ab7878a8b7d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
@@ -35,6 +35,8 @@ import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
import lombok.Cleanup;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerEntry;
@@ -45,10 +47,15 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.admin.LongRunningProcessStatus;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.RawMessage;
+import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.RawMessageImpl;
@@ -160,6 +167,55 @@ public class CompactorTest extends
MockedPulsarServiceBaseTest {
compactAndVerify(topic, expected, true);
}
+ @Test
+ public void testAllCompactedOut() throws Exception {
+ String topicName =
"persistent://my-property/use/my-ns/testAllCompactedOut";
+ // set retain null key to true
+ boolean oldRetainNullKey =
pulsar.getConfig().isTopicCompactionRetainNullKey();
+ pulsar.getConfig().setTopicCompactionRetainNullKey(true);
+ this.restartBroker();
+
+ @Cleanup
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+
.enableBatching(true).topic(topicName).batchingMaxMessages(3).create();
+
+ producer.newMessage().key("K1").value("V1").sendAsync();
+ producer.newMessage().key("K2").value("V2").sendAsync();
+ producer.newMessage().key("K2").value(null).sendAsync();
+ producer.flush();
+
+ admin.topics().triggerCompaction(topicName);
+
+ Awaitility.await().untilAsserted(() -> {
+
Assert.assertEquals(admin.topics().compactionStatus(topicName).status,
+ LongRunningProcessStatus.Status.SUCCESS);
+ });
+
+ producer.newMessage().key("K1").value(null).sendAsync();
+ producer.flush();
+
+ admin.topics().triggerCompaction(topicName);
+
+ Awaitility.await().untilAsserted(() -> {
+
Assert.assertEquals(admin.topics().compactionStatus(topicName).status,
+ LongRunningProcessStatus.Status.SUCCESS);
+ });
+
+ @Cleanup
+ Reader<String> reader = pulsarClient.newReader(Schema.STRING)
+ .subscriptionName("reader-test")
+ .topic(topicName)
+ .readCompacted(true)
+ .startMessageId(MessageId.earliest)
+ .create();
+ while (reader.hasMessageAvailable()) {
+ Message<String> message = reader.readNext(3, TimeUnit.SECONDS);
+ Assert.assertNotNull(message);
+ }
+ // set retain null key back to avoid affecting other tests
+ pulsar.getConfig().setTopicCompactionRetainNullKey(oldRetainNullKey);
+ }
+
@Test
public void testCompactAddCompact() throws Exception {
String topic = "persistent://my-property/use/my-ns/my-topic1";