This is an automated email from the ASF dual-hosted git repository.
yong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 047cb0e3117 [feat] [tiered-storage] Add pure S3 provider for the
offloader (#15710)
047cb0e3117 is described below
commit 047cb0e3117d55a79c0082c0dc3d2ab3c9bcd6f9
Author: Yong Zhang <[email protected]>
AuthorDate: Wed May 25 15:37:33 2022 +0800
[feat] [tiered-storage] Add pure S3 provider for the offloader (#15710)
* [improve] [tiered-storage] Add pure S3 provider for the offloader
---
*Motivation*
There have some cloud storages are compatible with S3
APIs, such as aliyun-oss. Some other storages also use
the S3 APIs and want to offload the data into them, but
we only support the AWS or the Aliyun.
The PR https://github.com/apache/pulsar/pull/8985 provides
the Aliyun offload provider, but it has a force limitation of
the `S3Constants.PROPERTY_S3_VIRTUAL_HOST_BUCKETS`. That
is not a limitation on other storage service which compatible
with S3 APIs.
This PR provides a more general offload provider `S3` which uses
pure JClouds S3 metadata and allows people to override the
default JClouds properties through system properties.
*Modifications*
- Add the pure S3 offload provider
---
.../jcloud/provider/JCloudBlobStoreProvider.java | 54 ++++++++++++++++------
.../provider/TieredStorageConfiguration.java | 13 ++++++
.../provider/JCloudBlobStoreProviderTests.java | 31 ++++++++++++-
.../provider/TieredStorageConfigurationTests.java | 17 +++++++
4 files changed, 99 insertions(+), 16 deletions(-)
diff --git
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java
index 28f3490b6eb..5969d2d2c3f 100644
---
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java
+++
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java
@@ -176,17 +176,34 @@ public enum JCloudBlobStoreProvider implements
Serializable, ConfigValidation, B
ALIYUN_OSS("aliyun-oss", new AnonymousProviderMetadata(new
S3ApiMetadata(), "")) {
@Override
public void validate(TieredStorageConfiguration config) throws
IllegalArgumentException {
- ALIYUN_OSS_VALIDATION.validate(config);
+ S3_VALIDATION.validate(config);
}
@Override
public BlobStore getBlobStore(TieredStorageConfiguration config) {
- return ALIYUN_OSS_BLOB_STORE_BUILDER.getBlobStore(config);
+ return S3_BLOB_STORE_BUILDER.getBlobStore(config);
}
@Override
public void buildCredentials(TieredStorageConfiguration config) {
- ALIYUN_OSS_CREDENTIAL_BUILDER.buildCredentials(config);
+ S3_CREDENTIAL_BUILDER.buildCredentials(config);
+ }
+ },
+
+ S3("S3", new AnonymousProviderMetadata(new S3ApiMetadata(), "")) {
+ @Override
+ public BlobStore getBlobStore(TieredStorageConfiguration config) {
+ return S3_BLOB_STORE_BUILDER.getBlobStore(config);
+ }
+
+ @Override
+ public void buildCredentials(TieredStorageConfiguration config) {
+ S3_CREDENTIAL_BUILDER.buildCredentials(config);
+ }
+
+ @Override
+ public void validate(TieredStorageConfiguration config) throws
IllegalArgumentException {
+ S3_VALIDATION.validate(config);
}
},
@@ -369,12 +386,14 @@ public enum JCloudBlobStoreProvider implements
Serializable, ConfigValidation, B
}
};
- static final BlobStoreBuilder ALIYUN_OSS_BLOB_STORE_BUILDER =
(TieredStorageConfiguration config) -> {
+ static final BlobStoreBuilder S3_BLOB_STORE_BUILDER =
(TieredStorageConfiguration config) -> {
ContextBuilder contextBuilder =
ContextBuilder.newBuilder(config.getProviderMetadata());
ShadedJCloudsUtils.addStandardModules(contextBuilder);
Properties overrides = config.getOverrides();
- // For security reasons, OSS supports only virtual hosted style access.
- overrides.setProperty(S3Constants.PROPERTY_S3_VIRTUAL_HOST_BUCKETS,
"true");
+ if (ALIYUN_OSS.getDriver().equals(config.getDriver())) {
+ // For security reasons, OSS supports only virtual hosted style
access.
+
overrides.setProperty(S3Constants.PROPERTY_S3_VIRTUAL_HOST_BUCKETS, "true");
+ }
contextBuilder.overrides(overrides);
contextBuilder.endpoint(config.getServiceEndpoint());
@@ -391,7 +410,7 @@ public enum JCloudBlobStoreProvider implements
Serializable, ConfigValidation, B
}
};
- static final ConfigValidation ALIYUN_OSS_VALIDATION =
(TieredStorageConfiguration config) -> {
+ static final ConfigValidation S3_VALIDATION = (TieredStorageConfiguration
config) -> {
if (Strings.isNullOrEmpty(config.getServiceEndpoint())) {
throw new IllegalArgumentException(
"ServiceEndpoint must specified for " + config.getDriver()
+ " offload");
@@ -409,14 +428,21 @@ public enum JCloudBlobStoreProvider implements
Serializable, ConfigValidation, B
}
};
- static final CredentialBuilder ALIYUN_OSS_CREDENTIAL_BUILDER =
(TieredStorageConfiguration config) -> {
- String accountName = System.getenv("ALIYUN_OSS_ACCESS_KEY_ID");
- if (StringUtils.isEmpty(accountName)) {
- throw new IllegalArgumentException("Couldn't get the aliyun oss
access key id.");
+ static final CredentialBuilder S3_CREDENTIAL_BUILDER =
(TieredStorageConfiguration config) -> {
+ String accountName = System.getenv().getOrDefault("ACCESS_KEY_ID", "");
+ // For forward compatibility
+ if (StringUtils.isEmpty(accountName.trim())) {
+ accountName =
System.getenv().getOrDefault("ALIYUN_OSS_ACCESS_KEY_ID", "");
+ }
+ if (StringUtils.isEmpty(accountName.trim())) {
+ throw new IllegalArgumentException("Couldn't get the access key
id.");
+ }
+ String accountKey = System.getenv().getOrDefault("ACCESS_KEY_ID", "");
+ if (StringUtils.isEmpty(accountKey.trim())) {
+ accountKey =
System.getenv().getOrDefault("ALIYUN_OSS_ACCESS_KEY_SECRET", "");
}
- String accountKey = System.getenv("ALIYUN_OSS_ACCESS_KEY_SECRET");
- if (StringUtils.isEmpty(accountKey)) {
- throw new IllegalArgumentException("Couldn't get the aliyun oss
access key secret.");
+ if (StringUtils.isEmpty(accountKey.trim())) {
+ throw new IllegalArgumentException("Couldn't get the access key
secret.");
}
Credentials credentials = new Credentials(
accountName, accountKey);
diff --git
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfiguration.java
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfiguration.java
index c1054969a42..18e3bbf0db8 100644
---
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfiguration.java
+++
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfiguration.java
@@ -329,6 +329,19 @@ public class TieredStorageConfiguration {
overrides.setProperty(S3Constants.PROPERTY_S3_VIRTUAL_HOST_BUCKETS, "false");
}
+ // load more jclouds properties into the overrides
+ System.getProperties().entrySet().stream()
+ .filter(p -> p.getKey().toString().startsWith("jclouds"))
+ .forEach(jcloudsProp -> {
+ overrides.setProperty(jcloudsProp.getKey().toString(),
jcloudsProp.getValue().toString());
+ });
+
+ System.getenv().entrySet().stream()
+ .filter(p -> p.getKey().toString().startsWith("jclouds"))
+ .forEach(jcloudsProp -> {
+ overrides.setProperty(jcloudsProp.getKey().toString(),
jcloudsProp.getValue().toString());
+ });
+
log.info("getOverrides: {}", overrides.toString());
return overrides;
}
diff --git
a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProviderTests.java
b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProviderTests.java
index 28e5829ba2a..4f0c60bc007 100644
---
a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProviderTests.java
+++
b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProviderTests.java
@@ -23,8 +23,6 @@ import static org.testng.Assert.assertEquals;
import java.util.HashMap;
import java.util.Map;
-import
org.apache.bookkeeper.mledger.offload.jcloud.provider.JCloudBlobStoreProvider;
-import
org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration;
import org.testng.annotations.Test;
public class JCloudBlobStoreProviderTests {
@@ -105,4 +103,33 @@ public class JCloudBlobStoreProviderTests {
config = new TieredStorageConfiguration(map);
JCloudBlobStoreProvider.TRANSIENT.validate(config);
}
+
+ @Test()
+ public void s3ValidationTest() {
+ Map<String, String> map = new HashMap<>();
+ map.put("managedLedgerOffloadDriver", "S3");
+ map.put("managedLedgerOffloadServiceEndpoint", "http://s3.service");
+ map.put("managedLedgerOffloadBucket", "test-s3-bucket");
+ TieredStorageConfiguration configuration = new
TieredStorageConfiguration(map);
+ configuration.getProvider().validate(configuration);
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class,
+ expectedExceptionsMessageRegExp = "ServiceEndpoint must specified for
S3 offload")
+ public void s3ValidationServiceEndpointMissed() {
+ Map<String, String> map = new HashMap<>();
+ map.put("managedLedgerOffloadDriver", "S3");
+ TieredStorageConfiguration configuration = new
TieredStorageConfiguration(map);
+ configuration.getProvider().validate(configuration);
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class,
+ expectedExceptionsMessageRegExp = "Bucket cannot be empty for S3
offload")
+ public void s3ValidationBucketMissed() {
+ Map<String, String> map = new HashMap<>();
+ map.put("managedLedgerOffloadDriver", "S3");
+ map.put("managedLedgerOffloadServiceEndpoint", "http://s3.service");
+ TieredStorageConfiguration configuration = new
TieredStorageConfiguration(map);
+ configuration.getProvider().validate(configuration);
+ }
}
diff --git
a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfigurationTests.java
b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfigurationTests.java
index f80f3ceaa1a..bf5e046bf70 100644
---
a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfigurationTests.java
+++
b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfigurationTests.java
@@ -22,6 +22,8 @@ import static org.testng.Assert.assertEquals;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
+
import org.jclouds.domain.Credentials;
import org.testng.annotations.Test;
@@ -205,4 +207,19 @@ public class TieredStorageConfigurationTests {
assertEquals(config.getMaxBlockSizeInBytes(), new Integer(12));
assertEquals(config.getReadBufferSizeInBytes(), new Integer(500));
}
+
+ @Test
+ public void overridePropertiesTest() {
+ Map<String, String> map = new HashMap<>();
+ map.put("s3ManagedLedgerOffloadServiceEndpoint", "http://localhost");
+ map.put("s3ManagedLedgerOffloadRegion", "my-region");
+ System.setProperty("jclouds.SystemPropertyA", "A");
+ System.setProperty("jclouds.region", "jclouds-region");
+ TieredStorageConfiguration config = new
TieredStorageConfiguration(map);
+ Properties properties = config.getOverrides();
+ System.out.println(properties.toString());
+ assertEquals(properties.get("jclouds.region"), "jclouds-region");
+ assertEquals(config.getServiceEndpoint(), "http://localhost");
+ assertEquals(properties.get("jclouds.SystemPropertyA"), "A");
+ }
}