This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 744b7af5fc4 [improve][broker] Support not retaining null-key message 
during topic compaction (#21578) (#21662)
744b7af5fc4 is described below

commit 744b7af5fc4b49ab975728006c67e29830dc0210
Author: Cong Zhao <[email protected]>
AuthorDate: Mon Dec 4 19:54:30 2023 +0800

    [improve][broker] Support not retaining null-key message during topic 
compaction (#21578) (#21662)
---
 conf/broker.conf                                   |  3 ++
 conf/standalone.conf                               |  3 ++
 .../apache/pulsar/broker/ServiceConfiguration.java |  6 +++
 .../pulsar/client/impl/RawBatchConverter.java      | 19 ++++++--
 .../pulsar/compaction/TwoPhaseCompactor.java       | 23 +++++++--
 .../apache/pulsar/compaction/CompactionTest.java   | 56 ++++++++++++----------
 6 files changed, 76 insertions(+), 34 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 89d2d852004..46af8530623 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -537,6 +537,9 @@ brokerServiceCompactionThresholdInBytes=0
 # If the execution time of the compaction phase one loop exceeds this time, 
the compaction will not proceed.
 brokerServiceCompactionPhaseOneLoopTimeInSeconds=30
 
+# Whether retain null-key message during topic compaction
+topicCompactionRemainNullKey=true
+
 # Whether to enable the delayed delivery for messages.
 # If disabled, messages will be immediately delivered and there will
 # be no tracking overhead.
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 63bc7a29ae6..1f1910435dd 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -1266,3 +1266,6 @@ delayedDeliveryMaxIndexesPerBucketSnapshotSegment=5000
 # after reaching the max buckets limitation, the adjacent buckets will be 
merged.
 # (disable with value -1)
 delayedDeliveryMaxNumBuckets=-1
+
+# Whether retain null-key message during topic compaction
+topicCompactionRemainNullKey=true
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 2c48310f964..2b0d185ca55 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -2744,6 +2744,12 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
     )
     private long brokerServiceCompactionPhaseOneLoopTimeInSeconds = 30;
 
+    @FieldContext(
+            category = CATEGORY_SERVER,
+            doc = "Whether retain null-key message during topic compaction."
+    )
+    private boolean topicCompactionRemainNullKey = true;
+
     @FieldContext(
         category = CATEGORY_SERVER,
         doc = "Interval between checks to see if cluster is migrated and marks 
topic migrated "
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
index 7f4e5dea331..1b1b2e3ebcd 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
@@ -90,6 +90,11 @@ public class RawBatchConverter {
         return idsAndKeysAndSize;
     }
 
+    public static Optional<RawMessage> rebatchMessage(RawMessage msg,
+                                                      BiPredicate<String, 
MessageId> filter) throws IOException {
+        return rebatchMessage(msg, filter, true);
+    }
+
     /**
      * Take a batched message and a filter, and returns a message with the 
only the sub-messages
      * which match the filter. Returns an empty optional if no messages match.
@@ -97,7 +102,8 @@ public class RawBatchConverter {
      *  NOTE: this message does not alter the reference count of the 
RawMessage argument.
      */
     public static Optional<RawMessage> rebatchMessage(RawMessage msg,
-                                                      BiPredicate<String, 
MessageId> filter)
+                                                      BiPredicate<String, 
MessageId> filter,
+                                                      boolean retainNullKey)
             throws IOException {
         checkArgument(msg.getMessageIdData().getBatchIndex() == -1);
 
@@ -125,9 +131,14 @@ public class RawBatchConverter {
                                                       
msg.getMessageIdData().getPartition(),
                                                       i);
                 if (!singleMessageMetadata.hasPartitionKey()) {
-                    messagesRetained++;
-                    
Commands.serializeSingleMessageInBatchWithPayload(singleMessageMetadata,
-                                                                      
singleMessagePayload, batchBuffer);
+                    if (retainNullKey) {
+                        messagesRetained++;
+                        
Commands.serializeSingleMessageInBatchWithPayload(singleMessageMetadata,
+                                singleMessagePayload, batchBuffer);
+                    } else {
+                        
Commands.serializeSingleMessageInBatchWithPayload(emptyMetadata,
+                                Unpooled.EMPTY_BUFFER, batchBuffer);
+                    }
                 } else if 
(filter.test(singleMessageMetadata.getPartitionKey(), id)
                            && singleMessagePayload.readableBytes() > 0) {
                     messagesRetained++;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
index e82114d9741..f0aa95d40a8 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
@@ -61,6 +61,7 @@ public class TwoPhaseCompactor extends Compactor {
     private static final int MAX_OUTSTANDING = 500;
     protected static final String COMPACTED_TOPIC_LEDGER_PROPERTY = 
"CompactedTopicLedger";
     private final Duration phaseOneLoopReadTimeout;
+    private final boolean topicCompactionRemainNullKey;
 
     public TwoPhaseCompactor(ServiceConfiguration conf,
                              PulsarClient pulsar,
@@ -68,6 +69,7 @@ public class TwoPhaseCompactor extends Compactor {
                              ScheduledExecutorService scheduler) {
         super(conf, pulsar, bk, scheduler);
         phaseOneLoopReadTimeout = 
Duration.ofSeconds(conf.getBrokerServiceCompactionPhaseOneLoopTimeInSeconds());
+        topicCompactionRemainNullKey = conf.isTopicCompactionRemainNullKey();
     }
 
     @Override
@@ -132,8 +134,16 @@ public class TwoPhaseCompactor extends Compactor {
                         int numMessagesInBatch = 
metadata.getNumMessagesInBatch();
                         int deleteCnt = 0;
                         for (ImmutableTriple<MessageId, String, Integer> e : 
RawBatchConverter
-                                .extractIdsAndKeysAndSize(m, false)) {
+                                .extractIdsAndKeysAndSize(m, true)) {
                             if (e != null) {
+                                if (e.getMiddle() == null) {
+                                    if (!topicCompactionRemainNullKey) {
+                                        // record delete null-key message event
+                                        deleteCnt++;
+                                        
mxBean.addCompactionRemovedEvent(reader.getTopic());
+                                    }
+                                    continue;
+                                }
                                 if (e.getRight() > 0) {
                                     MessageId old = 
latestForKey.put(e.getMiddle(), e.getLeft());
                                     if (old != null) {
@@ -163,6 +173,10 @@ public class TwoPhaseCompactor extends Compactor {
                             deletedMessage = true;
                             latestForKey.remove(keyAndSize.getLeft());
                         }
+                    } else {
+                        if (!topicCompactionRemainNullKey) {
+                            deletedMessage = true;
+                        }
                     }
                     if (replaceMessage || deletedMessage) {
                         mxBean.addCompactionRemovedEvent(reader.getTopic());
@@ -241,7 +255,6 @@ public class TwoPhaseCompactor extends Compactor {
             }
 
             if (m.getMessageId().compareTo(lastCompactedMessageId) <= 0) {
-                m.close();
                 phaseTwoLoop(reader, to, latestForKey, lh, outstanding, 
promise, lastCompactedMessageId);
                 return;
             }
@@ -253,7 +266,7 @@ public class TwoPhaseCompactor extends Compactor {
                 if (RawBatchConverter.isReadableBatch(m)) {
                     try {
                         messageToAdd = RawBatchConverter.rebatchMessage(
-                                m, (key, subid) -> 
subid.equals(latestForKey.get(key)));
+                                m, (key, subid) -> 
subid.equals(latestForKey.get(key)), topicCompactionRemainNullKey);
                     } catch (IOException ioe) {
                         log.info("Error decoding batch for message {}. Whole 
batch will be included in output",
                                 id, ioe);
@@ -262,8 +275,8 @@ public class TwoPhaseCompactor extends Compactor {
                 } else {
                     Pair<String, Integer> keyAndSize = extractKeyAndSize(m);
                     MessageId msg;
-                    if (keyAndSize == null) { // pass through messages without 
a key
-                        messageToAdd = Optional.of(m);
+                    if (keyAndSize == null) {
+                        messageToAdd = topicCompactionRemainNullKey ? 
Optional.of(m) : Optional.empty();
                     } else if ((msg = latestForKey.get(keyAndSize.getLeft())) 
!= null
                             && msg.equals(id)) { // consider message only if 
present into latestForKey map
                         if (keyAndSize.getRight() <= 0) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index 3985069c6eb..be8c368a1ee 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -27,6 +27,7 @@ import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertSame;
 import static org.testng.Assert.assertTrue;
+
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import io.netty.buffer.ByteBuf;
@@ -643,8 +644,17 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
         }
     }
 
-    @Test
-    public void testKeyLessMessagesPassThrough() throws Exception {
+    @DataProvider(name = "retainNullKey")
+    public static Object[][] retainNullKey() {
+        return new Object[][] {{true}, {false}};
+    }
+
+    @Test(dataProvider = "retainNullKey")
+    public void testKeyLessMessagesPassThrough(boolean retainNullKey) throws 
Exception {
+        conf.setTopicCompactionRemainNullKey(retainNullKey);
+        restartBroker();
+        FieldUtils.writeDeclaredField(compactor, 
"topicCompactionRemainNullKey", retainNullKey, true);
+
         String topic = "persistent://my-property/use/my-ns/my-topic1";
 
         // subscribe before sending anything, so that we get all messages
@@ -685,29 +695,25 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
                 Message<byte[]> m = consumer.receive(2, TimeUnit.SECONDS);
                 assertNull(m);
             } else {
-                Message<byte[]> message1 = consumer.receive();
-                Assert.assertFalse(message1.hasKey());
-                Assert.assertEquals(new String(message1.getData()), 
"my-message-1");
-
-                Message<byte[]> message2 = consumer.receive();
-                Assert.assertFalse(message2.hasKey());
-                Assert.assertEquals(new String(message2.getData()), 
"my-message-2");
-
-                Message<byte[]> message3 = consumer.receive();
-                Assert.assertEquals(message3.getKey(), "key1");
-                Assert.assertEquals(new String(message3.getData()), 
"my-message-4");
-
-                Message<byte[]> message4 = consumer.receive();
-                Assert.assertEquals(message4.getKey(), "key2");
-                Assert.assertEquals(new String(message4.getData()), 
"my-message-6");
-
-                Message<byte[]> message5 = consumer.receive();
-                Assert.assertFalse(message5.hasKey());
-                Assert.assertEquals(new String(message5.getData()), 
"my-message-7");
+                List<Pair<String, String>> result = new ArrayList<>();
+                while (true) {
+                    Message<byte[]> message = consumer.receive(10, 
TimeUnit.SECONDS);
+                    if (message == null) {
+                        break;
+                    }
+                    result.add(Pair.of(message.getKey(), message.getData() == 
null ? null : new String(message.getData())));
+                }
 
-                Message<byte[]> message6 = consumer.receive();
-                Assert.assertFalse(message6.hasKey());
-                Assert.assertEquals(new String(message6.getData()), 
"my-message-8");
+                List<Pair<String, String>> expectList;
+                if (retainNullKey) {
+                    expectList = List.of(
+                        Pair.of(null, "my-message-1"), Pair.of(null, 
"my-message-2"),
+                        Pair.of("key1", "my-message-4"), Pair.of("key2", 
"my-message-6"),
+                        Pair.of(null, "my-message-7"), Pair.of(null, 
"my-message-8"));
+                } else {
+                    expectList = List.of(Pair.of("key1", "my-message-4"), 
Pair.of("key2", "my-message-6"));
+                }
+                Assert.assertEquals(result, expectList);
             }
         }
     }
@@ -1888,7 +1894,7 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
                 .topic(topicName).create();
 
         for (int i = 0; i < 10; i+=2) {
-            producer.newMessage().key(null).value(new 
byte[4*1024*1024]).send();
+            producer.newMessage().key(UUID.randomUUID().toString()).value(new 
byte[4*1024*1024]).send();
         }
         producer.flush();
 

Reply via email to