This is an automated email from the ASF dual-hosted git repository.
tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 204905ee054 [improve][offload]keep topic/ns set-offload-policies
consistent behavior logic (#20646)
204905ee054 is described below
commit 204905ee0548c3a1dd5b297fea999456d266a686
Author: YingQun Zhong <[email protected]>
AuthorDate: Thu Jun 29 12:10:52 2023 +0800
[improve][offload]keep topic/ns set-offload-policies consistent behavior
logic (#20646)
Co-authored-by: tison <[email protected]>
---
.../pulsar/admin/cli/PulsarAdminToolTest.java | 55 ++++++++
.../org/apache/pulsar/admin/cli/CmdNamespaces.java | 18 ++-
.../org/apache/pulsar/admin/cli/CmdTopics.java | 141 ++++++++++++++++++---
3 files changed, 188 insertions(+), 26 deletions(-)
diff --git
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 282336b4c37..8509d037cba 100644
---
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -356,6 +356,33 @@ public class PulsarAdminToolTest {
}
@Test
+ public void namespacesSetOffloadPolicies() throws Exception {
+ PulsarAdmin admin = Mockito.mock(PulsarAdmin.class);
+ Namespaces mockNamespaces = mock(Namespaces.class);
+ when(admin.namespaces()).thenReturn(mockNamespaces);
+ Lookup mockLookup = mock(Lookup.class);
+ when(admin.lookups()).thenReturn(mockLookup);
+
+ // filesystem offload
+ CmdNamespaces namespaces = new CmdNamespaces(() -> admin);
+ namespaces.run(split(
+ "set-offload-policies myprop/clust/ns2 -d filesystem -oat 100M -oats
1h -oae 1h -orp bookkeeper-first"));
+ verify(mockNamespaces).setOffloadPolicies("myprop/clust/ns2",
+ OffloadPoliciesImpl.create("filesystem", null, null,
+ null, null, null, null, null, 64 * 1024 * 1024, 1024 * 1024,
+ 100 * 1024 * 1024L, 3600L, 3600 * 1000L,
OffloadedReadPriority.BOOKKEEPER_FIRST));
+
+ // S3 offload
+ CmdNamespaces namespaces2 = new CmdNamespaces(() -> admin);
+ namespaces2.run(split(
+ "set-offload-policies myprop/clust/ns1 -r test-region -d aws-s3 -b
test-bucket -e http://test.endpoint -mbs 32M -rbs 5M -oat 10M -oats 100 -oae
10s -orp tiered-storage-first"));
+ verify(mockNamespaces).setOffloadPolicies("myprop/clust/ns1",
+ OffloadPoliciesImpl.create("aws-s3", "test-region", "test-bucket",
+ "http://test.endpoint",null, null, null, null, 32 * 1024 * 1024, 5
* 1024 * 1024,
+ 10 * 1024 * 1024L, 100L, 10000L,
OffloadedReadPriority.TIERED_STORAGE_FIRST));
+ }
+
+ @Test
public void namespaces() throws Exception {
PulsarAdmin admin = Mockito.mock(PulsarAdmin.class);
Namespaces mockNamespaces = mock(Namespaces.class);
@@ -1455,6 +1482,34 @@ public class PulsarAdminToolTest {
verify(mockGlobalTopicsPolicies).removeAutoSubscriptionCreation("persistent://prop/clust/ns1/ds1");
}
+ @Test
+ public void topicsSetOffloadPolicies() throws Exception {
+ PulsarAdmin admin = Mockito.mock(PulsarAdmin.class);
+ Topics mockTopics = mock(Topics.class);
+ when(admin.topics()).thenReturn(mockTopics);
+ Schemas mockSchemas = mock(Schemas.class);
+ when(admin.schemas()).thenReturn(mockSchemas);
+ Lookup mockLookup = mock(Lookup.class);
+ when(admin.lookups()).thenReturn(mockLookup);
+
+ // filesystem offload
+ CmdTopics cmdTopics = new CmdTopics(() -> admin);
+ cmdTopics.run(split("set-offload-policies
persistent://myprop/clust/ns1/ds1 -d filesystem -oat 100M -oats 1h -oae 1h -orp
bookkeeper-first"));
+ OffloadPoliciesImpl offloadPolicies =
OffloadPoliciesImpl.create("filesystem", null, null
+ , null, null, null, null, null, 64 * 1024 * 1024, 1024 * 1024,
+ 100 * 1024 * 1024L, 3600L, 3600 * 1000L,
OffloadedReadPriority.BOOKKEEPER_FIRST);
+
verify(mockTopics).setOffloadPolicies("persistent://myprop/clust/ns1/ds1",
offloadPolicies);
+
+// S3 offload
+ CmdTopics cmdTopics2 = new CmdTopics(() -> admin);
+ cmdTopics2.run(split("set-offload-policies
persistent://myprop/clust/ns1/ds2 -d s3 -r region -b bucket -e endpoint -ts 50
-m 8 -rb 9 -t 10 -orp tiered-storage-first"));
+ OffloadPoliciesImpl offloadPolicies2 =
OffloadPoliciesImpl.create("s3", "region", "bucket"
+ , "endpoint", null, null, null, null,
+ 8, 9, 10L, 50L, null, OffloadedReadPriority.TIERED_STORAGE_FIRST);
+
verify(mockTopics).setOffloadPolicies("persistent://myprop/clust/ns1/ds2",
offloadPolicies2);
+ }
+
+
@Test
public void topics() throws Exception {
PulsarAdmin admin = Mockito.mock(PulsarAdmin.class);
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index 998591f8177..3d18b97060a 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -2229,7 +2229,7 @@ public class CmdNamespaces extends CmdBase {
@Parameter(
names = {"--bucket", "-b"},
description = "Bucket to place offloaded ledger into",
- required = true)
+ required = false)
private String bucket;
@Parameter(
@@ -2265,7 +2265,8 @@ public class CmdNamespaces extends CmdBase {
@Parameter(
names = {"--maxBlockSize", "-mbs"},
- description = "Max block size (eg: 32M, 64M), default is 64MB",
+ description = "Max block size (eg: 32M, 64M), default is 64MB"
+ + "s3 and google-cloud-storage requires this parameter",
required = false)
private String maxBlockSizeStr;
@@ -2277,7 +2278,8 @@ public class CmdNamespaces extends CmdBase {
@Parameter(
names = {"--offloadAfterElapsed", "-oae"},
- description = "Offload after elapsed in minutes (or minutes,
hours,days,weeks eg: 100m, 3h, 2d, 5w).",
+ description = "Delay time in Millis for deleting the
bookkeeper ledger after offload "
+ + "(or seconds,minutes,hours,days,weeks eg: 10s, 100m, 3h,
2d, 5w).",
required = false)
private String offloadAfterElapsedStr;
@@ -2289,7 +2291,7 @@ public class CmdNamespaces extends CmdBase {
@Parameter(
names = {"--offloadAfterThresholdInSeconds", "-oats"},
- description = "Offload after threshold seconds (eg: 1,5,10)",
+ description = "Offload after threshold seconds (or
minutes,hours,days,weeks eg: 100m, 3h, 2d, 5w).",
required = false
)
private String offloadAfterThresholdInSecondsStr;
@@ -2390,7 +2392,13 @@ public class CmdNamespaces extends CmdBase {
Long offloadThresholdInSeconds =
OffloadPoliciesImpl.DEFAULT_OFFLOAD_THRESHOLD_IN_SECONDS;
if (StringUtils.isNotEmpty(offloadAfterThresholdInSecondsStr)) {
- long offloadThresholdInSeconds0 =
Long.parseLong(offloadAfterThresholdInSecondsStr.trim());
+ Long offloadThresholdInSeconds0;
+ try {
+ offloadThresholdInSeconds0 = TimeUnit.SECONDS.toSeconds(
+
RelativeTimeUtil.parseRelativeTimeInSeconds(offloadAfterThresholdInSecondsStr.trim()));
+ } catch (IllegalArgumentException exception) {
+ throw new ParameterException(exception.getMessage());
+ }
if (maxValueCheck("OffloadAfterThresholdInSeconds",
offloadThresholdInSeconds0, Long.MAX_VALUE)) {
offloadThresholdInSeconds = offloadThresholdInSeconds0;
}
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index f96410749ae..b07ad005661 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -23,6 +23,7 @@ import com.beust.jcommander.IUsageFormatter;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import com.beust.jcommander.Parameters;
+import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
@@ -47,6 +48,7 @@ import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import lombok.Getter;
+import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.admin.ListTopicsOptions;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.admin.OffloadProcessStatus;
@@ -67,6 +69,7 @@ import
org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
+import org.apache.pulsar.common.policies.data.OffloadPolicies;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
import org.apache.pulsar.common.policies.data.OffloadedReadPriority;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
@@ -2143,27 +2146,32 @@ public class CmdTopics extends CmdBase {
, description = "S3 role session name used for
STSAssumeRoleSessionCredentialsProvider")
private String s3RoleSessionName;
- @Parameter(names = {"-m", "--maxBlockSizeInBytes"},
- description = "ManagedLedger offload max block Size in bytes,"
- + "s3 and google-cloud-storage requires this parameter")
- private int maxBlockSizeInBytes;
+ @Parameter(
+ names = {"-m", "--maxBlockSizeInBytes", "--maxBlockSize",
"-mbs"},
+ description = "Max block size (eg: 32M, 64M), default is 64MB"
+ + "s3 and google-cloud-storage requires this parameter",
+ required = false)
+ private String maxBlockSizeStr;
- @Parameter(names = {"-rb", "--readBufferSizeInBytes"},
- description = "ManagedLedger offload read buffer size in
bytes,"
- + "s3 and google-cloud-storage requires this parameter")
- private int readBufferSizeInBytes;
+ @Parameter(
+ names = {"-rb", "--readBufferSizeInBytes", "--readBufferSize",
"-rbs"},
+ description = "Read buffer size (eg: 1M, 5M), default is 1MB"
+ + "s3 and google-cloud-storage requires this parameter",
+ required = false)
+ private String readBufferSizeStr;
- @Parameter(names = {"-t", "--offloadThresholdInBytes"}
- , description = "ManagedLedger offload threshold in bytes",
required = true)
- private long offloadThresholdInBytes;
+ @Parameter(names = {"-t", "--offloadThresholdInBytes",
"--offloadAfterThreshold", "-oat"}
+ , description = "Offload after threshold size (eg: 1M, 5M)",
required = false)
+ private String offloadAfterThresholdStr;
- @Parameter(names = {"-ts", "--offloadThresholdInSeconds"}
- , description = "ManagedLedger offload threshold in seconds")
- private Long offloadThresholdInSeconds;
+ @Parameter(names = {"-ts", "--offloadThresholdInSeconds",
"--offloadAfterThresholdInSeconds", "-oats"},
+ description = "Offload after threshold seconds (or
minutes,hours,days,weeks eg: 100m, 3h, 2d, 5w).")
+ private String offloadAfterThresholdInSecondsStr;
- @Parameter(names = {"-dl", "--offloadDeletionLagInMillis"}
- , description = "ManagedLedger offload deletion lag in bytes")
- private Long offloadDeletionLagInMillis;
+ @Parameter(names = {"-dl", "--offloadDeletionLagInMillis",
"--offloadAfterElapsed", "-oae"}
+ , description = "Delay time in Millis for deleting the
bookkeeper ledger after offload "
+ + "(or seconds,minutes,hours,days,weeks eg: 10s, 100m, 3h, 2d, 5w).")
+ private String offloadAfterElapsedStr;
@Parameter(names = {"--offloadedReadPriority", "-orp"},
description = "Read priority for offloaded messages. "
@@ -2175,10 +2183,102 @@ public class CmdTopics extends CmdBase {
)
private String offloadReadPriorityStr;
+ public final List<String> driverNames =
OffloadPoliciesImpl.DRIVER_NAMES;
+
+ public boolean driverSupported(String driver) {
+ return driverNames.stream().anyMatch(d ->
d.equalsIgnoreCase(driver));
+ }
+
+ public boolean isS3Driver(String driver) {
+ if (StringUtils.isEmpty(driver)) {
+ return false;
+ }
+ return driver.equalsIgnoreCase(driverNames.get(0)) ||
driver.equalsIgnoreCase(driverNames.get(1));
+ }
+
+ public boolean positiveCheck(String paramName, long value) {
+ if (value <= 0) {
+ throw new ParameterException(paramName + " is not be negative
or 0!");
+ }
+ return true;
+ }
+
+ public boolean maxValueCheck(String paramName, long value, long
maxValue) {
+ if (value > maxValue) {
+ throw new ParameterException(paramName + " is not bigger than
" + maxValue + "!");
+ }
+ return true;
+ }
+
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
+ if (!driverSupported(driver)) {
+ throw new ParameterException(
+ "The driver " + driver + " is not supported, "
+ + "(Possible values: " + String.join(",", driverNames) +
").");
+ }
+ if (isS3Driver(driver) && Strings.isNullOrEmpty(region) &&
Strings.isNullOrEmpty(endpoint)) {
+ throw new ParameterException(
+ "Either s3ManagedLedgerOffloadRegion or
s3ManagedLedgerOffloadServiceEndpoint must be set"
+ + " if s3 offload enabled");
+ }
+
+ int maxBlockSizeInBytes =
OffloadPoliciesImpl.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES;
+ if (StringUtils.isNotEmpty(maxBlockSizeStr)) {
+ long maxBlockSize = validateSizeString(maxBlockSizeStr);
+ if (positiveCheck("MaxBlockSize", maxBlockSize)
+ && maxValueCheck("MaxBlockSize", maxBlockSize,
Integer.MAX_VALUE)) {
+ maxBlockSizeInBytes =
Long.valueOf(maxBlockSize).intValue();
+ }
+ }
+
+ int readBufferSizeInBytes =
OffloadPoliciesImpl.DEFAULT_READ_BUFFER_SIZE_IN_BYTES;
+ if (StringUtils.isNotEmpty(readBufferSizeStr)) {
+ long readBufferSize = validateSizeString(readBufferSizeStr);
+ if (positiveCheck("ReadBufferSize", readBufferSize)
+ && maxValueCheck("ReadBufferSize", readBufferSize,
Integer.MAX_VALUE)) {
+ readBufferSizeInBytes =
Long.valueOf(readBufferSize).intValue();
+ }
+ }
+
+ Long offloadAfterElapsedInMillis =
OffloadPoliciesImpl.DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS;
+ if (StringUtils.isNotEmpty(offloadAfterElapsedStr)) {
+ Long offloadAfterElapsed;
+ try {
+ offloadAfterElapsed = TimeUnit.SECONDS.toMillis(
+
RelativeTimeUtil.parseRelativeTimeInSeconds(offloadAfterElapsedStr));
+ } catch (IllegalArgumentException exception) {
+ throw new ParameterException(exception.getMessage());
+ }
+ if (maxValueCheck("OffloadAfterElapsed", offloadAfterElapsed,
Long.MAX_VALUE)) {
+ offloadAfterElapsedInMillis = offloadAfterElapsed;
+ }
+ }
+
+ Long offloadAfterThresholdInBytes =
OffloadPoliciesImpl.DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES;
+ if (StringUtils.isNotEmpty(offloadAfterThresholdStr)) {
+ long offloadAfterThreshold =
validateSizeString(offloadAfterThresholdStr);
+ if (maxValueCheck("OffloadAfterThreshold",
offloadAfterThreshold, Long.MAX_VALUE)) {
+ offloadAfterThresholdInBytes = offloadAfterThreshold;
+ }
+ }
+
+ Long offloadThresholdInSeconds =
OffloadPoliciesImpl.DEFAULT_OFFLOAD_THRESHOLD_IN_SECONDS;
+ if (StringUtils.isNotEmpty(offloadAfterThresholdInSecondsStr)) {
+ Long offloadThresholdInSeconds0;
+ try {
+ offloadThresholdInSeconds0 = TimeUnit.SECONDS.toSeconds(
+
RelativeTimeUtil.parseRelativeTimeInSeconds(offloadAfterThresholdInSecondsStr.trim()));
+ } catch (IllegalArgumentException exception) {
+ throw new ParameterException(exception.getMessage());
+ }
+ if (maxValueCheck("OffloadAfterThresholdInSeconds",
offloadThresholdInSeconds0, Long.MAX_VALUE)) {
+ offloadThresholdInSeconds = offloadThresholdInSeconds0;
+ }
+ }
+
OffloadedReadPriority offloadedReadPriority =
OffloadPoliciesImpl.DEFAULT_OFFLOADED_READ_PRIORITY;
if (this.offloadReadPriorityStr != null) {
@@ -2193,12 +2293,11 @@ public class CmdTopics extends CmdBase {
}
}
- OffloadPoliciesImpl offloadPolicies =
OffloadPoliciesImpl.create(driver, region, bucket, endpoint,
+ OffloadPolicies offloadPolicies =
OffloadPoliciesImpl.create(driver, region, bucket, endpoint,
s3Role, s3RoleSessionName,
awsId, awsSecret,
- maxBlockSizeInBytes,
- readBufferSizeInBytes, offloadThresholdInBytes,
offloadThresholdInSeconds,
- offloadDeletionLagInMillis, offloadedReadPriority);
+ maxBlockSizeInBytes, readBufferSizeInBytes,
offloadAfterThresholdInBytes,
+ offloadThresholdInSeconds, offloadAfterElapsedInMillis,
offloadedReadPriority);
getTopics().setOffloadPolicies(persistentTopic, offloadPolicies);
}