This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new de05a9b GCS offload support(3): add configs to support GCS driver
(#2151)
de05a9b is described below
commit de05a9b8e7bdf43d24b0f9ff6e665501d61d6d5e
Author: Jia Zhai <[email protected]>
AuthorDate: Wed Jul 25 02:33:37 2018 +0800
GCS offload support(3): add configs to support GCS driver (#2151)
This is the third part to support Google Cloud Storage offload.
It aims to support GCS related config in `ManagedLedgerOffloader`. It is
based on PR #2065. Please only review the commits after
[e5b1f7a](https://github.com/apache/incubator-pulsar/pull/2065/commits/e5b1f7aad4fc1f0d8f4c61b69806100a38774633).
Currently it passed the real test in GCS, by test case
[ManagedLedgerOffloaderTest#testGcsRealOffload](https://github.com/apache/incubator-pulsar/pull/2151/commits/eda5097f110acfd3fc5ee518dd8bc16771707c27#diff-6387a2ab4cf9c9243135b5a34c8522efR584),
since lack of GCS mock docker image, will try to use read GCS to do the
integration test in later PR.
Master Issue: #2067
---
conf/broker.conf | 19 +++
.../apache/pulsar/broker/ServiceConfiguration.java | 59 +++++++++
.../impl/BlobStoreManagedLedgerOffloader.java | 132 ++++++++++++++++-----
.../impl/BlobStoreManagedLedgerOffloaderTest.java | 50 ++++++++
4 files changed, 229 insertions(+), 31 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index 96433aa..11a87d5 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -487,6 +487,8 @@
schemaRegistryStorageClassName=org.apache.pulsar.broker.service.schema.Bookkeepe
### --- Ledger Offloading --- ###
# Driver to use to offload old data to long term storage (Possible values: S3,
aws-s3, google-cloud-storage)
+# When using google-cloud-storage, Make sure both Google Cloud Storage and
Google Cloud Storage JSON API are enabled for
+# the project (check from Developers Console -> Api&auth -> APIs).
managedLedgerOffloadDriver=
# Maximum number of thread pool threads for ledger offloading
@@ -507,6 +509,23 @@ s3ManagedLedgerOffloadMaxBlockSizeInBytes=67108864
# For Amazon S3 ledger offload, Read buffer size in bytes (1MB by default)
s3ManagedLedgerOffloadReadBufferSizeInBytes=1048576
+# For Google Cloud Storage ledger offload, region where offload bucket is
located.
+# reference this page for more details:
https://cloud.google.com/storage/docs/bucket-locations
+gcsManagedLedgerOffloadRegion=
+
+# For Google Cloud Storage ledger offload, Bucket to place offloaded ledger
into
+gcsManagedLedgerOffloadBucket=
+
+# For Google Cloud Storage ledger offload, Max block size in bytes. (64MB by
default, 5MB minimum)
+gcsManagedLedgerOffloadMaxBlockSizeInBytes=67108864
+
+# For Google Cloud Storage ledger offload, Read buffer size in bytes (1MB by
default)
+gcsManagedLedgerOffloadReadBufferSizeInBytes=1048576
+
+# For Google Cloud Storage, path to json file containing service account
credentials.
+# For more details, see the "Service Accounts" section of
https://support.google.com/googleapi/answer/6158849
+gcsManagedLedgerOffloadServiceAccountKeyFile=
+
### --- Deprecated config variables --- ###
# Deprecated. Use configurationStoreServers
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 1bb6fff..182bc09 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -502,6 +502,25 @@ public class ServiceConfiguration implements
PulsarConfiguration {
@FieldContext(minValue = 1024)
private int s3ManagedLedgerOffloadReadBufferSizeInBytes = 1024 * 1024; //
1MB
+ // For Google Cloud Storage ledger offload, region where offload bucket is
located.
+ // reference this page for more details:
https://cloud.google.com/storage/docs/bucket-locations
+ private String gcsManagedLedgerOffloadRegion = null;
+
+ // For Google Cloud Storage ledger offload, Bucket to place offloaded
ledger into
+ private String gcsManagedLedgerOffloadBucket = null;
+
+ // For Google Cloud Storage ledger offload, Max block size in bytes.
+ @FieldContext(minValue = 5242880) // 5MB
+ private int gcsManagedLedgerOffloadMaxBlockSizeInBytes = 64 * 1024 * 1024;
// 64MB
+
+ // For Google Cloud Storage ledger offload, Read buffer size in bytes.
+ @FieldContext(minValue = 1024)
+ private int gcsManagedLedgerOffloadReadBufferSizeInBytes = 1024 * 1024; //
1MB
+
+ // For Google Cloud Storage, path to json file containing service account
credentials.
+ // For more details, see the "Service Accounts" section of
https://support.google.com/googleapi/answer/6158849
+ private String gcsManagedLedgerOffloadServiceAccountKeyFile = null;
+
public String getZookeeperServers() {
return zookeeperServers;
}
@@ -1733,6 +1752,46 @@ public class ServiceConfiguration implements
PulsarConfiguration {
return this.s3ManagedLedgerOffloadReadBufferSizeInBytes;
}
+ public void setGcsManagedLedgerOffloadRegion(String region) {
+ this.gcsManagedLedgerOffloadRegion = region;
+ }
+
+ public String getGcsManagedLedgerOffloadRegion() {
+ return this.gcsManagedLedgerOffloadRegion;
+ }
+
+ public void setGcsManagedLedgerOffloadBucket(String bucket) {
+ this.gcsManagedLedgerOffloadBucket = bucket;
+ }
+
+ public String getGcsManagedLedgerOffloadBucket() {
+ return this.gcsManagedLedgerOffloadBucket;
+ }
+
+ public void setGcsManagedLedgerOffloadMaxBlockSizeInBytes(int
blockSizeInBytes) {
+ this.gcsManagedLedgerOffloadMaxBlockSizeInBytes = blockSizeInBytes;
+ }
+
+ public int getGcsManagedLedgerOffloadMaxBlockSizeInBytes() {
+ return this.gcsManagedLedgerOffloadMaxBlockSizeInBytes;
+ }
+
+ public void setGcsManagedLedgerOffloadReadBufferSizeInBytes(int
readBufferSizeInBytes) {
+ this.gcsManagedLedgerOffloadReadBufferSizeInBytes =
readBufferSizeInBytes;
+ }
+
+ public int getGcsManagedLedgerOffloadReadBufferSizeInBytes() {
+ return this.gcsManagedLedgerOffloadReadBufferSizeInBytes;
+ }
+
+ public void setGcsManagedLedgerOffloadServiceAccountKeyFile(String
keyPath) {
+ this.gcsManagedLedgerOffloadServiceAccountKeyFile = keyPath;
+ }
+
+ public String getGcsManagedLedgerOffloadServiceAccountKeyFile() {
+ return this.gcsManagedLedgerOffloadServiceAccountKeyFile;
+ }
+
public void setBrokerServiceCompactionMonitorIntervalInSeconds(int
interval) {
this.brokerServiceCompactionMonitorIntervalInSeconds = interval;
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/BlobStoreManagedLedgerOffloader.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/BlobStoreManagedLedgerOffloader.java
index 528ba28..418c14f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/BlobStoreManagedLedgerOffloader.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/BlobStoreManagedLedgerOffloader.java
@@ -20,11 +20,15 @@ package org.apache.pulsar.broker.offload.impl;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+import java.io.File;
import java.io.IOException;
+import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@@ -49,9 +53,11 @@ import org.jclouds.blobstore.domain.BlobBuilder;
import org.jclouds.blobstore.domain.MultipartPart;
import org.jclouds.blobstore.domain.MultipartUpload;
import org.jclouds.blobstore.options.PutOptions;
+import org.jclouds.domain.Credentials;
import org.jclouds.domain.Location;
import org.jclouds.domain.LocationBuilder;
import org.jclouds.domain.LocationScope;
+import org.jclouds.googlecloud.GoogleCredentialsFromJson;
import org.jclouds.io.Payload;
import org.jclouds.io.Payloads;
import org.jclouds.s3.reference.S3Constants;
@@ -63,6 +69,7 @@ public class BlobStoreManagedLedgerOffloader implements
LedgerOffloader {
public static final String[] DRIVER_NAMES = {"S3", "aws-s3",
"google-cloud-storage"};
+ // use these keys for both s3 and gcs.
static final String METADATA_FORMAT_VERSION_KEY =
"S3ManagedLedgerOffloaderFormatVersion";
static final String METADATA_SOFTWARE_VERSION_KEY =
"S3ManagedLedgerOffloaderSoftwareVersion";
static final String METADATA_SOFTWARE_GITSHA_KEY =
"S3ManagedLedgerOffloaderSoftwareGitSha";
@@ -72,11 +79,19 @@ public class BlobStoreManagedLedgerOffloader implements
LedgerOffloader {
return Arrays.stream(DRIVER_NAMES).anyMatch(d ->
d.equalsIgnoreCase(driver));
}
+ public static boolean isS3Driver(String driver) {
+ return driver.equalsIgnoreCase(DRIVER_NAMES[0]) ||
driver.equalsIgnoreCase(DRIVER_NAMES[1]);
+ }
+
+ public static boolean isGcsDriver(String driver) {
+ return driver.equalsIgnoreCase(DRIVER_NAMES[2]);
+ }
+
private static void addVersionInfo(BlobBuilder blobBuilder) {
blobBuilder.userMetadata(ImmutableMap.of(
- METADATA_FORMAT_VERSION_KEY, CURRENT_VERSION,
- METADATA_SOFTWARE_VERSION_KEY,
PulsarBrokerVersionStringUtils.getNormalizedVersionString(),
- METADATA_SOFTWARE_GITSHA_KEY,
PulsarBrokerVersionStringUtils.getGitSha()));
+ METADATA_FORMAT_VERSION_KEY.toLowerCase(), CURRENT_VERSION,
+ METADATA_SOFTWARE_VERSION_KEY.toLowerCase(),
PulsarBrokerVersionStringUtils.getNormalizedVersionString(),
+ METADATA_SOFTWARE_GITSHA_KEY.toLowerCase(),
PulsarBrokerVersionStringUtils.getGitSha()));
}
private final VersionCheck VERSION_CHECK = (key, blob) -> {
@@ -104,30 +119,91 @@ public class BlobStoreManagedLedgerOffloader implements
LedgerOffloader {
OrderedScheduler
scheduler)
throws PulsarServerException {
String driver = conf.getManagedLedgerOffloadDriver();
- String region = conf.getS3ManagedLedgerOffloadRegion();
- String bucket = conf.getS3ManagedLedgerOffloadBucket();
- String endpoint = conf.getS3ManagedLedgerOffloadServiceEndpoint();
- int maxBlockSize = conf.getS3ManagedLedgerOffloadMaxBlockSizeInBytes();
- int readBufferSize =
conf.getS3ManagedLedgerOffloadReadBufferSizeInBytes();
+ if (!driverSupported(driver)) {
+ throw new PulsarServerException(
+ "Not support this kind of driver as offload backend: " +
driver);
+ }
- if (Strings.isNullOrEmpty(region) && Strings.isNullOrEmpty(endpoint)) {
+ String endpoint = conf.getS3ManagedLedgerOffloadServiceEndpoint();
+ String region = isS3Driver(driver) ?
+ conf.getS3ManagedLedgerOffloadRegion() :
+ conf.getGcsManagedLedgerOffloadRegion();
+ String bucket = isS3Driver(driver) ?
+ conf.getS3ManagedLedgerOffloadBucket() :
+ conf.getGcsManagedLedgerOffloadBucket();
+ int maxBlockSize = isS3Driver(driver) ?
+ conf.getS3ManagedLedgerOffloadMaxBlockSizeInBytes() :
+ conf.getGcsManagedLedgerOffloadMaxBlockSizeInBytes();
+ int readBufferSize = isS3Driver(driver) ?
+ conf.getS3ManagedLedgerOffloadReadBufferSizeInBytes() :
+ conf.getGcsManagedLedgerOffloadReadBufferSizeInBytes();
+
+ if (isS3Driver(driver) && Strings.isNullOrEmpty(region) &&
Strings.isNullOrEmpty(endpoint)) {
throw new PulsarServerException(
"Either s3ManagedLedgerOffloadRegion or
s3ManagedLedgerOffloadServiceEndpoint must be set"
+ " if s3 offload enabled");
}
+
if (Strings.isNullOrEmpty(bucket)) {
- throw new PulsarServerException("s3ManagedLedgerOffloadBucket
cannot be empty if s3 offload enabled");
+ throw new PulsarServerException(
+ "ManagedLedgerOffloadBucket cannot be empty for s3 and gcs
offload");
}
if (maxBlockSize < 5*1024*1024) {
- throw new
PulsarServerException("s3ManagedLedgerOffloadMaxBlockSizeInBytes cannot be less
than 5MB");
+ throw new PulsarServerException(
+ "ManagedLedgerOffloadMaxBlockSizeInBytes cannot be less than
5MB for s3 and gcs offload");
}
- return new BlobStoreManagedLedgerOffloader(driver, bucket, scheduler,
maxBlockSize, readBufferSize, endpoint, region);
+ Credentials credentials = getCredentials(driver, conf);
+
+ return new BlobStoreManagedLedgerOffloader(driver, bucket, scheduler,
maxBlockSize, readBufferSize, endpoint, region, credentials);
+ }
+
+ public static Credentials getCredentials(String driver,
ServiceConfiguration conf) throws PulsarServerException {
+ // credentials:
+ // for s3, get by DefaultAWSCredentialsProviderChain.
+ // for gcs, use downloaded file 'google_creds.json', which contains
service account key by
+ // following instructions in page
https://support.google.com/googleapi/answer/6158849
+
+ if (isGcsDriver(driver)) {
+ String gcsKeyPath =
conf.getGcsManagedLedgerOffloadServiceAccountKeyFile();
+ if (Strings.isNullOrEmpty(gcsKeyPath)) {
+ throw new PulsarServerException(
+ "The service account key path is empty for GCS driver");
+ }
+ try {
+ String gcsKeyContent = Files.toString(new File(gcsKeyPath),
Charset.defaultCharset());
+ return new GoogleCredentialsFromJson(gcsKeyContent).get();
+ } catch (IOException ioe) {
+ log.error("Cannot read GCS service account credentials file:
{}", gcsKeyPath);
+ throw new PulsarServerException(ioe);
+ }
+ } else if (isS3Driver(driver)) {
+ AWSCredentials credentials = null;
+ try {
+ DefaultAWSCredentialsProviderChain creds =
DefaultAWSCredentialsProviderChain.getInstance();
+ credentials = creds.getCredentials();
+ } catch (Exception e) {
+ // allowed, some mock s3 service not need credential
+ log.error("Exception when get credentials for s3 ", e);
+ throw new PulsarServerException(e);
+ }
+
+ String id = "accesskey";
+ String key = "secretkey";
+ if (credentials != null) {
+ id = credentials.getAWSAccessKeyId();
+ key = credentials.getAWSSecretKey();
+ }
+ return new Credentials(id, key);
+ } else {
+ throw new PulsarServerException(
+ "Not support this kind of driver: " + driver);
+ }
}
// build context for jclouds BlobStoreContext
BlobStoreManagedLedgerOffloader(String driver, String container,
OrderedScheduler scheduler,
- int maxBlockSize, int readBufferSize,
String endpoint, String region) {
+ int maxBlockSize, int readBufferSize, String
endpoint, String region, Credentials credentials) {
this.scheduler = scheduler;
this.readBufferSize = readBufferSize;
@@ -142,24 +218,9 @@ public class BlobStoreManagedLedgerOffloader implements
LedgerOffloader {
overrides.setProperty(Constants.PROPERTY_MAX_RETRIES,
Integer.toString(100));
ContextBuilder contextBuilder = ContextBuilder.newBuilder(driver);
+ contextBuilder.credentials(credentials.identity,
credentials.credential);
- AWSCredentials credentials = null;
- try {
- DefaultAWSCredentialsProviderChain creds =
DefaultAWSCredentialsProviderChain.getInstance();
- credentials = creds.getCredentials();
- } catch (Exception e) {
- log.error("Exception when get credentials for s3 ", e);
- }
-
- String id = "accesskey";
- String key = "secretkey";
- if (credentials != null) {
- id = credentials.getAWSAccessKeyId();
- key = credentials.getAWSSecretKey();
- }
- contextBuilder.credentials(id, key);
-
- if (!Strings.isNullOrEmpty(endpoint)) {
+ if (isS3Driver(driver) && !Strings.isNullOrEmpty(endpoint)) {
contextBuilder.endpoint(endpoint);
overrides.setProperty(S3Constants.PROPERTY_S3_VIRTUAL_HOST_BUCKETS, "false");
}
@@ -174,7 +235,8 @@ public class BlobStoreManagedLedgerOffloader implements
LedgerOffloader {
this.blobStore = context.getBlobStore();
}
- // build context for jclouds BlobStoreContext
+ // build context for jclouds BlobStoreContext, mostly used in test
+ @VisibleForTesting
BlobStoreManagedLedgerOffloader(BlobStore blobStore, String container,
OrderedScheduler scheduler,
int maxBlockSize, int readBufferSize) {
this.scheduler = scheduler;
@@ -192,6 +254,14 @@ public class BlobStoreManagedLedgerOffloader implements
LedgerOffloader {
return String.format("%s-ledger-%d-index", uuid.toString(), ledgerId);
}
+ public boolean createBucket() {
+ return blobStore.createContainerInLocation(location, bucket);
+ }
+
+ public void deleteBucket() {
+ blobStore.deleteContainer(bucket);
+ }
+
// upload DataBlock to s3 using MultiPartUpload, and indexBlock in a new
Block,
@Override
public CompletableFuture<Void> offload(ReadHandle readHandle,
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/impl/BlobStoreManagedLedgerOffloaderTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/impl/BlobStoreManagedLedgerOffloaderTest.java
index 19463d2..86b4657 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/impl/BlobStoreManagedLedgerOffloaderTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/impl/BlobStoreManagedLedgerOffloaderTest.java
@@ -26,6 +26,7 @@ import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.mock;
+import java.io.File;
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.HashMap;
@@ -180,6 +181,55 @@ class BlobStoreManagedLedgerOffloaderTest extends
BlobStoreTestBase {
}
@Test
+ public void testGcsNoKeyPath() throws Exception {
+ ServiceConfiguration conf = new ServiceConfiguration();
+ conf.setManagedLedgerOffloadDriver("google-cloud-storage");
+ conf.setGcsManagedLedgerOffloadBucket(BUCKET);
+
+ try {
+ BlobStoreManagedLedgerOffloader.create(conf, scheduler);
+ Assert.fail("Should have thrown exception");
+ } catch (PulsarServerException pse) {
+ // correct
+ log.error("Expected pse", pse);
+ }
+ }
+
+ @Test
+ public void testGcsNoBucketConfigured() throws Exception {
+ ServiceConfiguration conf = new ServiceConfiguration();
+ conf.setManagedLedgerOffloadDriver("google-cloud-storage");
+ File tmpKeyFile = File.createTempFile("gcsOffload", "json");
+
conf.setGcsManagedLedgerOffloadServiceAccountKeyFile(tmpKeyFile.getAbsolutePath());
+
+ try {
+ BlobStoreManagedLedgerOffloader.create(conf, scheduler);
+ Assert.fail("Should have thrown exception");
+ } catch (PulsarServerException pse) {
+ // correct
+ log.error("Expected pse", pse);
+ }
+ }
+
+ @Test
+ public void testGcsSmallBlockSizeConfigured() throws Exception {
+ ServiceConfiguration conf = new ServiceConfiguration();
+ conf.setManagedLedgerOffloadDriver("google-cloud-storage");
+ File tmpKeyFile = File.createTempFile("gcsOffload", "json");
+
conf.setGcsManagedLedgerOffloadServiceAccountKeyFile(tmpKeyFile.getAbsolutePath());
+ conf.setGcsManagedLedgerOffloadBucket(BUCKET);
+ conf.setGcsManagedLedgerOffloadMaxBlockSizeInBytes(1024);
+
+ try {
+ BlobStoreManagedLedgerOffloader.create(conf, scheduler);
+ Assert.fail("Should have thrown exception");
+ } catch (PulsarServerException pse) {
+ // correct
+ log.error("Expected pse", pse);
+ }
+ }
+
+ @Test
public void testOffloadAndRead() throws Exception {
ReadHandle toWrite = buildReadHandle(DEFAULT_BLOCK_SIZE, 3);
LedgerOffloader offloader = new
BlobStoreManagedLedgerOffloader(blobStore, BUCKET, scheduler,