This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d5373d829a9 [improve][java-client] Add verification for configured
default backlog quota and retention (#15441)
d5373d829a9 is described below
commit d5373d829a9fd2d4e962bc8203c61ffbe087be39
Author: lipenghui <[email protected]>
AuthorDate: Thu May 5 19:13:27 2022 +0800
[improve][java-client] Add verification for configured default backlog
quota and retention (#15441)
---
.../org/apache/pulsar/broker/PulsarService.java | 20 ++++++
.../apache/pulsar/broker/PulsarServiceTest.java | 75 ++++++++++++++++++++++
2 files changed, 95 insertions(+)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 3c4aa25ddf3..372ca4e4250 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -652,6 +652,26 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
+ "authenticationEnabled=true when authorization is
enabled with authorizationEnabled=true.");
}
+ if (config.getDefaultRetentionSizeInMB() > 0
+ && config.getBacklogQuotaDefaultLimitBytes() > 0
+ && config.getBacklogQuotaDefaultLimitBytes()
+ >= (config.getDefaultRetentionSizeInMB() * 1024L * 1024L))
{
+ throw new IllegalArgumentException(String.format("The
retention size must > the backlog quota limit "
+ + "size, but the configured backlog quota
limit bytes is %d, the retention size is %d",
+ config.getBacklogQuotaDefaultLimitBytes(),
+ config.getDefaultRetentionSizeInMB() * 1024L * 1024L));
+ }
+
+ if (config.getDefaultRetentionTimeInMinutes() > 0
+ && config.getBacklogQuotaDefaultLimitSecond() > 0
+ && config.getBacklogQuotaDefaultLimitSecond() >=
config.getDefaultRetentionTimeInMinutes() * 60) {
+ throw new IllegalArgumentException(String.format("The
retention time must > the backlog quota limit "
+ + "time, but the configured backlog quota
limit time duration is %d, "
+ + "the retention time duration is %d",
+ config.getBacklogQuotaDefaultLimitSecond(),
+ config.getDefaultRetentionTimeInMinutes() * 60));
+ }
+
if (!config.getLoadBalancerOverrideBrokerNicSpeedGbps().isPresent()
&& config.isLoadBalancerEnabled()
&& LinuxInfoUtils.isLinux()
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
index 14dc1176a52..bb616356d51 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
@@ -21,6 +21,8 @@ package org.apache.pulsar.broker;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
import static org.testng.AssertJUnit.assertSame;
import java.util.Optional;
import lombok.Cleanup;
@@ -142,4 +144,77 @@ public class PulsarServiceTest extends
MockedPulsarServiceBaseTest {
assertEquals(pulsar.getWebServiceAddressTls(), "https://localhost:" +
pulsar.getWebService().getListenPortHTTPS().get());
}
+ @Test
+ public void testBacklogAndRetentionCheck() {
+ ServiceConfiguration config = new ServiceConfiguration();
+ config.setClusterName("test");
+ config.setMetadataStoreUrl("memory:local");
+ config.setMetadataStoreConfigPath("memory:local");
+ PulsarService pulsarService = new PulsarService(config);
+
+ // Check the default configuration
+ try {
+ pulsarService.start();
+ } catch (Exception e) {
+ assertFalse(e.getCause() instanceof IllegalArgumentException);
+ }
+
+ // Only set retention
+ config.setDefaultRetentionSizeInMB(5);
+ config.setDefaultRetentionTimeInMinutes(5);
+
+ pulsarService = new PulsarService(config);
+
+ try {
+ pulsarService.start();
+ } catch (Exception e) {
+ assertFalse(e.getCause() instanceof IllegalArgumentException);
+ }
+
+ // Set both retention and backlog quota
+ config.setBacklogQuotaDefaultLimitBytes(4 * 1024 * 1024);
+ config.setBacklogQuotaDefaultLimitSecond(4 * 60);
+
+ pulsarService = new PulsarService(config);
+
+ try {
+ pulsarService.start();
+ } catch (Exception e) {
+ assertFalse(e.getCause() instanceof IllegalArgumentException);
+ }
+
+ // Set invalidated retention and backlog quota
+ config.setBacklogQuotaDefaultLimitBytes(6 * 1024 * 1024);
+
+ pulsarService = new PulsarService(config);
+
+ try {
+ pulsarService.start();
+ } catch (Exception e) {
+ assertTrue(e.getCause() instanceof IllegalArgumentException);
+ }
+
+ config.setBacklogQuotaDefaultLimitBytes(4 * 1024 * 1024);
+ config.setBacklogQuotaDefaultLimitSecond(6 * 60);
+
+ pulsarService = new PulsarService(config);
+
+ try {
+ pulsarService.start();
+ } catch (Exception e) {
+ assertTrue(e.getCause() instanceof IllegalArgumentException);
+ }
+
+ // Only set backlog quota
+ config.setDefaultRetentionSizeInMB(0);
+ config.setDefaultRetentionTimeInMinutes(0);
+
+ pulsarService = new PulsarService(config);
+
+ try {
+ pulsarService.start();
+ } catch (Exception e) {
+ assertFalse(e.getCause() instanceof IllegalArgumentException);
+ }
+ }
}