This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 96e4fd6 Make the compaction phase one loop timeout configurable
(#11206)
96e4fd6 is described below
commit 96e4fd69e5376fa2924517b045d02f8229679b92
Author: lipenghui <[email protected]>
AuthorDate: Sun Jul 4 08:48:43 2021 +0800
Make the compaction phase one loop timeout configurable (#11206)
---
conf/broker.conf | 4 ++++
.../org/apache/pulsar/broker/ServiceConfiguration.java | 7 +++++++
.../org/apache/pulsar/compaction/TwoPhaseCompactor.java | 9 +++++++--
.../java/org/apache/pulsar/compaction/CompactorTest.java | 14 ++++++++++++++
4 files changed, 32 insertions(+), 2 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index 0efc656..26d33d7 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -450,6 +450,10 @@ brokerServiceCompactionMonitorIntervalInSeconds=60
# Using a value of 0, is disabling compression check.
brokerServiceCompactionThresholdInBytes=0
+# Timeout for the compaction phase one loop.
+# If the execution time of the compaction phase one loop exceeds this time,
the compaction will not proceed.
+brokerServiceCompactionPhaseOneLoopTimeInSeconds=30
+
# Whether to enable the delayed delivery for messages.
# If disabled, messages will be immediately delivered and there will
# be no tracking overhead.
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index b41f76f..4d10020 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1884,6 +1884,13 @@ public class ServiceConfiguration implements
PulsarConfiguration {
private long brokerServiceCompactionThresholdInBytes = 0;
@FieldContext(
+ category = CATEGORY_SERVER,
+ doc = "Timeout for the compaction phase one loop, If the execution
time of the compaction " +
+ "phase one loop exceeds this time, the compaction will not
proceed."
+ )
+ private long brokerServiceCompactionPhaseOneLoopTimeInSeconds = 30;
+
+ @FieldContext(
category = CATEGORY_SCHEMA,
doc = "Enforce schema validation on following cases:\n\n"
+ " - if a producer without a schema attempts to produce to a
topic with schema, the producer will be\n"
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
index 800182e..0f0f981 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
@@ -60,13 +60,14 @@ public class TwoPhaseCompactor extends Compactor {
private static final Logger log =
LoggerFactory.getLogger(TwoPhaseCompactor.class);
private static final int MAX_OUTSTANDING = 500;
private static final String COMPACTED_TOPIC_LEDGER_PROPERTY =
"CompactedTopicLedger";
- public static final Duration PHASE_ONE_LOOP_READ_TIMEOUT =
Duration.ofSeconds(10);
+ private final Duration phaseOneLoopReadTimeout;
public TwoPhaseCompactor(ServiceConfiguration conf,
PulsarClient pulsar,
BookKeeper bk,
ScheduledExecutorService scheduler) {
super(conf, pulsar, bk, scheduler);
+ phaseOneLoopReadTimeout =
Duration.ofSeconds(conf.getBrokerServiceCompactionPhaseOneLoopTimeInSeconds());
}
@Override
@@ -116,7 +117,7 @@ public class TwoPhaseCompactor extends Compactor {
}
CompletableFuture<RawMessage> future = reader.readNextAsync();
FutureUtil.addTimeoutHandling(future,
- PHASE_ONE_LOOP_READ_TIMEOUT, scheduler,
+ phaseOneLoopReadTimeout, scheduler,
() -> FutureUtil.createTimeoutException("Timeout", getClass(),
"phaseOneLoop(...)"));
future.thenAcceptAsync(m -> {
@@ -399,4 +400,8 @@ public class TwoPhaseCompactor extends Compactor {
this.latestForKey = latestForKey;
}
}
+
+ public long getPhaseOneLoopReadTimeoutInSeconds() {
+ return phaseOneLoopReadTimeout.getSeconds();
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
index 57a2146..0d1a95c 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
@@ -39,15 +39,19 @@ import java.util.concurrent.ScheduledExecutorService;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.RawMessage;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.RawMessageImpl;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.policies.data.ClusterDataImpl;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
@@ -220,6 +224,16 @@ public class CompactorTest extends
MockedPulsarServiceBaseTest {
compactor.compact(topic).get();
}
+ @Test
+ public void testPhaseOneLoopTimeConfiguration() {
+ ServiceConfiguration configuration = new ServiceConfiguration();
+ configuration.setBrokerServiceCompactionPhaseOneLoopTimeInSeconds(60);
+ TwoPhaseCompactor compactor = new TwoPhaseCompactor(configuration,
Mockito.mock(PulsarClientImpl.class),
+ Mockito.mock(BookKeeper.class), compactionScheduler);
+ Assert.assertEquals(compactor.getPhaseOneLoopReadTimeoutInSeconds(),
60);
+
+ }
+
public ByteBuf extractPayload(RawMessage m) throws Exception {
ByteBuf payloadAndMetadata = m.getHeadersAndPayload();
Commands.skipChecksumIfPresent(payloadAndMetadata);