This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new de05a9b  GCS offload support(3): add configs to support GCS driver 
(#2151)
de05a9b is described below

commit de05a9b8e7bdf43d24b0f9ff6e665501d61d6d5e
Author: Jia Zhai <[email protected]>
AuthorDate: Wed Jul 25 02:33:37 2018 +0800

    GCS offload support(3): add configs to support GCS driver (#2151)
    
    This is the third part to support Google Cloud Storage offload.
    It aims to support GCS related config in `ManagedLedgerOffloader`.  It is 
based on PR #2065.  Please only review the commits after 
[e5b1f7a](https://github.com/apache/incubator-pulsar/pull/2065/commits/e5b1f7aad4fc1f0d8f4c61b69806100a38774633).
    
    Currently it passed the real test in GCS, by test case 
[ManagedLedgerOffloaderTest#testGcsRealOffload](https://github.com/apache/incubator-pulsar/pull/2151/commits/eda5097f110acfd3fc5ee518dd8bc16771707c27#diff-6387a2ab4cf9c9243135b5a34c8522efR584),
 since lack of GCS mock docker image, will try to use read GCS to do the 
integration test in later PR.
    
    Master Issue: #2067
---
 conf/broker.conf                                   |  19 +++
 .../apache/pulsar/broker/ServiceConfiguration.java |  59 +++++++++
 .../impl/BlobStoreManagedLedgerOffloader.java      | 132 ++++++++++++++++-----
 .../impl/BlobStoreManagedLedgerOffloaderTest.java  |  50 ++++++++
 4 files changed, 229 insertions(+), 31 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 96433aa..11a87d5 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -487,6 +487,8 @@ 
schemaRegistryStorageClassName=org.apache.pulsar.broker.service.schema.Bookkeepe
 ### --- Ledger Offloading --- ###
 
 # Driver to use to offload old data to long term storage (Possible values: S3, 
aws-s3, google-cloud-storage)
+# When using google-cloud-storage, Make sure both Google Cloud Storage and 
Google Cloud Storage JSON API are enabled for
+# the project (check from Developers Console -> Api&auth -> APIs).
 managedLedgerOffloadDriver=
 
 # Maximum number of thread pool threads for ledger offloading
@@ -507,6 +509,23 @@ s3ManagedLedgerOffloadMaxBlockSizeInBytes=67108864
 # For Amazon S3 ledger offload, Read buffer size in bytes (1MB by default)
 s3ManagedLedgerOffloadReadBufferSizeInBytes=1048576
 
+# For Google Cloud Storage ledger offload, region where offload bucket is 
located.
+# reference this page for more details: 
https://cloud.google.com/storage/docs/bucket-locations
+gcsManagedLedgerOffloadRegion=
+
+# For Google Cloud Storage ledger offload, Bucket to place offloaded ledger 
into
+gcsManagedLedgerOffloadBucket=
+
+# For Google Cloud Storage ledger offload, Max block size in bytes. (64MB by 
default, 5MB minimum)
+gcsManagedLedgerOffloadMaxBlockSizeInBytes=67108864
+
+# For Google Cloud Storage ledger offload, Read buffer size in bytes (1MB by 
default)
+gcsManagedLedgerOffloadReadBufferSizeInBytes=1048576
+
+# For Google Cloud Storage, path to json file containing service account 
credentials.
+# For more details, see the "Service Accounts" section of 
https://support.google.com/googleapi/answer/6158849
+gcsManagedLedgerOffloadServiceAccountKeyFile=
+
 ### --- Deprecated config variables --- ###
 
 # Deprecated. Use configurationStoreServers
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 1bb6fff..182bc09 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -502,6 +502,25 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
     @FieldContext(minValue = 1024)
     private int s3ManagedLedgerOffloadReadBufferSizeInBytes = 1024 * 1024; // 
1MB
 
+    // For Google Cloud Storage ledger offload, region where offload bucket is 
located.
+    // reference this page for more details: 
https://cloud.google.com/storage/docs/bucket-locations
+    private String gcsManagedLedgerOffloadRegion = null;
+
+    // For Google Cloud Storage ledger offload, Bucket to place offloaded 
ledger into
+    private String gcsManagedLedgerOffloadBucket = null;
+
+    // For Google Cloud Storage ledger offload, Max block size in bytes.
+    @FieldContext(minValue = 5242880) // 5MB
+    private int gcsManagedLedgerOffloadMaxBlockSizeInBytes = 64 * 1024 * 1024; 
// 64MB
+
+    // For Google Cloud Storage ledger offload, Read buffer size in bytes.
+    @FieldContext(minValue = 1024)
+    private int gcsManagedLedgerOffloadReadBufferSizeInBytes = 1024 * 1024; // 
1MB
+
+    // For Google Cloud Storage, path to json file containing service account 
credentials.
+    // For more details, see the "Service Accounts" section of 
https://support.google.com/googleapi/answer/6158849
+    private String gcsManagedLedgerOffloadServiceAccountKeyFile = null;
+
     public String getZookeeperServers() {
         return zookeeperServers;
     }
@@ -1733,6 +1752,46 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
         return this.s3ManagedLedgerOffloadReadBufferSizeInBytes;
     }
 
+    public void setGcsManagedLedgerOffloadRegion(String region) {
+        this.gcsManagedLedgerOffloadRegion = region;
+    }
+
+    public String getGcsManagedLedgerOffloadRegion() {
+        return this.gcsManagedLedgerOffloadRegion;
+    }
+
+    public void setGcsManagedLedgerOffloadBucket(String bucket) {
+        this.gcsManagedLedgerOffloadBucket = bucket;
+    }
+
+    public String getGcsManagedLedgerOffloadBucket() {
+        return this.gcsManagedLedgerOffloadBucket;
+    }
+
+    public void setGcsManagedLedgerOffloadMaxBlockSizeInBytes(int 
blockSizeInBytes) {
+        this.gcsManagedLedgerOffloadMaxBlockSizeInBytes = blockSizeInBytes;
+    }
+
+    public int getGcsManagedLedgerOffloadMaxBlockSizeInBytes() {
+        return this.gcsManagedLedgerOffloadMaxBlockSizeInBytes;
+    }
+
+    public void setGcsManagedLedgerOffloadReadBufferSizeInBytes(int 
readBufferSizeInBytes) {
+        this.gcsManagedLedgerOffloadReadBufferSizeInBytes = 
readBufferSizeInBytes;
+    }
+
+    public int getGcsManagedLedgerOffloadReadBufferSizeInBytes() {
+        return this.gcsManagedLedgerOffloadReadBufferSizeInBytes;
+    }
+
+    public void setGcsManagedLedgerOffloadServiceAccountKeyFile(String 
keyPath) {
+        this.gcsManagedLedgerOffloadServiceAccountKeyFile = keyPath;
+    }
+
+    public String getGcsManagedLedgerOffloadServiceAccountKeyFile() {
+        return this.gcsManagedLedgerOffloadServiceAccountKeyFile;
+    }
+
     public void setBrokerServiceCompactionMonitorIntervalInSeconds(int 
interval) {
         this.brokerServiceCompactionMonitorIntervalInSeconds = interval;
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/BlobStoreManagedLedgerOffloader.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/BlobStoreManagedLedgerOffloader.java
index 528ba28..418c14f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/BlobStoreManagedLedgerOffloader.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/BlobStoreManagedLedgerOffloader.java
@@ -20,11 +20,15 @@ package org.apache.pulsar.broker.offload.impl;
 
 import com.amazonaws.auth.AWSCredentials;
 import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+import java.io.File;
 import java.io.IOException;
+import java.nio.charset.Charset;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
@@ -49,9 +53,11 @@ import org.jclouds.blobstore.domain.BlobBuilder;
 import org.jclouds.blobstore.domain.MultipartPart;
 import org.jclouds.blobstore.domain.MultipartUpload;
 import org.jclouds.blobstore.options.PutOptions;
+import org.jclouds.domain.Credentials;
 import org.jclouds.domain.Location;
 import org.jclouds.domain.LocationBuilder;
 import org.jclouds.domain.LocationScope;
+import org.jclouds.googlecloud.GoogleCredentialsFromJson;
 import org.jclouds.io.Payload;
 import org.jclouds.io.Payloads;
 import org.jclouds.s3.reference.S3Constants;
@@ -63,6 +69,7 @@ public class BlobStoreManagedLedgerOffloader implements 
LedgerOffloader {
 
     public static final String[] DRIVER_NAMES = {"S3", "aws-s3", 
"google-cloud-storage"};
 
+    // use these keys for both s3 and gcs.
     static final String METADATA_FORMAT_VERSION_KEY = 
"S3ManagedLedgerOffloaderFormatVersion";
     static final String METADATA_SOFTWARE_VERSION_KEY = 
"S3ManagedLedgerOffloaderSoftwareVersion";
     static final String METADATA_SOFTWARE_GITSHA_KEY = 
"S3ManagedLedgerOffloaderSoftwareGitSha";
@@ -72,11 +79,19 @@ public class BlobStoreManagedLedgerOffloader implements 
LedgerOffloader {
         return Arrays.stream(DRIVER_NAMES).anyMatch(d -> 
d.equalsIgnoreCase(driver));
     }
 
+    public static boolean isS3Driver(String driver) {
+        return driver.equalsIgnoreCase(DRIVER_NAMES[0]) || 
driver.equalsIgnoreCase(DRIVER_NAMES[1]);
+    }
+
+    public static boolean isGcsDriver(String driver) {
+        return driver.equalsIgnoreCase(DRIVER_NAMES[2]);
+    }
+
     private static void addVersionInfo(BlobBuilder blobBuilder) {
         blobBuilder.userMetadata(ImmutableMap.of(
-            METADATA_FORMAT_VERSION_KEY, CURRENT_VERSION,
-            METADATA_SOFTWARE_VERSION_KEY, 
PulsarBrokerVersionStringUtils.getNormalizedVersionString(),
-            METADATA_SOFTWARE_GITSHA_KEY, 
PulsarBrokerVersionStringUtils.getGitSha()));
+            METADATA_FORMAT_VERSION_KEY.toLowerCase(), CURRENT_VERSION,
+            METADATA_SOFTWARE_VERSION_KEY.toLowerCase(), 
PulsarBrokerVersionStringUtils.getNormalizedVersionString(),
+            METADATA_SOFTWARE_GITSHA_KEY.toLowerCase(), 
PulsarBrokerVersionStringUtils.getGitSha()));
     }
 
     private final VersionCheck VERSION_CHECK = (key, blob) -> {
@@ -104,30 +119,91 @@ public class BlobStoreManagedLedgerOffloader implements 
LedgerOffloader {
                                                          OrderedScheduler 
scheduler)
             throws PulsarServerException {
         String driver = conf.getManagedLedgerOffloadDriver();
-        String region = conf.getS3ManagedLedgerOffloadRegion();
-        String bucket = conf.getS3ManagedLedgerOffloadBucket();
-        String endpoint = conf.getS3ManagedLedgerOffloadServiceEndpoint();
-        int maxBlockSize = conf.getS3ManagedLedgerOffloadMaxBlockSizeInBytes();
-        int readBufferSize = 
conf.getS3ManagedLedgerOffloadReadBufferSizeInBytes();
+        if (!driverSupported(driver)) {
+            throw new PulsarServerException(
+                "Not support this kind of driver as offload backend: " + 
driver);
+        }
 
-        if (Strings.isNullOrEmpty(region) && Strings.isNullOrEmpty(endpoint)) {
+        String endpoint = conf.getS3ManagedLedgerOffloadServiceEndpoint();
+        String region = isS3Driver(driver) ?
+            conf.getS3ManagedLedgerOffloadRegion() :
+            conf.getGcsManagedLedgerOffloadRegion();
+        String bucket = isS3Driver(driver) ?
+            conf.getS3ManagedLedgerOffloadBucket() :
+            conf.getGcsManagedLedgerOffloadBucket();
+        int maxBlockSize = isS3Driver(driver) ?
+            conf.getS3ManagedLedgerOffloadMaxBlockSizeInBytes() :
+            conf.getGcsManagedLedgerOffloadMaxBlockSizeInBytes();
+        int readBufferSize = isS3Driver(driver) ?
+            conf.getS3ManagedLedgerOffloadReadBufferSizeInBytes() :
+            conf.getGcsManagedLedgerOffloadReadBufferSizeInBytes();
+
+        if (isS3Driver(driver) && Strings.isNullOrEmpty(region) && 
Strings.isNullOrEmpty(endpoint)) {
             throw new PulsarServerException(
                     "Either s3ManagedLedgerOffloadRegion or 
s3ManagedLedgerOffloadServiceEndpoint must be set"
                     + " if s3 offload enabled");
         }
+
         if (Strings.isNullOrEmpty(bucket)) {
-            throw new PulsarServerException("s3ManagedLedgerOffloadBucket 
cannot be empty if s3 offload enabled");
+            throw new PulsarServerException(
+                "ManagedLedgerOffloadBucket cannot be empty for s3 and gcs 
offload");
         }
         if (maxBlockSize < 5*1024*1024) {
-            throw new 
PulsarServerException("s3ManagedLedgerOffloadMaxBlockSizeInBytes cannot be less 
than 5MB");
+            throw new PulsarServerException(
+                "ManagedLedgerOffloadMaxBlockSizeInBytes cannot be less than 
5MB for s3 and gcs offload");
         }
 
-        return new BlobStoreManagedLedgerOffloader(driver, bucket, scheduler, 
maxBlockSize, readBufferSize, endpoint, region);
+        Credentials credentials = getCredentials(driver, conf);
+
+        return new BlobStoreManagedLedgerOffloader(driver, bucket, scheduler, 
maxBlockSize, readBufferSize, endpoint, region, credentials);
+    }
+
+    public static Credentials getCredentials(String driver, 
ServiceConfiguration conf) throws PulsarServerException {
+        // credentials:
+        //   for s3, get by DefaultAWSCredentialsProviderChain.
+        //   for gcs, use downloaded file 'google_creds.json', which contains 
service account key by
+        //     following instructions in page 
https://support.google.com/googleapi/answer/6158849
+
+        if (isGcsDriver(driver)) {
+            String gcsKeyPath = 
conf.getGcsManagedLedgerOffloadServiceAccountKeyFile();
+            if (Strings.isNullOrEmpty(gcsKeyPath)) {
+                throw new PulsarServerException(
+                    "The service account key path is empty for GCS driver");
+            }
+            try {
+                String gcsKeyContent = Files.toString(new File(gcsKeyPath), 
Charset.defaultCharset());
+                return new GoogleCredentialsFromJson(gcsKeyContent).get();
+            } catch (IOException ioe) {
+                log.error("Cannot read GCS service account credentials file: 
{}", gcsKeyPath);
+                throw new PulsarServerException(ioe);
+            }
+        } else if (isS3Driver(driver)) {
+            AWSCredentials credentials = null;
+            try {
+                DefaultAWSCredentialsProviderChain creds = 
DefaultAWSCredentialsProviderChain.getInstance();
+                credentials = creds.getCredentials();
+            } catch (Exception e) {
+                // allowed, some mock s3 service not need credential
+                log.error("Exception when get credentials for s3 ", e);
+                throw new PulsarServerException(e);
+            }
+
+            String id = "accesskey";
+            String key = "secretkey";
+            if (credentials != null) {
+                id = credentials.getAWSAccessKeyId();
+                key = credentials.getAWSSecretKey();
+            }
+            return new Credentials(id, key);
+        } else {
+            throw new PulsarServerException(
+                "Not support this kind of driver: " + driver);
+        }
     }
 
     // build context for jclouds BlobStoreContext
     BlobStoreManagedLedgerOffloader(String driver, String container, 
OrderedScheduler scheduler,
-                                    int maxBlockSize, int readBufferSize, 
String endpoint, String region) {
+                           int maxBlockSize, int readBufferSize, String 
endpoint, String region, Credentials credentials) {
         this.scheduler = scheduler;
         this.readBufferSize = readBufferSize;
 
@@ -142,24 +218,9 @@ public class BlobStoreManagedLedgerOffloader implements 
LedgerOffloader {
         overrides.setProperty(Constants.PROPERTY_MAX_RETRIES, 
Integer.toString(100));
 
         ContextBuilder contextBuilder = ContextBuilder.newBuilder(driver);
+        contextBuilder.credentials(credentials.identity, 
credentials.credential);
 
-        AWSCredentials credentials = null;
-        try {
-            DefaultAWSCredentialsProviderChain creds = 
DefaultAWSCredentialsProviderChain.getInstance();
-            credentials = creds.getCredentials();
-        } catch (Exception e) {
-            log.error("Exception when get credentials for s3 ", e);
-        }
-
-        String id = "accesskey";
-        String key = "secretkey";
-        if (credentials != null) {
-            id = credentials.getAWSAccessKeyId();
-            key = credentials.getAWSSecretKey();
-        }
-        contextBuilder.credentials(id, key);
-
-        if (!Strings.isNullOrEmpty(endpoint)) {
+        if (isS3Driver(driver) && !Strings.isNullOrEmpty(endpoint)) {
             contextBuilder.endpoint(endpoint);
             
overrides.setProperty(S3Constants.PROPERTY_S3_VIRTUAL_HOST_BUCKETS, "false");
         }
@@ -174,7 +235,8 @@ public class BlobStoreManagedLedgerOffloader implements 
LedgerOffloader {
         this.blobStore = context.getBlobStore();
     }
 
-    // build context for jclouds BlobStoreContext
+    // build context for jclouds BlobStoreContext, mostly used in test
+    @VisibleForTesting
     BlobStoreManagedLedgerOffloader(BlobStore blobStore, String container, 
OrderedScheduler scheduler,
                                     int maxBlockSize, int readBufferSize) {
         this.scheduler = scheduler;
@@ -192,6 +254,14 @@ public class BlobStoreManagedLedgerOffloader implements 
LedgerOffloader {
         return String.format("%s-ledger-%d-index", uuid.toString(), ledgerId);
     }
 
+    public boolean createBucket() {
+        return blobStore.createContainerInLocation(location, bucket);
+    }
+
+    public void deleteBucket() {
+        blobStore.deleteContainer(bucket);
+    }
+
     // upload DataBlock to s3 using MultiPartUpload, and indexBlock in a new 
Block,
     @Override
     public CompletableFuture<Void> offload(ReadHandle readHandle,
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/impl/BlobStoreManagedLedgerOffloaderTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/impl/BlobStoreManagedLedgerOffloaderTest.java
index 19463d2..86b4657 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/impl/BlobStoreManagedLedgerOffloaderTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/impl/BlobStoreManagedLedgerOffloaderTest.java
@@ -26,6 +26,7 @@ import static org.mockito.Matchers.anyInt;
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Mockito.mock;
 
+import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.Method;
 import java.util.HashMap;
@@ -180,6 +181,55 @@ class BlobStoreManagedLedgerOffloaderTest extends 
BlobStoreTestBase {
     }
 
     @Test
+    public void testGcsNoKeyPath() throws Exception {
+        ServiceConfiguration conf = new ServiceConfiguration();
+        conf.setManagedLedgerOffloadDriver("google-cloud-storage");
+        conf.setGcsManagedLedgerOffloadBucket(BUCKET);
+
+        try {
+            BlobStoreManagedLedgerOffloader.create(conf, scheduler);
+            Assert.fail("Should have thrown exception");
+        } catch (PulsarServerException pse) {
+            // correct
+            log.error("Expected pse", pse);
+        }
+    }
+
+    @Test
+    public void testGcsNoBucketConfigured() throws Exception {
+        ServiceConfiguration conf = new ServiceConfiguration();
+        conf.setManagedLedgerOffloadDriver("google-cloud-storage");
+        File tmpKeyFile = File.createTempFile("gcsOffload", "json");
+        
conf.setGcsManagedLedgerOffloadServiceAccountKeyFile(tmpKeyFile.getAbsolutePath());
+
+        try {
+            BlobStoreManagedLedgerOffloader.create(conf, scheduler);
+            Assert.fail("Should have thrown exception");
+        } catch (PulsarServerException pse) {
+            // correct
+            log.error("Expected pse", pse);
+        }
+    }
+
+    @Test
+    public void testGcsSmallBlockSizeConfigured() throws Exception {
+        ServiceConfiguration conf = new ServiceConfiguration();
+        conf.setManagedLedgerOffloadDriver("google-cloud-storage");
+        File tmpKeyFile = File.createTempFile("gcsOffload", "json");
+        
conf.setGcsManagedLedgerOffloadServiceAccountKeyFile(tmpKeyFile.getAbsolutePath());
+        conf.setGcsManagedLedgerOffloadBucket(BUCKET);
+        conf.setGcsManagedLedgerOffloadMaxBlockSizeInBytes(1024);
+
+        try {
+            BlobStoreManagedLedgerOffloader.create(conf, scheduler);
+            Assert.fail("Should have thrown exception");
+        } catch (PulsarServerException pse) {
+            // correct
+            log.error("Expected pse", pse);
+        }
+    }
+
+    @Test
     public void testOffloadAndRead() throws Exception {
         ReadHandle toWrite = buildReadHandle(DEFAULT_BLOCK_SIZE, 3);
         LedgerOffloader offloader = new 
BlobStoreManagedLedgerOffloader(blobStore, BUCKET, scheduler,

Reply via email to