This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d5373d829a9 [improve][java-client] Add verification for configured 
default backlog quota and retention (#15441)
d5373d829a9 is described below

commit d5373d829a9fd2d4e962bc8203c61ffbe087be39
Author: lipenghui <[email protected]>
AuthorDate: Thu May 5 19:13:27 2022 +0800

    [improve][java-client] Add verification for configured default backlog 
quota and retention (#15441)
---
 .../org/apache/pulsar/broker/PulsarService.java    | 20 ++++++
 .../apache/pulsar/broker/PulsarServiceTest.java    | 75 ++++++++++++++++++++++
 2 files changed, 95 insertions(+)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 3c4aa25ddf3..372ca4e4250 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -652,6 +652,26 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
                         + "authenticationEnabled=true when authorization is 
enabled with authorizationEnabled=true.");
             }
 
+            if (config.getDefaultRetentionSizeInMB() > 0
+                    && config.getBacklogQuotaDefaultLimitBytes() > 0
+                    && config.getBacklogQuotaDefaultLimitBytes()
+                    >= (config.getDefaultRetentionSizeInMB() * 1024L * 1024L)) 
{
+                throw new IllegalArgumentException(String.format("The 
retention size must > the backlog quota limit "
+                                + "size, but the configured backlog quota 
limit bytes is %d, the retention size is %d",
+                        config.getBacklogQuotaDefaultLimitBytes(),
+                        config.getDefaultRetentionSizeInMB() * 1024L * 1024L));
+            }
+
+            if (config.getDefaultRetentionTimeInMinutes() > 0
+                    && config.getBacklogQuotaDefaultLimitSecond() > 0
+                    && config.getBacklogQuotaDefaultLimitSecond() >= 
config.getDefaultRetentionTimeInMinutes() * 60) {
+                throw new IllegalArgumentException(String.format("The 
retention time must > the backlog quota limit "
+                                + "time, but the configured backlog quota 
limit time duration is %d, "
+                                + "the retention time duration is %d",
+                        config.getBacklogQuotaDefaultLimitSecond(),
+                        config.getDefaultRetentionTimeInMinutes() * 60));
+            }
+
             if (!config.getLoadBalancerOverrideBrokerNicSpeedGbps().isPresent()
                     && config.isLoadBalancerEnabled()
                     && LinuxInfoUtils.isLinux()
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
index 14dc1176a52..bb616356d51 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
@@ -21,6 +21,8 @@ package org.apache.pulsar.broker;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
 import static org.testng.AssertJUnit.assertSame;
 import java.util.Optional;
 import lombok.Cleanup;
@@ -142,4 +144,77 @@ public class PulsarServiceTest extends 
MockedPulsarServiceBaseTest {
         assertEquals(pulsar.getWebServiceAddressTls(), "https://localhost:"; + 
pulsar.getWebService().getListenPortHTTPS().get());
     }
 
+    @Test
+    public void testBacklogAndRetentionCheck() {
+        ServiceConfiguration config = new ServiceConfiguration();
+        config.setClusterName("test");
+        config.setMetadataStoreUrl("memory:local");
+        config.setMetadataStoreConfigPath("memory:local");
+        PulsarService pulsarService = new PulsarService(config);
+
+        // Check the default configuration
+        try {
+            pulsarService.start();
+        } catch (Exception e) {
+            assertFalse(e.getCause() instanceof IllegalArgumentException);
+        }
+
+        // Only set retention
+        config.setDefaultRetentionSizeInMB(5);
+        config.setDefaultRetentionTimeInMinutes(5);
+
+        pulsarService = new PulsarService(config);
+
+        try {
+            pulsarService.start();
+        } catch (Exception e) {
+            assertFalse(e.getCause() instanceof IllegalArgumentException);
+        }
+
+        // Set both retention and backlog quota
+        config.setBacklogQuotaDefaultLimitBytes(4 * 1024 * 1024);
+        config.setBacklogQuotaDefaultLimitSecond(4 * 60);
+
+        pulsarService = new PulsarService(config);
+
+        try {
+            pulsarService.start();
+        } catch (Exception e) {
+            assertFalse(e.getCause() instanceof IllegalArgumentException);
+        }
+
+        // Set invalidated retention and backlog quota
+        config.setBacklogQuotaDefaultLimitBytes(6 * 1024 * 1024);
+
+        pulsarService = new PulsarService(config);
+
+        try {
+            pulsarService.start();
+        } catch (Exception e) {
+            assertTrue(e.getCause() instanceof IllegalArgumentException);
+        }
+
+        config.setBacklogQuotaDefaultLimitBytes(4 * 1024 * 1024);
+        config.setBacklogQuotaDefaultLimitSecond(6 * 60);
+
+        pulsarService = new PulsarService(config);
+
+        try {
+            pulsarService.start();
+        } catch (Exception e) {
+            assertTrue(e.getCause() instanceof IllegalArgumentException);
+        }
+
+        // Only set backlog quota
+        config.setDefaultRetentionSizeInMB(0);
+        config.setDefaultRetentionTimeInMinutes(0);
+
+        pulsarService = new PulsarService(config);
+
+        try {
+            pulsarService.start();
+        } catch (Exception e) {
+            assertFalse(e.getCause() instanceof IllegalArgumentException);
+        }
+    }
 }

Reply via email to