This is an automated email from the ASF dual-hosted git repository.

zhaocong pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 13b108b4455 [improve][broker] Support not retaining null-key message 
during topic compaction (#21578)
13b108b4455 is described below

commit 13b108b445545326f4440eae44498d3550936e67
Author: Cong Zhao <[email protected]>
AuthorDate: Thu Dec 14 17:11:37 2023 +0800

    [improve][broker] Support not retaining null-key message during topic 
compaction (#21578)
---
 conf/broker.conf                                   |  3 ++
 conf/standalone.conf                               |  3 ++
 .../apache/pulsar/broker/ServiceConfiguration.java |  6 +++
 .../pulsar/client/impl/RawBatchConverter.java      | 19 ++++++--
 .../pulsar/compaction/TwoPhaseCompactor.java       | 22 +++++++--
 .../pendingack/PendingAckPersistentTest.java       |  2 +-
 .../apache/pulsar/compaction/CompactionTest.java   | 56 ++++++++++++----------
 7 files changed, 77 insertions(+), 34 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 9d7c68bc34e..29e434de7c1 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -507,6 +507,9 @@ brokerServiceCompactionThresholdInBytes=0
 # If the execution time of the compaction phase one loop exceeds this time, 
the compaction will not proceed.
 brokerServiceCompactionPhaseOneLoopTimeInSeconds=30
 
+# Whether retain null-key message during topic compaction
+topicCompactionRemainNullKey=true
+
 # Whether to enable the delayed delivery for messages.
 # If disabled, messages will be immediately delivered and there will
 # be no tracking overhead.
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 90f23a7915f..22ab0223e58 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -1102,3 +1102,6 @@ zookeeperServers=
 # Configuration Store connection string
 # Deprecated: use configurationMetadataStoreUrl
 configurationStoreServers=
+
+# Whether retain null-key message during topic compaction
+topicCompactionRemainNullKey=true
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index c0f724a5436..79454c0199b 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -2294,6 +2294,12 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
     )
     private long brokerServiceCompactionPhaseOneLoopTimeInSeconds = 30;
 
+    @FieldContext(
+            category = CATEGORY_SERVER,
+            doc = "Whether retain null-key message during topic compaction."
+    )
+    private boolean topicCompactionRemainNullKey = true;
+
     @FieldContext(
         category = CATEGORY_SCHEMA,
         doc = "Enforce schema validation on following cases:\n\n"
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
index c096d361066..b3c9d7c9f2b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
@@ -90,6 +90,11 @@ public class RawBatchConverter {
         return idsAndKeysAndSize;
     }
 
+    public static Optional<RawMessage> rebatchMessage(RawMessage msg,
+                                                      BiPredicate<String, 
MessageId> filter) throws IOException {
+        return rebatchMessage(msg, filter, true);
+    }
+
     /**
      * Take a batched message and a filter, and returns a message with the 
only the sub-messages
      * which match the filter. Returns an empty optional if no messages match.
@@ -97,7 +102,8 @@ public class RawBatchConverter {
      *  NOTE: this message does not alter the reference count of the 
RawMessage argument.
      */
     public static Optional<RawMessage> rebatchMessage(RawMessage msg,
-                                                      BiPredicate<String, 
MessageId> filter)
+                                                      BiPredicate<String, 
MessageId> filter,
+                                                      boolean retainNullKey)
             throws IOException {
         checkArgument(msg.getMessageIdData().getBatchIndex() == -1);
 
@@ -125,9 +131,14 @@ public class RawBatchConverter {
                                                       
msg.getMessageIdData().getPartition(),
                                                       i);
                 if (!singleMessageMetadata.hasPartitionKey()) {
-                    messagesRetained++;
-                    
Commands.serializeSingleMessageInBatchWithPayload(singleMessageMetadata,
-                                                                      
singleMessagePayload, batchBuffer);
+                    if (retainNullKey) {
+                        messagesRetained++;
+                        
Commands.serializeSingleMessageInBatchWithPayload(singleMessageMetadata,
+                                singleMessagePayload, batchBuffer);
+                    } else {
+                        
Commands.serializeSingleMessageInBatchWithPayload(emptyMetadata,
+                                Unpooled.EMPTY_BUFFER, batchBuffer);
+                    }
                 } else if 
(filter.test(singleMessageMetadata.getPartitionKey(), id)
                            && singleMessagePayload.readableBytes() > 0) {
                     messagesRetained++;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
index ce7ed568148..831baffd7f2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
@@ -62,6 +62,7 @@ public class TwoPhaseCompactor extends Compactor {
     private static final int MAX_OUTSTANDING = 500;
     private static final String COMPACTED_TOPIC_LEDGER_PROPERTY = 
"CompactedTopicLedger";
     private final Duration phaseOneLoopReadTimeout;
+    private final boolean topicCompactionRemainNullKey;
 
     public TwoPhaseCompactor(ServiceConfiguration conf,
                              PulsarClient pulsar,
@@ -69,6 +70,7 @@ public class TwoPhaseCompactor extends Compactor {
                              ScheduledExecutorService scheduler) {
         super(conf, pulsar, bk, scheduler);
         phaseOneLoopReadTimeout = 
Duration.ofSeconds(conf.getBrokerServiceCompactionPhaseOneLoopTimeInSeconds());
+        topicCompactionRemainNullKey = conf.isTopicCompactionRemainNullKey();
     }
 
     @Override
@@ -133,8 +135,16 @@ public class TwoPhaseCompactor extends Compactor {
                         int numMessagesInBatch = 
metadata.getNumMessagesInBatch();
                         int deleteCnt = 0;
                         for (ImmutableTriple<MessageId, String, Integer> e : 
RawBatchConverter
-                                .extractIdsAndKeysAndSize(m, false)) {
+                                .extractIdsAndKeysAndSize(m, true)) {
                             if (e != null) {
+                                if (e.getMiddle() == null) {
+                                    if (!topicCompactionRemainNullKey) {
+                                        // record delete null-key message event
+                                        deleteCnt++;
+                                        
mxBean.addCompactionRemovedEvent(reader.getTopic());
+                                    }
+                                    continue;
+                                }
                                 if (e.getRight() > 0) {
                                     MessageId old = 
latestForKey.put(e.getMiddle(), e.getLeft());
                                     if (old != null) {
@@ -164,6 +174,10 @@ public class TwoPhaseCompactor extends Compactor {
                             deletedMessage = true;
                             latestForKey.remove(keyAndSize.getLeft());
                         }
+                    } else {
+                        if (!topicCompactionRemainNullKey) {
+                            deletedMessage = true;
+                        }
                     }
                     if (replaceMessage || deletedMessage) {
                         mxBean.addCompactionRemovedEvent(reader.getTopic());
@@ -253,7 +267,7 @@ public class TwoPhaseCompactor extends Compactor {
                 if (RawBatchConverter.isReadableBatch(m)) {
                     try {
                         messageToAdd = RawBatchConverter.rebatchMessage(
-                                m, (key, subid) -> 
subid.equals(latestForKey.get(key)));
+                                m, (key, subid) -> 
subid.equals(latestForKey.get(key)), topicCompactionRemainNullKey);
                     } catch (IOException ioe) {
                         log.info("Error decoding batch for message {}. Whole 
batch will be included in output",
                                 id, ioe);
@@ -262,8 +276,8 @@ public class TwoPhaseCompactor extends Compactor {
                 } else {
                     Pair<String, Integer> keyAndSize = extractKeyAndSize(m);
                     MessageId msg;
-                    if (keyAndSize == null) { // pass through messages without 
a key
-                        messageToAdd = Optional.of(m);
+                    if (keyAndSize == null) {
+                        messageToAdd = topicCompactionRemainNullKey ? 
Optional.of(m) : Optional.empty();
                     } else if ((msg = latestForKey.get(keyAndSize.getLeft())) 
!= null
                             && msg.equals(id)) { // consider message only if 
present into latestForKey map
                         if (keyAndSize.getRight() <= 0) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
index f9047a849d5..2ef8b8f68ee 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
@@ -81,7 +81,7 @@ public class PendingAckPersistentTest extends 
TransactionTestBase {
 
     private static final int NUM_PARTITIONS = 16;
 
-    @BeforeMethod
+    @BeforeMethod(alwaysRun = true)
     public void setup() throws Exception {
         setUpBase(1, NUM_PARTITIONS, PENDING_ACK_REPLAY_TOPIC, 0);
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index de3f9ca525f..6e909a9ef02 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -26,6 +26,8 @@ import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
+
+import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import io.netty.buffer.ByteBuf;
@@ -570,7 +572,7 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
 
         // compact the topic
         Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, 
compactionScheduler);
-        compactor.compact(topic).join();
+        compactor.compact(topic).get();
 
         // Read messages before compaction to get ids
         List<Message<byte[]>> messages = new ArrayList<>();
@@ -628,8 +630,16 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
         }
     }
 
-    @Test
-    public void testKeyLessMessagesPassThrough() throws Exception {
+    @DataProvider(name = "retainNullKey")
+    public static Object[][] retainNullKey() {
+        return new Object[][] {{true}, {false}};
+    }
+
+    @Test(dataProvider = "retainNullKey")
+    public void testKeyLessMessagesPassThrough(boolean retainNullKey) throws 
Exception {
+        conf.setTopicCompactionRemainNullKey(retainNullKey);
+        restartBroker();
+
         String topic = "persistent://my-property/use/my-ns/my-topic1";
 
         // subscribe before sending anything, so that we get all messages
@@ -659,29 +669,25 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
 
         try (Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topic)
                 .subscriptionName("sub1").readCompacted(true).subscribe()){
-            Message<byte[]> message1 = consumer.receive();
-            Assert.assertFalse(message1.hasKey());
-            Assert.assertEquals(new String(message1.getData()), 
"my-message-1");
-
-            Message<byte[]> message2 = consumer.receive();
-            Assert.assertFalse(message2.hasKey());
-            Assert.assertEquals(new String(message2.getData()), 
"my-message-2");
-
-            Message<byte[]> message3 = consumer.receive();
-            Assert.assertEquals(message3.getKey(), "key1");
-            Assert.assertEquals(new String(message3.getData()), 
"my-message-4");
-
-            Message<byte[]> message4 = consumer.receive();
-            Assert.assertEquals(message4.getKey(), "key2");
-            Assert.assertEquals(new String(message4.getData()), 
"my-message-6");
-
-            Message<byte[]> message5 = consumer.receive();
-            Assert.assertFalse(message5.hasKey());
-            Assert.assertEquals(new String(message5.getData()), 
"my-message-7");
+                            List<Pair<String, String>> result = new 
ArrayList<>();
+                while (true) {
+                    Message<byte[]> message = consumer.receive(10, 
TimeUnit.SECONDS);
+                    if (message == null) {
+                        break;
+                    }
+                    result.add(Pair.of(message.getKey(), message.getData() == 
null ? null : new String(message.getData())));
+                }
 
-            Message<byte[]> message6 = consumer.receive();
-            Assert.assertFalse(message6.hasKey());
-            Assert.assertEquals(new String(message6.getData()), 
"my-message-8");
+                List<Pair<String, String>> expectList;
+                if (retainNullKey) {
+                    expectList = Lists.newArrayList(
+                        Pair.of(null, "my-message-1"), Pair.of(null, 
"my-message-2"),
+                        Pair.of("key1", "my-message-4"), Pair.of("key2", 
"my-message-6"),
+                        Pair.of(null, "my-message-7"), Pair.of(null, 
"my-message-8"));
+                } else {
+                    expectList = Lists.newArrayList(Pair.of("key1", 
"my-message-4"), Pair.of("key2", "my-message-6"));
+                }
+                Assert.assertEquals(result, expectList);
         }
     }
 

Reply via email to