This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 870e459 [tiered storage] move offloader implementation to a separate
module (#2371)
870e459 is described below
commit 870e459fac80909bf9ad177b67d329b476c79486
Author: Sijie Guo <[email protected]>
AuthorDate: Tue Aug 14 11:01:15 2018 -0700
[tiered storage] move offloader implementation to a separate module (#2371)
### Motivation
We need tiered storage implementation available for Pulsar SQL.
And we need to use NAR plugin for packaging different offloaders to
avoid version conflicts.
### Changes
- Create `tiered-storage` and `tiered-storage/jcloud` modules.
- Move `org.apache.pulsar.broker.offload` to `tiered-storage/jcloud`.
- Add `TieredStorageConfigurationData` for all tiered storage related
configuration
### NOTES
This change is mainly for relocating files only.
---
pom.xml | 1 +
pulsar-broker/pom.xml | 16 ++---
.../org/apache/pulsar/broker/PulsarService.java | 39 +++++++++++-
tiered-storage/jcloud/pom.xml | 58 +++++++++++++++++
.../pulsar/broker/offload/BackedInputStream.java | 0
.../offload/BlockAwareSegmentInputStream.java | 0
.../pulsar/broker/offload/DataBlockHeader.java | 0
.../pulsar/broker/offload/OffloadIndexBlock.java | 0
.../broker/offload/OffloadIndexBlockBuilder.java | 0
.../pulsar/broker/offload/OffloadIndexEntry.java | 0
.../offload/TieredStorageConfigurationData.java | 69 ++++++++++++++++++++
.../impl/BlobStoreBackedInputStreamImpl.java | 0
.../impl/BlobStoreBackedReadHandleImpl.java | 3 +-
.../impl/BlobStoreManagedLedgerOffloader.java | 73 ++++++++++++++--------
.../impl/BlockAwareSegmentInputStreamImpl.java | 0
.../broker/offload/impl/DataBlockHeaderImpl.java | 0
.../offload/impl/OffloadIndexBlockBuilderImpl.java | 0
.../broker/offload/impl/OffloadIndexBlockImpl.java | 0
.../broker/offload/impl/OffloadIndexEntryImpl.java | 0
.../offload/BlobStoreBackedInputStreamTest.java | 0
.../pulsar/broker/offload/BlobStoreTestBase.java | 0
.../impl/BlobStoreManagedLedgerOffloaderTest.java | 52 +++++++++------
.../impl/BlockAwareSegmentInputStreamTest.java | 0
.../broker/offload/impl/DataBlockHeaderTest.java | 0
.../broker/offload/impl/OffloadIndexTest.java | 0
tiered-storage/pom.xml | 38 +++++++++++
26 files changed, 292 insertions(+), 57 deletions(-)
diff --git a/pom.xml b/pom.xml
index 2ce7e2f..f60421c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -76,6 +76,7 @@ flexible messaging model and an intuitive client
API.</description>
<module>buildtools</module>
<module>managed-ledger</module>
<module>managed-ledger-shaded</module>
+ <module>tiered-storage</module>
<module>pulsar-common</module>
<module>pulsar-broker-common</module>
<module>pulsar-broker</module>
diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml
index 8e5d3d3..f3410b5 100644
--- a/pulsar-broker/pom.xml
+++ b/pulsar-broker/pom.xml
@@ -99,6 +99,12 @@
<dependency>
<groupId>${project.groupId}</groupId>
+ <artifactId>tiered-storage-jcloud</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>${project.groupId}</groupId>
<artifactId>pulsar-broker-common</artifactId>
<version>${project.version}</version>
</dependency>
@@ -125,11 +131,6 @@
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>com.amazonaws</groupId>
- <artifactId>aws-java-sdk-core</artifactId>
- </dependency>
-
<!-- functions related dependencies (begin) -->
<dependency>
@@ -274,11 +275,6 @@
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>org.apache.pulsar</groupId>
- <artifactId>jclouds-shaded</artifactId>
- <version>${project.version}</version>
- </dependency>
</dependencies>
<build>
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index ff224bc..f6319ae 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -22,6 +22,7 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.broker.admin.impl.NamespacesBase.getBundles;
import static
org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@@ -61,6 +62,7 @@ import
org.apache.pulsar.broker.loadbalance.LoadResourceQuotaUpdaterTask;
import org.apache.pulsar.broker.loadbalance.LoadSheddingTask;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.broker.offload.TieredStorageConfigurationData;
import org.apache.pulsar.broker.offload.impl.BlobStoreManagedLedgerOffloader;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Topic;
@@ -654,16 +656,51 @@ public class PulsarService implements AutoCloseable {
return offloader;
}
+ // TODO: improve the user metadata in subsequent changes
+ static final String METADATA_SOFTWARE_VERSION_KEY =
"S3ManagedLedgerOffloaderSoftwareVersion";
+ static final String METADATA_SOFTWARE_GITSHA_KEY =
"S3ManagedLedgerOffloaderSoftwareGitSha";
+
+
public synchronized LedgerOffloader
createManagedLedgerOffloader(ServiceConfiguration conf)
throws PulsarServerException {
if (conf.getManagedLedgerOffloadDriver() != null
&&
BlobStoreManagedLedgerOffloader.driverSupported(conf.getManagedLedgerOffloadDriver()))
{
- return BlobStoreManagedLedgerOffloader.create(conf,
getOffloaderScheduler(conf));
+ try {
+ return BlobStoreManagedLedgerOffloader.create(
+ getTieredStorageConf(conf),
+ ImmutableMap.of(
+ METADATA_SOFTWARE_VERSION_KEY.toLowerCase(),
PulsarBrokerVersionStringUtils.getNormalizedVersionString(),
+ METADATA_SOFTWARE_GITSHA_KEY.toLowerCase(),
PulsarBrokerVersionStringUtils.getGitSha()
+ ),
+ getOffloaderScheduler(conf));
+ } catch (IOException ioe) {
+ throw new PulsarServerException(ioe.getMessage(),
ioe.getCause());
+ }
} else {
return NullLedgerOffloader.INSTANCE;
}
}
+ private static TieredStorageConfigurationData
getTieredStorageConf(ServiceConfiguration serverConf) {
+ TieredStorageConfigurationData tsConf = new
TieredStorageConfigurationData();
+ // generic settings
+
tsConf.setManagedLedgerOffloadDriver(serverConf.getManagedLedgerOffloadDriver());
+
tsConf.setManagedLedgerOffloadMaxThreads(serverConf.getManagedLedgerOffloadMaxThreads());
+ // s3 settings
+
tsConf.setS3ManagedLedgerOffloadRegion(serverConf.getS3ManagedLedgerOffloadRegion());
+
tsConf.setS3ManagedLedgerOffloadBucket(serverConf.getS3ManagedLedgerOffloadBucket());
+
tsConf.setS3ManagedLedgerOffloadServiceEndpoint(serverConf.getS3ManagedLedgerOffloadServiceEndpoint());
+
tsConf.setS3ManagedLedgerOffloadMaxBlockSizeInBytes(serverConf.getS3ManagedLedgerOffloadMaxBlockSizeInBytes());
+
tsConf.setS3ManagedLedgerOffloadReadBufferSizeInBytes(serverConf.getS3ManagedLedgerOffloadReadBufferSizeInBytes());
+ // gcs settings
+
tsConf.setGcsManagedLedgerOffloadRegion(serverConf.getGcsManagedLedgerOffloadRegion());
+
tsConf.setGcsManagedLedgerOffloadBucket(serverConf.getGcsManagedLedgerOffloadBucket());
+
tsConf.setGcsManagedLedgerOffloadServiceAccountKeyFile(serverConf.getGcsManagedLedgerOffloadServiceAccountKeyFile());
+
tsConf.setGcsManagedLedgerOffloadMaxBlockSizeInBytes(serverConf.getGcsManagedLedgerOffloadMaxBlockSizeInBytes());
+
tsConf.setGcsManagedLedgerOffloadReadBufferSizeInBytes(serverConf.getGcsManagedLedgerOffloadReadBufferSizeInBytes());
+ return tsConf;
+ }
+
public ZooKeeperCache getLocalZkCache() {
return localZkCache;
}
diff --git a/tiered-storage/jcloud/pom.xml b/tiered-storage/jcloud/pom.xml
new file mode 100644
index 0000000..4c4bc39
--- /dev/null
+++ b/tiered-storage/jcloud/pom.xml
@@ -0,0 +1,58 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>tiered-storage-parent</artifactId>
+ <version>2.2.0-incubating-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>tiered-storage-jcloud</artifactId>
+ <name>Apache Pulsar :: Tiered Storage :: JCloud</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>managed-ledger-original</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>jclouds-shaded</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.amazonaws</groupId>
+ <artifactId>aws-java-sdk-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>managed-ledger-original</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/BackedInputStream.java
b/tiered-storage/jcloud/src/main/java/org/apache/pulsar/broker/offload/BackedInputStream.java
similarity index 100%
rename from
pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/BackedInputStream.java
rename to
tiered-storage/jcloud/src/main/java/org/apache/pulsar/broker/offload/BackedInputStream.java
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/BlockAwareSegmentInputStream.java
b/tiered-storage/jcloud/src/main/java/org/apache/pulsar/broker/offload/BlockAwareSegmentInputStream.java
similarity index 100%
rename from
pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/BlockAwareSegmentInputStream.java
rename to
tiered-storage/jcloud/src/main/java/org/apache/pulsar/broker/offload/BlockAwareSegmentInputStream.java
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/DataBlockHeader.java
b/tiered-storage/jcloud/src/main/java/org/apache/pulsar/broker/offload/DataBlockHeader.java
similarity index 100%
rename from
pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/DataBlockHeader.java
rename to
tiered-storage/jcloud/src/main/java/org/apache/pulsar/broker/offload/DataBlockHeader.java
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/OffloadIndexBlock.java
b/tiered-storage/jcloud/src/main/java/org/apache/pulsar/broker/offload/OffloadIndexBlock.java
similarity index 100%
rename from
pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/OffloadIndexBlock.java
rename to
tiered-storage/jcloud/src/main/java/org/apache/pulsar/broker/offload/OffloadIndexBlock.java
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/OffloadIndexBlockBuilder.java
b/tiered-storage/jcloud/src/main/java/org/apache/pulsar/broker/offload/OffloadIndexBlockBuilder.java
similarity index 100%
rename from
pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/OffloadIndexBlockBuilder.java
rename to
tiered-storage/jcloud/src/main/java/org/apache/pulsar/broker/offload/OffloadIndexBlockBuilder.java
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/OffloadIndexEntry.java
b/tiered-storage/jcloud/src/main/java/org/apache/pulsar/broker/offload/OffloadIndexEntry.java
similarity index 100%
rename from
pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/OffloadIndexEntry.java
rename to
tiered-storage/jcloud/src/main/java/org/apache/pulsar/broker/offload/OffloadIndexEntry.java
diff --git
a/tiered-storage/jcloud/src/main/java/org/apache/pulsar/broker/offload/TieredStorageConfigurationData.java
b/tiered-storage/jcloud/src/main/java/org/apache/pulsar/broker/offload/TieredStorageConfigurationData.java
new file mode 100644
index 0000000..c7ef3fd
--- /dev/null
+++
b/tiered-storage/jcloud/src/main/java/org/apache/pulsar/broker/offload/TieredStorageConfigurationData.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.offload;
+
+import java.io.Serializable;
+import lombok.Data;
+
+/**
+ * Configuration for tiered storage.
+ */
+@Data
+public class TieredStorageConfigurationData implements Serializable, Cloneable{
+
+ /**** --- Ledger Offloading --- ****/
+ // Driver to use to offload old data to long term storage
+ private String managedLedgerOffloadDriver = null;
+
+ // Maximum number of thread pool threads for ledger offloading
+ private int managedLedgerOffloadMaxThreads = 2;
+
+ // For Amazon S3 ledger offload, AWS region
+ private String s3ManagedLedgerOffloadRegion = null;
+
+ // For Amazon S3 ledger offload, Bucket to place offloaded ledger into
+ private String s3ManagedLedgerOffloadBucket = null;
+
+ // For Amazon S3 ledger offload, Alternative endpoint to connect to
(useful for testing)
+ private String s3ManagedLedgerOffloadServiceEndpoint = null;
+
+ // For Amazon S3 ledger offload, Max block size in bytes.
+ private int s3ManagedLedgerOffloadMaxBlockSizeInBytes = 64 * 1024 * 1024;
// 64MB
+
+ // For Amazon S3 ledger offload, Read buffer size in bytes.
+ private int s3ManagedLedgerOffloadReadBufferSizeInBytes = 1024 * 1024; //
1MB
+
+ // For Google Cloud Storage ledger offload, region where offload bucket is
located.
+ // reference this page for more details:
https://cloud.google.com/storage/docs/bucket-locations
+ private String gcsManagedLedgerOffloadRegion = null;
+
+ // For Google Cloud Storage ledger offload, Bucket to place offloaded
ledger into
+ private String gcsManagedLedgerOffloadBucket = null;
+
+ // For Google Cloud Storage ledger offload, Max block size in bytes.
+ private int gcsManagedLedgerOffloadMaxBlockSizeInBytes = 64 * 1024 * 1024;
// 64MB
+
+ // For Google Cloud Storage ledger offload, Read buffer size in bytes.
+ private int gcsManagedLedgerOffloadReadBufferSizeInBytes = 1024 * 1024; //
1MB
+
+ // For Google Cloud Storage, path to json file containing service account
credentials.
+ // For more details, see the "Service Accounts" section of
https://support.google.com/googleapi/answer/6158849
+ private String gcsManagedLedgerOffloadServiceAccountKeyFile = null;
+
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/BlobStoreBackedInputStreamImpl.java
b/tiered-storage/jcloud/src/main/java/org/apache/pulsar/broker/offload/impl/BlobStoreBackedInputStreamImpl.java
similarity index 100%
rename from
pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/BlobStoreBackedInputStreamImpl.java
rename to
tiered-storage/jcloud/src/main/java/org/apache/pulsar/broker/offload/impl/BlobStoreBackedInputStreamImpl.java
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/BlobStoreBackedReadHandleImpl.java
b/tiered-storage/jcloud/src/main/java/org/apache/pulsar/broker/offload/impl/BlobStoreBackedReadHandleImpl.java
similarity index 98%
rename from
pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/BlobStoreBackedReadHandleImpl.java
rename to
tiered-storage/jcloud/src/main/java/org/apache/pulsar/broker/offload/impl/BlobStoreBackedReadHandleImpl.java
index 36b382b..7598d58 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/BlobStoreBackedReadHandleImpl.java
+++
b/tiered-storage/jcloud/src/main/java/org/apache/pulsar/broker/offload/impl/BlobStoreBackedReadHandleImpl.java
@@ -18,7 +18,6 @@
*/
package org.apache.pulsar.broker.offload.impl;
-import com.amazonaws.AmazonClientException;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import java.io.DataInputStream;
@@ -186,7 +185,7 @@ public class BlobStoreBackedReadHandleImpl implements
ReadHandle {
BlobStore blobStore, String bucket, String
key, String indexKey,
VersionCheck versionCheck,
long ledgerId, int readBufferSize)
- throws AmazonClientException, IOException {
+ throws IOException {
Blob blob = blobStore.getBlob(bucket, indexKey);
versionCheck.check(indexKey, blob);
OffloadIndexBlockBuilder indexBuilder =
OffloadIndexBlockBuilder.create();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/BlobStoreManagedLedgerOffloader.java
b/tiered-storage/jcloud/src/main/java/org/apache/pulsar/broker/offload/impl/BlobStoreManagedLedgerOffloader.java
similarity index 86%
rename from
pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/BlobStoreManagedLedgerOffloader.java
rename to
tiered-storage/jcloud/src/main/java/org/apache/pulsar/broker/offload/impl/BlobStoreManagedLedgerOffloader.java
index fa0647c..b0345fa 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/BlobStoreManagedLedgerOffloader.java
+++
b/tiered-storage/jcloud/src/main/java/org/apache/pulsar/broker/offload/impl/BlobStoreManagedLedgerOffloader.java
@@ -25,6 +25,7 @@ import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import com.google.common.io.Files;
import java.io.File;
import java.io.IOException;
@@ -38,12 +39,10 @@ import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.mledger.LedgerOffloader;
-import org.apache.pulsar.broker.PulsarServerException;
-import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.offload.BlockAwareSegmentInputStream;
import org.apache.pulsar.broker.offload.OffloadIndexBlock;
import org.apache.pulsar.broker.offload.OffloadIndexBlockBuilder;
-import org.apache.pulsar.utils.PulsarBrokerVersionStringUtils;
+import org.apache.pulsar.broker.offload.TieredStorageConfigurationData;
import org.jclouds.Constants;
import org.jclouds.ContextBuilder;
import org.jclouds.blobstore.BlobStore;
@@ -71,8 +70,6 @@ public class BlobStoreManagedLedgerOffloader implements
LedgerOffloader {
// use these keys for both s3 and gcs.
static final String METADATA_FORMAT_VERSION_KEY =
"S3ManagedLedgerOffloaderFormatVersion";
- static final String METADATA_SOFTWARE_VERSION_KEY =
"S3ManagedLedgerOffloaderSoftwareVersion";
- static final String METADATA_SOFTWARE_GITSHA_KEY =
"S3ManagedLedgerOffloaderSoftwareGitSha";
static final String CURRENT_VERSION = String.valueOf(1);
public static boolean driverSupported(String driver) {
@@ -87,11 +84,11 @@ public class BlobStoreManagedLedgerOffloader implements
LedgerOffloader {
return driver.equalsIgnoreCase(DRIVER_NAMES[2]);
}
- private static void addVersionInfo(BlobBuilder blobBuilder) {
- blobBuilder.userMetadata(ImmutableMap.of(
- METADATA_FORMAT_VERSION_KEY.toLowerCase(), CURRENT_VERSION,
- METADATA_SOFTWARE_VERSION_KEY.toLowerCase(),
PulsarBrokerVersionStringUtils.getNormalizedVersionString(),
- METADATA_SOFTWARE_GITSHA_KEY.toLowerCase(),
PulsarBrokerVersionStringUtils.getGitSha()));
+ private static void addVersionInfo(BlobBuilder blobBuilder, Map<String,
String> userMetadata) {
+ ImmutableMap.Builder<String, String> metadataBuilder =
ImmutableMap.builder();
+ metadataBuilder.putAll(userMetadata);
+ metadataBuilder.put(METADATA_FORMAT_VERSION_KEY.toLowerCase(),
CURRENT_VERSION);
+ blobBuilder.userMetadata(metadataBuilder.build());
}
private final VersionCheck VERSION_CHECK = (key, blob) -> {
@@ -99,7 +96,7 @@ public class BlobStoreManagedLedgerOffloader implements
LedgerOffloader {
String version =
blob.getMetadata().getUserMetadata().get(METADATA_FORMAT_VERSION_KEY.toLowerCase());
if (version == null || !version.equals(CURRENT_VERSION)) {
throw new IOException(String.format("Invalid object version %s for
%s, expect %s",
- version, key,
CURRENT_VERSION));
+ version, key, CURRENT_VERSION));
}
};
@@ -114,13 +111,21 @@ public class BlobStoreManagedLedgerOffloader implements
LedgerOffloader {
private BlobStoreContext context;
private BlobStore blobStore;
Location location = null;
+ private final Map<String, String> userMetadata;
- public static BlobStoreManagedLedgerOffloader create(ServiceConfiguration
conf,
+ @VisibleForTesting
+ static BlobStoreManagedLedgerOffloader
create(TieredStorageConfigurationData conf,
+ OrderedScheduler scheduler)
throws IOException {
+ return create(conf, Maps.newHashMap(), scheduler);
+ }
+
+ public static BlobStoreManagedLedgerOffloader
create(TieredStorageConfigurationData conf,
+ Map<String, String>
userMetadata,
OrderedScheduler
scheduler)
- throws PulsarServerException {
+ throws IOException {
String driver = conf.getManagedLedgerOffloadDriver();
if (!driverSupported(driver)) {
- throw new PulsarServerException(
+ throw new IOException(
"Not support this kind of driver as offload backend: " +
driver);
}
@@ -139,26 +144,27 @@ public class BlobStoreManagedLedgerOffloader implements
LedgerOffloader {
conf.getGcsManagedLedgerOffloadReadBufferSizeInBytes();
if (isS3Driver(driver) && Strings.isNullOrEmpty(region) &&
Strings.isNullOrEmpty(endpoint)) {
- throw new PulsarServerException(
+ throw new IOException(
"Either s3ManagedLedgerOffloadRegion or
s3ManagedLedgerOffloadServiceEndpoint must be set"
+ " if s3 offload enabled");
}
if (Strings.isNullOrEmpty(bucket)) {
- throw new PulsarServerException(
+ throw new IOException(
"ManagedLedgerOffloadBucket cannot be empty for s3 and gcs
offload");
}
if (maxBlockSize < 5*1024*1024) {
- throw new PulsarServerException(
+ throw new IOException(
"ManagedLedgerOffloadMaxBlockSizeInBytes cannot be less than
5MB for s3 and gcs offload");
}
Credentials credentials = getCredentials(driver, conf);
- return new BlobStoreManagedLedgerOffloader(driver, bucket, scheduler,
maxBlockSize, readBufferSize, endpoint, region, credentials);
+ return new BlobStoreManagedLedgerOffloader(driver, bucket, scheduler,
+ maxBlockSize, readBufferSize, endpoint, region, credentials,
userMetadata);
}
- public static Credentials getCredentials(String driver,
ServiceConfiguration conf) throws PulsarServerException {
+ public static Credentials getCredentials(String driver,
TieredStorageConfigurationData conf) throws IOException {
// credentials:
// for s3, get by DefaultAWSCredentialsProviderChain.
// for gcs, use downloaded file 'google_creds.json', which contains
service account key by
@@ -167,7 +173,7 @@ public class BlobStoreManagedLedgerOffloader implements
LedgerOffloader {
if (isGcsDriver(driver)) {
String gcsKeyPath =
conf.getGcsManagedLedgerOffloadServiceAccountKeyFile();
if (Strings.isNullOrEmpty(gcsKeyPath)) {
- throw new PulsarServerException(
+ throw new IOException(
"The service account key path is empty for GCS driver");
}
try {
@@ -175,7 +181,7 @@ public class BlobStoreManagedLedgerOffloader implements
LedgerOffloader {
return new GoogleCredentialsFromJson(gcsKeyContent).get();
} catch (IOException ioe) {
log.error("Cannot read GCS service account credentials file:
{}", gcsKeyPath);
- throw new PulsarServerException(ioe);
+ throw new IOException(ioe);
}
} else if (isS3Driver(driver)) {
AWSCredentials credentials = null;
@@ -195,19 +201,27 @@ public class BlobStoreManagedLedgerOffloader implements
LedgerOffloader {
}
return new Credentials(id, key);
} else {
- throw new PulsarServerException(
+ throw new IOException(
"Not support this kind of driver: " + driver);
}
}
+
// build context for jclouds BlobStoreContext
BlobStoreManagedLedgerOffloader(String driver, String container,
OrderedScheduler scheduler,
- int maxBlockSize, int readBufferSize, String
endpoint, String region, Credentials credentials) {
+ int maxBlockSize, int readBufferSize,
String endpoint, String region, Credentials credentials) {
+ this(driver, container, scheduler, maxBlockSize, readBufferSize,
endpoint, region, credentials, Maps.newHashMap());
+ }
+
+ BlobStoreManagedLedgerOffloader(String driver, String container,
OrderedScheduler scheduler,
+ int maxBlockSize, int readBufferSize,
+ String endpoint, String region,
Credentials credentials,
+ Map<String, String> userMetadata) {
this.scheduler = scheduler;
this.readBufferSize = readBufferSize;
-
this.bucket = container;
this.maxBlockSize = maxBlockSize;
+ this.userMetadata = userMetadata;
Properties overrides = new Properties();
// This property controls the number of parts being uploaded in
parallel.
@@ -238,11 +252,18 @@ public class BlobStoreManagedLedgerOffloader implements
LedgerOffloader {
@VisibleForTesting
BlobStoreManagedLedgerOffloader(BlobStore blobStore, String container,
OrderedScheduler scheduler,
int maxBlockSize, int readBufferSize) {
+ this(blobStore, container, scheduler, maxBlockSize, readBufferSize,
Maps.newHashMap());
+ }
+
+ BlobStoreManagedLedgerOffloader(BlobStore blobStore, String container,
OrderedScheduler scheduler,
+ int maxBlockSize, int readBufferSize,
+ Map<String, String> userMetadata) {
this.scheduler = scheduler;
this.readBufferSize = readBufferSize;
this.bucket = container;
this.maxBlockSize = maxBlockSize;
this.blobStore = blobStore;
+ this.userMetadata = userMetadata;
}
static String dataBlockOffloadKey(long ledgerId, UUID uuid) {
@@ -285,7 +306,7 @@ public class BlobStoreManagedLedgerOffloader implements
LedgerOffloader {
// init multi part upload for data block.
try {
BlobBuilder blobBuilder = blobStore.blobBuilder(dataBlockKey);
- addVersionInfo(blobBuilder);
+ addVersionInfo(blobBuilder, userMetadata);
Blob blob = blobBuilder.build();
mpu = blobStore.initiateMultipartUpload(bucket,
blob.getMetadata(), new PutOptions());
} catch (Throwable t) {
@@ -348,7 +369,7 @@ public class BlobStoreManagedLedgerOffloader implements
LedgerOffloader {
OffloadIndexBlock.IndexInputStream indexStream =
index.toStream()) {
// write the index block
BlobBuilder blobBuilder = blobStore.blobBuilder(indexBlockKey);
- addVersionInfo(blobBuilder);
+ addVersionInfo(blobBuilder, userMetadata);
Payload indexPayload =
Payloads.newInputStreamPayload(indexStream);
indexPayload.getContentMetadata().setContentLength((long)indexStream.getStreamSize());
indexPayload.getContentMetadata().setContentType("application/octet-stream");
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/BlockAwareSegmentInputStreamImpl.java
b/tiered-storage/jcloud/src/main/java/org/apache/pulsar/broker/offload/impl/BlockAwareSegmentInputStreamImpl.java
similarity index 100%
rename from
pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/BlockAwareSegmentInputStreamImpl.java
rename to
tiered-storage/jcloud/src/main/java/org/apache/pulsar/broker/offload/impl/BlockAwareSegmentInputStreamImpl.java
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/DataBlockHeaderImpl.java
b/tiered-storage/jcloud/src/main/java/org/apache/pulsar/broker/offload/impl/DataBlockHeaderImpl.java
similarity index 100%
rename from
pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/DataBlockHeaderImpl.java
rename to
tiered-storage/jcloud/src/main/java/org/apache/pulsar/broker/offload/impl/DataBlockHeaderImpl.java
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/OffloadIndexBlockBuilderImpl.java
b/tiered-storage/jcloud/src/main/java/org/apache/pulsar/broker/offload/impl/OffloadIndexBlockBuilderImpl.java
similarity index 100%
rename from
pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/OffloadIndexBlockBuilderImpl.java
rename to
tiered-storage/jcloud/src/main/java/org/apache/pulsar/broker/offload/impl/OffloadIndexBlockBuilderImpl.java
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/OffloadIndexBlockImpl.java
b/tiered-storage/jcloud/src/main/java/org/apache/pulsar/broker/offload/impl/OffloadIndexBlockImpl.java
similarity index 100%
rename from
pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/OffloadIndexBlockImpl.java
rename to
tiered-storage/jcloud/src/main/java/org/apache/pulsar/broker/offload/impl/OffloadIndexBlockImpl.java
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/OffloadIndexEntryImpl.java
b/tiered-storage/jcloud/src/main/java/org/apache/pulsar/broker/offload/impl/OffloadIndexEntryImpl.java
similarity index 100%
rename from
pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/OffloadIndexEntryImpl.java
rename to
tiered-storage/jcloud/src/main/java/org/apache/pulsar/broker/offload/impl/OffloadIndexEntryImpl.java
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/BlobStoreBackedInputStreamTest.java
b/tiered-storage/jcloud/src/test/java/org/apache/pulsar/broker/offload/BlobStoreBackedInputStreamTest.java
similarity index 100%
rename from
pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/BlobStoreBackedInputStreamTest.java
rename to
tiered-storage/jcloud/src/test/java/org/apache/pulsar/broker/offload/BlobStoreBackedInputStreamTest.java
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/BlobStoreTestBase.java
b/tiered-storage/jcloud/src/test/java/org/apache/pulsar/broker/offload/BlobStoreTestBase.java
similarity index 100%
rename from
pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/BlobStoreTestBase.java
rename to
tiered-storage/jcloud/src/test/java/org/apache/pulsar/broker/offload/BlobStoreTestBase.java
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/impl/BlobStoreManagedLedgerOffloaderTest.java
b/tiered-storage/jcloud/src/test/java/org/apache/pulsar/broker/offload/impl/BlobStoreManagedLedgerOffloaderTest.java
similarity index 93%
rename from
pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/impl/BlobStoreManagedLedgerOffloaderTest.java
rename to
tiered-storage/jcloud/src/test/java/org/apache/pulsar/broker/offload/impl/BlobStoreManagedLedgerOffloaderTest.java
index f1e43c2..0999d63 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/impl/BlobStoreManagedLedgerOffloaderTest.java
+++
b/tiered-storage/jcloud/src/test/java/org/apache/pulsar/broker/offload/impl/BlobStoreManagedLedgerOffloaderTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.offload.impl;
+import static java.nio.charset.StandardCharsets.UTF_8;
import static
org.apache.pulsar.broker.offload.impl.BlobStoreManagedLedgerOffloader.dataBlockOffloadKey;
import static
org.apache.pulsar.broker.offload.impl.BlobStoreManagedLedgerOffloader.indexBlockOffloadKey;
import static org.mockito.AdditionalAnswers.delegatesTo;
@@ -26,17 +27,19 @@ import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.mock;
+import com.google.common.util.concurrent.MoreExecutors;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Method;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
-import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerHandle;
@@ -48,10 +51,12 @@ import org.apache.bookkeeper.client.api.LedgerEntry;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.mledger.LedgerOffloader;
-import org.apache.pulsar.broker.PulsarServerException;
-import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.bookkeeper.util.ZkUtils;
import org.apache.pulsar.broker.offload.BlobStoreTestBase;
+import org.apache.pulsar.broker.offload.TieredStorageConfigurationData;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.MockZooKeeper;
+import org.apache.zookeeper.data.ACL;
import org.jclouds.blobstore.BlobStore;
import org.jclouds.blobstore.options.CopyOptions;
import org.mockito.Mockito;
@@ -61,10 +66,21 @@ import org.testng.Assert;
import org.testng.annotations.Test;
import org.testng.collections.Maps;
-@Slf4j
class BlobStoreManagedLedgerOffloaderTest extends BlobStoreTestBase {
private static final Logger log =
LoggerFactory.getLogger(BlobStoreManagedLedgerOffloaderTest.class);
+ private static MockZooKeeper createMockZooKeeper() throws Exception {
+ MockZooKeeper zk =
MockZooKeeper.newInstance(MoreExecutors.newDirectExecutorService());
+ List<ACL> dummyAclList = new ArrayList<ACL>(0);
+
+ ZkUtils.createFullPathOptimistic(zk, "/ledgers/available/192.168.1.1:"
+ 5000,
+ "".getBytes(UTF_8), dummyAclList, CreateMode.PERSISTENT);
+
+ zk.create("/ledgers/LAYOUT", "1\nflat:1".getBytes(UTF_8), dummyAclList,
+ CreateMode.PERSISTENT);
+ return zk;
+ }
+
private static final int DEFAULT_BLOCK_SIZE = 5*1024*1024;
private static final int DEFAULT_READ_BUFFER_SIZE = 1*1024*1024;
final OrderedScheduler scheduler;
@@ -72,7 +88,7 @@ class BlobStoreManagedLedgerOffloaderTest extends
BlobStoreTestBase {
BlobStoreManagedLedgerOffloaderTest() throws Exception {
scheduler =
OrderedScheduler.newSchedulerBuilder().numThreads(1).name("offloader").build();
- bk = new
MockBookKeeper(MockedPulsarServiceBaseTest.createMockZooKeeper());
+ bk = new MockBookKeeper(createMockZooKeeper());
}
private ReadHandle buildReadHandle() throws Exception {
@@ -139,35 +155,35 @@ class BlobStoreManagedLedgerOffloaderTest extends
BlobStoreTestBase {
@Test
public void testNoRegionConfigured() throws Exception {
- ServiceConfiguration conf = new ServiceConfiguration();
+ TieredStorageConfigurationData conf = new
TieredStorageConfigurationData();
conf.setManagedLedgerOffloadDriver("s3");
conf.setS3ManagedLedgerOffloadBucket(BUCKET);
try {
BlobStoreManagedLedgerOffloader.create(conf, scheduler);
Assert.fail("Should have thrown exception");
- } catch (PulsarServerException pse) {
+ } catch (IOException pse) {
// correct
}
}
@Test
public void testNoBucketConfigured() throws Exception {
- ServiceConfiguration conf = new ServiceConfiguration();
+ TieredStorageConfigurationData conf = new
TieredStorageConfigurationData();
conf.setManagedLedgerOffloadDriver("s3");
conf.setS3ManagedLedgerOffloadRegion("eu-west-1");
try {
BlobStoreManagedLedgerOffloader.create(conf, scheduler);
Assert.fail("Should have thrown exception");
- } catch (PulsarServerException pse) {
+ } catch (IOException pse) {
// correct
}
}
@Test
public void testSmallBlockSizeConfigured() throws Exception {
- ServiceConfiguration conf = new ServiceConfiguration();
+ TieredStorageConfigurationData conf = new
TieredStorageConfigurationData();
conf.setManagedLedgerOffloadDriver("s3");
conf.setS3ManagedLedgerOffloadRegion("eu-west-1");
conf.setS3ManagedLedgerOffloadBucket(BUCKET);
@@ -176,21 +192,21 @@ class BlobStoreManagedLedgerOffloaderTest extends
BlobStoreTestBase {
try {
BlobStoreManagedLedgerOffloader.create(conf, scheduler);
Assert.fail("Should have thrown exception");
- } catch (PulsarServerException pse) {
+ } catch (IOException pse) {
// correct
}
}
@Test
public void testGcsNoKeyPath() throws Exception {
- ServiceConfiguration conf = new ServiceConfiguration();
+ TieredStorageConfigurationData conf = new
TieredStorageConfigurationData();
conf.setManagedLedgerOffloadDriver("google-cloud-storage");
conf.setGcsManagedLedgerOffloadBucket(BUCKET);
try {
BlobStoreManagedLedgerOffloader.create(conf, scheduler);
Assert.fail("Should have thrown exception");
- } catch (PulsarServerException pse) {
+ } catch (IOException pse) {
// correct
log.error("Expected pse", pse);
}
@@ -198,7 +214,7 @@ class BlobStoreManagedLedgerOffloaderTest extends
BlobStoreTestBase {
@Test
public void testGcsNoBucketConfigured() throws Exception {
- ServiceConfiguration conf = new ServiceConfiguration();
+ TieredStorageConfigurationData conf = new
TieredStorageConfigurationData();
conf.setManagedLedgerOffloadDriver("google-cloud-storage");
File tmpKeyFile = File.createTempFile("gcsOffload", "json");
conf.setGcsManagedLedgerOffloadServiceAccountKeyFile(tmpKeyFile.getAbsolutePath());
@@ -206,7 +222,7 @@ class BlobStoreManagedLedgerOffloaderTest extends
BlobStoreTestBase {
try {
BlobStoreManagedLedgerOffloader.create(conf, scheduler);
Assert.fail("Should have thrown exception");
- } catch (PulsarServerException pse) {
+ } catch (IOException pse) {
// correct
log.error("Expected pse", pse);
}
@@ -214,7 +230,7 @@ class BlobStoreManagedLedgerOffloaderTest extends
BlobStoreTestBase {
@Test
public void testGcsSmallBlockSizeConfigured() throws Exception {
- ServiceConfiguration conf = new ServiceConfiguration();
+ TieredStorageConfigurationData conf = new
TieredStorageConfigurationData();
conf.setManagedLedgerOffloadDriver("google-cloud-storage");
File tmpKeyFile = File.createTempFile("gcsOffload", "json");
conf.setGcsManagedLedgerOffloadServiceAccountKeyFile(tmpKeyFile.getAbsolutePath());
@@ -224,7 +240,7 @@ class BlobStoreManagedLedgerOffloaderTest extends
BlobStoreTestBase {
try {
BlobStoreManagedLedgerOffloader.create(conf, scheduler);
Assert.fail("Should have thrown exception");
- } catch (PulsarServerException pse) {
+ } catch (IOException pse) {
// correct
log.error("Expected pse", pse);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/impl/BlockAwareSegmentInputStreamTest.java
b/tiered-storage/jcloud/src/test/java/org/apache/pulsar/broker/offload/impl/BlockAwareSegmentInputStreamTest.java
similarity index 100%
rename from
pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/impl/BlockAwareSegmentInputStreamTest.java
rename to
tiered-storage/jcloud/src/test/java/org/apache/pulsar/broker/offload/impl/BlockAwareSegmentInputStreamTest.java
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/impl/DataBlockHeaderTest.java
b/tiered-storage/jcloud/src/test/java/org/apache/pulsar/broker/offload/impl/DataBlockHeaderTest.java
similarity index 100%
rename from
pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/impl/DataBlockHeaderTest.java
rename to
tiered-storage/jcloud/src/test/java/org/apache/pulsar/broker/offload/impl/DataBlockHeaderTest.java
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/impl/OffloadIndexTest.java
b/tiered-storage/jcloud/src/test/java/org/apache/pulsar/broker/offload/impl/OffloadIndexTest.java
similarity index 100%
rename from
pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/impl/OffloadIndexTest.java
rename to
tiered-storage/jcloud/src/test/java/org/apache/pulsar/broker/offload/impl/OffloadIndexTest.java
diff --git a/tiered-storage/pom.xml b/tiered-storage/pom.xml
new file mode 100644
index 0000000..d8f25b8
--- /dev/null
+++ b/tiered-storage/pom.xml
@@ -0,0 +1,38 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <packaging>pom</packaging>
+ <parent>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar</artifactId>
+ <version>2.2.0-incubating-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>tiered-storage-parent</artifactId>
+ <name>Apache Pulsar :: Tiered Storage :: Parent</name>
+
+ <modules>
+ <module>jcloud</module>
+ </modules>
+</project>