This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 204905ee054 [improve][offload]keep topic/ns set-offload-policies 
consistent behavior logic (#20646)
204905ee054 is described below

commit 204905ee0548c3a1dd5b297fea999456d266a686
Author: YingQun Zhong <[email protected]>
AuthorDate: Thu Jun 29 12:10:52 2023 +0800

    [improve][offload]keep topic/ns set-offload-policies consistent behavior 
logic (#20646)
    
    Co-authored-by: tison <[email protected]>
---
 .../pulsar/admin/cli/PulsarAdminToolTest.java      |  55 ++++++++
 .../org/apache/pulsar/admin/cli/CmdNamespaces.java |  18 ++-
 .../org/apache/pulsar/admin/cli/CmdTopics.java     | 141 ++++++++++++++++++---
 3 files changed, 188 insertions(+), 26 deletions(-)

diff --git 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 282336b4c37..8509d037cba 100644
--- 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -356,6 +356,33 @@ public class PulsarAdminToolTest {
     }
 
     @Test
+    public void namespacesSetOffloadPolicies() throws Exception {
+        PulsarAdmin admin = Mockito.mock(PulsarAdmin.class);
+        Namespaces mockNamespaces = mock(Namespaces.class);
+        when(admin.namespaces()).thenReturn(mockNamespaces);
+        Lookup mockLookup = mock(Lookup.class);
+        when(admin.lookups()).thenReturn(mockLookup);
+
+        // filesystem offload
+        CmdNamespaces namespaces = new CmdNamespaces(() -> admin);
+        namespaces.run(split(
+          "set-offload-policies myprop/clust/ns2 -d filesystem -oat 100M -oats 
1h -oae 1h -orp bookkeeper-first"));
+        verify(mockNamespaces).setOffloadPolicies("myprop/clust/ns2",
+          OffloadPoliciesImpl.create("filesystem", null, null,
+            null, null, null, null, null, 64 * 1024 * 1024, 1024 * 1024,
+            100 * 1024 * 1024L, 3600L, 3600 * 1000L, 
OffloadedReadPriority.BOOKKEEPER_FIRST));
+
+        // S3 offload
+        CmdNamespaces namespaces2 = new CmdNamespaces(() -> admin);
+        namespaces2.run(split(
+          "set-offload-policies myprop/clust/ns1 -r test-region -d aws-s3 -b 
test-bucket -e http://test.endpoint -mbs 32M -rbs 5M -oat 10M -oats 100 -oae 
10s -orp tiered-storage-first"));
+        verify(mockNamespaces).setOffloadPolicies("myprop/clust/ns1",
+          OffloadPoliciesImpl.create("aws-s3", "test-region", "test-bucket",
+            "http://test.endpoint",null, null, null, null, 32 * 1024 * 1024, 5 
* 1024 * 1024,
+            10 * 1024 * 1024L, 100L, 10000L, 
OffloadedReadPriority.TIERED_STORAGE_FIRST));
+    }
+
+        @Test
     public void namespaces() throws Exception {
         PulsarAdmin admin = Mockito.mock(PulsarAdmin.class);
         Namespaces mockNamespaces = mock(Namespaces.class);
@@ -1455,6 +1482,34 @@ public class PulsarAdminToolTest {
         
verify(mockGlobalTopicsPolicies).removeAutoSubscriptionCreation("persistent://prop/clust/ns1/ds1");
     }
 
+    @Test
+    public void topicsSetOffloadPolicies() throws Exception {
+        PulsarAdmin admin = Mockito.mock(PulsarAdmin.class);
+        Topics mockTopics = mock(Topics.class);
+        when(admin.topics()).thenReturn(mockTopics);
+        Schemas mockSchemas = mock(Schemas.class);
+        when(admin.schemas()).thenReturn(mockSchemas);
+        Lookup mockLookup = mock(Lookup.class);
+        when(admin.lookups()).thenReturn(mockLookup);
+
+        // filesystem offload
+        CmdTopics cmdTopics = new CmdTopics(() -> admin);
+        cmdTopics.run(split("set-offload-policies 
persistent://myprop/clust/ns1/ds1 -d filesystem -oat 100M -oats 1h -oae 1h -orp 
bookkeeper-first"));
+        OffloadPoliciesImpl offloadPolicies = 
OffloadPoliciesImpl.create("filesystem", null, null
+          , null, null, null, null, null, 64 * 1024 * 1024, 1024 * 1024,
+          100 * 1024 * 1024L, 3600L, 3600 * 1000L, 
OffloadedReadPriority.BOOKKEEPER_FIRST);
+        
verify(mockTopics).setOffloadPolicies("persistent://myprop/clust/ns1/ds1", 
offloadPolicies);
+
+//         S3 offload
+        CmdTopics cmdTopics2 = new CmdTopics(() -> admin);
+        cmdTopics2.run(split("set-offload-policies 
persistent://myprop/clust/ns1/ds2 -d s3 -r region -b bucket -e endpoint -ts 50 
-m 8 -rb 9 -t 10 -orp tiered-storage-first"));
+        OffloadPoliciesImpl offloadPolicies2 = 
OffloadPoliciesImpl.create("s3", "region", "bucket"
+          , "endpoint", null, null, null, null,
+          8, 9, 10L, 50L, null, OffloadedReadPriority.TIERED_STORAGE_FIRST);
+        
verify(mockTopics).setOffloadPolicies("persistent://myprop/clust/ns1/ds2", 
offloadPolicies2);
+    }
+
+
     @Test
     public void topics() throws Exception {
         PulsarAdmin admin = Mockito.mock(PulsarAdmin.class);
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index 998591f8177..3d18b97060a 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -2229,7 +2229,7 @@ public class CmdNamespaces extends CmdBase {
         @Parameter(
                 names = {"--bucket", "-b"},
                 description = "Bucket to place offloaded ledger into",
-                required = true)
+                required = false)
         private String bucket;
 
         @Parameter(
@@ -2265,7 +2265,8 @@ public class CmdNamespaces extends CmdBase {
 
         @Parameter(
                 names = {"--maxBlockSize", "-mbs"},
-                description = "Max block size (eg: 32M, 64M), default is 64MB",
+                description = "Max block size (eg: 32M, 64M), default is 64MB"
+                  + "s3 and google-cloud-storage requires this parameter",
                 required = false)
         private String maxBlockSizeStr;
 
@@ -2277,7 +2278,8 @@ public class CmdNamespaces extends CmdBase {
 
         @Parameter(
                 names = {"--offloadAfterElapsed", "-oae"},
-                description = "Offload after elapsed in minutes (or minutes, 
hours,days,weeks eg: 100m, 3h, 2d, 5w).",
+                description = "Delay time in Millis for deleting the 
bookkeeper ledger after offload "
+                    + "(or seconds,minutes,hours,days,weeks eg: 10s, 100m, 3h, 
2d, 5w).",
                 required = false)
         private String offloadAfterElapsedStr;
 
@@ -2289,7 +2291,7 @@ public class CmdNamespaces extends CmdBase {
 
         @Parameter(
                 names = {"--offloadAfterThresholdInSeconds", "-oats"},
-                description = "Offload after threshold seconds (eg: 1,5,10)",
+                description = "Offload after threshold seconds (or 
minutes,hours,days,weeks eg: 100m, 3h, 2d, 5w).",
                 required = false
         )
         private String offloadAfterThresholdInSecondsStr;
@@ -2390,7 +2392,13 @@ public class CmdNamespaces extends CmdBase {
 
             Long offloadThresholdInSeconds = 
OffloadPoliciesImpl.DEFAULT_OFFLOAD_THRESHOLD_IN_SECONDS;
             if (StringUtils.isNotEmpty(offloadAfterThresholdInSecondsStr)) {
-                long offloadThresholdInSeconds0 = 
Long.parseLong(offloadAfterThresholdInSecondsStr.trim());
+                Long offloadThresholdInSeconds0;
+                try {
+                    offloadThresholdInSeconds0 = TimeUnit.SECONDS.toSeconds(
+                      
RelativeTimeUtil.parseRelativeTimeInSeconds(offloadAfterThresholdInSecondsStr.trim()));
+                } catch (IllegalArgumentException exception) {
+                    throw new ParameterException(exception.getMessage());
+                }
                 if (maxValueCheck("OffloadAfterThresholdInSeconds", 
offloadThresholdInSeconds0, Long.MAX_VALUE)) {
                     offloadThresholdInSeconds = offloadThresholdInSeconds0;
                 }
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index f96410749ae..b07ad005661 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -23,6 +23,7 @@ import com.beust.jcommander.IUsageFormatter;
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.ParameterException;
 import com.beust.jcommander.Parameters;
+import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
@@ -47,6 +48,7 @@ import java.util.concurrent.TimeoutException;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import lombok.Getter;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.admin.ListTopicsOptions;
 import org.apache.pulsar.client.admin.LongRunningProcessStatus;
 import org.apache.pulsar.client.admin.OffloadProcessStatus;
@@ -67,6 +69,7 @@ import 
org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
 import org.apache.pulsar.common.policies.data.DispatchRate;
 import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
 import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
+import org.apache.pulsar.common.policies.data.OffloadPolicies;
 import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
 import org.apache.pulsar.common.policies.data.OffloadedReadPriority;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
@@ -2143,27 +2146,32 @@ public class CmdTopics extends CmdBase {
                 , description = "S3 role session name used for 
STSAssumeRoleSessionCredentialsProvider")
         private String s3RoleSessionName;
 
-        @Parameter(names = {"-m", "--maxBlockSizeInBytes"},
-                description = "ManagedLedger offload max block Size in bytes,"
-                + "s3 and google-cloud-storage requires this parameter")
-        private int maxBlockSizeInBytes;
+        @Parameter(
+                names = {"-m", "--maxBlockSizeInBytes", "--maxBlockSize", 
"-mbs"},
+                description = "Max block size (eg: 32M, 64M), default is 64MB"
+                + "s3 and google-cloud-storage requires this parameter",
+                required = false)
+        private String maxBlockSizeStr;
 
-        @Parameter(names = {"-rb", "--readBufferSizeInBytes"},
-                description = "ManagedLedger offload read buffer size in 
bytes,"
-                + "s3 and google-cloud-storage requires this parameter")
-        private int readBufferSizeInBytes;
+        @Parameter(
+                names = {"-rb", "--readBufferSizeInBytes", "--readBufferSize", 
"-rbs"},
+                description = "Read buffer size (eg: 1M, 5M), default is 1MB"
+                + "s3 and google-cloud-storage requires this parameter",
+                required = false)
+        private String readBufferSizeStr;
 
-        @Parameter(names = {"-t", "--offloadThresholdInBytes"}
-                , description = "ManagedLedger offload threshold in bytes", 
required = true)
-        private long offloadThresholdInBytes;
+        @Parameter(names = {"-t", "--offloadThresholdInBytes", 
"--offloadAfterThreshold", "-oat"}
+                , description = "Offload after threshold size (eg: 1M, 5M)", 
required = false)
+        private String offloadAfterThresholdStr;
 
-        @Parameter(names = {"-ts", "--offloadThresholdInSeconds"}
-                , description = "ManagedLedger offload threshold in seconds")
-        private Long offloadThresholdInSeconds;
+        @Parameter(names = {"-ts", "--offloadThresholdInSeconds", 
"--offloadAfterThresholdInSeconds", "-oats"},
+          description = "Offload after threshold seconds (or 
minutes,hours,days,weeks eg: 100m, 3h, 2d, 5w).")
+        private String offloadAfterThresholdInSecondsStr;
 
-        @Parameter(names = {"-dl", "--offloadDeletionLagInMillis"}
-                , description = "ManagedLedger offload deletion lag in bytes")
-        private Long offloadDeletionLagInMillis;
+        @Parameter(names = {"-dl", "--offloadDeletionLagInMillis", 
"--offloadAfterElapsed", "-oae"}
+                , description = "Delay time in Millis for deleting the 
bookkeeper ledger after offload "
+          + "(or seconds,minutes,hours,days,weeks eg: 10s, 100m, 3h, 2d, 5w).")
+        private String offloadAfterElapsedStr;
 
         @Parameter(names = {"--offloadedReadPriority", "-orp"},
                 description = "Read priority for offloaded messages. "
@@ -2175,10 +2183,102 @@ public class CmdTopics extends CmdBase {
         )
         private String offloadReadPriorityStr;
 
+        public final List<String> driverNames = 
OffloadPoliciesImpl.DRIVER_NAMES;
+
+        public boolean driverSupported(String driver) {
+            return driverNames.stream().anyMatch(d -> 
d.equalsIgnoreCase(driver));
+        }
+
+        public boolean isS3Driver(String driver) {
+            if (StringUtils.isEmpty(driver)) {
+                return false;
+            }
+            return driver.equalsIgnoreCase(driverNames.get(0)) || 
driver.equalsIgnoreCase(driverNames.get(1));
+        }
+
+        public boolean positiveCheck(String paramName, long value) {
+            if (value <= 0) {
+                throw new ParameterException(paramName + " is not be negative 
or 0!");
+            }
+            return true;
+        }
+
+        public boolean maxValueCheck(String paramName, long value, long 
maxValue) {
+            if (value > maxValue) {
+                throw new ParameterException(paramName + " is not bigger than 
" + maxValue + "!");
+            }
+            return true;
+        }
+
         @Override
         void run() throws PulsarAdminException {
             String persistentTopic = validatePersistentTopic(params);
 
+            if (!driverSupported(driver)) {
+                throw new ParameterException(
+                  "The driver " + driver + " is not supported, "
+                    + "(Possible values: " + String.join(",", driverNames) + 
").");
+            }
+            if (isS3Driver(driver) && Strings.isNullOrEmpty(region) && 
Strings.isNullOrEmpty(endpoint)) {
+                throw new ParameterException(
+                  "Either s3ManagedLedgerOffloadRegion or 
s3ManagedLedgerOffloadServiceEndpoint must be set"
+                    + " if s3 offload enabled");
+            }
+
+            int maxBlockSizeInBytes = 
OffloadPoliciesImpl.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES;
+            if (StringUtils.isNotEmpty(maxBlockSizeStr)) {
+                long maxBlockSize = validateSizeString(maxBlockSizeStr);
+                if (positiveCheck("MaxBlockSize", maxBlockSize)
+                  && maxValueCheck("MaxBlockSize", maxBlockSize, 
Integer.MAX_VALUE)) {
+                    maxBlockSizeInBytes = 
Long.valueOf(maxBlockSize).intValue();
+                }
+            }
+
+            int readBufferSizeInBytes = 
OffloadPoliciesImpl.DEFAULT_READ_BUFFER_SIZE_IN_BYTES;
+            if (StringUtils.isNotEmpty(readBufferSizeStr)) {
+                long readBufferSize = validateSizeString(readBufferSizeStr);
+                if (positiveCheck("ReadBufferSize", readBufferSize)
+                  && maxValueCheck("ReadBufferSize", readBufferSize, 
Integer.MAX_VALUE)) {
+                    readBufferSizeInBytes = 
Long.valueOf(readBufferSize).intValue();
+                }
+            }
+
+            Long offloadAfterElapsedInMillis = 
OffloadPoliciesImpl.DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS;
+            if (StringUtils.isNotEmpty(offloadAfterElapsedStr)) {
+                Long offloadAfterElapsed;
+                try {
+                    offloadAfterElapsed = TimeUnit.SECONDS.toMillis(
+                      
RelativeTimeUtil.parseRelativeTimeInSeconds(offloadAfterElapsedStr));
+                } catch (IllegalArgumentException exception) {
+                    throw new ParameterException(exception.getMessage());
+                }
+                if (maxValueCheck("OffloadAfterElapsed", offloadAfterElapsed, 
Long.MAX_VALUE)) {
+                    offloadAfterElapsedInMillis = offloadAfterElapsed;
+                }
+            }
+
+            Long offloadAfterThresholdInBytes = 
OffloadPoliciesImpl.DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES;
+            if (StringUtils.isNotEmpty(offloadAfterThresholdStr)) {
+                long offloadAfterThreshold = 
validateSizeString(offloadAfterThresholdStr);
+                if (maxValueCheck("OffloadAfterThreshold", 
offloadAfterThreshold, Long.MAX_VALUE)) {
+                    offloadAfterThresholdInBytes = offloadAfterThreshold;
+                }
+            }
+
+            Long offloadThresholdInSeconds = 
OffloadPoliciesImpl.DEFAULT_OFFLOAD_THRESHOLD_IN_SECONDS;
+            if (StringUtils.isNotEmpty(offloadAfterThresholdInSecondsStr)) {
+                Long offloadThresholdInSeconds0;
+                try {
+                    offloadThresholdInSeconds0 = TimeUnit.SECONDS.toSeconds(
+                      
RelativeTimeUtil.parseRelativeTimeInSeconds(offloadAfterThresholdInSecondsStr.trim()));
+                } catch (IllegalArgumentException exception) {
+                    throw new ParameterException(exception.getMessage());
+                }
+                if (maxValueCheck("OffloadAfterThresholdInSeconds", 
offloadThresholdInSeconds0, Long.MAX_VALUE)) {
+                    offloadThresholdInSeconds = offloadThresholdInSeconds0;
+                }
+            }
+
             OffloadedReadPriority offloadedReadPriority = 
OffloadPoliciesImpl.DEFAULT_OFFLOADED_READ_PRIORITY;
 
             if (this.offloadReadPriorityStr != null) {
@@ -2193,12 +2293,11 @@ public class CmdTopics extends CmdBase {
                 }
             }
 
-            OffloadPoliciesImpl offloadPolicies = 
OffloadPoliciesImpl.create(driver, region, bucket, endpoint,
+            OffloadPolicies offloadPolicies = 
OffloadPoliciesImpl.create(driver, region, bucket, endpoint,
                     s3Role, s3RoleSessionName,
                     awsId, awsSecret,
-                    maxBlockSizeInBytes,
-                    readBufferSizeInBytes, offloadThresholdInBytes, 
offloadThresholdInSeconds,
-                    offloadDeletionLagInMillis, offloadedReadPriority);
+                    maxBlockSizeInBytes, readBufferSizeInBytes, 
offloadAfterThresholdInBytes,
+                    offloadThresholdInSeconds, offloadAfterElapsedInMillis, 
offloadedReadPriority);
 
             getTopics().setOffloadPolicies(persistentTopic, offloadPolicies);
         }

Reply via email to