This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 744b7af5fc4 [improve][broker] Support not retaining null-key message
during topic compaction (#21578) (#21662)
744b7af5fc4 is described below
commit 744b7af5fc4b49ab975728006c67e29830dc0210
Author: Cong Zhao <[email protected]>
AuthorDate: Mon Dec 4 19:54:30 2023 +0800
[improve][broker] Support not retaining null-key message during topic
compaction (#21578) (#21662)
---
conf/broker.conf | 3 ++
conf/standalone.conf | 3 ++
.../apache/pulsar/broker/ServiceConfiguration.java | 6 +++
.../pulsar/client/impl/RawBatchConverter.java | 19 ++++++--
.../pulsar/compaction/TwoPhaseCompactor.java | 23 +++++++--
.../apache/pulsar/compaction/CompactionTest.java | 56 ++++++++++++----------
6 files changed, 76 insertions(+), 34 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index 89d2d852004..46af8530623 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -537,6 +537,9 @@ brokerServiceCompactionThresholdInBytes=0
# If the execution time of the compaction phase one loop exceeds this time,
the compaction will not proceed.
brokerServiceCompactionPhaseOneLoopTimeInSeconds=30
+# Whether retain null-key message during topic compaction
+topicCompactionRemainNullKey=true
+
# Whether to enable the delayed delivery for messages.
# If disabled, messages will be immediately delivered and there will
# be no tracking overhead.
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 63bc7a29ae6..1f1910435dd 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -1266,3 +1266,6 @@ delayedDeliveryMaxIndexesPerBucketSnapshotSegment=5000
# after reaching the max buckets limitation, the adjacent buckets will be
merged.
# (disable with value -1)
delayedDeliveryMaxNumBuckets=-1
+
+# Whether retain null-key message during topic compaction
+topicCompactionRemainNullKey=true
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 2c48310f964..2b0d185ca55 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -2744,6 +2744,12 @@ public class ServiceConfiguration implements
PulsarConfiguration {
)
private long brokerServiceCompactionPhaseOneLoopTimeInSeconds = 30;
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ doc = "Whether retain null-key message during topic compaction."
+ )
+ private boolean topicCompactionRemainNullKey = true;
+
@FieldContext(
category = CATEGORY_SERVER,
doc = "Interval between checks to see if cluster is migrated and marks
topic migrated "
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
index 7f4e5dea331..1b1b2e3ebcd 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
@@ -90,6 +90,11 @@ public class RawBatchConverter {
return idsAndKeysAndSize;
}
+ public static Optional<RawMessage> rebatchMessage(RawMessage msg,
+ BiPredicate<String,
MessageId> filter) throws IOException {
+ return rebatchMessage(msg, filter, true);
+ }
+
/**
* Take a batched message and a filter, and returns a message with the
only the sub-messages
* which match the filter. Returns an empty optional if no messages match.
@@ -97,7 +102,8 @@ public class RawBatchConverter {
* NOTE: this message does not alter the reference count of the
RawMessage argument.
*/
public static Optional<RawMessage> rebatchMessage(RawMessage msg,
- BiPredicate<String,
MessageId> filter)
+ BiPredicate<String,
MessageId> filter,
+ boolean retainNullKey)
throws IOException {
checkArgument(msg.getMessageIdData().getBatchIndex() == -1);
@@ -125,9 +131,14 @@ public class RawBatchConverter {
msg.getMessageIdData().getPartition(),
i);
if (!singleMessageMetadata.hasPartitionKey()) {
- messagesRetained++;
-
Commands.serializeSingleMessageInBatchWithPayload(singleMessageMetadata,
-
singleMessagePayload, batchBuffer);
+ if (retainNullKey) {
+ messagesRetained++;
+
Commands.serializeSingleMessageInBatchWithPayload(singleMessageMetadata,
+ singleMessagePayload, batchBuffer);
+ } else {
+
Commands.serializeSingleMessageInBatchWithPayload(emptyMetadata,
+ Unpooled.EMPTY_BUFFER, batchBuffer);
+ }
} else if
(filter.test(singleMessageMetadata.getPartitionKey(), id)
&& singleMessagePayload.readableBytes() > 0) {
messagesRetained++;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
index e82114d9741..f0aa95d40a8 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
@@ -61,6 +61,7 @@ public class TwoPhaseCompactor extends Compactor {
private static final int MAX_OUTSTANDING = 500;
protected static final String COMPACTED_TOPIC_LEDGER_PROPERTY =
"CompactedTopicLedger";
private final Duration phaseOneLoopReadTimeout;
+ private final boolean topicCompactionRemainNullKey;
public TwoPhaseCompactor(ServiceConfiguration conf,
PulsarClient pulsar,
@@ -68,6 +69,7 @@ public class TwoPhaseCompactor extends Compactor {
ScheduledExecutorService scheduler) {
super(conf, pulsar, bk, scheduler);
phaseOneLoopReadTimeout =
Duration.ofSeconds(conf.getBrokerServiceCompactionPhaseOneLoopTimeInSeconds());
+ topicCompactionRemainNullKey = conf.isTopicCompactionRemainNullKey();
}
@Override
@@ -132,8 +134,16 @@ public class TwoPhaseCompactor extends Compactor {
int numMessagesInBatch =
metadata.getNumMessagesInBatch();
int deleteCnt = 0;
for (ImmutableTriple<MessageId, String, Integer> e :
RawBatchConverter
- .extractIdsAndKeysAndSize(m, false)) {
+ .extractIdsAndKeysAndSize(m, true)) {
if (e != null) {
+ if (e.getMiddle() == null) {
+ if (!topicCompactionRemainNullKey) {
+ // record delete null-key message event
+ deleteCnt++;
+
mxBean.addCompactionRemovedEvent(reader.getTopic());
+ }
+ continue;
+ }
if (e.getRight() > 0) {
MessageId old =
latestForKey.put(e.getMiddle(), e.getLeft());
if (old != null) {
@@ -163,6 +173,10 @@ public class TwoPhaseCompactor extends Compactor {
deletedMessage = true;
latestForKey.remove(keyAndSize.getLeft());
}
+ } else {
+ if (!topicCompactionRemainNullKey) {
+ deletedMessage = true;
+ }
}
if (replaceMessage || deletedMessage) {
mxBean.addCompactionRemovedEvent(reader.getTopic());
@@ -241,7 +255,6 @@ public class TwoPhaseCompactor extends Compactor {
}
if (m.getMessageId().compareTo(lastCompactedMessageId) <= 0) {
- m.close();
phaseTwoLoop(reader, to, latestForKey, lh, outstanding,
promise, lastCompactedMessageId);
return;
}
@@ -253,7 +266,7 @@ public class TwoPhaseCompactor extends Compactor {
if (RawBatchConverter.isReadableBatch(m)) {
try {
messageToAdd = RawBatchConverter.rebatchMessage(
- m, (key, subid) ->
subid.equals(latestForKey.get(key)));
+ m, (key, subid) ->
subid.equals(latestForKey.get(key)), topicCompactionRemainNullKey);
} catch (IOException ioe) {
log.info("Error decoding batch for message {}. Whole
batch will be included in output",
id, ioe);
@@ -262,8 +275,8 @@ public class TwoPhaseCompactor extends Compactor {
} else {
Pair<String, Integer> keyAndSize = extractKeyAndSize(m);
MessageId msg;
- if (keyAndSize == null) { // pass through messages without
a key
- messageToAdd = Optional.of(m);
+ if (keyAndSize == null) {
+ messageToAdd = topicCompactionRemainNullKey ?
Optional.of(m) : Optional.empty();
} else if ((msg = latestForKey.get(keyAndSize.getLeft()))
!= null
&& msg.equals(id)) { // consider message only if
present into latestForKey map
if (keyAndSize.getRight() <= 0) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index 3985069c6eb..be8c368a1ee 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -27,6 +27,7 @@ import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertSame;
import static org.testng.Assert.assertTrue;
+
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.buffer.ByteBuf;
@@ -643,8 +644,17 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
}
}
- @Test
- public void testKeyLessMessagesPassThrough() throws Exception {
+ @DataProvider(name = "retainNullKey")
+ public static Object[][] retainNullKey() {
+ return new Object[][] {{true}, {false}};
+ }
+
+ @Test(dataProvider = "retainNullKey")
+ public void testKeyLessMessagesPassThrough(boolean retainNullKey) throws
Exception {
+ conf.setTopicCompactionRemainNullKey(retainNullKey);
+ restartBroker();
+ FieldUtils.writeDeclaredField(compactor,
"topicCompactionRemainNullKey", retainNullKey, true);
+
String topic = "persistent://my-property/use/my-ns/my-topic1";
// subscribe before sending anything, so that we get all messages
@@ -685,29 +695,25 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
Message<byte[]> m = consumer.receive(2, TimeUnit.SECONDS);
assertNull(m);
} else {
- Message<byte[]> message1 = consumer.receive();
- Assert.assertFalse(message1.hasKey());
- Assert.assertEquals(new String(message1.getData()),
"my-message-1");
-
- Message<byte[]> message2 = consumer.receive();
- Assert.assertFalse(message2.hasKey());
- Assert.assertEquals(new String(message2.getData()),
"my-message-2");
-
- Message<byte[]> message3 = consumer.receive();
- Assert.assertEquals(message3.getKey(), "key1");
- Assert.assertEquals(new String(message3.getData()),
"my-message-4");
-
- Message<byte[]> message4 = consumer.receive();
- Assert.assertEquals(message4.getKey(), "key2");
- Assert.assertEquals(new String(message4.getData()),
"my-message-6");
-
- Message<byte[]> message5 = consumer.receive();
- Assert.assertFalse(message5.hasKey());
- Assert.assertEquals(new String(message5.getData()),
"my-message-7");
+ List<Pair<String, String>> result = new ArrayList<>();
+ while (true) {
+ Message<byte[]> message = consumer.receive(10,
TimeUnit.SECONDS);
+ if (message == null) {
+ break;
+ }
+ result.add(Pair.of(message.getKey(), message.getData() ==
null ? null : new String(message.getData())));
+ }
- Message<byte[]> message6 = consumer.receive();
- Assert.assertFalse(message6.hasKey());
- Assert.assertEquals(new String(message6.getData()),
"my-message-8");
+ List<Pair<String, String>> expectList;
+ if (retainNullKey) {
+ expectList = List.of(
+ Pair.of(null, "my-message-1"), Pair.of(null,
"my-message-2"),
+ Pair.of("key1", "my-message-4"), Pair.of("key2",
"my-message-6"),
+ Pair.of(null, "my-message-7"), Pair.of(null,
"my-message-8"));
+ } else {
+ expectList = List.of(Pair.of("key1", "my-message-4"),
Pair.of("key2", "my-message-6"));
+ }
+ Assert.assertEquals(result, expectList);
}
}
}
@@ -1888,7 +1894,7 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
.topic(topicName).create();
for (int i = 0; i < 10; i+=2) {
- producer.newMessage().key(null).value(new
byte[4*1024*1024]).send();
+ producer.newMessage().key(UUID.randomUUID().toString()).value(new
byte[4*1024*1024]).send();
}
producer.flush();