This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 83d33d4 [tiered storage] Provide LedgerOffloaderFactory for creating
offloaders (#2392)
83d33d4 is described below
commit 83d33d4a7ff9523be7892324bdc322ed6480ad38
Author: Sijie Guo <[email protected]>
AuthorDate: Fri Aug 17 08:38:17 2018 -0700
[tiered storage] Provide LedgerOffloaderFactory for creating offloaders
(#2392)
* [tiered storage] Provide LedgerOffloaderFactory for creating offloaders
### Motivation
In order to use NAR for packaging offloaders, we need a factory interface
for creating offloaders.
### Changes
- Provide a ledger offloader factory interface for creating offloaders.
- Move implemention specific settings to implementation package to be
respecting to offloader factory interface
* remove unneeded change
---
.../apache/bookkeeper/mledger/LedgerOffloader.java | 2 +-
.../bookkeeper/mledger/LedgerOffloaderFactory.java | 57 ++++++++++
.../apache/pulsar/broker/ServiceConfiguration.java | 120 +--------------------
.../org/apache/pulsar/broker/PulsarService.java | 33 ++----
.../jcloud/JCloudLedgerOffloaderFactory.java | 51 +++++++++
.../jcloud/TieredStorageConfigurationData.java | 30 +++++-
6 files changed, 150 insertions(+), 143 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
index 719b0c9..6885500 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
@@ -27,7 +27,7 @@ import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.client.api.ReadHandle;
/**
- * Interface for offloading ledgers to longterm storage
+ * Interface for offloading ledgers to long-term storage
*/
@Beta
public interface LedgerOffloader {
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderFactory.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderFactory.java
new file mode 100644
index 0000000..f0a6890
--- /dev/null
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderFactory.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Properties;
+import
org.apache.bookkeeper.common.annotation.InterfaceAudience.LimitedPrivate;
+import org.apache.bookkeeper.common.annotation.InterfaceStability.Evolving;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
+
+/**
+ * Factory to create {@link LedgerOffloader} to offload ledgers into long-term
storage.
+ */
+@LimitedPrivate
+@Evolving
+public interface LedgerOffloaderFactory<T extends LedgerOffloader> {
+
+ /**
+ * Check whether the provided driver <tt>driverName</tt> is supported.
+ *
+ * @param driverName offloader driver name
+ * @return true if the driver is supported, otherwise false.
+ */
+ boolean isDriverSupported(String driverName);
+
+ /**
+ * Create a ledger offloader with the provided configuration,
user-metadata and scheduler.
+ *
+ * @param properties service configuration
+ * @param userMetadata user metadata
+ * @param scheduler scheduler
+ * @return the offloader instance
+ * @throws IOException when fail to create an offloader
+ */
+ T create(Properties properties,
+ Map<String, String> userMetadata,
+ OrderedScheduler scheduler)
+ throws IOException;
+
+}
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index c18ec2d..9efa2a6 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -480,48 +480,16 @@ public class ServiceConfiguration implements
PulsarConfiguration {
private boolean exposePublisherStats = true;
/**** --- Ledger Offloading --- ****/
+ /****
+ * NOTES: all implementation related settings should be put in
implementation package.
+ * only common settings like driver name, io threads can be added
here.
+ ****/
// Driver to use to offload old data to long term storage
private String managedLedgerOffloadDriver = null;
// Maximum number of thread pool threads for ledger offloading
private int managedLedgerOffloadMaxThreads = 2;
- // For Amazon S3 ledger offload, AWS region
- private String s3ManagedLedgerOffloadRegion = null;
-
- // For Amazon S3 ledger offload, Bucket to place offloaded ledger into
- private String s3ManagedLedgerOffloadBucket = null;
-
- // For Amazon S3 ledger offload, Alternative endpoint to connect to
(useful for testing)
- private String s3ManagedLedgerOffloadServiceEndpoint = null;
-
- // For Amazon S3 ledger offload, Max block size in bytes.
- @FieldContext(minValue = 5242880) // 5MB
- private int s3ManagedLedgerOffloadMaxBlockSizeInBytes = 64 * 1024 * 1024;
// 64MB
-
- // For Amazon S3 ledger offload, Read buffer size in bytes.
- @FieldContext(minValue = 1024)
- private int s3ManagedLedgerOffloadReadBufferSizeInBytes = 1024 * 1024; //
1MB
-
- // For Google Cloud Storage ledger offload, region where offload bucket is
located.
- // reference this page for more details:
https://cloud.google.com/storage/docs/bucket-locations
- private String gcsManagedLedgerOffloadRegion = null;
-
- // For Google Cloud Storage ledger offload, Bucket to place offloaded
ledger into
- private String gcsManagedLedgerOffloadBucket = null;
-
- // For Google Cloud Storage ledger offload, Max block size in bytes.
- @FieldContext(minValue = 5242880) // 5MB
- private int gcsManagedLedgerOffloadMaxBlockSizeInBytes = 64 * 1024 * 1024;
// 64MB
-
- // For Google Cloud Storage ledger offload, Read buffer size in bytes.
- @FieldContext(minValue = 1024)
- private int gcsManagedLedgerOffloadReadBufferSizeInBytes = 1024 * 1024; //
1MB
-
- // For Google Cloud Storage, path to json file containing service account
credentials.
- // For more details, see the "Service Accounts" section of
https://support.google.com/googleapi/answer/6158849
- private String gcsManagedLedgerOffloadServiceAccountKeyFile = null;
-
public String getZookeeperServers() {
return zookeeperServers;
}
@@ -1721,86 +1689,6 @@ public class ServiceConfiguration implements
PulsarConfiguration {
return this.managedLedgerOffloadMaxThreads;
}
- public void setS3ManagedLedgerOffloadRegion(String region) {
- this.s3ManagedLedgerOffloadRegion = region;
- }
-
- public String getS3ManagedLedgerOffloadRegion() {
- return this.s3ManagedLedgerOffloadRegion;
- }
-
- public void setS3ManagedLedgerOffloadBucket(String bucket) {
- this.s3ManagedLedgerOffloadBucket = bucket;
- }
-
- public String getS3ManagedLedgerOffloadBucket() {
- return this.s3ManagedLedgerOffloadBucket;
- }
-
- public void setS3ManagedLedgerOffloadServiceEndpoint(String endpoint) {
- this.s3ManagedLedgerOffloadServiceEndpoint = endpoint;
- }
-
- public String getS3ManagedLedgerOffloadServiceEndpoint() {
- return this.s3ManagedLedgerOffloadServiceEndpoint;
- }
-
- public void setS3ManagedLedgerOffloadMaxBlockSizeInBytes(int
blockSizeInBytes) {
- this.s3ManagedLedgerOffloadMaxBlockSizeInBytes = blockSizeInBytes;
- }
-
- public int getS3ManagedLedgerOffloadMaxBlockSizeInBytes() {
- return this.s3ManagedLedgerOffloadMaxBlockSizeInBytes;
- }
-
- public void setS3ManagedLedgerOffloadReadBufferSizeInBytes(int
readBufferSizeInBytes) {
- this.s3ManagedLedgerOffloadReadBufferSizeInBytes =
readBufferSizeInBytes;
- }
-
- public int getS3ManagedLedgerOffloadReadBufferSizeInBytes() {
- return this.s3ManagedLedgerOffloadReadBufferSizeInBytes;
- }
-
- public void setGcsManagedLedgerOffloadRegion(String region) {
- this.gcsManagedLedgerOffloadRegion = region;
- }
-
- public String getGcsManagedLedgerOffloadRegion() {
- return this.gcsManagedLedgerOffloadRegion;
- }
-
- public void setGcsManagedLedgerOffloadBucket(String bucket) {
- this.gcsManagedLedgerOffloadBucket = bucket;
- }
-
- public String getGcsManagedLedgerOffloadBucket() {
- return this.gcsManagedLedgerOffloadBucket;
- }
-
- public void setGcsManagedLedgerOffloadMaxBlockSizeInBytes(int
blockSizeInBytes) {
- this.gcsManagedLedgerOffloadMaxBlockSizeInBytes = blockSizeInBytes;
- }
-
- public int getGcsManagedLedgerOffloadMaxBlockSizeInBytes() {
- return this.gcsManagedLedgerOffloadMaxBlockSizeInBytes;
- }
-
- public void setGcsManagedLedgerOffloadReadBufferSizeInBytes(int
readBufferSizeInBytes) {
- this.gcsManagedLedgerOffloadReadBufferSizeInBytes =
readBufferSizeInBytes;
- }
-
- public int getGcsManagedLedgerOffloadReadBufferSizeInBytes() {
- return this.gcsManagedLedgerOffloadReadBufferSizeInBytes;
- }
-
- public void setGcsManagedLedgerOffloadServiceAccountKeyFile(String
keyPath) {
- this.gcsManagedLedgerOffloadServiceAccountKeyFile = keyPath;
- }
-
- public String getGcsManagedLedgerOffloadServiceAccountKeyFile() {
- return this.gcsManagedLedgerOffloadServiceAccountKeyFile;
- }
-
public void setBrokerServiceCompactionMonitorIntervalInSeconds(int
interval) {
this.brokerServiceCompactionMonitorIntervalInSeconds = interval;
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index c4b89fb..d965c7e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -47,8 +47,10 @@ import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.mledger.LedgerOffloader;
+import org.apache.bookkeeper.mledger.LedgerOffloaderFactory;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
+import
org.apache.bookkeeper.mledger.offload.jcloud.JCloudLedgerOffloaderFactory;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
import org.apache.pulsar.broker.admin.AdminResource;
@@ -62,8 +64,6 @@ import
org.apache.pulsar.broker.loadbalance.LoadResourceQuotaUpdaterTask;
import org.apache.pulsar.broker.loadbalance.LoadSheddingTask;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
import org.apache.pulsar.broker.namespace.NamespaceService;
-import
org.apache.bookkeeper.mledger.offload.jcloud.TieredStorageConfigurationData;
-import
org.apache.bookkeeper.mledger.offload.jcloud.impl.BlobStoreManagedLedgerOffloader;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
@@ -664,11 +664,14 @@ public class PulsarService implements AutoCloseable {
public synchronized LedgerOffloader
createManagedLedgerOffloader(ServiceConfiguration conf)
throws PulsarServerException {
try {
+ // TODO: will make this configurable when switching to use NAR
loader to load offloaders
+ LedgerOffloaderFactory offloaderFactory =
JCloudLedgerOffloaderFactory.of();
+
if (conf.getManagedLedgerOffloadDriver() != null
- &&
BlobStoreManagedLedgerOffloader.driverSupported(conf.getManagedLedgerOffloadDriver()))
{
+ &&
offloaderFactory.isDriverSupported(conf.getManagedLedgerOffloadDriver())) {
try {
- return BlobStoreManagedLedgerOffloader.create(
- getTieredStorageConf(conf),
+ return offloaderFactory.create(
+ conf.getProperties(),
ImmutableMap.of(
METADATA_SOFTWARE_VERSION_KEY.toLowerCase(),
PulsarBrokerVersionStringUtils.getNormalizedVersionString(),
METADATA_SOFTWARE_GITSHA_KEY.toLowerCase(),
PulsarBrokerVersionStringUtils.getGitSha()
@@ -685,26 +688,6 @@ public class PulsarService implements AutoCloseable {
}
}
- private static TieredStorageConfigurationData
getTieredStorageConf(ServiceConfiguration serverConf) {
- TieredStorageConfigurationData tsConf = new
TieredStorageConfigurationData();
- // generic settings
-
tsConf.setManagedLedgerOffloadDriver(serverConf.getManagedLedgerOffloadDriver());
-
tsConf.setManagedLedgerOffloadMaxThreads(serverConf.getManagedLedgerOffloadMaxThreads());
- // s3 settings
-
tsConf.setS3ManagedLedgerOffloadRegion(serverConf.getS3ManagedLedgerOffloadRegion());
-
tsConf.setS3ManagedLedgerOffloadBucket(serverConf.getS3ManagedLedgerOffloadBucket());
-
tsConf.setS3ManagedLedgerOffloadServiceEndpoint(serverConf.getS3ManagedLedgerOffloadServiceEndpoint());
-
tsConf.setS3ManagedLedgerOffloadMaxBlockSizeInBytes(serverConf.getS3ManagedLedgerOffloadMaxBlockSizeInBytes());
-
tsConf.setS3ManagedLedgerOffloadReadBufferSizeInBytes(serverConf.getS3ManagedLedgerOffloadReadBufferSizeInBytes());
- // gcs settings
-
tsConf.setGcsManagedLedgerOffloadRegion(serverConf.getGcsManagedLedgerOffloadRegion());
-
tsConf.setGcsManagedLedgerOffloadBucket(serverConf.getGcsManagedLedgerOffloadBucket());
-
tsConf.setGcsManagedLedgerOffloadServiceAccountKeyFile(serverConf.getGcsManagedLedgerOffloadServiceAccountKeyFile());
-
tsConf.setGcsManagedLedgerOffloadMaxBlockSizeInBytes(serverConf.getGcsManagedLedgerOffloadMaxBlockSizeInBytes());
-
tsConf.setGcsManagedLedgerOffloadReadBufferSizeInBytes(serverConf.getGcsManagedLedgerOffloadReadBufferSizeInBytes());
- return tsConf;
- }
-
public ZooKeeperCache getLocalZkCache() {
return localZkCache;
}
diff --git
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/JCloudLedgerOffloaderFactory.java
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/JCloudLedgerOffloaderFactory.java
new file mode 100644
index 0000000..dffe253
--- /dev/null
+++
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/JCloudLedgerOffloaderFactory.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload.jcloud;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.mledger.LedgerOffloaderFactory;
+import
org.apache.bookkeeper.mledger.offload.jcloud.impl.BlobStoreManagedLedgerOffloader;
+
+/**
+ * A jcloud based offloader factory.
+ */
+public class JCloudLedgerOffloaderFactory implements
LedgerOffloaderFactory<BlobStoreManagedLedgerOffloader> {
+
+ public static JCloudLedgerOffloaderFactory of() {
+ return INSTANCE;
+ }
+
+ private static final JCloudLedgerOffloaderFactory INSTANCE = new
JCloudLedgerOffloaderFactory();
+
+ @Override
+ public boolean isDriverSupported(String driverName) {
+ return BlobStoreManagedLedgerOffloader.driverSupported(driverName);
+ }
+
+ @Override
+ public BlobStoreManagedLedgerOffloader create(Properties properties,
+ Map<String, String>
userMetadata,
+ OrderedScheduler scheduler)
throws IOException {
+ TieredStorageConfigurationData data =
TieredStorageConfigurationData.create(properties);
+ return BlobStoreManagedLedgerOffloader.create(data, userMetadata,
scheduler);
+ }
+}
diff --git
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/TieredStorageConfigurationData.java
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/TieredStorageConfigurationData.java
index 7c0d26a..52fedfd 100644
---
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/TieredStorageConfigurationData.java
+++
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/TieredStorageConfigurationData.java
@@ -18,14 +18,19 @@
*/
package org.apache.bookkeeper.mledger.offload.jcloud;
+import static org.apache.pulsar.common.util.FieldParser.value;
+
import java.io.Serializable;
+import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.Properties;
import lombok.Data;
/**
* Configuration for tiered storage.
*/
@Data
-public class TieredStorageConfigurationData implements Serializable, Cloneable{
+public class TieredStorageConfigurationData implements Serializable, Cloneable
{
/**** --- Ledger Offloading --- ****/
// Driver to use to offload old data to long term storage
@@ -66,4 +71,27 @@ public class TieredStorageConfigurationData implements
Serializable, Cloneable{
// For more details, see the "Service Accounts" section of
https://support.google.com/googleapi/answer/6158849
private String gcsManagedLedgerOffloadServiceAccountKeyFile = null;
+ /**
+ * Create a tiered storage configuration from the provided
<tt>properties</tt>.
+ *
+ * @param properties the configuration properties
+ * @return tiered storage configuration
+ */
+ public static TieredStorageConfigurationData create(Properties properties)
{
+ TieredStorageConfigurationData data = new
TieredStorageConfigurationData();
+ Field[] fields =
TieredStorageConfigurationData.class.getDeclaredFields();
+ Arrays.stream(fields).forEach(f -> {
+ if (properties.containsKey(f.getName())) {
+ try {
+ f.setAccessible(true);
+ f.set(data, value((String) properties.get(f.getName()),
f));
+ } catch (Exception e) {
+ throw new IllegalArgumentException(String.format("failed
to initialize %s field while setting value %s",
+ f.getName(), properties.get(f.getName())), e);
+ }
+ }
+ });
+ return data;
+ }
+
}