This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 89b56a1abf0 [fix][broker] Fix typo in the config key (#21690)
89b56a1abf0 is described below
commit 89b56a1abf0fe703d348c8c864c6a14045fa0ec8
Author: Cong Zhao <[email protected]>
AuthorDate: Fri Dec 8 04:23:30 2023 +0800
[fix][broker] Fix typo in the config key (#21690)
---
conf/broker.conf | 2 +-
conf/standalone.conf | 2 +-
.../java/org/apache/pulsar/broker/ServiceConfiguration.java | 2 +-
.../java/org/apache/pulsar/compaction/TwoPhaseCompactor.java | 12 ++++++------
.../java/org/apache/pulsar/compaction/CompactionTest.java | 5 ++---
5 files changed, 11 insertions(+), 12 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index bc9b644b221..2dbcff12b16 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -539,7 +539,7 @@ brokerServiceCompactionThresholdInBytes=0
brokerServiceCompactionPhaseOneLoopTimeInSeconds=30
# Whether retain null-key message during topic compaction
-topicCompactionRemainNullKey=true
+topicCompactionRetainNullKey=true
# Whether to enable the delayed delivery for messages.
# If disabled, messages will be immediately delivered and there will
diff --git a/conf/standalone.conf b/conf/standalone.conf
index b730bbc1290..0b486bdaf04 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -1280,4 +1280,4 @@ brokerInterceptors=
disableBrokerInterceptors=true
# Whether retain null-key message during topic compaction
-topicCompactionRemainNullKey=true
+topicCompactionRetainNullKey=true
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 912182ceba7..6175835826e 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -2781,7 +2781,7 @@ public class ServiceConfiguration implements
PulsarConfiguration {
category = CATEGORY_SERVER,
doc = "Whether retain null-key message during topic compaction."
)
- private boolean topicCompactionRemainNullKey = true;
+ private boolean topicCompactionRetainNullKey = true;
@FieldContext(
category = CATEGORY_SERVER,
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
index 5fa64e9f067..a78323a9cfe 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
@@ -62,7 +62,7 @@ public class TwoPhaseCompactor extends Compactor {
private static final Logger log =
LoggerFactory.getLogger(TwoPhaseCompactor.class);
private static final int MAX_OUTSTANDING = 500;
private final Duration phaseOneLoopReadTimeout;
- private final boolean topicCompactionRemainNullKey;
+ private final boolean topicCompactionRetainNullKey;
public TwoPhaseCompactor(ServiceConfiguration conf,
PulsarClient pulsar,
@@ -70,7 +70,7 @@ public class TwoPhaseCompactor extends Compactor {
ScheduledExecutorService scheduler) {
super(conf, pulsar, bk, scheduler);
phaseOneLoopReadTimeout =
Duration.ofSeconds(conf.getBrokerServiceCompactionPhaseOneLoopTimeInSeconds());
- topicCompactionRemainNullKey = conf.isTopicCompactionRemainNullKey();
+ topicCompactionRetainNullKey = conf.isTopicCompactionRetainNullKey();
}
@Override
@@ -137,7 +137,7 @@ public class TwoPhaseCompactor extends Compactor {
for (ImmutableTriple<MessageId, String, Integer> e :
extractIdsAndKeysAndSizeFromBatch(m)) {
if (e != null) {
if (e.getMiddle() == null) {
- if (!topicCompactionRemainNullKey) {
+ if (!topicCompactionRetainNullKey) {
// record delete null-key message event
deleteCnt++;
mxBean.addCompactionRemovedEvent(reader.getTopic());
@@ -174,7 +174,7 @@ public class TwoPhaseCompactor extends Compactor {
latestForKey.remove(keyAndSize.getLeft());
}
} else {
- if (!topicCompactionRemainNullKey) {
+ if (!topicCompactionRetainNullKey) {
deletedMessage = true;
}
}
@@ -265,7 +265,7 @@ public class TwoPhaseCompactor extends Compactor {
if (RawBatchConverter.isReadableBatch(m)) {
try {
messageToAdd = rebatchMessage(reader.getTopic(),
- m, (key, subid) ->
subid.equals(latestForKey.get(key)), topicCompactionRemainNullKey);
+ m, (key, subid) ->
subid.equals(latestForKey.get(key)), topicCompactionRetainNullKey);
} catch (IOException ioe) {
log.info("Error decoding batch for message {}. Whole
batch will be included in output",
id, ioe);
@@ -275,7 +275,7 @@ public class TwoPhaseCompactor extends Compactor {
Pair<String, Integer> keyAndSize = extractKeyAndSize(m);
MessageId msg;
if (keyAndSize == null) {
- messageToAdd = topicCompactionRemainNullKey ?
Optional.of(m) : Optional.empty();
+ messageToAdd = topicCompactionRetainNullKey ?
Optional.of(m) : Optional.empty();
} else if ((msg = latestForKey.get(keyAndSize.getLeft()))
!= null
&& msg.equals(id)) { // consider message only if
present into latestForKey map
if (keyAndSize.getRight() <= 0) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index 5ee12d660e0..69baca3abc3 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -26,7 +26,6 @@ import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
-
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.buffer.ByteBuf;
@@ -648,9 +647,9 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
@Test(dataProvider = "retainNullKey")
public void testKeyLessMessagesPassThrough(boolean retainNullKey) throws
Exception {
- conf.setTopicCompactionRemainNullKey(retainNullKey);
+ conf.setTopicCompactionRetainNullKey(retainNullKey);
restartBroker();
- FieldUtils.writeDeclaredField(compactor,
"topicCompactionRemainNullKey", retainNullKey, true);
+ FieldUtils.writeDeclaredField(compactor,
"topicCompactionRetainNullKey", retainNullKey, true);
String topic = "persistent://my-property/use/my-ns/my-topic1";