sijie closed pull request #2151: GCS offload support(3): add configs to support 
GCS driver
URL: https://github.com/apache/incubator-pulsar/pull/2151
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/conf/broker.conf b/conf/broker.conf
index 96433aa517..11a87d5071 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -487,6 +487,8 @@ 
schemaRegistryStorageClassName=org.apache.pulsar.broker.service.schema.Bookkeepe
 ### --- Ledger Offloading --- ###
 
 # Driver to use to offload old data to long term storage (Possible values: S3, 
aws-s3, google-cloud-storage)
+# When using google-cloud-storage, Make sure both Google Cloud Storage and 
Google Cloud Storage JSON API are enabled for
+# the project (check from Developers Console -> Api&auth -> APIs).
 managedLedgerOffloadDriver=
 
 # Maximum number of thread pool threads for ledger offloading
@@ -507,6 +509,23 @@ s3ManagedLedgerOffloadMaxBlockSizeInBytes=67108864
 # For Amazon S3 ledger offload, Read buffer size in bytes (1MB by default)
 s3ManagedLedgerOffloadReadBufferSizeInBytes=1048576
 
+# For Google Cloud Storage ledger offload, region where offload bucket is 
located.
+# reference this page for more details: 
https://cloud.google.com/storage/docs/bucket-locations
+gcsManagedLedgerOffloadRegion=
+
+# For Google Cloud Storage ledger offload, Bucket to place offloaded ledger 
into
+gcsManagedLedgerOffloadBucket=
+
+# For Google Cloud Storage ledger offload, Max block size in bytes. (64MB by 
default, 5MB minimum)
+gcsManagedLedgerOffloadMaxBlockSizeInBytes=67108864
+
+# For Google Cloud Storage ledger offload, Read buffer size in bytes (1MB by 
default)
+gcsManagedLedgerOffloadReadBufferSizeInBytes=1048576
+
+# For Google Cloud Storage, path to json file containing service account 
credentials.
+# For more details, see the "Service Accounts" section of 
https://support.google.com/googleapi/answer/6158849
+gcsManagedLedgerOffloadServiceAccountKeyFile=
+
 ### --- Deprecated config variables --- ###
 
 # Deprecated. Use configurationStoreServers
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 1bb6fff168..182bc0939f 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -502,6 +502,25 @@
     @FieldContext(minValue = 1024)
     private int s3ManagedLedgerOffloadReadBufferSizeInBytes = 1024 * 1024; // 
1MB
 
+    // For Google Cloud Storage ledger offload, region where offload bucket is 
located.
+    // reference this page for more details: 
https://cloud.google.com/storage/docs/bucket-locations
+    private String gcsManagedLedgerOffloadRegion = null;
+
+    // For Google Cloud Storage ledger offload, Bucket to place offloaded 
ledger into
+    private String gcsManagedLedgerOffloadBucket = null;
+
+    // For Google Cloud Storage ledger offload, Max block size in bytes.
+    @FieldContext(minValue = 5242880) // 5MB
+    private int gcsManagedLedgerOffloadMaxBlockSizeInBytes = 64 * 1024 * 1024; 
// 64MB
+
+    // For Google Cloud Storage ledger offload, Read buffer size in bytes.
+    @FieldContext(minValue = 1024)
+    private int gcsManagedLedgerOffloadReadBufferSizeInBytes = 1024 * 1024; // 
1MB
+
+    // For Google Cloud Storage, path to json file containing service account 
credentials.
+    // For more details, see the "Service Accounts" section of 
https://support.google.com/googleapi/answer/6158849
+    private String gcsManagedLedgerOffloadServiceAccountKeyFile = null;
+
     public String getZookeeperServers() {
         return zookeeperServers;
     }
@@ -1733,6 +1752,46 @@ public int 
getS3ManagedLedgerOffloadReadBufferSizeInBytes() {
         return this.s3ManagedLedgerOffloadReadBufferSizeInBytes;
     }
 
+    public void setGcsManagedLedgerOffloadRegion(String region) {
+        this.gcsManagedLedgerOffloadRegion = region;
+    }
+
+    public String getGcsManagedLedgerOffloadRegion() {
+        return this.gcsManagedLedgerOffloadRegion;
+    }
+
+    public void setGcsManagedLedgerOffloadBucket(String bucket) {
+        this.gcsManagedLedgerOffloadBucket = bucket;
+    }
+
+    public String getGcsManagedLedgerOffloadBucket() {
+        return this.gcsManagedLedgerOffloadBucket;
+    }
+
+    public void setGcsManagedLedgerOffloadMaxBlockSizeInBytes(int 
blockSizeInBytes) {
+        this.gcsManagedLedgerOffloadMaxBlockSizeInBytes = blockSizeInBytes;
+    }
+
+    public int getGcsManagedLedgerOffloadMaxBlockSizeInBytes() {
+        return this.gcsManagedLedgerOffloadMaxBlockSizeInBytes;
+    }
+
+    public void setGcsManagedLedgerOffloadReadBufferSizeInBytes(int 
readBufferSizeInBytes) {
+        this.gcsManagedLedgerOffloadReadBufferSizeInBytes = 
readBufferSizeInBytes;
+    }
+
+    public int getGcsManagedLedgerOffloadReadBufferSizeInBytes() {
+        return this.gcsManagedLedgerOffloadReadBufferSizeInBytes;
+    }
+
+    public void setGcsManagedLedgerOffloadServiceAccountKeyFile(String 
keyPath) {
+        this.gcsManagedLedgerOffloadServiceAccountKeyFile = keyPath;
+    }
+
+    public String getGcsManagedLedgerOffloadServiceAccountKeyFile() {
+        return this.gcsManagedLedgerOffloadServiceAccountKeyFile;
+    }
+
     public void setBrokerServiceCompactionMonitorIntervalInSeconds(int 
interval) {
         this.brokerServiceCompactionMonitorIntervalInSeconds = interval;
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/BlobStoreManagedLedgerOffloader.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/BlobStoreManagedLedgerOffloader.java
index 528ba28348..418c14f04a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/BlobStoreManagedLedgerOffloader.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/BlobStoreManagedLedgerOffloader.java
@@ -20,11 +20,15 @@
 
 import com.amazonaws.auth.AWSCredentials;
 import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+import java.io.File;
 import java.io.IOException;
+import java.nio.charset.Charset;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
@@ -49,9 +53,11 @@
 import org.jclouds.blobstore.domain.MultipartPart;
 import org.jclouds.blobstore.domain.MultipartUpload;
 import org.jclouds.blobstore.options.PutOptions;
+import org.jclouds.domain.Credentials;
 import org.jclouds.domain.Location;
 import org.jclouds.domain.LocationBuilder;
 import org.jclouds.domain.LocationScope;
+import org.jclouds.googlecloud.GoogleCredentialsFromJson;
 import org.jclouds.io.Payload;
 import org.jclouds.io.Payloads;
 import org.jclouds.s3.reference.S3Constants;
@@ -63,6 +69,7 @@
 
     public static final String[] DRIVER_NAMES = {"S3", "aws-s3", 
"google-cloud-storage"};
 
+    // use these keys for both s3 and gcs.
     static final String METADATA_FORMAT_VERSION_KEY = 
"S3ManagedLedgerOffloaderFormatVersion";
     static final String METADATA_SOFTWARE_VERSION_KEY = 
"S3ManagedLedgerOffloaderSoftwareVersion";
     static final String METADATA_SOFTWARE_GITSHA_KEY = 
"S3ManagedLedgerOffloaderSoftwareGitSha";
@@ -72,11 +79,19 @@ public static boolean driverSupported(String driver) {
         return Arrays.stream(DRIVER_NAMES).anyMatch(d -> 
d.equalsIgnoreCase(driver));
     }
 
+    public static boolean isS3Driver(String driver) {
+        return driver.equalsIgnoreCase(DRIVER_NAMES[0]) || 
driver.equalsIgnoreCase(DRIVER_NAMES[1]);
+    }
+
+    public static boolean isGcsDriver(String driver) {
+        return driver.equalsIgnoreCase(DRIVER_NAMES[2]);
+    }
+
     private static void addVersionInfo(BlobBuilder blobBuilder) {
         blobBuilder.userMetadata(ImmutableMap.of(
-            METADATA_FORMAT_VERSION_KEY, CURRENT_VERSION,
-            METADATA_SOFTWARE_VERSION_KEY, 
PulsarBrokerVersionStringUtils.getNormalizedVersionString(),
-            METADATA_SOFTWARE_GITSHA_KEY, 
PulsarBrokerVersionStringUtils.getGitSha()));
+            METADATA_FORMAT_VERSION_KEY.toLowerCase(), CURRENT_VERSION,
+            METADATA_SOFTWARE_VERSION_KEY.toLowerCase(), 
PulsarBrokerVersionStringUtils.getNormalizedVersionString(),
+            METADATA_SOFTWARE_GITSHA_KEY.toLowerCase(), 
PulsarBrokerVersionStringUtils.getGitSha()));
     }
 
     private final VersionCheck VERSION_CHECK = (key, blob) -> {
@@ -104,30 +119,91 @@ public static BlobStoreManagedLedgerOffloader 
create(ServiceConfiguration conf,
                                                          OrderedScheduler 
scheduler)
             throws PulsarServerException {
         String driver = conf.getManagedLedgerOffloadDriver();
-        String region = conf.getS3ManagedLedgerOffloadRegion();
-        String bucket = conf.getS3ManagedLedgerOffloadBucket();
-        String endpoint = conf.getS3ManagedLedgerOffloadServiceEndpoint();
-        int maxBlockSize = conf.getS3ManagedLedgerOffloadMaxBlockSizeInBytes();
-        int readBufferSize = 
conf.getS3ManagedLedgerOffloadReadBufferSizeInBytes();
+        if (!driverSupported(driver)) {
+            throw new PulsarServerException(
+                "Not support this kind of driver as offload backend: " + 
driver);
+        }
 
-        if (Strings.isNullOrEmpty(region) && Strings.isNullOrEmpty(endpoint)) {
+        String endpoint = conf.getS3ManagedLedgerOffloadServiceEndpoint();
+        String region = isS3Driver(driver) ?
+            conf.getS3ManagedLedgerOffloadRegion() :
+            conf.getGcsManagedLedgerOffloadRegion();
+        String bucket = isS3Driver(driver) ?
+            conf.getS3ManagedLedgerOffloadBucket() :
+            conf.getGcsManagedLedgerOffloadBucket();
+        int maxBlockSize = isS3Driver(driver) ?
+            conf.getS3ManagedLedgerOffloadMaxBlockSizeInBytes() :
+            conf.getGcsManagedLedgerOffloadMaxBlockSizeInBytes();
+        int readBufferSize = isS3Driver(driver) ?
+            conf.getS3ManagedLedgerOffloadReadBufferSizeInBytes() :
+            conf.getGcsManagedLedgerOffloadReadBufferSizeInBytes();
+
+        if (isS3Driver(driver) && Strings.isNullOrEmpty(region) && 
Strings.isNullOrEmpty(endpoint)) {
             throw new PulsarServerException(
                     "Either s3ManagedLedgerOffloadRegion or 
s3ManagedLedgerOffloadServiceEndpoint must be set"
                     + " if s3 offload enabled");
         }
+
         if (Strings.isNullOrEmpty(bucket)) {
-            throw new PulsarServerException("s3ManagedLedgerOffloadBucket 
cannot be empty if s3 offload enabled");
+            throw new PulsarServerException(
+                "ManagedLedgerOffloadBucket cannot be empty for s3 and gcs 
offload");
         }
         if (maxBlockSize < 5*1024*1024) {
-            throw new 
PulsarServerException("s3ManagedLedgerOffloadMaxBlockSizeInBytes cannot be less 
than 5MB");
+            throw new PulsarServerException(
+                "ManagedLedgerOffloadMaxBlockSizeInBytes cannot be less than 
5MB for s3 and gcs offload");
         }
 
-        return new BlobStoreManagedLedgerOffloader(driver, bucket, scheduler, 
maxBlockSize, readBufferSize, endpoint, region);
+        Credentials credentials = getCredentials(driver, conf);
+
+        return new BlobStoreManagedLedgerOffloader(driver, bucket, scheduler, 
maxBlockSize, readBufferSize, endpoint, region, credentials);
+    }
+
+    public static Credentials getCredentials(String driver, 
ServiceConfiguration conf) throws PulsarServerException {
+        // credentials:
+        //   for s3, get by DefaultAWSCredentialsProviderChain.
+        //   for gcs, use downloaded file 'google_creds.json', which contains 
service account key by
+        //     following instructions in page 
https://support.google.com/googleapi/answer/6158849
+
+        if (isGcsDriver(driver)) {
+            String gcsKeyPath = 
conf.getGcsManagedLedgerOffloadServiceAccountKeyFile();
+            if (Strings.isNullOrEmpty(gcsKeyPath)) {
+                throw new PulsarServerException(
+                    "The service account key path is empty for GCS driver");
+            }
+            try {
+                String gcsKeyContent = Files.toString(new File(gcsKeyPath), 
Charset.defaultCharset());
+                return new GoogleCredentialsFromJson(gcsKeyContent).get();
+            } catch (IOException ioe) {
+                log.error("Cannot read GCS service account credentials file: 
{}", gcsKeyPath);
+                throw new PulsarServerException(ioe);
+            }
+        } else if (isS3Driver(driver)) {
+            AWSCredentials credentials = null;
+            try {
+                DefaultAWSCredentialsProviderChain creds = 
DefaultAWSCredentialsProviderChain.getInstance();
+                credentials = creds.getCredentials();
+            } catch (Exception e) {
+                // allowed, some mock s3 service not need credential
+                log.error("Exception when get credentials for s3 ", e);
+                throw new PulsarServerException(e);
+            }
+
+            String id = "accesskey";
+            String key = "secretkey";
+            if (credentials != null) {
+                id = credentials.getAWSAccessKeyId();
+                key = credentials.getAWSSecretKey();
+            }
+            return new Credentials(id, key);
+        } else {
+            throw new PulsarServerException(
+                "Not support this kind of driver: " + driver);
+        }
     }
 
     // build context for jclouds BlobStoreContext
     BlobStoreManagedLedgerOffloader(String driver, String container, 
OrderedScheduler scheduler,
-                                    int maxBlockSize, int readBufferSize, 
String endpoint, String region) {
+                           int maxBlockSize, int readBufferSize, String 
endpoint, String region, Credentials credentials) {
         this.scheduler = scheduler;
         this.readBufferSize = readBufferSize;
 
@@ -142,24 +218,9 @@ public static BlobStoreManagedLedgerOffloader 
create(ServiceConfiguration conf,
         overrides.setProperty(Constants.PROPERTY_MAX_RETRIES, 
Integer.toString(100));
 
         ContextBuilder contextBuilder = ContextBuilder.newBuilder(driver);
+        contextBuilder.credentials(credentials.identity, 
credentials.credential);
 
-        AWSCredentials credentials = null;
-        try {
-            DefaultAWSCredentialsProviderChain creds = 
DefaultAWSCredentialsProviderChain.getInstance();
-            credentials = creds.getCredentials();
-        } catch (Exception e) {
-            log.error("Exception when get credentials for s3 ", e);
-        }
-
-        String id = "accesskey";
-        String key = "secretkey";
-        if (credentials != null) {
-            id = credentials.getAWSAccessKeyId();
-            key = credentials.getAWSSecretKey();
-        }
-        contextBuilder.credentials(id, key);
-
-        if (!Strings.isNullOrEmpty(endpoint)) {
+        if (isS3Driver(driver) && !Strings.isNullOrEmpty(endpoint)) {
             contextBuilder.endpoint(endpoint);
             
overrides.setProperty(S3Constants.PROPERTY_S3_VIRTUAL_HOST_BUCKETS, "false");
         }
@@ -174,7 +235,8 @@ public static BlobStoreManagedLedgerOffloader 
create(ServiceConfiguration conf,
         this.blobStore = context.getBlobStore();
     }
 
-    // build context for jclouds BlobStoreContext
+    // build context for jclouds BlobStoreContext, mostly used in test
+    @VisibleForTesting
     BlobStoreManagedLedgerOffloader(BlobStore blobStore, String container, 
OrderedScheduler scheduler,
                                     int maxBlockSize, int readBufferSize) {
         this.scheduler = scheduler;
@@ -192,6 +254,14 @@ static String indexBlockOffloadKey(long ledgerId, UUID 
uuid) {
         return String.format("%s-ledger-%d-index", uuid.toString(), ledgerId);
     }
 
+    public boolean createBucket() {
+        return blobStore.createContainerInLocation(location, bucket);
+    }
+
+    public void deleteBucket() {
+        blobStore.deleteContainer(bucket);
+    }
+
     // upload DataBlock to s3 using MultiPartUpload, and indexBlock in a new 
Block,
     @Override
     public CompletableFuture<Void> offload(ReadHandle readHandle,
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/impl/BlobStoreManagedLedgerOffloaderTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/impl/BlobStoreManagedLedgerOffloaderTest.java
index 19463d2fc0..86b4657384 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/impl/BlobStoreManagedLedgerOffloaderTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/impl/BlobStoreManagedLedgerOffloaderTest.java
@@ -26,6 +26,7 @@
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Mockito.mock;
 
+import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.Method;
 import java.util.HashMap;
@@ -179,6 +180,55 @@ public void testSmallBlockSizeConfigured() throws 
Exception {
         }
     }
 
+    @Test
+    public void testGcsNoKeyPath() throws Exception {
+        ServiceConfiguration conf = new ServiceConfiguration();
+        conf.setManagedLedgerOffloadDriver("google-cloud-storage");
+        conf.setGcsManagedLedgerOffloadBucket(BUCKET);
+
+        try {
+            BlobStoreManagedLedgerOffloader.create(conf, scheduler);
+            Assert.fail("Should have thrown exception");
+        } catch (PulsarServerException pse) {
+            // correct
+            log.error("Expected pse", pse);
+        }
+    }
+
+    @Test
+    public void testGcsNoBucketConfigured() throws Exception {
+        ServiceConfiguration conf = new ServiceConfiguration();
+        conf.setManagedLedgerOffloadDriver("google-cloud-storage");
+        File tmpKeyFile = File.createTempFile("gcsOffload", "json");
+        
conf.setGcsManagedLedgerOffloadServiceAccountKeyFile(tmpKeyFile.getAbsolutePath());
+
+        try {
+            BlobStoreManagedLedgerOffloader.create(conf, scheduler);
+            Assert.fail("Should have thrown exception");
+        } catch (PulsarServerException pse) {
+            // correct
+            log.error("Expected pse", pse);
+        }
+    }
+
+    @Test
+    public void testGcsSmallBlockSizeConfigured() throws Exception {
+        ServiceConfiguration conf = new ServiceConfiguration();
+        conf.setManagedLedgerOffloadDriver("google-cloud-storage");
+        File tmpKeyFile = File.createTempFile("gcsOffload", "json");
+        
conf.setGcsManagedLedgerOffloadServiceAccountKeyFile(tmpKeyFile.getAbsolutePath());
+        conf.setGcsManagedLedgerOffloadBucket(BUCKET);
+        conf.setGcsManagedLedgerOffloadMaxBlockSizeInBytes(1024);
+
+        try {
+            BlobStoreManagedLedgerOffloader.create(conf, scheduler);
+            Assert.fail("Should have thrown exception");
+        } catch (PulsarServerException pse) {
+            // correct
+            log.error("Expected pse", pse);
+        }
+    }
+
     @Test
     public void testOffloadAndRead() throws Exception {
         ReadHandle toWrite = buildReadHandle(DEFAULT_BLOCK_SIZE, 3);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to