This is an automated email from the ASF dual-hosted git repository. zhaocong pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push: new 2b5fd9fd44d [fix][broker] Fix typo in the config key (#21690) 2b5fd9fd44d is described below commit 2b5fd9fd44d7eac239a6b3cfcf43ad22523e1282 Author: Cong Zhao <zhaoc...@apache.org> AuthorDate: Fri Dec 8 04:23:30 2023 +0800 [fix][broker] Fix typo in the config key (#21690) (cherry picked from commit 822b7f434e062989eb2dbc84dc2ca50fc0e6b134) --- conf/broker.conf | 2 +- conf/standalone.conf | 2 +- .../java/org/apache/pulsar/broker/ServiceConfiguration.java | 2 +- .../java/org/apache/pulsar/compaction/TwoPhaseCompactor.java | 12 ++++++------ .../java/org/apache/pulsar/compaction/CompactionTest.java | 2 +- 5 files changed, 10 insertions(+), 10 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index 29c76b538a3..1b3bdd1f91f 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -526,7 +526,7 @@ brokerServiceCompactionThresholdInBytes=0 brokerServiceCompactionPhaseOneLoopTimeInSeconds=30 # Whether retain null-key message during topic compaction -topicCompactionRemainNullKey=true +topicCompactionRetainNullKey=true # Whether to enable the delayed delivery for messages. # If disabled, messages will be immediately delivered and there will diff --git a/conf/standalone.conf b/conf/standalone.conf index a45d0e98d96..52da27a3d10 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -1199,4 +1199,4 @@ configurationStoreServers= managedLedgerMaxUnackedRangesToPersistInZooKeeper=-1 # Whether retain null-key message during topic compaction -topicCompactionRemainNullKey=true +topicCompactionRetainNullKey=true diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 88e052abe98..d7fa1eff1b2 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -2476,7 +2476,7 @@ public class ServiceConfiguration implements PulsarConfiguration { category = CATEGORY_SERVER, doc = "Whether retain null-key message during topic compaction." ) - private boolean topicCompactionRemainNullKey = true; + private boolean topicCompactionRetainNullKey = true; @FieldContext( category = CATEGORY_SCHEMA, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java index 831baffd7f2..59e1e39f3e6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java @@ -62,7 +62,7 @@ public class TwoPhaseCompactor extends Compactor { private static final int MAX_OUTSTANDING = 500; private static final String COMPACTED_TOPIC_LEDGER_PROPERTY = "CompactedTopicLedger"; private final Duration phaseOneLoopReadTimeout; - private final boolean topicCompactionRemainNullKey; + private final boolean topicCompactionRetainNullKey; public TwoPhaseCompactor(ServiceConfiguration conf, PulsarClient pulsar, @@ -70,7 +70,7 @@ public class TwoPhaseCompactor extends Compactor { ScheduledExecutorService scheduler) { super(conf, pulsar, bk, scheduler); phaseOneLoopReadTimeout = Duration.ofSeconds(conf.getBrokerServiceCompactionPhaseOneLoopTimeInSeconds()); - topicCompactionRemainNullKey = conf.isTopicCompactionRemainNullKey(); + topicCompactionRetainNullKey = conf.isTopicCompactionRetainNullKey(); } @Override @@ -138,7 +138,7 @@ public class TwoPhaseCompactor extends Compactor { .extractIdsAndKeysAndSize(m, true)) { if (e != null) { if (e.getMiddle() == null) { - if (!topicCompactionRemainNullKey) { + if (!topicCompactionRetainNullKey) { // record delete null-key message event deleteCnt++; mxBean.addCompactionRemovedEvent(reader.getTopic()); @@ -175,7 +175,7 @@ public class TwoPhaseCompactor extends Compactor { latestForKey.remove(keyAndSize.getLeft()); } } else { - if (!topicCompactionRemainNullKey) { + if (!topicCompactionRetainNullKey) { deletedMessage = true; } } @@ -267,7 +267,7 @@ public class TwoPhaseCompactor extends Compactor { if (RawBatchConverter.isReadableBatch(m)) { try { messageToAdd = RawBatchConverter.rebatchMessage( - m, (key, subid) -> subid.equals(latestForKey.get(key)), topicCompactionRemainNullKey); + m, (key, subid) -> subid.equals(latestForKey.get(key)), topicCompactionRetainNullKey); } catch (IOException ioe) { log.info("Error decoding batch for message {}. Whole batch will be included in output", id, ioe); @@ -277,7 +277,7 @@ public class TwoPhaseCompactor extends Compactor { Pair<String, Integer> keyAndSize = extractKeyAndSize(m); MessageId msg; if (keyAndSize == null) { - messageToAdd = topicCompactionRemainNullKey ? Optional.of(m) : Optional.empty(); + messageToAdd = topicCompactionRetainNullKey ? Optional.of(m) : Optional.empty(); } else if ((msg = latestForKey.get(keyAndSize.getLeft())) != null && msg.equals(id)) { // consider message only if present into latestForKey map if (keyAndSize.getRight() <= 0) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java index 006dce0c185..6792d4a4e2c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java @@ -640,7 +640,7 @@ public class CompactionTest extends MockedPulsarServiceBaseTest { @Test(dataProvider = "retainNullKey") public void testKeyLessMessagesPassThrough(boolean retainNullKey) throws Exception { - conf.setTopicCompactionRemainNullKey(retainNullKey); + conf.setTopicCompactionRetainNullKey(retainNullKey); restartBroker(); String topic = "persistent://my-property/use/my-ns/my-topic1";