This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 98ad39f  [Issue 8887][tiered-storage-jcloud] support tiered-storage 
provider by aliyun OSS (#8985)
98ad39f is described below

commit 98ad39ffa51239e389c73411dfb8df7f5592a5aa
Author: wangyufan <[email protected]>
AuthorDate: Tue Dec 29 17:08:16 2020 +0800

    [Issue 8887][tiered-storage-jcloud] support tiered-storage provider by 
aliyun OSS (#8985)
    
    [Issue 8887][tiered-storage-jcloud] support tiered-storage provider by 
aliyun OSS
---
 .../common/policies/data/OffloadPolicies.java      |  4 +-
 .../jcloud/provider/JCloudBlobStoreProvider.java   | 81 +++++++++++++++++++++-
 2 files changed, 83 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java
index 422bf24..4c5058b 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java
@@ -60,7 +60,9 @@ public class OffloadPolicies implements Serializable {
     public final static int DEFAULT_READ_BUFFER_SIZE_IN_BYTES = 1024 * 1024;   
   // 1MB
     public final static int DEFAULT_OFFLOAD_MAX_THREADS = 2;
     public final static int DEFAULT_OFFLOAD_MAX_PREFETCH_ROUNDS = 1;
-    public final static String[] DRIVER_NAMES = {"S3", "aws-s3", 
"google-cloud-storage", "filesystem", "azureblob"};
+    public final static String[] DRIVER_NAMES = {
+            "S3", "aws-s3", "google-cloud-storage", "filesystem", "azureblob", 
"aliyun-oss"
+    };
     public final static String DEFAULT_OFFLOADER_DIRECTORY = "./offloaders";
     public final static Long DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES = null;
     public final static Long DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS = null;
diff --git 
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java
 
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java
index 9d0871e..ba7065e 100644
--- 
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java
+++ 
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java
@@ -33,6 +33,7 @@ import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
 import java.nio.charset.Charset;
+import java.util.Properties;
 import java.util.UUID;
 
 import lombok.extern.slf4j.Slf4j;
@@ -57,6 +58,8 @@ import org.jclouds.googlecloud.GoogleCredentialsFromJson;
 import org.jclouds.googlecloudstorage.GoogleCloudStorageProviderMetadata;
 import org.jclouds.providers.AnonymousProviderMetadata;
 import org.jclouds.providers.ProviderMetadata;
+import org.jclouds.s3.S3ApiMetadata;
+import org.jclouds.s3.reference.S3Constants;
 
 /**
  * Enumeration of the supported JCloud Blob Store Providers.
@@ -162,6 +165,28 @@ public enum JCloudBlobStoreProvider implements 
Serializable, ConfigValidation, B
         }
     },
 
+
+    /**
+     * Aliyun OSS is compatible with the S3 API
+     * https://www.alibabacloud.com/help/doc-detail/64919.htm
+     */
+    ALIYUN_OSS("aliyun-oss", new AnonymousProviderMetadata(new 
S3ApiMetadata(), "")) {
+        @Override
+        public void validate(TieredStorageConfiguration config) throws 
IllegalArgumentException {
+            ALIYUN_OSS_VALIDATION.validate(config);
+        }
+
+        @Override
+        public BlobStore getBlobStore(TieredStorageConfiguration config) {
+            return ALIYUN_OSS_BLOB_STORE_BUILDER.getBlobStore(config);
+        }
+
+        @Override
+        public void buildCredentials(TieredStorageConfiguration config) {
+            ALIYUN_OSS_CREDENTIAL_BUILDER.buildCredentials(config);
+        }
+    },
+
     TRANSIENT("transient", new AnonymousProviderMetadata(new 
TransientApiMetadata(), "")) {
         @Override
         public void validate(TieredStorageConfiguration config) throws 
IllegalArgumentException {
@@ -177,7 +202,7 @@ public enum JCloudBlobStoreProvider implements 
Serializable, ConfigValidation, B
             ContextBuilder builder =  ContextBuilder.newBuilder("transient");
             BlobStoreContext ctx = builder
                     .buildView(BlobStoreContext.class);
-            
+
             BlobStore bs = ctx.getBlobStore();
 
             if (!bs.containerExists(config.getBucket())) {
@@ -312,4 +337,58 @@ public enum JCloudBlobStoreProvider implements 
Serializable, ConfigValidation, B
             }
         }
     };
+
+    static final BlobStoreBuilder ALIYUN_OSS_BLOB_STORE_BUILDER = 
(TieredStorageConfiguration config) -> {
+        ContextBuilder contextBuilder = 
ContextBuilder.newBuilder(config.getProviderMetadata());
+        Properties overrides = config.getOverrides();
+        // For security reasons, OSS supports only virtual hosted style access.
+        overrides.setProperty(S3Constants.PROPERTY_S3_VIRTUAL_HOST_BUCKETS, 
"true");
+        contextBuilder.overrides(overrides);
+        contextBuilder.endpoint(config.getServiceEndpoint());
+
+        if (config.getProviderCredentials() != null) {
+            return contextBuilder
+                    .credentialsSupplier(config.getCredentials())
+                    .buildView(BlobStoreContext.class)
+                    .getBlobStore();
+        } else {
+            log.warn("The credentials is null. driver: {}, bucket: {}", 
config.getDriver(), config.getBucket());
+            return contextBuilder
+                    .buildView(BlobStoreContext.class)
+                    .getBlobStore();
+        }
+    };
+
+    static final ConfigValidation ALIYUN_OSS_VALIDATION = 
(TieredStorageConfiguration config) -> {
+        if (Strings.isNullOrEmpty(config.getServiceEndpoint())) {
+            throw new IllegalArgumentException(
+                    "ServiceEndpoint must specified for " + config.getDriver() 
+ " offload");
+        }
+
+        if (Strings.isNullOrEmpty(config.getBucket())) {
+            throw new IllegalArgumentException(
+                    "Bucket cannot be empty for " + config.getDriver() + " 
offload");
+        }
+
+        if (config.getMaxBlockSizeInBytes() < (5 * 1024 * 1024)) {
+            throw new IllegalArgumentException(
+                    "ManagedLedgerOffloadMaxBlockSizeInBytes cannot be less 
than 5MB for "
+                            + config.getDriver() + " offload");
+        }
+    };
+
+    static final CredentialBuilder ALIYUN_OSS_CREDENTIAL_BUILDER = 
(TieredStorageConfiguration config) -> {
+        String accountName = System.getenv("ALIYUN_OSS_ACCESS_KEY_ID");
+        if (StringUtils.isEmpty(accountName)) {
+            throw new IllegalArgumentException("Couldn't get the aliyun oss 
access key id.");
+        }
+        String accountKey = System.getenv("ALIYUN_OSS_ACCESS_KEY_SECRET");
+        if (StringUtils.isEmpty(accountKey)) {
+            throw new IllegalArgumentException("Couldn't get the aliyun oss 
access key secret.");
+        }
+        Credentials credentials = new Credentials(
+                accountName, accountKey);
+        config.setProviderCredentials(() -> credentials);
+    };
+
 }
\ No newline at end of file

Reply via email to