This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 2530177 [Issue 7949][Tiered Storage] support aws creds per offload
policies (#7950)
2530177 is described below
commit 2530177dac83124ea06194e0de788c53c34cf3e6
Author: Alexandre DUVAL <[email protected]>
AuthorDate: Fri Nov 6 06:01:28 2020 +0100
[Issue 7949][Tiered Storage] support aws creds per offload policies (#7950)
Fixes #7949
### Motivation
Provide different s3 credentials per offloadpolicies on each ns.
### Modifications
Add awsId/awsSecret in OffloadPolicies.
### Does this pull request potentially affect one of the following parts:
- The rest endpoints: yes it adds options
- The admin cli options: yes it adds options
### Documentation
- Does this pull request introduce a new feature? yes
- If yes, how is the feature documented? docs
---
.../apache/bookkeeper/mledger/impl/OffloadPrefixTest.java | 1 +
.../apache/pulsar/broker/admin/AdminApiOffloadTest.java | 4 ++--
.../org/apache/pulsar/broker/admin/NamespacesTest.java | 4 ++++
.../org/apache/pulsar/admin/cli/PulsarAdminToolTest.java | 6 +++---
.../java/org/apache/pulsar/admin/cli/CmdNamespaces.java | 14 +++++++++++++-
.../main/java/org/apache/pulsar/admin/cli/CmdTopics.java | 10 +++++++++-
.../pulsar/common/policies/data/OffloadPolicies.java | 7 +++++++
.../pulsar/common/policies/data/OffloadPoliciesTest.java | 8 ++++++++
site2/docs/reference-pulsar-admin.md | 2 ++
9 files changed, 49 insertions(+), 7 deletions(-)
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
index ba2d678..807876e 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
@@ -991,6 +991,7 @@ public class OffloadPrefixTest extends
MockedBookKeeperTestCase {
}
OffloadPolicies offloadPolicies = OffloadPolicies.create("S3", "", "",
"",
+ null, null,
OffloadPolicies.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES,
OffloadPolicies.DEFAULT_READ_BUFFER_SIZE_IN_BYTES,
OffloadPolicies.DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES,
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
index 9591f7a..bbda407 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
@@ -167,8 +167,8 @@ public class AdminApiOffloadTest extends
MockedPulsarServiceBaseTest {
long offloadDeletionLagInMillis = 100L;
OffloadPolicies offload1 = OffloadPolicies.create(
- driver, region, bucket, endpoint, 100, 100,
- offloadThresholdInBytes, offloadDeletionLagInMillis);
+ driver, region, bucket, endpoint, null, null,
+ 100, 100, offloadThresholdInBytes, offloadDeletionLagInMillis);
admin.namespaces().setOffloadPolicies(namespaceName, offload1);
OffloadPolicies offload2 =
admin.namespaces().getOffloadPolicies(namespaceName);
assertEquals(offload1, offload2);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index 21f98a5..8bf35ff 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -1240,6 +1240,7 @@ public class NamespacesTest extends
MockedPulsarServiceBaseTest {
// the ledger config should have the expected value
ManagedLedgerConfig ledgerConf =
pulsar.getBrokerService().getManagedLedgerConfig(topicName).get();
MockLedgerOffloader offloader = new
MockLedgerOffloader(OffloadPolicies.create("S3", "", "", "",
+ null, null,
OffloadPolicies.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES,
OffloadPolicies.DEFAULT_READ_BUFFER_SIZE_IN_BYTES,
admin.namespaces().getOffloadThreshold(namespace),
@@ -1254,6 +1255,7 @@ public class NamespacesTest extends
MockedPulsarServiceBaseTest {
ledgerConf =
pulsar.getBrokerService().getManagedLedgerConfig(topicName).get();
admin.namespaces().getOffloadPolicies(namespace);
offloader = new MockLedgerOffloader(OffloadPolicies.create("S3", "",
"", "",
+ null, null,
OffloadPolicies.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES,
OffloadPolicies.DEFAULT_READ_BUFFER_SIZE_IN_BYTES,
admin.namespaces().getOffloadThreshold(namespace),
@@ -1267,6 +1269,7 @@ public class NamespacesTest extends
MockedPulsarServiceBaseTest {
assertEquals(-2, admin.namespaces().getOffloadThreshold(namespace));
ledgerConf =
pulsar.getBrokerService().getManagedLedgerConfig(topicName).get();
offloader = new MockLedgerOffloader(OffloadPolicies.create("S3", "",
"", "",
+ null, null,
OffloadPolicies.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES,
OffloadPolicies.DEFAULT_READ_BUFFER_SIZE_IN_BYTES,
admin.namespaces().getOffloadThreshold(namespace),
@@ -1280,6 +1283,7 @@ public class NamespacesTest extends
MockedPulsarServiceBaseTest {
assertEquals(-1, admin.namespaces().getOffloadThreshold(namespace));
ledgerConf =
pulsar.getBrokerService().getManagedLedgerConfig(topicName).get();
offloader = new MockLedgerOffloader(OffloadPolicies.create("S3", "",
"", "",
+ null, null,
OffloadPolicies.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES,
OffloadPolicies.DEFAULT_READ_BUFFER_SIZE_IN_BYTES,
admin.namespaces().getOffloadThreshold(namespace),
diff --git
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index d4cff2b..4e45b90 100644
---
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -543,8 +543,8 @@ public class PulsarAdminToolTest {
namespaces.run(split("set-offload-policies myprop/clust/ns1 -r
test-region -d aws-s3 -b test-bucket -e http://test.endpoint -mbs 32M -rbs 5M
-oat 10M -oae 10s"));
verify(mockNamespaces).setOffloadPolicies("myprop/clust/ns1",
OffloadPolicies.create("aws-s3", "test-region", "test-bucket",
- "http://test.endpoint", 32 * 1024 * 1024, 5 * 1024 *
1024,
- 10L * 1024 * 1024, 10000L));
+ "http://test.endpoint", null, null, 32 * 1024 * 1024,
5 * 1024 * 1024,
+ 10 * 1024 * 1024L, 10000L));
namespaces.run(split("remove-offload-policies myprop/clust/ns1"));
verify(mockNamespaces).removeOffloadPolicies("myprop/clust/ns1");
@@ -753,7 +753,7 @@ public class PulsarAdminToolTest {
cmdTopics.run(split("set-offload-policies
persistent://myprop/clust/ns1/ds1 -d s3 -r region -b bucket -e endpoint -m 8
-rb 9 -t 10"));
OffloadPolicies offloadPolicies = OffloadPolicies.create("s3",
"region", "bucket"
- , "endpoint", 8, 9, 10L, null);
+ , "endpoint", null, null, 8, 9, 10L, null);
verify(mockTopics).setOffloadPolicies("persistent://myprop/clust/ns1/ds1",
offloadPolicies);
cmdTopics.run(split("get-max-unacked-messages-on-consumer
persistent://myprop/clust/ns1/ds1"));
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index 459e450..bf07b70 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -1595,6 +1595,18 @@ public class CmdNamespaces extends CmdBase {
private String endpoint;
@Parameter(
+ names = {"--aws-id", "-i"},
+ description = "AWS Credential Id to use when using driver S3
or aws-s3",
+ required = false)
+ private String awsId;
+
+ @Parameter(
+ names = {"--aws-secret", "-s"},
+ description = "AWS Credential Secret to use when using driver
S3 or aws-s3",
+ required = false)
+ private String awsSecret;
+
+ @Parameter(
names = {"--maxBlockSize", "-mbs"},
description = "Max block size (eg: 32M, 64M), default is 64MB",
required = false)
@@ -1697,7 +1709,7 @@ public class CmdNamespaces extends CmdBase {
}
}
- OffloadPolicies offloadPolicies = OffloadPolicies.create(driver,
region, bucket, endpoint,
+ OffloadPolicies offloadPolicies = OffloadPolicies.create(driver,
region, bucket, endpoint, awsId, awsSecret,
maxBlockSizeInBytes, readBufferSizeInBytes,
offloadAfterThresholdInBytes,
offloadAfterElapsedInMillis);
admin.namespaces().setOffloadPolicies(namespace, offloadPolicies);
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 4cbf34e..d725129 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -1225,6 +1225,14 @@ public class CmdTopics extends CmdBase {
, description = "ManagedLedger offload service endpoint, only
s3 requires this parameter")
private String endpoint;
+ @Parameter(names = {"-i", "--aws-id"}
+ , description = "AWS Credential Id to use when using driver S3
or aws-s3")
+ private String awsId;
+
+ @Parameter(names = {"-s", "--aws-secret"}
+ , description = "AWS Credential Secret to use when using
driver S3 or aws-s3")
+ private String awsSecret;
+
@Parameter(names = {"-m", "--maxBlockSizeInBytes"}
, description = "ManagedLedger offload max block Size in
bytes, s3 and google-cloud-storage requires this parameter")
private int maxBlockSizeInBytes;
@@ -1244,7 +1252,7 @@ public class CmdTopics extends CmdBase {
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
- OffloadPolicies offloadPolicies = OffloadPolicies.create(driver,
region, bucket, endpoint, maxBlockSizeInBytes
+ OffloadPolicies offloadPolicies = OffloadPolicies.create(driver,
region, bucket, endpoint, awsId, awsSecret, maxBlockSizeInBytes
, readBufferSizeInBytes, offloadThresholdInBytes,
offloadDeletionLagInMillis);
admin.topics().setOffloadPolicies(persistentTopic,
offloadPolicies);
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java
index 5d2103b..91cd2aa 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java
@@ -134,6 +134,7 @@ public class OffloadPolicies implements Serializable {
private Integer managedLedgerOffloadReadBufferSizeInBytes;
public static OffloadPolicies create(String driver, String region, String
bucket, String endpoint,
+ String credentialId, String
credentialSecret,
Integer maxBlockSizeInBytes, Integer
readBufferSizeInBytes,
Long offloadThresholdInBytes, Long
offloadDeletionLagInMillis) {
OffloadPolicies offloadPolicies = new OffloadPolicies();
@@ -148,6 +149,12 @@ public class OffloadPolicies implements Serializable {
offloadPolicies.setManagedLedgerOffloadReadBufferSizeInBytes(readBufferSizeInBytes);
if (driver.equalsIgnoreCase(DRIVER_NAMES[0]) ||
driver.equalsIgnoreCase(DRIVER_NAMES[1])) {
+ if (credentialId != null) {
+ offloadPolicies.setS3ManagedLedgerOffloadRole(credentialId);
+ }
+ if (credentialSecret != null) {
+
offloadPolicies.setS3ManagedLedgerOffloadRoleSessionName(credentialSecret);
+ }
offloadPolicies.setS3ManagedLedgerOffloadRegion(region);
offloadPolicies.setS3ManagedLedgerOffloadBucket(bucket);
offloadPolicies.setS3ManagedLedgerOffloadServiceEndpoint(endpoint);
diff --git
a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java
b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java
index cb5bc78..d87887d 100644
---
a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java
+++
b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java
@@ -41,6 +41,8 @@ public class OffloadPoliciesTest {
final String driver = "aws-s3";
final String region = "test-region";
final String bucket = "test-bucket";
+ final String credentialId = "test-credential-id";
+ final String credentialSecret = "test-credential-secret";
final String endPoint = "test-endpoint";
final Integer maxBlockSizeInBytes = 5 * M;
final Integer readBufferSizeInBytes = 2 * M;
@@ -52,6 +54,8 @@ public class OffloadPoliciesTest {
region,
bucket,
endPoint,
+ credentialId,
+ credentialSecret,
maxBlockSizeInBytes,
readBufferSizeInBytes,
offloadThresholdInBytes,
@@ -76,6 +80,8 @@ public class OffloadPoliciesTest {
final String region = "test-region";
final String bucket = "test-bucket";
final String endPoint = "test-endpoint";
+ final String credentialId = "test-credential-id";
+ final String credentialSecret = "test-credential-secret";
final Integer maxBlockSizeInBytes = 5 * M;
final Integer readBufferSizeInBytes = 2 * M;
final Long offloadThresholdInBytes = 0L;
@@ -86,6 +92,8 @@ public class OffloadPoliciesTest {
region,
bucket,
endPoint,
+ credentialId,
+ credentialSecret,
maxBlockSizeInBytes,
readBufferSizeInBytes,
offloadThresholdInBytes,
diff --git a/site2/docs/reference-pulsar-admin.md
b/site2/docs/reference-pulsar-admin.md
index be4d946..8214427 100644
--- a/site2/docs/reference-pulsar-admin.md
+++ b/site2/docs/reference-pulsar-admin.md
@@ -2555,6 +2555,8 @@ Options
|`-r`, `--region`|The long term storage region||
|`-b`, `--bucket`|Bucket to place offloaded ledger into||
|`-e`, `--endpoint`|Alternative endpoint to connect to||
+|`-i`, `--aws-id`|AWS Credential Id to use when using driver S3 or aws-s3||
+|`-s`, `--aws-secret`|AWS Credential Secret to use when using driver S3 or
aws-s3||
|`-mbs`, `--maxBlockSize`|Max block size|64MB|
|`-rbs`, `--readBufferSize`|Read buffer size|1MB|
|`-oat`, `--offloadAfterThreshold`|Offload after threshold size (eg: 1M, 5M)||