sijie closed pull request #2151: GCS offload support(3): add configs to support GCS driver URL: https://github.com/apache/incubator-pulsar/pull/2151
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/conf/broker.conf b/conf/broker.conf index 96433aa517..11a87d5071 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -487,6 +487,8 @@ schemaRegistryStorageClassName=org.apache.pulsar.broker.service.schema.Bookkeepe ### --- Ledger Offloading --- ### # Driver to use to offload old data to long term storage (Possible values: S3, aws-s3, google-cloud-storage) +# When using google-cloud-storage, Make sure both Google Cloud Storage and Google Cloud Storage JSON API are enabled for +# the project (check from Developers Console -> Api&auth -> APIs). managedLedgerOffloadDriver= # Maximum number of thread pool threads for ledger offloading @@ -507,6 +509,23 @@ s3ManagedLedgerOffloadMaxBlockSizeInBytes=67108864 # For Amazon S3 ledger offload, Read buffer size in bytes (1MB by default) s3ManagedLedgerOffloadReadBufferSizeInBytes=1048576 +# For Google Cloud Storage ledger offload, region where offload bucket is located. +# reference this page for more details: https://cloud.google.com/storage/docs/bucket-locations +gcsManagedLedgerOffloadRegion= + +# For Google Cloud Storage ledger offload, Bucket to place offloaded ledger into +gcsManagedLedgerOffloadBucket= + +# For Google Cloud Storage ledger offload, Max block size in bytes. (64MB by default, 5MB minimum) +gcsManagedLedgerOffloadMaxBlockSizeInBytes=67108864 + +# For Google Cloud Storage ledger offload, Read buffer size in bytes (1MB by default) +gcsManagedLedgerOffloadReadBufferSizeInBytes=1048576 + +# For Google Cloud Storage, path to json file containing service account credentials. +# For more details, see the "Service Accounts" section of https://support.google.com/googleapi/answer/6158849 +gcsManagedLedgerOffloadServiceAccountKeyFile= + ### --- Deprecated config variables --- ### # Deprecated. Use configurationStoreServers diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 1bb6fff168..182bc0939f 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -502,6 +502,25 @@ @FieldContext(minValue = 1024) private int s3ManagedLedgerOffloadReadBufferSizeInBytes = 1024 * 1024; // 1MB + // For Google Cloud Storage ledger offload, region where offload bucket is located. + // reference this page for more details: https://cloud.google.com/storage/docs/bucket-locations + private String gcsManagedLedgerOffloadRegion = null; + + // For Google Cloud Storage ledger offload, Bucket to place offloaded ledger into + private String gcsManagedLedgerOffloadBucket = null; + + // For Google Cloud Storage ledger offload, Max block size in bytes. + @FieldContext(minValue = 5242880) // 5MB + private int gcsManagedLedgerOffloadMaxBlockSizeInBytes = 64 * 1024 * 1024; // 64MB + + // For Google Cloud Storage ledger offload, Read buffer size in bytes. + @FieldContext(minValue = 1024) + private int gcsManagedLedgerOffloadReadBufferSizeInBytes = 1024 * 1024; // 1MB + + // For Google Cloud Storage, path to json file containing service account credentials. + // For more details, see the "Service Accounts" section of https://support.google.com/googleapi/answer/6158849 + private String gcsManagedLedgerOffloadServiceAccountKeyFile = null; + public String getZookeeperServers() { return zookeeperServers; } @@ -1733,6 +1752,46 @@ public int getS3ManagedLedgerOffloadReadBufferSizeInBytes() { return this.s3ManagedLedgerOffloadReadBufferSizeInBytes; } + public void setGcsManagedLedgerOffloadRegion(String region) { + this.gcsManagedLedgerOffloadRegion = region; + } + + public String getGcsManagedLedgerOffloadRegion() { + return this.gcsManagedLedgerOffloadRegion; + } + + public void setGcsManagedLedgerOffloadBucket(String bucket) { + this.gcsManagedLedgerOffloadBucket = bucket; + } + + public String getGcsManagedLedgerOffloadBucket() { + return this.gcsManagedLedgerOffloadBucket; + } + + public void setGcsManagedLedgerOffloadMaxBlockSizeInBytes(int blockSizeInBytes) { + this.gcsManagedLedgerOffloadMaxBlockSizeInBytes = blockSizeInBytes; + } + + public int getGcsManagedLedgerOffloadMaxBlockSizeInBytes() { + return this.gcsManagedLedgerOffloadMaxBlockSizeInBytes; + } + + public void setGcsManagedLedgerOffloadReadBufferSizeInBytes(int readBufferSizeInBytes) { + this.gcsManagedLedgerOffloadReadBufferSizeInBytes = readBufferSizeInBytes; + } + + public int getGcsManagedLedgerOffloadReadBufferSizeInBytes() { + return this.gcsManagedLedgerOffloadReadBufferSizeInBytes; + } + + public void setGcsManagedLedgerOffloadServiceAccountKeyFile(String keyPath) { + this.gcsManagedLedgerOffloadServiceAccountKeyFile = keyPath; + } + + public String getGcsManagedLedgerOffloadServiceAccountKeyFile() { + return this.gcsManagedLedgerOffloadServiceAccountKeyFile; + } + public void setBrokerServiceCompactionMonitorIntervalInSeconds(int interval) { this.brokerServiceCompactionMonitorIntervalInSeconds = interval; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/BlobStoreManagedLedgerOffloader.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/BlobStoreManagedLedgerOffloader.java index 528ba28348..418c14f04a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/BlobStoreManagedLedgerOffloader.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/BlobStoreManagedLedgerOffloader.java @@ -20,11 +20,15 @@ import com.amazonaws.auth.AWSCredentials; import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; +import com.google.common.io.Files; +import java.io.File; import java.io.IOException; +import java.nio.charset.Charset; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -49,9 +53,11 @@ import org.jclouds.blobstore.domain.MultipartPart; import org.jclouds.blobstore.domain.MultipartUpload; import org.jclouds.blobstore.options.PutOptions; +import org.jclouds.domain.Credentials; import org.jclouds.domain.Location; import org.jclouds.domain.LocationBuilder; import org.jclouds.domain.LocationScope; +import org.jclouds.googlecloud.GoogleCredentialsFromJson; import org.jclouds.io.Payload; import org.jclouds.io.Payloads; import org.jclouds.s3.reference.S3Constants; @@ -63,6 +69,7 @@ public static final String[] DRIVER_NAMES = {"S3", "aws-s3", "google-cloud-storage"}; + // use these keys for both s3 and gcs. static final String METADATA_FORMAT_VERSION_KEY = "S3ManagedLedgerOffloaderFormatVersion"; static final String METADATA_SOFTWARE_VERSION_KEY = "S3ManagedLedgerOffloaderSoftwareVersion"; static final String METADATA_SOFTWARE_GITSHA_KEY = "S3ManagedLedgerOffloaderSoftwareGitSha"; @@ -72,11 +79,19 @@ public static boolean driverSupported(String driver) { return Arrays.stream(DRIVER_NAMES).anyMatch(d -> d.equalsIgnoreCase(driver)); } + public static boolean isS3Driver(String driver) { + return driver.equalsIgnoreCase(DRIVER_NAMES[0]) || driver.equalsIgnoreCase(DRIVER_NAMES[1]); + } + + public static boolean isGcsDriver(String driver) { + return driver.equalsIgnoreCase(DRIVER_NAMES[2]); + } + private static void addVersionInfo(BlobBuilder blobBuilder) { blobBuilder.userMetadata(ImmutableMap.of( - METADATA_FORMAT_VERSION_KEY, CURRENT_VERSION, - METADATA_SOFTWARE_VERSION_KEY, PulsarBrokerVersionStringUtils.getNormalizedVersionString(), - METADATA_SOFTWARE_GITSHA_KEY, PulsarBrokerVersionStringUtils.getGitSha())); + METADATA_FORMAT_VERSION_KEY.toLowerCase(), CURRENT_VERSION, + METADATA_SOFTWARE_VERSION_KEY.toLowerCase(), PulsarBrokerVersionStringUtils.getNormalizedVersionString(), + METADATA_SOFTWARE_GITSHA_KEY.toLowerCase(), PulsarBrokerVersionStringUtils.getGitSha())); } private final VersionCheck VERSION_CHECK = (key, blob) -> { @@ -104,30 +119,91 @@ public static BlobStoreManagedLedgerOffloader create(ServiceConfiguration conf, OrderedScheduler scheduler) throws PulsarServerException { String driver = conf.getManagedLedgerOffloadDriver(); - String region = conf.getS3ManagedLedgerOffloadRegion(); - String bucket = conf.getS3ManagedLedgerOffloadBucket(); - String endpoint = conf.getS3ManagedLedgerOffloadServiceEndpoint(); - int maxBlockSize = conf.getS3ManagedLedgerOffloadMaxBlockSizeInBytes(); - int readBufferSize = conf.getS3ManagedLedgerOffloadReadBufferSizeInBytes(); + if (!driverSupported(driver)) { + throw new PulsarServerException( + "Not support this kind of driver as offload backend: " + driver); + } - if (Strings.isNullOrEmpty(region) && Strings.isNullOrEmpty(endpoint)) { + String endpoint = conf.getS3ManagedLedgerOffloadServiceEndpoint(); + String region = isS3Driver(driver) ? + conf.getS3ManagedLedgerOffloadRegion() : + conf.getGcsManagedLedgerOffloadRegion(); + String bucket = isS3Driver(driver) ? + conf.getS3ManagedLedgerOffloadBucket() : + conf.getGcsManagedLedgerOffloadBucket(); + int maxBlockSize = isS3Driver(driver) ? + conf.getS3ManagedLedgerOffloadMaxBlockSizeInBytes() : + conf.getGcsManagedLedgerOffloadMaxBlockSizeInBytes(); + int readBufferSize = isS3Driver(driver) ? + conf.getS3ManagedLedgerOffloadReadBufferSizeInBytes() : + conf.getGcsManagedLedgerOffloadReadBufferSizeInBytes(); + + if (isS3Driver(driver) && Strings.isNullOrEmpty(region) && Strings.isNullOrEmpty(endpoint)) { throw new PulsarServerException( "Either s3ManagedLedgerOffloadRegion or s3ManagedLedgerOffloadServiceEndpoint must be set" + " if s3 offload enabled"); } + if (Strings.isNullOrEmpty(bucket)) { - throw new PulsarServerException("s3ManagedLedgerOffloadBucket cannot be empty if s3 offload enabled"); + throw new PulsarServerException( + "ManagedLedgerOffloadBucket cannot be empty for s3 and gcs offload"); } if (maxBlockSize < 5*1024*1024) { - throw new PulsarServerException("s3ManagedLedgerOffloadMaxBlockSizeInBytes cannot be less than 5MB"); + throw new PulsarServerException( + "ManagedLedgerOffloadMaxBlockSizeInBytes cannot be less than 5MB for s3 and gcs offload"); } - return new BlobStoreManagedLedgerOffloader(driver, bucket, scheduler, maxBlockSize, readBufferSize, endpoint, region); + Credentials credentials = getCredentials(driver, conf); + + return new BlobStoreManagedLedgerOffloader(driver, bucket, scheduler, maxBlockSize, readBufferSize, endpoint, region, credentials); + } + + public static Credentials getCredentials(String driver, ServiceConfiguration conf) throws PulsarServerException { + // credentials: + // for s3, get by DefaultAWSCredentialsProviderChain. + // for gcs, use downloaded file 'google_creds.json', which contains service account key by + // following instructions in page https://support.google.com/googleapi/answer/6158849 + + if (isGcsDriver(driver)) { + String gcsKeyPath = conf.getGcsManagedLedgerOffloadServiceAccountKeyFile(); + if (Strings.isNullOrEmpty(gcsKeyPath)) { + throw new PulsarServerException( + "The service account key path is empty for GCS driver"); + } + try { + String gcsKeyContent = Files.toString(new File(gcsKeyPath), Charset.defaultCharset()); + return new GoogleCredentialsFromJson(gcsKeyContent).get(); + } catch (IOException ioe) { + log.error("Cannot read GCS service account credentials file: {}", gcsKeyPath); + throw new PulsarServerException(ioe); + } + } else if (isS3Driver(driver)) { + AWSCredentials credentials = null; + try { + DefaultAWSCredentialsProviderChain creds = DefaultAWSCredentialsProviderChain.getInstance(); + credentials = creds.getCredentials(); + } catch (Exception e) { + // allowed, some mock s3 service not need credential + log.error("Exception when get credentials for s3 ", e); + throw new PulsarServerException(e); + } + + String id = "accesskey"; + String key = "secretkey"; + if (credentials != null) { + id = credentials.getAWSAccessKeyId(); + key = credentials.getAWSSecretKey(); + } + return new Credentials(id, key); + } else { + throw new PulsarServerException( + "Not support this kind of driver: " + driver); + } } // build context for jclouds BlobStoreContext BlobStoreManagedLedgerOffloader(String driver, String container, OrderedScheduler scheduler, - int maxBlockSize, int readBufferSize, String endpoint, String region) { + int maxBlockSize, int readBufferSize, String endpoint, String region, Credentials credentials) { this.scheduler = scheduler; this.readBufferSize = readBufferSize; @@ -142,24 +218,9 @@ public static BlobStoreManagedLedgerOffloader create(ServiceConfiguration conf, overrides.setProperty(Constants.PROPERTY_MAX_RETRIES, Integer.toString(100)); ContextBuilder contextBuilder = ContextBuilder.newBuilder(driver); + contextBuilder.credentials(credentials.identity, credentials.credential); - AWSCredentials credentials = null; - try { - DefaultAWSCredentialsProviderChain creds = DefaultAWSCredentialsProviderChain.getInstance(); - credentials = creds.getCredentials(); - } catch (Exception e) { - log.error("Exception when get credentials for s3 ", e); - } - - String id = "accesskey"; - String key = "secretkey"; - if (credentials != null) { - id = credentials.getAWSAccessKeyId(); - key = credentials.getAWSSecretKey(); - } - contextBuilder.credentials(id, key); - - if (!Strings.isNullOrEmpty(endpoint)) { + if (isS3Driver(driver) && !Strings.isNullOrEmpty(endpoint)) { contextBuilder.endpoint(endpoint); overrides.setProperty(S3Constants.PROPERTY_S3_VIRTUAL_HOST_BUCKETS, "false"); } @@ -174,7 +235,8 @@ public static BlobStoreManagedLedgerOffloader create(ServiceConfiguration conf, this.blobStore = context.getBlobStore(); } - // build context for jclouds BlobStoreContext + // build context for jclouds BlobStoreContext, mostly used in test + @VisibleForTesting BlobStoreManagedLedgerOffloader(BlobStore blobStore, String container, OrderedScheduler scheduler, int maxBlockSize, int readBufferSize) { this.scheduler = scheduler; @@ -192,6 +254,14 @@ static String indexBlockOffloadKey(long ledgerId, UUID uuid) { return String.format("%s-ledger-%d-index", uuid.toString(), ledgerId); } + public boolean createBucket() { + return blobStore.createContainerInLocation(location, bucket); + } + + public void deleteBucket() { + blobStore.deleteContainer(bucket); + } + // upload DataBlock to s3 using MultiPartUpload, and indexBlock in a new Block, @Override public CompletableFuture<Void> offload(ReadHandle readHandle, diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/impl/BlobStoreManagedLedgerOffloaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/impl/BlobStoreManagedLedgerOffloaderTest.java index 19463d2fc0..86b4657384 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/impl/BlobStoreManagedLedgerOffloaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/impl/BlobStoreManagedLedgerOffloaderTest.java @@ -26,6 +26,7 @@ import static org.mockito.Matchers.anyLong; import static org.mockito.Mockito.mock; +import java.io.File; import java.io.IOException; import java.lang.reflect.Method; import java.util.HashMap; @@ -179,6 +180,55 @@ public void testSmallBlockSizeConfigured() throws Exception { } } + @Test + public void testGcsNoKeyPath() throws Exception { + ServiceConfiguration conf = new ServiceConfiguration(); + conf.setManagedLedgerOffloadDriver("google-cloud-storage"); + conf.setGcsManagedLedgerOffloadBucket(BUCKET); + + try { + BlobStoreManagedLedgerOffloader.create(conf, scheduler); + Assert.fail("Should have thrown exception"); + } catch (PulsarServerException pse) { + // correct + log.error("Expected pse", pse); + } + } + + @Test + public void testGcsNoBucketConfigured() throws Exception { + ServiceConfiguration conf = new ServiceConfiguration(); + conf.setManagedLedgerOffloadDriver("google-cloud-storage"); + File tmpKeyFile = File.createTempFile("gcsOffload", "json"); + conf.setGcsManagedLedgerOffloadServiceAccountKeyFile(tmpKeyFile.getAbsolutePath()); + + try { + BlobStoreManagedLedgerOffloader.create(conf, scheduler); + Assert.fail("Should have thrown exception"); + } catch (PulsarServerException pse) { + // correct + log.error("Expected pse", pse); + } + } + + @Test + public void testGcsSmallBlockSizeConfigured() throws Exception { + ServiceConfiguration conf = new ServiceConfiguration(); + conf.setManagedLedgerOffloadDriver("google-cloud-storage"); + File tmpKeyFile = File.createTempFile("gcsOffload", "json"); + conf.setGcsManagedLedgerOffloadServiceAccountKeyFile(tmpKeyFile.getAbsolutePath()); + conf.setGcsManagedLedgerOffloadBucket(BUCKET); + conf.setGcsManagedLedgerOffloadMaxBlockSizeInBytes(1024); + + try { + BlobStoreManagedLedgerOffloader.create(conf, scheduler); + Assert.fail("Should have thrown exception"); + } catch (PulsarServerException pse) { + // correct + log.error("Expected pse", pse); + } + } + @Test public void testOffloadAndRead() throws Exception { ReadHandle toWrite = buildReadHandle(DEFAULT_BLOCK_SIZE, 3); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
