This is an automated email from the ASF dual-hosted git repository.
zhaocong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e1318b2c1e6 [improve][broker][PIP-318] Support not retaining null-key
message during topic compaction (#21578)
e1318b2c1e6 is described below
commit e1318b2c1e6d528c574f02c4edfe80b05cf8e340
Author: Cong Zhao <[email protected]>
AuthorDate: Mon Nov 20 16:27:15 2023 +0800
[improve][broker][PIP-318] Support not retaining null-key message during
topic compaction (#21578)
---
conf/broker.conf | 3 ++
conf/standalone.conf | 5 +-
.../apache/pulsar/broker/ServiceConfiguration.java | 6 +++
.../pulsar/client/impl/RawBatchConverter.java | 25 +++++++---
.../pulsar/compaction/TwoPhaseCompactor.java | 30 +++++++++---
.../apache/pulsar/compaction/CompactionTest.java | 56 ++++++++++++----------
6 files changed, 86 insertions(+), 39 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index a043f379ed4..6bef17350ee 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -538,6 +538,9 @@ brokerServiceCompactionThresholdInBytes=0
# If the execution time of the compaction phase one loop exceeds this time,
the compaction will not proceed.
brokerServiceCompactionPhaseOneLoopTimeInSeconds=30
+# Whether retain null-key message during topic compaction
+topicCompactionRemainNullKey=false
+
# Whether to enable the delayed delivery for messages.
# If disabled, messages will be immediately delivered and there will
# be no tracking overhead.
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 43455966c97..30de267b5c7 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -1285,4 +1285,7 @@ brokerInterceptorsDirectory=./interceptors
brokerInterceptors=
# Enable or disable the broker interceptor, which is only used for testing for
now
-disableBrokerInterceptors=true
\ No newline at end of file
+disableBrokerInterceptors=true
+
+# Whether retain null-key message during topic compaction
+topicCompactionRemainNullKey=false
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 27b302ea839..06d26ad6808 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -2772,6 +2772,12 @@ public class ServiceConfiguration implements
PulsarConfiguration {
)
private long brokerServiceCompactionPhaseOneLoopTimeInSeconds = 30;
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ doc = "Whether retain null-key message during topic compaction."
+ )
+ private boolean topicCompactionRemainNullKey = false;
+
@FieldContext(
category = CATEGORY_SERVER,
doc = "Interval between checks to see if cluster is migrated and marks
topic migrated "
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
index 5f476df68af..dfa65d19953 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
@@ -75,10 +75,10 @@ public class RawBatchConverter {
msg.getMessageIdData().getEntryId(),
msg.getMessageIdData().getPartition(),
i);
- if (!smm.isCompactedOut() && smm.hasPartitionKey()) {
+ if (!smm.isCompactedOut()) {
idsAndKeysAndSize.add(ImmutableTriple.of(id,
- smm.getPartitionKey(),
- smm.hasPayloadSize() ? smm.getPayloadSize() : 0));
+ smm.hasPartitionKey() ? smm.getPartitionKey() :
null,
+ smm.hasPayloadSize() ? smm.getPayloadSize() : 0));
}
singleMessagePayload.release();
}
@@ -86,6 +86,11 @@ public class RawBatchConverter {
return idsAndKeysAndSize;
}
+ public static Optional<RawMessage> rebatchMessage(RawMessage msg,
+ BiPredicate<String,
MessageId> filter) throws IOException {
+ return rebatchMessage(msg, filter, true);
+ }
+
/**
* Take a batched message and a filter, and returns a message with the
only the sub-messages
* which match the filter. Returns an empty optional if no messages match.
@@ -93,7 +98,8 @@ public class RawBatchConverter {
* NOTE: this message does not alter the reference count of the
RawMessage argument.
*/
public static Optional<RawMessage> rebatchMessage(RawMessage msg,
- BiPredicate<String,
MessageId> filter)
+ BiPredicate<String,
MessageId> filter,
+ boolean retainNullKey)
throws IOException {
checkArgument(msg.getMessageIdData().getBatchIndex() == -1);
@@ -129,9 +135,14 @@ public class RawBatchConverter {
msg.getMessageIdData().getPartition(),
i);
if (!singleMessageMetadata.hasPartitionKey()) {
- messagesRetained++;
-
Commands.serializeSingleMessageInBatchWithPayload(singleMessageMetadata,
-
singleMessagePayload, batchBuffer);
+ if (retainNullKey) {
+ messagesRetained++;
+
Commands.serializeSingleMessageInBatchWithPayload(singleMessageMetadata,
+ singleMessagePayload, batchBuffer);
+ } else {
+
Commands.serializeSingleMessageInBatchWithPayload(emptyMetadata,
+ Unpooled.EMPTY_BUFFER, batchBuffer);
+ }
} else if
(filter.test(singleMessageMetadata.getPartitionKey(), id)
&& singleMessagePayload.readableBytes() > 0) {
messagesRetained++;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
index a6fa92b8c56..cb39cc93154 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
@@ -62,6 +62,7 @@ public class TwoPhaseCompactor extends Compactor {
private static final Logger log =
LoggerFactory.getLogger(TwoPhaseCompactor.class);
private static final int MAX_OUTSTANDING = 500;
private final Duration phaseOneLoopReadTimeout;
+ private final boolean topicCompactionRemainNullKey;
public TwoPhaseCompactor(ServiceConfiguration conf,
PulsarClient pulsar,
@@ -69,6 +70,7 @@ public class TwoPhaseCompactor extends Compactor {
ScheduledExecutorService scheduler) {
super(conf, pulsar, bk, scheduler);
phaseOneLoopReadTimeout =
Duration.ofSeconds(conf.getBrokerServiceCompactionPhaseOneLoopTimeInSeconds());
+ topicCompactionRemainNullKey = conf.isTopicCompactionRemainNullKey();
}
@Override
@@ -134,6 +136,14 @@ public class TwoPhaseCompactor extends Compactor {
int deleteCnt = 0;
for (ImmutableTriple<MessageId, String, Integer> e :
extractIdsAndKeysAndSizeFromBatch(m)) {
if (e != null) {
+ if (e.getMiddle() == null) {
+ if (!topicCompactionRemainNullKey) {
+ // record delete null-key message event
+ deleteCnt++;
+
mxBean.addCompactionRemovedEvent(reader.getTopic());
+ }
+ continue;
+ }
if (e.getRight() > 0) {
MessageId old =
latestForKey.put(e.getMiddle(), e.getLeft());
if (old != null) {
@@ -163,6 +173,10 @@ public class TwoPhaseCompactor extends Compactor {
deletedMessage = true;
latestForKey.remove(keyAndSize.getLeft());
}
+ } else {
+ if (!topicCompactionRemainNullKey) {
+ deletedMessage = true;
+ }
}
if (replaceMessage || deletedMessage) {
mxBean.addCompactionRemovedEvent(reader.getTopic());
@@ -249,8 +263,8 @@ public class TwoPhaseCompactor extends Compactor {
mxBean.addCompactionReadOp(reader.getTopic(),
m.getHeadersAndPayload().readableBytes());
if (RawBatchConverter.isReadableBatch(m)) {
try {
- messageToAdd = rebatchMessage(
- m, (key, subid) ->
subid.equals(latestForKey.get(key)));
+ messageToAdd = rebatchMessage(reader.getTopic(),
+ m, (key, subid) ->
subid.equals(latestForKey.get(key)), topicCompactionRemainNullKey);
} catch (IOException ioe) {
log.info("Error decoding batch for message {}. Whole
batch will be included in output",
id, ioe);
@@ -259,8 +273,8 @@ public class TwoPhaseCompactor extends Compactor {
} else {
Pair<String, Integer> keyAndSize = extractKeyAndSize(m);
MessageId msg;
- if (keyAndSize == null) { // pass through messages without
a key
- messageToAdd = Optional.of(m);
+ if (keyAndSize == null) {
+ messageToAdd = topicCompactionRemainNullKey ?
Optional.of(m) : Optional.empty();
} else if ((msg = latestForKey.get(keyAndSize.getLeft()))
!= null
&& msg.equals(id)) { // consider message only if
present into latestForKey map
if (keyAndSize.getRight() <= 0) {
@@ -419,9 +433,13 @@ public class TwoPhaseCompactor extends Compactor {
return RawBatchConverter.extractIdsAndKeysAndSize(msg);
}
- protected Optional<RawMessage> rebatchMessage(RawMessage msg,
BiPredicate<String, MessageId> filter)
+ protected Optional<RawMessage> rebatchMessage(String topic, RawMessage
msg, BiPredicate<String, MessageId> filter,
+ boolean retainNullKey)
throws IOException {
- return RawBatchConverter.rebatchMessage(msg, filter);
+ if (log.isDebugEnabled()) {
+ log.debug("Rebatching message {} for topic {}",
msg.getMessageId(), topic);
+ }
+ return RawBatchConverter.rebatchMessage(msg, filter, retainNullKey);
}
private static class PhaseOneResult {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index 52837cbdcd5..5ee12d660e0 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -26,6 +26,7 @@ import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
+
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.buffer.ByteBuf;
@@ -640,8 +641,17 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
}
}
- @Test
- public void testKeyLessMessagesPassThrough() throws Exception {
+ @DataProvider(name = "retainNullKey")
+ public static Object[][] retainNullKey() {
+ return new Object[][] {{true}, {false}};
+ }
+
+ @Test(dataProvider = "retainNullKey")
+ public void testKeyLessMessagesPassThrough(boolean retainNullKey) throws
Exception {
+ conf.setTopicCompactionRemainNullKey(retainNullKey);
+ restartBroker();
+ FieldUtils.writeDeclaredField(compactor,
"topicCompactionRemainNullKey", retainNullKey, true);
+
String topic = "persistent://my-property/use/my-ns/my-topic1";
// subscribe before sending anything, so that we get all messages
@@ -682,29 +692,25 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
Message<byte[]> m = consumer.receive(2, TimeUnit.SECONDS);
assertNull(m);
} else {
- Message<byte[]> message1 = consumer.receive();
- Assert.assertFalse(message1.hasKey());
- Assert.assertEquals(new String(message1.getData()),
"my-message-1");
-
- Message<byte[]> message2 = consumer.receive();
- Assert.assertFalse(message2.hasKey());
- Assert.assertEquals(new String(message2.getData()),
"my-message-2");
-
- Message<byte[]> message3 = consumer.receive();
- Assert.assertEquals(message3.getKey(), "key1");
- Assert.assertEquals(new String(message3.getData()),
"my-message-4");
-
- Message<byte[]> message4 = consumer.receive();
- Assert.assertEquals(message4.getKey(), "key2");
- Assert.assertEquals(new String(message4.getData()),
"my-message-6");
-
- Message<byte[]> message5 = consumer.receive();
- Assert.assertFalse(message5.hasKey());
- Assert.assertEquals(new String(message5.getData()),
"my-message-7");
+ List<Pair<String, String>> result = new ArrayList<>();
+ while (true) {
+ Message<byte[]> message = consumer.receive(10,
TimeUnit.SECONDS);
+ if (message == null) {
+ break;
+ }
+ result.add(Pair.of(message.getKey(), message.getData() ==
null ? null : new String(message.getData())));
+ }
- Message<byte[]> message6 = consumer.receive();
- Assert.assertFalse(message6.hasKey());
- Assert.assertEquals(new String(message6.getData()),
"my-message-8");
+ List<Pair<String, String>> expectList;
+ if (retainNullKey) {
+ expectList = List.of(
+ Pair.of(null, "my-message-1"), Pair.of(null,
"my-message-2"),
+ Pair.of("key1", "my-message-4"), Pair.of("key2",
"my-message-6"),
+ Pair.of(null, "my-message-7"), Pair.of(null,
"my-message-8"));
+ } else {
+ expectList = List.of(Pair.of("key1", "my-message-4"),
Pair.of("key2", "my-message-6"));
+ }
+ Assert.assertEquals(result, expectList);
}
}
}
@@ -1885,7 +1891,7 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
.topic(topicName).create();
for (int i = 0; i < 10; i+=2) {
- producer.newMessage().key(null).value(new
byte[4*1024*1024]).send();
+ producer.newMessage().key(UUID.randomUUID().toString()).value(new
byte[4*1024*1024]).send();
}
producer.flush();