This is an automated email from the ASF dual-hosted git repository.
zhaocong pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new 13b108b4455 [improve][broker] Support not retaining null-key message
during topic compaction (#21578)
13b108b4455 is described below
commit 13b108b445545326f4440eae44498d3550936e67
Author: Cong Zhao <[email protected]>
AuthorDate: Thu Dec 14 17:11:37 2023 +0800
[improve][broker] Support not retaining null-key message during topic
compaction (#21578)
---
conf/broker.conf | 3 ++
conf/standalone.conf | 3 ++
.../apache/pulsar/broker/ServiceConfiguration.java | 6 +++
.../pulsar/client/impl/RawBatchConverter.java | 19 ++++++--
.../pulsar/compaction/TwoPhaseCompactor.java | 22 +++++++--
.../pendingack/PendingAckPersistentTest.java | 2 +-
.../apache/pulsar/compaction/CompactionTest.java | 56 ++++++++++++----------
7 files changed, 77 insertions(+), 34 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index 9d7c68bc34e..29e434de7c1 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -507,6 +507,9 @@ brokerServiceCompactionThresholdInBytes=0
# If the execution time of the compaction phase one loop exceeds this time,
the compaction will not proceed.
brokerServiceCompactionPhaseOneLoopTimeInSeconds=30
+# Whether retain null-key message during topic compaction
+topicCompactionRemainNullKey=true
+
# Whether to enable the delayed delivery for messages.
# If disabled, messages will be immediately delivered and there will
# be no tracking overhead.
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 90f23a7915f..22ab0223e58 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -1102,3 +1102,6 @@ zookeeperServers=
# Configuration Store connection string
# Deprecated: use configurationMetadataStoreUrl
configurationStoreServers=
+
+# Whether retain null-key message during topic compaction
+topicCompactionRemainNullKey=true
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index c0f724a5436..79454c0199b 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -2294,6 +2294,12 @@ public class ServiceConfiguration implements
PulsarConfiguration {
)
private long brokerServiceCompactionPhaseOneLoopTimeInSeconds = 30;
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ doc = "Whether retain null-key message during topic compaction."
+ )
+ private boolean topicCompactionRemainNullKey = true;
+
@FieldContext(
category = CATEGORY_SCHEMA,
doc = "Enforce schema validation on following cases:\n\n"
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
index c096d361066..b3c9d7c9f2b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
@@ -90,6 +90,11 @@ public class RawBatchConverter {
return idsAndKeysAndSize;
}
+ public static Optional<RawMessage> rebatchMessage(RawMessage msg,
+ BiPredicate<String,
MessageId> filter) throws IOException {
+ return rebatchMessage(msg, filter, true);
+ }
+
/**
* Take a batched message and a filter, and returns a message with the
only the sub-messages
* which match the filter. Returns an empty optional if no messages match.
@@ -97,7 +102,8 @@ public class RawBatchConverter {
* NOTE: this message does not alter the reference count of the
RawMessage argument.
*/
public static Optional<RawMessage> rebatchMessage(RawMessage msg,
- BiPredicate<String,
MessageId> filter)
+ BiPredicate<String,
MessageId> filter,
+ boolean retainNullKey)
throws IOException {
checkArgument(msg.getMessageIdData().getBatchIndex() == -1);
@@ -125,9 +131,14 @@ public class RawBatchConverter {
msg.getMessageIdData().getPartition(),
i);
if (!singleMessageMetadata.hasPartitionKey()) {
- messagesRetained++;
-
Commands.serializeSingleMessageInBatchWithPayload(singleMessageMetadata,
-
singleMessagePayload, batchBuffer);
+ if (retainNullKey) {
+ messagesRetained++;
+
Commands.serializeSingleMessageInBatchWithPayload(singleMessageMetadata,
+ singleMessagePayload, batchBuffer);
+ } else {
+
Commands.serializeSingleMessageInBatchWithPayload(emptyMetadata,
+ Unpooled.EMPTY_BUFFER, batchBuffer);
+ }
} else if
(filter.test(singleMessageMetadata.getPartitionKey(), id)
&& singleMessagePayload.readableBytes() > 0) {
messagesRetained++;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
index ce7ed568148..831baffd7f2 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
@@ -62,6 +62,7 @@ public class TwoPhaseCompactor extends Compactor {
private static final int MAX_OUTSTANDING = 500;
private static final String COMPACTED_TOPIC_LEDGER_PROPERTY =
"CompactedTopicLedger";
private final Duration phaseOneLoopReadTimeout;
+ private final boolean topicCompactionRemainNullKey;
public TwoPhaseCompactor(ServiceConfiguration conf,
PulsarClient pulsar,
@@ -69,6 +70,7 @@ public class TwoPhaseCompactor extends Compactor {
ScheduledExecutorService scheduler) {
super(conf, pulsar, bk, scheduler);
phaseOneLoopReadTimeout =
Duration.ofSeconds(conf.getBrokerServiceCompactionPhaseOneLoopTimeInSeconds());
+ topicCompactionRemainNullKey = conf.isTopicCompactionRemainNullKey();
}
@Override
@@ -133,8 +135,16 @@ public class TwoPhaseCompactor extends Compactor {
int numMessagesInBatch =
metadata.getNumMessagesInBatch();
int deleteCnt = 0;
for (ImmutableTriple<MessageId, String, Integer> e :
RawBatchConverter
- .extractIdsAndKeysAndSize(m, false)) {
+ .extractIdsAndKeysAndSize(m, true)) {
if (e != null) {
+ if (e.getMiddle() == null) {
+ if (!topicCompactionRemainNullKey) {
+ // record delete null-key message event
+ deleteCnt++;
+
mxBean.addCompactionRemovedEvent(reader.getTopic());
+ }
+ continue;
+ }
if (e.getRight() > 0) {
MessageId old =
latestForKey.put(e.getMiddle(), e.getLeft());
if (old != null) {
@@ -164,6 +174,10 @@ public class TwoPhaseCompactor extends Compactor {
deletedMessage = true;
latestForKey.remove(keyAndSize.getLeft());
}
+ } else {
+ if (!topicCompactionRemainNullKey) {
+ deletedMessage = true;
+ }
}
if (replaceMessage || deletedMessage) {
mxBean.addCompactionRemovedEvent(reader.getTopic());
@@ -253,7 +267,7 @@ public class TwoPhaseCompactor extends Compactor {
if (RawBatchConverter.isReadableBatch(m)) {
try {
messageToAdd = RawBatchConverter.rebatchMessage(
- m, (key, subid) ->
subid.equals(latestForKey.get(key)));
+ m, (key, subid) ->
subid.equals(latestForKey.get(key)), topicCompactionRemainNullKey);
} catch (IOException ioe) {
log.info("Error decoding batch for message {}. Whole
batch will be included in output",
id, ioe);
@@ -262,8 +276,8 @@ public class TwoPhaseCompactor extends Compactor {
} else {
Pair<String, Integer> keyAndSize = extractKeyAndSize(m);
MessageId msg;
- if (keyAndSize == null) { // pass through messages without
a key
- messageToAdd = Optional.of(m);
+ if (keyAndSize == null) {
+ messageToAdd = topicCompactionRemainNullKey ?
Optional.of(m) : Optional.empty();
} else if ((msg = latestForKey.get(keyAndSize.getLeft()))
!= null
&& msg.equals(id)) { // consider message only if
present into latestForKey map
if (keyAndSize.getRight() <= 0) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
index f9047a849d5..2ef8b8f68ee 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
@@ -81,7 +81,7 @@ public class PendingAckPersistentTest extends
TransactionTestBase {
private static final int NUM_PARTITIONS = 16;
- @BeforeMethod
+ @BeforeMethod(alwaysRun = true)
public void setup() throws Exception {
setUpBase(1, NUM_PARTITIONS, PENDING_ACK_REPLAY_TOPIC, 0);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index de3f9ca525f..6e909a9ef02 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -26,6 +26,8 @@ import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
+
+import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.buffer.ByteBuf;
@@ -570,7 +572,7 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
// compact the topic
Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk,
compactionScheduler);
- compactor.compact(topic).join();
+ compactor.compact(topic).get();
// Read messages before compaction to get ids
List<Message<byte[]>> messages = new ArrayList<>();
@@ -628,8 +630,16 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
}
}
- @Test
- public void testKeyLessMessagesPassThrough() throws Exception {
+ @DataProvider(name = "retainNullKey")
+ public static Object[][] retainNullKey() {
+ return new Object[][] {{true}, {false}};
+ }
+
+ @Test(dataProvider = "retainNullKey")
+ public void testKeyLessMessagesPassThrough(boolean retainNullKey) throws
Exception {
+ conf.setTopicCompactionRemainNullKey(retainNullKey);
+ restartBroker();
+
String topic = "persistent://my-property/use/my-ns/my-topic1";
// subscribe before sending anything, so that we get all messages
@@ -659,29 +669,25 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
try (Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topic)
.subscriptionName("sub1").readCompacted(true).subscribe()){
- Message<byte[]> message1 = consumer.receive();
- Assert.assertFalse(message1.hasKey());
- Assert.assertEquals(new String(message1.getData()),
"my-message-1");
-
- Message<byte[]> message2 = consumer.receive();
- Assert.assertFalse(message2.hasKey());
- Assert.assertEquals(new String(message2.getData()),
"my-message-2");
-
- Message<byte[]> message3 = consumer.receive();
- Assert.assertEquals(message3.getKey(), "key1");
- Assert.assertEquals(new String(message3.getData()),
"my-message-4");
-
- Message<byte[]> message4 = consumer.receive();
- Assert.assertEquals(message4.getKey(), "key2");
- Assert.assertEquals(new String(message4.getData()),
"my-message-6");
-
- Message<byte[]> message5 = consumer.receive();
- Assert.assertFalse(message5.hasKey());
- Assert.assertEquals(new String(message5.getData()),
"my-message-7");
+ List<Pair<String, String>> result = new
ArrayList<>();
+ while (true) {
+ Message<byte[]> message = consumer.receive(10,
TimeUnit.SECONDS);
+ if (message == null) {
+ break;
+ }
+ result.add(Pair.of(message.getKey(), message.getData() ==
null ? null : new String(message.getData())));
+ }
- Message<byte[]> message6 = consumer.receive();
- Assert.assertFalse(message6.hasKey());
- Assert.assertEquals(new String(message6.getData()),
"my-message-8");
+ List<Pair<String, String>> expectList;
+ if (retainNullKey) {
+ expectList = Lists.newArrayList(
+ Pair.of(null, "my-message-1"), Pair.of(null,
"my-message-2"),
+ Pair.of("key1", "my-message-4"), Pair.of("key2",
"my-message-6"),
+ Pair.of(null, "my-message-7"), Pair.of(null,
"my-message-8"));
+ } else {
+ expectList = Lists.newArrayList(Pair.of("key1",
"my-message-4"), Pair.of("key2", "my-message-6"));
+ }
+ Assert.assertEquals(result, expectList);
}
}