This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 2530177  [Issue 7949][Tiered Storage] support aws creds per offload 
policies (#7950)
2530177 is described below

commit 2530177dac83124ea06194e0de788c53c34cf3e6
Author: Alexandre DUVAL <[email protected]>
AuthorDate: Fri Nov 6 06:01:28 2020 +0100

    [Issue 7949][Tiered Storage] support aws creds per offload policies (#7950)
    
    Fixes #7949
    
    ### Motivation
    
    Provide different s3 credentials per offloadpolicies on each ns.
    
    ### Modifications
    
    Add awsId/awsSecret in OffloadPolicies.
    
    ### Does this pull request potentially affect one of the following parts:
    
      - The rest endpoints: yes it adds options
      - The admin cli options: yes it adds options
    
    ### Documentation
    
      - Does this pull request introduce a new feature? yes
      - If yes, how is the feature documented? docs
---
 .../apache/bookkeeper/mledger/impl/OffloadPrefixTest.java  |  1 +
 .../apache/pulsar/broker/admin/AdminApiOffloadTest.java    |  4 ++--
 .../org/apache/pulsar/broker/admin/NamespacesTest.java     |  4 ++++
 .../org/apache/pulsar/admin/cli/PulsarAdminToolTest.java   |  6 +++---
 .../java/org/apache/pulsar/admin/cli/CmdNamespaces.java    | 14 +++++++++++++-
 .../main/java/org/apache/pulsar/admin/cli/CmdTopics.java   | 10 +++++++++-
 .../pulsar/common/policies/data/OffloadPolicies.java       |  7 +++++++
 .../pulsar/common/policies/data/OffloadPoliciesTest.java   |  8 ++++++++
 site2/docs/reference-pulsar-admin.md                       |  2 ++
 9 files changed, 49 insertions(+), 7 deletions(-)

diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
index ba2d678..807876e 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
@@ -991,6 +991,7 @@ public class OffloadPrefixTest extends 
MockedBookKeeperTestCase {
         }
 
         OffloadPolicies offloadPolicies = OffloadPolicies.create("S3", "", "", 
"",
+                null, null,
                 OffloadPolicies.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES,
                 OffloadPolicies.DEFAULT_READ_BUFFER_SIZE_IN_BYTES,
                 OffloadPolicies.DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES,
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
index 9591f7a..bbda407 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
@@ -167,8 +167,8 @@ public class AdminApiOffloadTest extends 
MockedPulsarServiceBaseTest {
         long offloadDeletionLagInMillis = 100L;
 
         OffloadPolicies offload1 = OffloadPolicies.create(
-                driver, region, bucket, endpoint, 100, 100,
-                offloadThresholdInBytes, offloadDeletionLagInMillis);
+                driver, region, bucket, endpoint, null, null,
+                100, 100, offloadThresholdInBytes, offloadDeletionLagInMillis);
         admin.namespaces().setOffloadPolicies(namespaceName, offload1);
         OffloadPolicies offload2 = 
admin.namespaces().getOffloadPolicies(namespaceName);
         assertEquals(offload1, offload2);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index 21f98a5..8bf35ff 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -1240,6 +1240,7 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
         // the ledger config should have the expected value
         ManagedLedgerConfig ledgerConf = 
pulsar.getBrokerService().getManagedLedgerConfig(topicName).get();
         MockLedgerOffloader offloader = new 
MockLedgerOffloader(OffloadPolicies.create("S3", "", "", "",
+                null, null,
                 OffloadPolicies.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES,
                 OffloadPolicies.DEFAULT_READ_BUFFER_SIZE_IN_BYTES,
                 admin.namespaces().getOffloadThreshold(namespace),
@@ -1254,6 +1255,7 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
         ledgerConf = 
pulsar.getBrokerService().getManagedLedgerConfig(topicName).get();
         admin.namespaces().getOffloadPolicies(namespace);
         offloader = new MockLedgerOffloader(OffloadPolicies.create("S3", "", 
"", "",
+                null, null,
                 OffloadPolicies.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES,
                 OffloadPolicies.DEFAULT_READ_BUFFER_SIZE_IN_BYTES,
                 admin.namespaces().getOffloadThreshold(namespace),
@@ -1267,6 +1269,7 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
         assertEquals(-2, admin.namespaces().getOffloadThreshold(namespace));
         ledgerConf = 
pulsar.getBrokerService().getManagedLedgerConfig(topicName).get();
         offloader = new MockLedgerOffloader(OffloadPolicies.create("S3", "", 
"", "",
+                null, null,
                 OffloadPolicies.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES,
                 OffloadPolicies.DEFAULT_READ_BUFFER_SIZE_IN_BYTES,
                 admin.namespaces().getOffloadThreshold(namespace),
@@ -1280,6 +1283,7 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
         assertEquals(-1, admin.namespaces().getOffloadThreshold(namespace));
         ledgerConf = 
pulsar.getBrokerService().getManagedLedgerConfig(topicName).get();
         offloader = new MockLedgerOffloader(OffloadPolicies.create("S3", "", 
"", "",
+                null, null,
                 OffloadPolicies.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES,
                 OffloadPolicies.DEFAULT_READ_BUFFER_SIZE_IN_BYTES,
                 admin.namespaces().getOffloadThreshold(namespace),
diff --git 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index d4cff2b..4e45b90 100644
--- 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -543,8 +543,8 @@ public class PulsarAdminToolTest {
         namespaces.run(split("set-offload-policies myprop/clust/ns1 -r 
test-region -d aws-s3 -b test-bucket -e http://test.endpoint -mbs 32M -rbs 5M 
-oat 10M -oae 10s"));
         verify(mockNamespaces).setOffloadPolicies("myprop/clust/ns1",
                 OffloadPolicies.create("aws-s3", "test-region", "test-bucket",
-                        "http://test.endpoint";, 32 * 1024 * 1024, 5 * 1024 * 
1024,
-                        10L * 1024 * 1024, 10000L));
+                        "http://test.endpoint";, null, null, 32 * 1024 * 1024, 
5 * 1024 * 1024,
+                        10 * 1024 * 1024L, 10000L));
 
         namespaces.run(split("remove-offload-policies myprop/clust/ns1"));
         verify(mockNamespaces).removeOffloadPolicies("myprop/clust/ns1");
@@ -753,7 +753,7 @@ public class PulsarAdminToolTest {
 
         cmdTopics.run(split("set-offload-policies 
persistent://myprop/clust/ns1/ds1 -d s3 -r region -b bucket -e endpoint -m 8 
-rb 9 -t 10"));
         OffloadPolicies offloadPolicies = OffloadPolicies.create("s3", 
"region", "bucket"
-                , "endpoint", 8, 9, 10L, null);
+                , "endpoint", null, null, 8, 9, 10L, null);
         
verify(mockTopics).setOffloadPolicies("persistent://myprop/clust/ns1/ds1", 
offloadPolicies);
 
         cmdTopics.run(split("get-max-unacked-messages-on-consumer 
persistent://myprop/clust/ns1/ds1"));
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index 459e450..bf07b70 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -1595,6 +1595,18 @@ public class CmdNamespaces extends CmdBase {
         private String endpoint;
 
         @Parameter(
+                names = {"--aws-id", "-i"},
+                description = "AWS Credential Id to use when using driver S3 
or aws-s3",
+                required = false)
+        private String awsId;
+
+        @Parameter(
+                names = {"--aws-secret", "-s"},
+                description = "AWS Credential Secret to use when using driver 
S3 or aws-s3",
+                required = false)
+        private String awsSecret;
+
+        @Parameter(
                 names = {"--maxBlockSize", "-mbs"},
                 description = "Max block size (eg: 32M, 64M), default is 64MB",
                 required = false)
@@ -1697,7 +1709,7 @@ public class CmdNamespaces extends CmdBase {
                 }
             }
 
-            OffloadPolicies offloadPolicies = OffloadPolicies.create(driver, 
region, bucket, endpoint,
+            OffloadPolicies offloadPolicies = OffloadPolicies.create(driver, 
region, bucket, endpoint, awsId, awsSecret,
                     maxBlockSizeInBytes, readBufferSizeInBytes, 
offloadAfterThresholdInBytes,
                     offloadAfterElapsedInMillis);
             admin.namespaces().setOffloadPolicies(namespace, offloadPolicies);
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 4cbf34e..d725129 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -1225,6 +1225,14 @@ public class CmdTopics extends CmdBase {
                 , description = "ManagedLedger offload service endpoint, only 
s3 requires this parameter")
         private String endpoint;
 
+        @Parameter(names = {"-i", "--aws-id"}
+                , description = "AWS Credential Id to use when using driver S3 
or aws-s3")
+        private String awsId;
+
+        @Parameter(names = {"-s", "--aws-secret"}
+                , description = "AWS Credential Secret to use when using 
driver S3 or aws-s3")
+        private String awsSecret;
+
         @Parameter(names = {"-m", "--maxBlockSizeInBytes"}
                 , description = "ManagedLedger offload max block Size in 
bytes, s3 and google-cloud-storage requires this parameter")
         private int maxBlockSizeInBytes;
@@ -1244,7 +1252,7 @@ public class CmdTopics extends CmdBase {
         @Override
         void run() throws PulsarAdminException {
             String persistentTopic = validatePersistentTopic(params);
-            OffloadPolicies offloadPolicies = OffloadPolicies.create(driver, 
region, bucket, endpoint, maxBlockSizeInBytes
+            OffloadPolicies offloadPolicies = OffloadPolicies.create(driver, 
region, bucket, endpoint, awsId, awsSecret, maxBlockSizeInBytes
                     , readBufferSizeInBytes, offloadThresholdInBytes, 
offloadDeletionLagInMillis);
             admin.topics().setOffloadPolicies(persistentTopic, 
offloadPolicies);
         }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java
index 5d2103b..91cd2aa 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java
@@ -134,6 +134,7 @@ public class OffloadPolicies implements Serializable {
     private Integer managedLedgerOffloadReadBufferSizeInBytes;
 
     public static OffloadPolicies create(String driver, String region, String 
bucket, String endpoint,
+                                         String credentialId, String 
credentialSecret,
                                          Integer maxBlockSizeInBytes, Integer 
readBufferSizeInBytes,
                                          Long offloadThresholdInBytes, Long 
offloadDeletionLagInMillis) {
         OffloadPolicies offloadPolicies = new OffloadPolicies();
@@ -148,6 +149,12 @@ public class OffloadPolicies implements Serializable {
         
offloadPolicies.setManagedLedgerOffloadReadBufferSizeInBytes(readBufferSizeInBytes);
 
         if (driver.equalsIgnoreCase(DRIVER_NAMES[0]) || 
driver.equalsIgnoreCase(DRIVER_NAMES[1])) {
+            if (credentialId != null) {
+                offloadPolicies.setS3ManagedLedgerOffloadRole(credentialId);
+            }
+            if (credentialSecret != null) {
+                
offloadPolicies.setS3ManagedLedgerOffloadRoleSessionName(credentialSecret);
+            }
             offloadPolicies.setS3ManagedLedgerOffloadRegion(region);
             offloadPolicies.setS3ManagedLedgerOffloadBucket(bucket);
             offloadPolicies.setS3ManagedLedgerOffloadServiceEndpoint(endpoint);
diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java
 
b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java
index cb5bc78..d87887d 100644
--- 
a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java
+++ 
b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java
@@ -41,6 +41,8 @@ public class OffloadPoliciesTest {
         final String driver = "aws-s3";
         final String region = "test-region";
         final String bucket = "test-bucket";
+        final String credentialId = "test-credential-id";
+        final String credentialSecret = "test-credential-secret";
         final String endPoint = "test-endpoint";
         final Integer maxBlockSizeInBytes = 5 * M;
         final Integer readBufferSizeInBytes = 2 * M;
@@ -52,6 +54,8 @@ public class OffloadPoliciesTest {
                 region,
                 bucket,
                 endPoint,
+                credentialId,
+                credentialSecret,
                 maxBlockSizeInBytes,
                 readBufferSizeInBytes,
                 offloadThresholdInBytes,
@@ -76,6 +80,8 @@ public class OffloadPoliciesTest {
         final String region = "test-region";
         final String bucket = "test-bucket";
         final String endPoint = "test-endpoint";
+        final String credentialId = "test-credential-id";
+        final String credentialSecret = "test-credential-secret";
         final Integer maxBlockSizeInBytes = 5 * M;
         final Integer readBufferSizeInBytes = 2 * M;
         final Long offloadThresholdInBytes = 0L;
@@ -86,6 +92,8 @@ public class OffloadPoliciesTest {
                 region,
                 bucket,
                 endPoint,
+                credentialId,
+                credentialSecret,
                 maxBlockSizeInBytes,
                 readBufferSizeInBytes,
                 offloadThresholdInBytes,
diff --git a/site2/docs/reference-pulsar-admin.md 
b/site2/docs/reference-pulsar-admin.md
index be4d946..8214427 100644
--- a/site2/docs/reference-pulsar-admin.md
+++ b/site2/docs/reference-pulsar-admin.md
@@ -2555,6 +2555,8 @@ Options
 |`-r`, `--region`|The long term storage region||
 |`-b`, `--bucket`|Bucket to place offloaded ledger into||
 |`-e`, `--endpoint`|Alternative endpoint to connect to||
+|`-i`, `--aws-id`|AWS Credential Id to use when using driver S3 or aws-s3||
+|`-s`, `--aws-secret`|AWS Credential Secret to use when using driver S3 or 
aws-s3||
 |`-mbs`, `--maxBlockSize`|Max block size|64MB|
 |`-rbs`, `--readBufferSize`|Read buffer size|1MB|
 |`-oat`, `--offloadAfterThreshold`|Offload after threshold size (eg: 1M, 5M)||

Reply via email to