This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 980dfc81a51 [improve][cli] Add some checks for topic-level 
`setOffloadPolicies` (#20943)
980dfc81a51 is described below

commit 980dfc81a5165147615232692e7b5a71b61c71a7
Author: Jiwei Guo <[email protected]>
AuthorDate: Mon Aug 14 09:27:36 2023 +0800

    [improve][cli] Add some checks for topic-level `setOffloadPolicies` (#20943)
---
 .../apache/pulsar/broker/admin/AdminResource.java  | 23 ++++++++++++
 .../pulsar/broker/admin/impl/NamespacesBase.java   | 22 -----------
 .../broker/admin/impl/PersistentTopicsBase.java    |  4 +-
 .../pulsar/broker/admin/v2/PersistentTopics.java   |  1 +
 .../pulsar/broker/admin/AdminApiOffloadTest.java   | 17 +++++----
 .../org/apache/pulsar/admin/cli/CmdNamespaces.java | 16 +-------
 .../apache/pulsar/admin/cli/CmdTopicPolicies.java  | 43 +++++++++++++++++++---
 .../apache/pulsar/admin/cli/utils/CmdUtils.java    | 14 +++++++
 8 files changed, 89 insertions(+), 51 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 828b18c2df9..e9beab90b5f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -58,6 +58,7 @@ import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.BundlesData;
 import org.apache.pulsar.common.policies.data.EntryFilters;
 import org.apache.pulsar.common.policies.data.NamespaceOperation;
+import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.PolicyName;
@@ -852,4 +853,26 @@ public abstract class AdminResource extends 
PulsarWebResource {
     protected AuthorizationService getAuthorizationService() {
         return pulsar().getBrokerService().getAuthorizationService();
     }
+
+    protected void validateOffloadPolicies(OffloadPoliciesImpl 
offloadPolicies) {
+        if (offloadPolicies == null) {
+            log.warn("[{}] Failed to update offload configuration for 
namespace {}: offloadPolicies is null",
+                    clientAppId(), namespaceName);
+            throw new RestException(Status.PRECONDITION_FAILED,
+                    "The offloadPolicies must be specified for namespace 
offload.");
+        }
+        if (!offloadPolicies.driverSupported()) {
+            log.warn("[{}] Failed to update offload configuration for 
namespace {}: "
+                            + "driver is not supported, support value: {}",
+                    clientAppId(), namespaceName, 
OffloadPoliciesImpl.getSupportedDriverNames());
+            throw new RestException(Status.PRECONDITION_FAILED,
+                    "The driver is not supported, support value: " + 
OffloadPoliciesImpl.getSupportedDriverNames());
+        }
+        if (!offloadPolicies.bucketValid()) {
+            log.warn("[{}] Failed to update offload configuration for 
namespace {}: bucket must be specified",
+                    clientAppId(), namespaceName);
+            throw new RestException(Status.PRECONDITION_FAILED,
+                    "The bucket must be specified for namespace offload.");
+        }
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 789a8e6dbdc..ae36c5c30cf 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -2316,28 +2316,6 @@ public abstract class NamespacesBase extends 
AdminResource {
         }
     }
 
-    private void validateOffloadPolicies(OffloadPoliciesImpl offloadPolicies) {
-        if (offloadPolicies == null) {
-            log.warn("[{}] Failed to update offload configuration for 
namespace {}: offloadPolicies is null",
-                    clientAppId(), namespaceName);
-            throw new RestException(Status.PRECONDITION_FAILED,
-                    "The offloadPolicies must be specified for namespace 
offload.");
-        }
-        if (!offloadPolicies.driverSupported()) {
-            log.warn("[{}] Failed to update offload configuration for 
namespace {}: "
-                            + "driver is not supported, support value: {}",
-                    clientAppId(), namespaceName, 
OffloadPoliciesImpl.getSupportedDriverNames());
-            throw new RestException(Status.PRECONDITION_FAILED,
-                    "The driver is not supported, support value: " + 
OffloadPoliciesImpl.getSupportedDriverNames());
-        }
-        if (!offloadPolicies.bucketValid()) {
-            log.warn("[{}] Failed to update offload configuration for 
namespace {}: bucket must be specified",
-                    clientAppId(), namespaceName);
-            throw new RestException(Status.PRECONDITION_FAILED,
-                    "The bucket must be specified for namespace offload.");
-        }
-    }
-
    protected void internalRemoveMaxTopicsPerNamespace() {
         validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_TOPICS, 
PolicyOperation.WRITE);
         internalSetMaxTopicsPerNamespace(null);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 682eb7b12e9..56598f5cc45 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -940,8 +940,8 @@ public class PersistentTopicsBase extends AdminResource {
             });
     }
 
-    protected CompletableFuture<Void> internalSetOffloadPolicies
-            (OffloadPoliciesImpl offloadPolicies, boolean isGlobal) {
+    protected CompletableFuture<Void> 
internalSetOffloadPolicies(OffloadPoliciesImpl offloadPolicies,
+                                                                 boolean 
isGlobal) {
         return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
             .thenCompose(op -> {
                 TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 1927d4b244a..df50c2721b2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -380,6 +380,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiParam(value = "Offload policies for the specified topic") 
OffloadPoliciesImpl offloadPolicies) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
+            .thenAccept(__ -> validateOffloadPolicies(offloadPolicies))
             .thenCompose(__ -> internalSetOffloadPolicies(offloadPolicies, 
isGlobal))
             .thenRun(() -> asyncResponse.resume(Response.noContent().build()))
             .exceptionally(ex -> {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
index c3265897b87..95b0d48c69a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
@@ -213,6 +213,8 @@ public class AdminApiOffloadTest extends 
MockedPulsarServiceBaseTest {
         OffloadPoliciesImpl offloadPolicies = (OffloadPoliciesImpl) 
admin.topics().getOffloadPolicies(topicName);
         assertNull(offloadPolicies);
         OffloadPoliciesImpl offload = new OffloadPoliciesImpl();
+        offload.setManagedLedgerOffloadDriver("S3");
+        offload.setManagedLedgerOffloadBucket("bucket");
         String path = "fileSystemPath";
         offload.setFileSystemProfilePath(path);
         admin.topics().setOffloadPolicies(topicName, offload);
@@ -404,12 +406,13 @@ public class AdminApiOffloadTest extends 
MockedPulsarServiceBaseTest {
         //3 construct a topic level offloadPolicies
         OffloadPoliciesImpl offloadPolicies = new OffloadPoliciesImpl();
         offloadPolicies.setOffloadersDirectory(".");
-        offloadPolicies.setManagedLedgerOffloadDriver("mock");
+        offloadPolicies.setManagedLedgerOffloadDriver("S3");
+        offloadPolicies.setManagedLedgerOffloadBucket("bucket");
         offloadPolicies.setManagedLedgerOffloadPrefetchRounds(10);
         offloadPolicies.setManagedLedgerOffloadThresholdInBytes(1024L);
 
         LedgerOffloader topicOffloader = mock(LedgerOffloader.class);
-        when(topicOffloader.getOffloadDriverName()).thenReturn("mock");
+        when(topicOffloader.getOffloadDriverName()).thenReturn("S3");
         
doReturn(topicOffloader).when(pulsar).createManagedLedgerOffloader(any());
 
         //4 set topic level offload policies
@@ -423,18 +426,18 @@ public class AdminApiOffloadTest extends 
MockedPulsarServiceBaseTest {
                         
.getTopic(TopicName.get(topicName).getPartition(i).toString(), 
false).get().get();
                 
assertNotNull(topic.getManagedLedger().getConfig().getLedgerOffloader());
                 
assertEquals(topic.getManagedLedger().getConfig().getLedgerOffloader().getOffloadDriverName()
-                        , "mock");
+                        , "S3");
             }
         } else {
             PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService()
                     .getTopic(topicName, false).get().get();
             
assertNotNull(topic.getManagedLedger().getConfig().getLedgerOffloader());
             
assertEquals(topic.getManagedLedger().getConfig().getLedgerOffloader().getOffloadDriverName()
-                    , "mock");
+                    , "S3");
         }
         //6 remove topic level offload policy, offloader should become 
namespaceOffloader
         LedgerOffloader namespaceOffloader = mock(LedgerOffloader.class);
-        when(namespaceOffloader.getOffloadDriverName()).thenReturn("s3");
+        when(namespaceOffloader.getOffloadDriverName()).thenReturn("S3");
         Map<NamespaceName, LedgerOffloader> map = new HashMap<>();
         map.put(TopicName.get(topicName).getNamespaceObject(), 
namespaceOffloader);
         doReturn(map).when(pulsar).getLedgerOffloaderMap();
@@ -450,14 +453,14 @@ public class AdminApiOffloadTest extends 
MockedPulsarServiceBaseTest {
                         
.getTopicIfExists(TopicName.get(topicName).getPartition(i).toString()).get().get();
                 
assertNotNull(topic.getManagedLedger().getConfig().getLedgerOffloader());
                 
assertEquals(topic.getManagedLedger().getConfig().getLedgerOffloader().getOffloadDriverName()
-                        , "s3");
+                        , "S3");
             }
         } else {
             PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService()
                     .getTopic(topicName, false).get().get();
             
assertNotNull(topic.getManagedLedger().getConfig().getLedgerOffloader());
             
assertEquals(topic.getManagedLedger().getConfig().getLedgerOffloader().getOffloadDriverName()
-                    , "s3");
+                    , "S3");
         }
     }
 
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index 5901d7c177e..33277fdb608 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.admin.cli;
 
+import static org.apache.pulsar.admin.cli.utils.CmdUtils.maxValueCheck;
+import static org.apache.pulsar.admin.cli.utils.CmdUtils.positiveCheck;
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.ParameterException;
 import com.beust.jcommander.Parameters;
@@ -2319,20 +2321,6 @@ public class CmdNamespaces extends CmdBase {
             return driver.equalsIgnoreCase(driverNames.get(0)) || 
driver.equalsIgnoreCase(driverNames.get(1));
         }
 
-        public boolean positiveCheck(String paramName, long value) {
-            if (value <= 0) {
-                throw new ParameterException(paramName + " cannot be less than 
or equal to 0!");
-            }
-            return true;
-        }
-
-        public boolean maxValueCheck(String paramName, long value, long 
maxValue) {
-            if (value > maxValue) {
-                throw new ParameterException(paramName + " cannot be greater 
than " + maxValue + "!");
-            }
-            return true;
-        }
-
         @Override
         void run() throws PulsarAdminException {
             String namespace = validateNamespace(params);
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
index d567d0b3671..2914a5e8a08 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
@@ -18,9 +18,12 @@
  */
 package org.apache.pulsar.admin.cli;
 
+import static org.apache.pulsar.admin.cli.utils.CmdUtils.maxValueCheck;
+import static org.apache.pulsar.admin.cli.utils.CmdUtils.positiveCheck;
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.ParameterException;
 import com.beust.jcommander.Parameters;
+import com.google.common.base.Strings;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
@@ -1764,24 +1767,24 @@ public class CmdTopicPolicies extends CmdBase {
         @Parameter(names = {"-m", "--maxBlockSizeInBytes"},
                 description = "ManagedLedger offload max block Size in bytes,"
                         + "s3 and google-cloud-storage requires this 
parameter")
-        private int maxBlockSizeInBytes;
+        private int maxBlockSizeInBytes = 
OffloadPoliciesImpl.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES;
 
         @Parameter(names = {"-rb", "--readBufferSizeInBytes"},
                 description = "ManagedLedger offload read buffer size in 
bytes,"
                         + "s3 and google-cloud-storage requires this 
parameter")
-        private int readBufferSizeInBytes;
+        private int readBufferSizeInBytes = 
OffloadPoliciesImpl.DEFAULT_READ_BUFFER_SIZE_IN_BYTES;
 
         @Parameter(names = {"-t", "--offloadThresholdInBytes"}
-                , description = "ManagedLedger offload threshold in bytes", 
required = true)
-        private long offloadThresholdInBytes;
+                , description = "ManagedLedger offload threshold in bytes")
+        private Long offloadThresholdInBytes = 
OffloadPoliciesImpl.DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES;
 
         @Parameter(names = {"-ts", "--offloadThresholdInSeconds"}
                 , description = "ManagedLedger offload threshold in seconds")
-        private Long offloadThresholdInSeconds;
+        private Long offloadThresholdInSeconds = 
OffloadPoliciesImpl.DEFAULT_OFFLOAD_THRESHOLD_IN_SECONDS;
 
         @Parameter(names = {"-dl", "--offloadDeletionLagInMillis"}
                 , description = "ManagedLedger offload deletion lag in bytes")
-        private Long offloadDeletionLagInMillis;
+        private Long offloadDeletionLagInMillis = 
OffloadPoliciesImpl.DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS;
 
         @Parameter(
                 names = {"--offloadedReadPriority", "-orp"},
@@ -1798,10 +1801,38 @@ public class CmdTopicPolicies extends CmdBase {
                 + "If set to true, the policy will be replicate to other 
clusters asynchronously")
         private boolean isGlobal = false;
 
+        public final List<String> driverNames = 
OffloadPoliciesImpl.DRIVER_NAMES;
+
+        public boolean driverSupported(String driver) {
+            return driverNames.stream().anyMatch(d -> 
d.equalsIgnoreCase(driver));
+        }
+
+        public boolean isS3Driver(String driver) {
+            if (StringUtils.isEmpty(driver)) {
+                return false;
+            }
+            return driver.equalsIgnoreCase(driverNames.get(0)) || 
driver.equalsIgnoreCase(driverNames.get(1));
+        }
+
         @Override
         void run() throws PulsarAdminException {
             String persistentTopic = validatePersistentTopic(params);
 
+            if (!driverSupported(driver)) {
+                throw new ParameterException("The driver " + driver + " is not 
supported, "
+                        + "(Possible values: " + String.join(",", driverNames) 
+ ").");
+            }
+
+            if (isS3Driver(driver) && Strings.isNullOrEmpty(region) && 
Strings.isNullOrEmpty(endpoint)) {
+                throw new ParameterException(
+                        "Either s3ManagedLedgerOffloadRegion or 
s3ManagedLedgerOffloadServiceEndpoint must be set"
+                                + " if s3 offload enabled");
+            }
+            positiveCheck("maxBlockSizeInBytes", maxBlockSizeInBytes);
+            maxValueCheck("maxBlockSizeInBytes", maxBlockSizeInBytes, 
Integer.MAX_VALUE);
+            positiveCheck("readBufferSizeInBytes", readBufferSizeInBytes);
+            maxValueCheck("readBufferSizeInBytes", readBufferSizeInBytes, 
Integer.MAX_VALUE);
+
             OffloadedReadPriority offloadedReadPriority = 
OffloadPoliciesImpl.DEFAULT_OFFLOADED_READ_PRIORITY;
 
             if (this.offloadReadPriorityStr != null) {
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/utils/CmdUtils.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/utils/CmdUtils.java
index a8659e066e4..a4db39f9cc9 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/utils/CmdUtils.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/utils/CmdUtils.java
@@ -56,4 +56,18 @@ public class CmdUtils {
             }
         }
     }
+
+    public static boolean positiveCheck(String paramName, long value) {
+        if (value <= 0) {
+            throw new ParameterException(paramName + " cannot be less than or 
equal to 0!");
+        }
+        return true;
+    }
+
+    public static boolean maxValueCheck(String paramName, long value, long 
maxValue) {
+        if (value > maxValue) {
+            throw new ParameterException(paramName + " cannot be greater than 
" + maxValue + "!");
+        }
+        return true;
+    }
 }

Reply via email to