This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 3c8d13c GCS offload support(2): replace `s3client` api with `jclouds`
related api (#2065)
3c8d13c is described below
commit 3c8d13c0546b7fed0100d499bd1841f7ce2a127e
Author: Jia Zhai <[email protected]>
AuthorDate: Fri Jul 20 13:27:05 2018 +0800
GCS offload support(2): replace `s3client` api with `jclouds` related api
(#2065)
This is the second part to support `Google Cloud Storage` offload.
It aims to replace "s3 client" api with "jclouds" api, and make sure unit
test and integration test passed.
There will be a following change to add `Google Cloud Storage` support and
related test.
change:
replace `s3client` api with `jclouds` related api in
`S3ManagedLedgerOffloader`
Master Issue: #2067
---
conf/broker.conf | 2 +-
distribution/server/src/assemble/LICENSE.bin.txt | 10 +-
jclouds-shaded/pom.xml | 105 +++++++
pom.xml | 2 +
pulsar-broker/pom.xml | 8 +-
.../org/apache/pulsar/broker/PulsarService.java | 6 +-
...pl.java => BlobStoreBackedInputStreamImpl.java} | 63 ++--
...mpl.java => BlobStoreBackedReadHandleImpl.java} | 49 ++-
...r.java => BlobStoreManagedLedgerOffloader.java} | 220 +++++++++-----
...st.java => BlobStoreBackedInputStreamTest.java} | 114 ++++---
.../{S3TestBase.java => BlobStoreTestBase.java} | 36 ++-
.../org/apache/pulsar/broker/offload/S3Mock.java | 334 ---------------------
...va => BlobStoreManagedLedgerOffloaderTest.java} | 190 ++++++------
...luster-2-bookie-1-broker-unstarted-with-s3.yaml | 4 +-
14 files changed, 518 insertions(+), 625 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index 2e40d82..96433aa 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -486,7 +486,7 @@
schemaRegistryStorageClassName=org.apache.pulsar.broker.service.schema.Bookkeepe
### --- Ledger Offloading --- ###
-# Driver to use to offload old data to long term storage (Possible values: S3)
+# Driver to use to offload old data to long term storage (Possible values: S3,
aws-s3, google-cloud-storage)
managedLedgerOffloadDriver=
# Maximum number of thread pool threads for ledger offloading
diff --git a/distribution/server/src/assemble/LICENSE.bin.txt
b/distribution/server/src/assemble/LICENSE.bin.txt
index 33b9748..85a97e5 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -334,10 +334,10 @@ The Apache Software License, Version 2.0
- io.swagger-swagger-annotations-1.5.3.jar
- io.swagger-swagger-core-1.5.3.jar
- io.swagger-swagger-models-1.5.3.jar
- * DataSketches
+ * DataSketches
- com.yahoo.datasketches-memory-0.8.3.jar
- com.yahoo.datasketches-sketches-core-0.8.3.jar
- * Apache Commons
+ * Apache Commons
- commons-beanutils-commons-beanutils-1.7.0.jar
- commons-beanutils-commons-beanutils-core-1.8.0.jar
- commons-cli-commons-cli-1.2.jar
@@ -461,6 +461,12 @@ The Apache Software License, Version 2.0
- org.xerial.snappy-snappy-java-1.1.1.3.jar
* Flatbuffers Java
- com.google.flatbuffers-flatbuffers-java-1.9.0.jar
+ * Apache Jclouds
+ - org.apache.jclouds-allblobstore-2.2.0-SNAPSHOT.jar
+ * Google Guice Core Library
+ - com.google.inject.guice-3.0.jar
+ - com.google.inject.extensions:guice-multibindings-3.0.jar
+ - com.google.inject.extensions:guice-assistedinject-3.0.jar
BSD 3-clause "New" or "Revised" License
diff --git a/jclouds-shaded/pom.xml b/jclouds-shaded/pom.xml
new file mode 100644
index 0000000..8aa1786
--- /dev/null
+++ b/jclouds-shaded/pom.xml
@@ -0,0 +1,105 @@
+<?xml version="1.0"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd"
+ xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar</artifactId>
+ <version>2.2.0-incubating-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>jclouds-shaded</artifactId>
+ <name>Apache Pulsar :: Jclouds shaded</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>com.google.code.gson</groupId>
+ <artifactId>gson</artifactId>
+ <version>2.5</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.jclouds</groupId>
+ <artifactId>jclouds-allblobstore</artifactId>
+ <version>2.2.0-SNAPSHOT</version>
+ </dependency>
+ </dependencies>
+
+ <repositories>
+ <repository>
+ <id>jclouds-snapshots</id>
+ <url>https://repository.apache.org/content/repositories/snapshots</url>
+ <snapshots>
+ <enabled>true</enabled>
+ </snapshots>
+ </repository>
+ </repositories>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <createDependencyReducedPom>true</createDependencyReducedPom>
+
<promoteTransitiveDependencies>true</promoteTransitiveDependencies>
+ <minimizeJar>false</minimizeJar>
+
+ <artifactSet>
+ <includes>
+ <include>com.google.code.gson:gson</include>
+ <include>com.google.guava:guava</include>
+ <include>org.apache.jclouds:*</include>
+ <include>org.apache.jclouds.api:*</include>
+ <include>org.apache.jclouds.common:*</include>
+ <include>org.apache.jclouds.provider:*</include>
+
<include>com.google.inject.extensions:guice-assistedinject</include>
+ <include>com.google.inject:guice</include>
+
<include>com.google.inject.extensions:guice-multibindings</include>
+ </includes>
+ </artifactSet>
+
+ <relocations>
+ <relocation>
+ <pattern>com.google</pattern>
+
<shadedPattern>org.apache.pulsar.shaded.com.google</shadedPattern>
+ </relocation>
+ </relocations>
+ <transformers>
+ <transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"
/>
+ <transformer
implementation="org.apache.maven.plugins.shade.resource.PluginXmlResourceTransformer"
/>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/pom.xml b/pom.xml
index 263d362..6369a10 100644
--- a/pom.xml
+++ b/pom.xml
@@ -100,6 +100,8 @@ flexible messaging model and an intuitive client
API.</description>
<module>pulsar-zookeeper</module>
<module>pulsar-log4j2-appender</module>
<module>protobuf-shaded</module>
+ <!-- jclouds shaded for gson conflict:
https://issues.apache.org/jira/browse/JCLOUDS-1166 -->
+ <module>jclouds-shaded</module>
<!-- functions-related modules -->
<module>pulsar-functions</module>
diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml
index d505c7a..ac264bd 100644
--- a/pulsar-broker/pom.xml
+++ b/pulsar-broker/pom.xml
@@ -127,7 +127,7 @@
<dependency>
<groupId>com.amazonaws</groupId>
- <artifactId>aws-java-sdk-s3</artifactId>
+ <artifactId>aws-java-sdk-core</artifactId>
</dependency>
<!-- functions related dependencies (begin) -->
@@ -273,6 +273,12 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>jclouds-shaded</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
<build>
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 2a341af..c0aa9d2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -61,7 +61,7 @@ import
org.apache.pulsar.broker.loadbalance.LoadResourceQuotaUpdaterTask;
import org.apache.pulsar.broker.loadbalance.LoadSheddingTask;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
import org.apache.pulsar.broker.namespace.NamespaceService;
-import org.apache.pulsar.broker.offload.impl.S3ManagedLedgerOffloader;
+import org.apache.pulsar.broker.offload.impl.BlobStoreManagedLedgerOffloader;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
@@ -657,8 +657,8 @@ public class PulsarService implements AutoCloseable {
public synchronized LedgerOffloader
createManagedLedgerOffloader(ServiceConfiguration conf)
throws PulsarServerException {
if (conf.getManagedLedgerOffloadDriver() != null
- &&
conf.getManagedLedgerOffloadDriver().equalsIgnoreCase(S3ManagedLedgerOffloader.DRIVER_NAME))
{
- return S3ManagedLedgerOffloader.create(conf,
getOffloaderScheduler(conf));
+ &&
BlobStoreManagedLedgerOffloader.driverSupported(conf.getManagedLedgerOffloadDriver()))
{
+ return BlobStoreManagedLedgerOffloader.create(conf,
getOffloaderScheduler(conf));
} else {
return NullLedgerOffloader.INSTANCE;
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/S3BackedInputStreamImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/BlobStoreBackedInputStreamImpl.java
similarity index 68%
rename from
pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/S3BackedInputStreamImpl.java
rename to
pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/BlobStoreBackedInputStreamImpl.java
index e55e61b..19fac59 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/S3BackedInputStreamImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/BlobStoreBackedInputStreamImpl.java
@@ -18,27 +18,22 @@
*/
package org.apache.pulsar.broker.offload.impl;
-import com.amazonaws.AmazonClientException;
-import com.amazonaws.services.s3.AmazonS3;
-import com.amazonaws.services.s3.model.GetObjectRequest;
-import com.amazonaws.services.s3.model.S3Object;
-
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
-
-import java.io.InputStream;
import java.io.IOException;
-
+import java.io.InputStream;
import org.apache.pulsar.broker.offload.BackedInputStream;
-import
org.apache.pulsar.broker.offload.impl.S3ManagedLedgerOffloader.VersionCheck;
-
+import
org.apache.pulsar.broker.offload.impl.BlobStoreManagedLedgerOffloader.VersionCheck;
+import org.jclouds.blobstore.BlobStore;
+import org.jclouds.blobstore.domain.Blob;
+import org.jclouds.blobstore.options.GetOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class S3BackedInputStreamImpl extends BackedInputStream {
- private static final Logger log =
LoggerFactory.getLogger(S3BackedInputStreamImpl.class);
+public class BlobStoreBackedInputStreamImpl extends BackedInputStream {
+ private static final Logger log =
LoggerFactory.getLogger(BlobStoreBackedInputStreamImpl.class);
- private final AmazonS3 s3client;
+ private final BlobStore blobStore;
private final String bucket;
private final String key;
private final VersionCheck versionCheck;
@@ -50,10 +45,10 @@ public class S3BackedInputStreamImpl extends
BackedInputStream {
private long bufferOffsetStart;
private long bufferOffsetEnd;
- public S3BackedInputStreamImpl(AmazonS3 s3client, String bucket, String
key,
- VersionCheck versionCheck,
- long objectLen, int bufferSize) {
- this.s3client = s3client;
+ public BlobStoreBackedInputStreamImpl(BlobStore blobStore, String bucket,
String key,
+ VersionCheck versionCheck,
+ long objectLen, int bufferSize) {
+ this.blobStore = blobStore;
this.bucket = bucket;
this.key = key;
this.versionCheck = versionCheck;
@@ -76,26 +71,24 @@ public class S3BackedInputStreamImpl extends
BackedInputStream {
long startRange = cursor;
long endRange = Math.min(cursor + bufferSize - 1,
objectLen - 1);
- GetObjectRequest req = new GetObjectRequest(bucket, key)
- .withRange(startRange, endRange);
- log.debug("Reading range {}-{} from {}/{}", startRange, endRange,
bucket, key);
- try (S3Object obj = s3client.getObject(req)) {
- versionCheck.check(key, obj.getObjectMetadata());
-
- Long[] range = obj.getObjectMetadata().getContentRange();
- long bytesRead = range[1] - range[0] + 1;
- buffer.clear();
- bufferOffsetStart = range[0];
- bufferOffsetEnd = range[1];
- InputStream s = obj.getObjectContent();
- int bytesToCopy = (int)bytesRead;
- while (bytesToCopy > 0) {
- bytesToCopy -= buffer.writeBytes(s, bytesToCopy);
+ try {
+ Blob blob = blobStore.getBlob(bucket, key, new
GetOptions().range(startRange, endRange));
+ versionCheck.check(key, blob);
+
+ try (InputStream stream = blob.getPayload().openStream()) {
+ buffer.clear();
+ bufferOffsetStart = startRange;
+ bufferOffsetEnd = endRange;
+ long bytesRead = endRange - startRange + 1;
+ int bytesToCopy = (int) bytesRead;
+ while (bytesToCopy > 0) {
+ bytesToCopy -= buffer.writeBytes(stream, bytesToCopy);
+ }
+ cursor += buffer.readableBytes();
}
- cursor += buffer.readableBytes();
- } catch (AmazonClientException e) {
- throw new IOException("Error reading from S3", e);
+ } catch (Throwable e) {
+ throw new IOException("Error reading from BlobStore", e);
}
}
return true;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/S3BackedReadHandleImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/BlobStoreBackedReadHandleImpl.java
similarity index 82%
rename from
pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/S3BackedReadHandleImpl.java
rename to
pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/BlobStoreBackedReadHandleImpl.java
index 08b5ea6..36b382b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/S3BackedReadHandleImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/BlobStoreBackedReadHandleImpl.java
@@ -19,13 +19,8 @@
package org.apache.pulsar.broker.offload.impl;
import com.amazonaws.AmazonClientException;
-import com.amazonaws.services.s3.AmazonS3;
-import com.amazonaws.services.s3.model.GetObjectRequest;
-import com.amazonaws.services.s3.model.S3Object;
-
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
-
import java.io.DataInputStream;
import java.io.IOException;
import java.util.ArrayList;
@@ -33,7 +28,6 @@ import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
-
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
import org.apache.bookkeeper.client.api.LedgerEntries;
@@ -42,18 +36,18 @@ import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.client.impl.LedgerEntriesImpl;
import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
-
+import org.apache.pulsar.broker.offload.BackedInputStream;
import org.apache.pulsar.broker.offload.OffloadIndexBlock;
import org.apache.pulsar.broker.offload.OffloadIndexBlockBuilder;
import org.apache.pulsar.broker.offload.OffloadIndexEntry;
-import org.apache.pulsar.broker.offload.BackedInputStream;
-import
org.apache.pulsar.broker.offload.impl.S3ManagedLedgerOffloader.VersionCheck;
-
+import
org.apache.pulsar.broker.offload.impl.BlobStoreManagedLedgerOffloader.VersionCheck;
+import org.jclouds.blobstore.BlobStore;
+import org.jclouds.blobstore.domain.Blob;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class S3BackedReadHandleImpl implements ReadHandle {
- private static final Logger log =
LoggerFactory.getLogger(S3BackedReadHandleImpl.class);
+public class BlobStoreBackedReadHandleImpl implements ReadHandle {
+ private static final Logger log =
LoggerFactory.getLogger(BlobStoreBackedReadHandleImpl.class);
private final long ledgerId;
private final OffloadIndexBlock index;
@@ -61,9 +55,9 @@ public class S3BackedReadHandleImpl implements ReadHandle {
private final DataInputStream dataStream;
private final ExecutorService executor;
- private S3BackedReadHandleImpl(long ledgerId, OffloadIndexBlock index,
- BackedInputStream inputStream,
- ExecutorService executor) {
+ private BlobStoreBackedReadHandleImpl(long ledgerId, OffloadIndexBlock
index,
+ BackedInputStream inputStream,
+ ExecutorService executor) {
this.ledgerId = ledgerId;
this.index = index;
this.inputStream = inputStream;
@@ -189,22 +183,19 @@ public class S3BackedReadHandleImpl implements ReadHandle
{
}
public static ReadHandle open(ScheduledExecutorService executor,
- AmazonS3 s3client, String bucket, String
key, String indexKey,
+ BlobStore blobStore, String bucket, String
key, String indexKey,
VersionCheck versionCheck,
long ledgerId, int readBufferSize)
throws AmazonClientException, IOException {
- GetObjectRequest req = new GetObjectRequest(bucket, indexKey);
- try (S3Object obj = s3client.getObject(req)) {
- versionCheck.check(indexKey, obj.getObjectMetadata());
-
- OffloadIndexBlockBuilder indexBuilder =
OffloadIndexBlockBuilder.create();
- OffloadIndexBlock index =
indexBuilder.fromStream(obj.getObjectContent());
-
- BackedInputStream inputStream = new
S3BackedInputStreamImpl(s3client, bucket, key,
-
versionCheck,
-
index.getDataObjectLength(),
-
readBufferSize);
- return new S3BackedReadHandleImpl(ledgerId, index, inputStream,
executor);
- }
+ Blob blob = blobStore.getBlob(bucket, indexKey);
+ versionCheck.check(indexKey, blob);
+ OffloadIndexBlockBuilder indexBuilder =
OffloadIndexBlockBuilder.create();
+ OffloadIndexBlock index =
indexBuilder.fromStream(blob.getPayload().openStream());
+
+ BackedInputStream inputStream = new
BlobStoreBackedInputStreamImpl(blobStore, bucket, key,
+ versionCheck,
+ index.getDataObjectLength(),
+ readBufferSize);
+ return new BlobStoreBackedReadHandleImpl(ledgerId, index, inputStream,
executor);
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/S3ManagedLedgerOffloader.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/BlobStoreManagedLedgerOffloader.java
similarity index 55%
rename from
pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/S3ManagedLedgerOffloader.java
rename to
pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/BlobStoreManagedLedgerOffloader.java
index ec74d27..528ba28 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/S3ManagedLedgerOffloader.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/BlobStoreManagedLedgerOffloader.java
@@ -18,24 +18,17 @@
*/
package org.apache.pulsar.broker.offload.impl;
-import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration;
-import com.amazonaws.services.s3.AmazonS3;
-import com.amazonaws.services.s3.AmazonS3ClientBuilder;
-import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
-import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
-import com.amazonaws.services.s3.model.DeleteObjectsRequest;
-import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
-import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
-import com.amazonaws.services.s3.model.ObjectMetadata;
-import com.amazonaws.services.s3.model.PartETag;
-import com.amazonaws.services.s3.model.PutObjectRequest;
-import com.amazonaws.services.s3.model.UploadPartRequest;
-import com.amazonaws.services.s3.model.UploadPartResult;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
import java.io.IOException;
-import java.util.LinkedList;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.client.api.ReadHandle;
@@ -47,21 +40,48 @@ import
org.apache.pulsar.broker.offload.BlockAwareSegmentInputStream;
import org.apache.pulsar.broker.offload.OffloadIndexBlock;
import org.apache.pulsar.broker.offload.OffloadIndexBlockBuilder;
import org.apache.pulsar.utils.PulsarBrokerVersionStringUtils;
+import org.jclouds.Constants;
+import org.jclouds.ContextBuilder;
+import org.jclouds.blobstore.BlobStore;
+import org.jclouds.blobstore.BlobStoreContext;
+import org.jclouds.blobstore.domain.Blob;
+import org.jclouds.blobstore.domain.BlobBuilder;
+import org.jclouds.blobstore.domain.MultipartPart;
+import org.jclouds.blobstore.domain.MultipartUpload;
+import org.jclouds.blobstore.options.PutOptions;
+import org.jclouds.domain.Location;
+import org.jclouds.domain.LocationBuilder;
+import org.jclouds.domain.LocationScope;
+import org.jclouds.io.Payload;
+import org.jclouds.io.Payloads;
+import org.jclouds.s3.reference.S3Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class S3ManagedLedgerOffloader implements LedgerOffloader {
- private static final Logger log =
LoggerFactory.getLogger(S3ManagedLedgerOffloader.class);
+public class BlobStoreManagedLedgerOffloader implements LedgerOffloader {
+ private static final Logger log =
LoggerFactory.getLogger(BlobStoreManagedLedgerOffloader.class);
- public static final String DRIVER_NAME = "S3";
+ public static final String[] DRIVER_NAMES = {"S3", "aws-s3",
"google-cloud-storage"};
static final String METADATA_FORMAT_VERSION_KEY =
"S3ManagedLedgerOffloaderFormatVersion";
static final String METADATA_SOFTWARE_VERSION_KEY =
"S3ManagedLedgerOffloaderSoftwareVersion";
static final String METADATA_SOFTWARE_GITSHA_KEY =
"S3ManagedLedgerOffloaderSoftwareGitSha";
static final String CURRENT_VERSION = String.valueOf(1);
- private final VersionCheck VERSION_CHECK = (key, metadata) -> {
- String version =
metadata.getUserMetadata().get(METADATA_FORMAT_VERSION_KEY);
+ public static boolean driverSupported(String driver) {
+ return Arrays.stream(DRIVER_NAMES).anyMatch(d ->
d.equalsIgnoreCase(driver));
+ }
+
+ private static void addVersionInfo(BlobBuilder blobBuilder) {
+ blobBuilder.userMetadata(ImmutableMap.of(
+ METADATA_FORMAT_VERSION_KEY, CURRENT_VERSION,
+ METADATA_SOFTWARE_VERSION_KEY,
PulsarBrokerVersionStringUtils.getNormalizedVersionString(),
+ METADATA_SOFTWARE_GITSHA_KEY,
PulsarBrokerVersionStringUtils.getGitSha()));
+ }
+
+ private final VersionCheck VERSION_CHECK = (key, blob) -> {
+ // NOTE all metadata in jclouds comes out as lowercase, in an effort
to normalize the providers
+ String version =
blob.getMetadata().getUserMetadata().get(METADATA_FORMAT_VERSION_KEY.toLowerCase());
if (version == null || !version.equals(CURRENT_VERSION)) {
throw new IOException(String.format("Invalid object version %s for
%s, expect %s",
version, key,
CURRENT_VERSION));
@@ -69,15 +89,21 @@ public class S3ManagedLedgerOffloader implements
LedgerOffloader {
};
private final OrderedScheduler scheduler;
- private final AmazonS3 s3client;
+
+ // container in jclouds
private final String bucket;
// max block size for each data block.
private int maxBlockSize;
private final int readBufferSize;
- public static S3ManagedLedgerOffloader create(ServiceConfiguration conf,
- OrderedScheduler scheduler)
+ private BlobStoreContext context;
+ private BlobStore blobStore;
+ Location location = null;
+
+ public static BlobStoreManagedLedgerOffloader create(ServiceConfiguration
conf,
+ OrderedScheduler
scheduler)
throws PulsarServerException {
+ String driver = conf.getManagedLedgerOffloadDriver();
String region = conf.getS3ManagedLedgerOffloadRegion();
String bucket = conf.getS3ManagedLedgerOffloadBucket();
String endpoint = conf.getS3ManagedLedgerOffloadServiceEndpoint();
@@ -96,23 +122,66 @@ public class S3ManagedLedgerOffloader implements
LedgerOffloader {
throw new
PulsarServerException("s3ManagedLedgerOffloadMaxBlockSizeInBytes cannot be less
than 5MB");
}
- AmazonS3ClientBuilder builder = AmazonS3ClientBuilder.standard();
+ return new BlobStoreManagedLedgerOffloader(driver, bucket, scheduler,
maxBlockSize, readBufferSize, endpoint, region);
+ }
+
+ // build context for jclouds BlobStoreContext
+ BlobStoreManagedLedgerOffloader(String driver, String container,
OrderedScheduler scheduler,
+ int maxBlockSize, int readBufferSize,
String endpoint, String region) {
+ this.scheduler = scheduler;
+ this.readBufferSize = readBufferSize;
+
+ this.bucket = container;
+ this.maxBlockSize = maxBlockSize;
+
+ Properties overrides = new Properties();
+ // This property controls the number of parts being uploaded in
parallel.
+ overrides.setProperty("jclouds.mpu.parallel.degree", "1");
+ overrides.setProperty("jclouds.mpu.parts.size",
Integer.toString(maxBlockSize));
+ overrides.setProperty(Constants.PROPERTY_SO_TIMEOUT, "25000");
+ overrides.setProperty(Constants.PROPERTY_MAX_RETRIES,
Integer.toString(100));
+
+ ContextBuilder contextBuilder = ContextBuilder.newBuilder(driver);
+
+ AWSCredentials credentials = null;
+ try {
+ DefaultAWSCredentialsProviderChain creds =
DefaultAWSCredentialsProviderChain.getInstance();
+ credentials = creds.getCredentials();
+ } catch (Exception e) {
+ log.error("Exception when get credentials for s3 ", e);
+ }
+
+ String id = "accesskey";
+ String key = "secretkey";
+ if (credentials != null) {
+ id = credentials.getAWSAccessKeyId();
+ key = credentials.getAWSSecretKey();
+ }
+ contextBuilder.credentials(id, key);
+
if (!Strings.isNullOrEmpty(endpoint)) {
- builder.setEndpointConfiguration(new
EndpointConfiguration(endpoint, region));
- builder.setPathStyleAccessEnabled(true);
- } else {
- builder.setRegion(region);
+ contextBuilder.endpoint(endpoint);
+
overrides.setProperty(S3Constants.PROPERTY_S3_VIRTUAL_HOST_BUCKETS, "false");
+ }
+ if (!Strings.isNullOrEmpty(region)) {
+ this.location = new
LocationBuilder().scope(LocationScope.REGION).id(region).description(region).build();
}
- return new S3ManagedLedgerOffloader(builder.build(), bucket,
scheduler, maxBlockSize, readBufferSize);
+
+ log.info("Constructor driver: {}, host: {}, container: {}, region: {}
", driver, endpoint, bucket, region);
+
+ contextBuilder.overrides(overrides);
+ this.context = contextBuilder.buildView(BlobStoreContext.class);
+ this.blobStore = context.getBlobStore();
}
- S3ManagedLedgerOffloader(AmazonS3 s3client, String bucket,
OrderedScheduler scheduler,
- int maxBlockSize, int readBufferSize) {
- this.s3client = s3client;
- this.bucket = bucket;
+ // build context for jclouds BlobStoreContext
+ BlobStoreManagedLedgerOffloader(BlobStore blobStore, String container,
OrderedScheduler scheduler,
+ int maxBlockSize, int readBufferSize) {
this.scheduler = scheduler;
- this.maxBlockSize = maxBlockSize;
this.readBufferSize = readBufferSize;
+ this.bucket = container;
+ this.maxBlockSize = maxBlockSize;
+ this.blobStore = blobStore;
}
static String dataBlockOffloadKey(long ledgerId, UUID uuid) {
@@ -141,15 +210,15 @@ public class S3ManagedLedgerOffloader implements
LedgerOffloader {
String dataBlockKey = dataBlockOffloadKey(readHandle.getId(),
uuid);
String indexBlockKey = indexBlockOffloadKey(readHandle.getId(),
uuid);
- ObjectMetadata dataMetadata = new ObjectMetadata();
- addVersionInfo(dataMetadata);
-
- InitiateMultipartUploadRequest dataBlockReq = new
InitiateMultipartUploadRequest(bucket, dataBlockKey, dataMetadata);
- InitiateMultipartUploadResult dataBlockRes = null;
+ MultipartUpload mpu = null;
+ List<MultipartPart> parts = Lists.newArrayList();
// init multi part upload for data block.
try {
- dataBlockRes = s3client.initiateMultipartUpload(dataBlockReq);
+ BlobBuilder blobBuilder = blobStore.blobBuilder(dataBlockKey);
+ addVersionInfo(blobBuilder);
+ Blob blob = blobBuilder.build();
+ mpu = blobStore.initiateMultipartUpload(bucket,
blob.getMetadata(), new PutOptions());
} catch (Throwable t) {
promise.completeExceptionally(t);
return;
@@ -161,7 +230,6 @@ public class S3ManagedLedgerOffloader implements
LedgerOffloader {
long startEntry = 0;
int partId = 1;
long entryBytesWritten = 0;
- List<PartETag> etags = new LinkedList<>();
while (startEntry <= readHandle.getLastAddConfirmed()) {
int blockSize = BlockAwareSegmentInputStreamImpl
.calculateBlockSize(maxBlockSize, readHandle,
startEntry, entryBytesWritten);
@@ -169,15 +237,13 @@ public class S3ManagedLedgerOffloader implements
LedgerOffloader {
try (BlockAwareSegmentInputStream blockStream = new
BlockAwareSegmentInputStreamImpl(
readHandle, startEntry, blockSize)) {
- UploadPartResult uploadRes = s3client.uploadPart(
- new UploadPartRequest()
- .withBucketName(bucket)
- .withKey(dataBlockKey)
- .withUploadId(dataBlockRes.getUploadId())
- .withInputStream(blockStream)
- .withPartSize(blockSize)
- .withPartNumber(partId));
- etags.add(uploadRes.getPartETag());
+ Payload partPayload =
Payloads.newInputStreamPayload(blockStream);
+
partPayload.getContentMetadata().setContentLength((long)blockSize);
+
partPayload.getContentMetadata().setContentType("application/octet-stream");
+ parts.add(blobStore.uploadMultipartPart(mpu, partId,
partPayload));
+ log.debug("UploadMultipartPart. container: {},
blobName: {}, partId: {}, mpu: {}",
+ bucket, dataBlockKey, partId, mpu.id());
+
indexBuilder.addBlock(startEntry, partId, blockSize);
if (blockStream.getEndEntryId() != -1) {
@@ -193,17 +259,16 @@ public class S3ManagedLedgerOffloader implements
LedgerOffloader {
dataObjectLength += blockSize;
}
- s3client.completeMultipartUpload(new
CompleteMultipartUploadRequest()
- .withBucketName(bucket).withKey(dataBlockKey)
- .withUploadId(dataBlockRes.getUploadId())
- .withPartETags(etags));
+ blobStore.completeMultipartUpload(mpu, parts);
+ mpu = null;
} catch (Throwable t) {
try {
- s3client.abortMultipartUpload(
- new AbortMultipartUploadRequest(bucket, dataBlockKey,
dataBlockRes.getUploadId()));
+ if (mpu != null) {
+ blobStore.abortMultipartUpload(mpu);
+ }
} catch (Throwable throwable) {
log.error("Failed abortMultipartUpload in bucket - {} with
key - {}, uploadId - {}.",
- bucket, dataBlockKey, dataBlockRes.getUploadId(),
throwable);
+ bucket, dataBlockKey, mpu.id(), throwable);
}
promise.completeExceptionally(t);
return;
@@ -213,19 +278,22 @@ public class S3ManagedLedgerOffloader implements
LedgerOffloader {
try (OffloadIndexBlock index =
indexBuilder.withDataObjectLength(dataObjectLength).build();
OffloadIndexBlock.IndexInputStream indexStream =
index.toStream()) {
// write the index block
- ObjectMetadata metadata = new ObjectMetadata();
- metadata.setContentLength(indexStream.getStreamSize());
- addVersionInfo(metadata);
-
- s3client.putObject(new PutObjectRequest(
- bucket,
- indexBlockKey,
- indexStream,
- metadata));
+ BlobBuilder blobBuilder = blobStore.blobBuilder(indexBlockKey);
+ addVersionInfo(blobBuilder);
+ Payload indexPayload =
Payloads.newInputStreamPayload(indexStream);
+
indexPayload.getContentMetadata().setContentLength((long)indexStream.getStreamSize());
+
indexPayload.getContentMetadata().setContentType("application/octet-stream");
+
+ Blob blob = blobBuilder
+ .payload(indexPayload)
+ .contentLength((long)indexStream.getStreamSize())
+ .build();
+
+ blobStore.putBlob(bucket, blob);
promise.complete(null);
} catch (Throwable t) {
try {
- s3client.deleteObject(bucket, dataBlockKey);
+ blobStore.removeBlob(bucket, dataBlockKey);
} catch (Throwable throwable) {
log.error("Failed deleteObject in bucket - {} with key -
{}.",
bucket, dataBlockKey, throwable);
@@ -244,35 +312,31 @@ public class S3ManagedLedgerOffloader implements
LedgerOffloader {
String indexKey = indexBlockOffloadKey(ledgerId, uid);
scheduler.chooseThread(ledgerId).submit(() -> {
try {
-
promise.complete(S3BackedReadHandleImpl.open(scheduler.chooseThread(ledgerId),
- s3client,
+
promise.complete(BlobStoreBackedReadHandleImpl.open(scheduler.chooseThread(ledgerId),
+ blobStore,
bucket, key,
indexKey,
VERSION_CHECK,
ledgerId,
readBufferSize));
} catch (Throwable t) {
+ log.error("Failed readOffloaded: ", t);
promise.completeExceptionally(t);
}
});
return promise;
}
- private static void addVersionInfo(ObjectMetadata metadata) {
- metadata.getUserMetadata().put(METADATA_FORMAT_VERSION_KEY,
CURRENT_VERSION);
- metadata.getUserMetadata().put(METADATA_SOFTWARE_VERSION_KEY,
-
PulsarBrokerVersionStringUtils.getNormalizedVersionString());
- metadata.getUserMetadata().put(METADATA_SOFTWARE_GITSHA_KEY,
PulsarBrokerVersionStringUtils.getGitSha());
- }
+
@Override
public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid) {
CompletableFuture<Void> promise = new CompletableFuture<>();
scheduler.chooseThread(ledgerId).submit(() -> {
try {
- s3client.deleteObjects(new DeleteObjectsRequest(bucket)
- .withKeys(dataBlockOffloadKey(ledgerId, uid),
indexBlockOffloadKey(ledgerId, uid)));
+ blobStore.removeBlobs(bucket,
+ ImmutableList.of(dataBlockOffloadKey(ledgerId, uid),
indexBlockOffloadKey(ledgerId, uid)));
promise.complete(null);
} catch (Throwable t) {
- log.error("Failed delete s3 Object ", t);
+ log.error("Failed delete Blob", t);
promise.completeExceptionally(t);
}
});
@@ -281,7 +345,7 @@ public class S3ManagedLedgerOffloader implements
LedgerOffloader {
}
public interface VersionCheck {
- void check(String key, ObjectMetadata md) throws IOException;
+ void check(String key, Blob blob) throws IOException;
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/S3BackedInputStreamTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/BlobStoreBackedInputStreamTest.java
similarity index 61%
rename from
pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/S3BackedInputStreamTest.java
rename to
pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/BlobStoreBackedInputStreamTest.java
index 45bca52..bde4dde 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/S3BackedInputStreamTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/BlobStoreBackedInputStreamTest.java
@@ -18,29 +18,35 @@
*/
package org.apache.pulsar.broker.offload;
-import static org.mockito.Matchers.anyObject;
-import static org.mockito.Mockito.spy;
+import static org.mockito.AdditionalAnswers.delegatesTo;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
-import com.amazonaws.services.s3.AmazonS3;
-import com.amazonaws.services.s3.model.ObjectMetadata;
-
-import java.io.InputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
-
import lombok.extern.slf4j.Slf4j;
-
-import org.apache.pulsar.broker.offload.impl.S3BackedInputStreamImpl;
-
+import org.apache.pulsar.broker.offload.impl.BlobStoreBackedInputStreamImpl;
+import org.jclouds.blobstore.BlobStore;
+import org.jclouds.blobstore.domain.Blob;
+import org.jclouds.blobstore.options.GetOptions;
+import org.jclouds.io.Payload;
+import org.jclouds.io.Payloads;
+import org.mockito.Matchers;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.Test;
@Slf4j
-class S3BackedInputStreamTest extends S3TestBase {
+class BlobStoreBackedInputStreamTest extends BlobStoreTestBase {
+ private static final Logger log =
LoggerFactory.getLogger(BlobStoreBackedInputStreamTest.class);
+
class RandomInputStream extends InputStream {
final Random r;
int bytesRemaining;
@@ -85,16 +91,21 @@ class S3BackedInputStreamTest extends S3TestBase {
@Test
public void testReadingFullObject() throws Exception {
- String objectKey = "foobar";
+ String objectKey = "testReadingFull";
int objectSize = 12345;
RandomInputStream toWrite = new RandomInputStream(0, objectSize);
RandomInputStream toCompare = new RandomInputStream(0, objectSize);
- ObjectMetadata metadata = new ObjectMetadata();
- metadata.setContentLength(objectSize);
- s3client.putObject(BUCKET, objectKey, toWrite, metadata);
+ Payload payload = Payloads.newInputStreamPayload(toWrite);
+ payload.getContentMetadata().setContentLength((long)objectSize);
+ Blob blob = blobStore.blobBuilder(objectKey)
+ .payload(payload)
+ .contentLength((long)objectSize)
+ .build();
+ String ret = blobStore.putBlob(BUCKET, blob);
+ log.debug("put blob: {} in Bucket: {}, in blobStore, result: {}",
objectKey, BUCKET, ret);
- BackedInputStream toTest = new S3BackedInputStreamImpl(s3client,
BUCKET, objectKey,
+ BackedInputStream toTest = new
BlobStoreBackedInputStreamImpl(blobStore, BUCKET, objectKey,
(key, md) ->
{},
objectSize,
1000);
assertStreamsMatch(toTest, toCompare);
@@ -102,24 +113,29 @@ class S3BackedInputStreamTest extends S3TestBase {
@Test
public void testReadingFullObjectByBytes() throws Exception {
- String objectKey = "foobar";
+ String objectKey = "testReadingFull2";
int objectSize = 12345;
RandomInputStream toWrite = new RandomInputStream(0, objectSize);
RandomInputStream toCompare = new RandomInputStream(0, objectSize);
- ObjectMetadata metadata = new ObjectMetadata();
- metadata.setContentLength(objectSize);
- s3client.putObject(BUCKET, objectKey, toWrite, metadata);
+ Payload payload = Payloads.newInputStreamPayload(toWrite);
+ payload.getContentMetadata().setContentLength((long)objectSize);
+ Blob blob = blobStore.blobBuilder(objectKey)
+ .payload(payload)
+ .contentLength((long)objectSize)
+ .build();
+ String ret = blobStore.putBlob(BUCKET, blob);
+ log.debug("put blob: {} in Bucket: {}, in blobStore, result: {}",
objectKey, BUCKET, ret);
- BackedInputStream toTest = new S3BackedInputStreamImpl(s3client,
BUCKET, objectKey,
+ BackedInputStream toTest = new
BlobStoreBackedInputStreamImpl(blobStore, BUCKET, objectKey,
(key, md) ->
{},
objectSize,
1000);
assertStreamsMatchByBytes(toTest, toCompare);
}
@Test(expectedExceptions = IOException.class)
- public void testErrorOnS3Read() throws Exception {
- BackedInputStream toTest = new S3BackedInputStreamImpl(s3client,
BUCKET, "doesn't exist",
+ public void testErrorOnRead() throws Exception {
+ BackedInputStream toTest = new
BlobStoreBackedInputStreamImpl(blobStore, BUCKET, "doesn't exist",
(key, md) ->
{},
1234, 1000);
toTest.read();
@@ -128,7 +144,7 @@ class S3BackedInputStreamTest extends S3TestBase {
@Test
public void testSeek() throws Exception {
- String objectKey = "foobar";
+ String objectKey = "testSeek";
int objectSize = 12345;
RandomInputStream toWrite = new RandomInputStream(0, objectSize);
@@ -141,11 +157,16 @@ class S3BackedInputStreamTest extends S3TestBase {
seeks.put(seek, stream);
}
- ObjectMetadata metadata = new ObjectMetadata();
- metadata.setContentLength(objectSize);
- s3client.putObject(BUCKET, objectKey, toWrite, metadata);
+ Payload payload = Payloads.newInputStreamPayload(toWrite);
+ payload.getContentMetadata().setContentLength((long)objectSize);
+ Blob blob = blobStore.blobBuilder(objectKey)
+ .payload(payload)
+ .contentLength((long)objectSize)
+ .build();
+ String ret = blobStore.putBlob(BUCKET, blob);
+ log.debug("put blob: {} in Bucket: {}, in blobStore, result: {}",
objectKey, BUCKET, ret);
- BackedInputStream toTest = new S3BackedInputStreamImpl(s3client,
BUCKET, objectKey,
+ BackedInputStream toTest = new
BlobStoreBackedInputStreamImpl(blobStore, BUCKET, objectKey,
(key, md) ->
{},
objectSize,
1000);
for (Map.Entry<Integer, InputStream> e : seeks.entrySet()) {
@@ -156,16 +177,23 @@ class S3BackedInputStreamTest extends S3TestBase {
@Test
public void testSeekWithinCurrent() throws Exception {
- String objectKey = "foobar";
+ String objectKey = "testSeekWithinCurrent";
int objectSize = 12345;
RandomInputStream toWrite = new RandomInputStream(0, objectSize);
- ObjectMetadata metadata = new ObjectMetadata();
- metadata.setContentLength(objectSize);
- s3client.putObject(BUCKET, objectKey, toWrite, metadata);
+ Payload payload = Payloads.newInputStreamPayload(toWrite);
+ payload.getContentMetadata().setContentLength((long)objectSize);
+ Blob blob = blobStore.blobBuilder(objectKey)
+ .payload(payload)
+ .contentLength((long)objectSize)
+ .build();
+ String ret = blobStore.putBlob(BUCKET, blob);
+ log.debug("put blob: {} in Bucket: {}, in blobStore, result: {}",
objectKey, BUCKET, ret);
+
+ //BlobStore spiedBlobStore = spy(blobStore);
+ BlobStore spiedBlobStore = mock(BlobStore.class,
delegatesTo(blobStore));
- AmazonS3 spiedClient = spy(s3client);
- BackedInputStream toTest = new S3BackedInputStreamImpl(spiedClient,
BUCKET, objectKey,
+ BackedInputStream toTest = new
BlobStoreBackedInputStreamImpl(spiedBlobStore, BUCKET, objectKey,
(key, md) ->
{},
objectSize,
1000);
@@ -193,20 +221,26 @@ class S3BackedInputStreamTest extends S3TestBase {
Assert.assertEquals(thirdSeek.read(), toTest.read());
}
- verify(spiedClient, times(1)).getObject(anyObject());
+ verify(spiedBlobStore, times(1))
+ .getBlob(Mockito.eq(BUCKET), Mockito.eq(objectKey),
Matchers.<GetOptions>anyObject());
}
@Test
public void testSeekForward() throws Exception {
- String objectKey = "foobar";
+ String objectKey = "testSeekForward";
int objectSize = 12345;
RandomInputStream toWrite = new RandomInputStream(0, objectSize);
- ObjectMetadata metadata = new ObjectMetadata();
- metadata.setContentLength(objectSize);
- s3client.putObject(BUCKET, objectKey, toWrite, metadata);
+ Payload payload = Payloads.newInputStreamPayload(toWrite);
+ payload.getContentMetadata().setContentLength((long)objectSize);
+ Blob blob = blobStore.blobBuilder(objectKey)
+ .payload(payload)
+ .contentLength((long)objectSize)
+ .build();
+ String ret = blobStore.putBlob(BUCKET, blob);
+ log.debug("put blob: {} in Bucket: {}, in blobStore, result: {}",
objectKey, BUCKET, ret);
- BackedInputStream toTest = new S3BackedInputStreamImpl(s3client,
BUCKET, objectKey,
+ BackedInputStream toTest = new
BlobStoreBackedInputStreamImpl(blobStore, BUCKET, objectKey,
(key, md) ->
{},
objectSize,
1000);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/S3TestBase.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/BlobStoreTestBase.java
similarity index 53%
rename from
pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/S3TestBase.java
rename to
pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/BlobStoreTestBase.java
index f2ea6c4..d1474f9 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/S3TestBase.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/BlobStoreTestBase.java
@@ -18,27 +18,39 @@
*/
package org.apache.pulsar.broker.offload;
-import com.amazonaws.services.s3.AmazonS3;
-import com.amazonaws.services.s3.AmazonS3ClientBuilder;
-
+import org.jclouds.ContextBuilder;
+import org.jclouds.blobstore.BlobStore;
+import org.jclouds.blobstore.BlobStoreContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
-public class S3TestBase {
+public class BlobStoreTestBase {
+ private static final Logger log =
LoggerFactory.getLogger(BlobStoreTestBase.class);
+
public final static String BUCKET = "pulsar-unittest";
- protected AmazonS3 s3client = null;
+ protected BlobStoreContext context = null;
+ protected BlobStore blobStore = null;
@BeforeMethod
public void start() throws Exception {
- if (Boolean.parseBoolean(System.getProperty("testRealAWS", "false"))) {
- // To use this, ~/.aws must be configured with credentials and a
default region
- s3client = AmazonS3ClientBuilder.standard().build();
- } else {
- s3client = new S3Mock();
+ context =
ContextBuilder.newBuilder("transient").build(BlobStoreContext.class);
+ blobStore = context.getBlobStore();
+ boolean create = blobStore.createContainerInLocation(null, BUCKET);
+
+ log.debug("TestBase Create Bucket: {}, in blobStore, result: {}",
BUCKET, create);
+ }
+
+ @AfterMethod
+ public void tearDown() {
+ if (blobStore != null) {
+ blobStore.deleteContainer(BUCKET);
}
- if (!s3client.doesBucketExistV2(BUCKET)) {
- s3client.createBucket(BUCKET);
+ if (context != null) {
+ context.close();
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/S3Mock.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/S3Mock.java
deleted file mode 100644
index 4bfc140..0000000
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/S3Mock.java
+++ /dev/null
@@ -1,334 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker.offload;
-
-import com.amazonaws.services.s3.AbstractAmazonS3;
-import com.amazonaws.services.s3.model.AmazonS3Exception;
-import com.amazonaws.services.s3.model.Bucket;
-import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
-import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
-import com.amazonaws.services.s3.model.CopyObjectRequest;
-import com.amazonaws.services.s3.model.CopyObjectResult;
-import com.amazonaws.services.s3.model.DeleteObjectRequest;
-import com.amazonaws.services.s3.model.DeleteObjectsRequest;
-import com.amazonaws.services.s3.model.DeleteObjectsResult;
-import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
-import com.amazonaws.services.s3.model.GetObjectRequest;
-import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
-import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
-import com.amazonaws.services.s3.model.ObjectMetadata;
-import com.amazonaws.services.s3.model.PartETag;
-import com.amazonaws.services.s3.model.PutObjectRequest;
-import com.amazonaws.services.s3.model.PutObjectResult;
-import com.amazonaws.services.s3.model.S3Object;
-import com.amazonaws.services.s3.model.UploadPartRequest;
-import com.amazonaws.services.s3.model.UploadPartResult;
-
-import com.google.common.collect.ComparisonChain;
-
-import java.io.ByteArrayInputStream;
-import java.io.InputStream;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.stream.Collectors;
-
-/**
- * Minimal mock for amazon client.
- * If making any changes, validate they behave the same as S3 by running all
S3 tests with -DtestRealAWS=true
- */
-class S3Mock extends AbstractAmazonS3 {
- @Override
- public boolean doesBucketExistV2(String bucketName) {
- return buckets.containsKey(bucketName);
- }
-
- @Override
- public boolean doesObjectExist(String bucketName, String objectName) {
- return buckets.containsKey(bucketName) &&
getBucket(bucketName).hasObject(objectName);
- }
-
- @Override
- public Bucket createBucket(String bucketName) {
- return buckets.computeIfAbsent(bucketName, (k) -> new MockBucket(k));
- }
-
- private MockBucket getBucket(String bucketName) throws AmazonS3Exception {
- MockBucket bucket = buckets.get(bucketName);
- if (bucket != null) {
- return bucket;
- } else {
- throw new AmazonS3Exception("NoSuchBucket: Bucket doesn't exist");
- }
- }
-
- @Override
- public PutObjectResult putObject(PutObjectRequest putObjectRequest)
- throws AmazonS3Exception {
- return
getBucket(putObjectRequest.getBucketName()).putObject(putObjectRequest);
- }
-
- @Override
- public S3Object getObject(GetObjectRequest getObjectRequest) {
- return
getBucket(getObjectRequest.getBucketName()).getObject(getObjectRequest);
- }
-
- @Override
- public ObjectMetadata getObjectMetadata(GetObjectMetadataRequest
getObjectMetadataRequest)
- throws AmazonS3Exception {
- return
getBucket(getObjectMetadataRequest.getBucketName()).getObjectMetadata(getObjectMetadataRequest);
- }
-
- @Override
- public void deleteObject(DeleteObjectRequest deleteObjectRequest)
- throws AmazonS3Exception {
-
getBucket(deleteObjectRequest.getBucketName()).deleteObject(deleteObjectRequest.getKey());
- }
-
- @Override
- public DeleteObjectsResult deleteObjects(DeleteObjectsRequest
deleteObjectsRequest)
- throws AmazonS3Exception {
- List<DeleteObjectsResult.DeletedObject> results =
deleteObjectsRequest.getKeys().stream().map((k) -> {
-
getBucket(deleteObjectsRequest.getBucketName()).deleteObject(k.getKey());
- DeleteObjectsResult.DeletedObject res = new
DeleteObjectsResult.DeletedObject();
- res.setKey(k.getKey());
- return res;
- }).collect(Collectors.toList());
- return new DeleteObjectsResult(results);
- }
-
- @Override
- public CopyObjectResult copyObject(CopyObjectRequest copyObjectRequest)
- throws AmazonS3Exception {
- S3Object from = getObject(new
GetObjectRequest(copyObjectRequest.getSourceBucketName(),
-
copyObjectRequest.getSourceKey()));
- ObjectMetadata newMetadata = copyObjectRequest.getNewObjectMetadata();
- if (newMetadata == null) {
- newMetadata = from.getObjectMetadata();
- }
-
newMetadata.setContentLength(from.getObjectMetadata().getContentLength());
- putObject(new
PutObjectRequest(copyObjectRequest.getDestinationBucketName(),
- copyObjectRequest.getDestinationKey(),
- from.getObjectContent(),
- newMetadata));
- return new CopyObjectResult();
- }
-
- @Override
- public InitiateMultipartUploadResult
initiateMultipartUpload(InitiateMultipartUploadRequest request)
- throws AmazonS3Exception {
- return getBucket(request.getBucketName()).initMultipart(request);
- }
-
- @Override
- public UploadPartResult uploadPart(UploadPartRequest request)
- throws AmazonS3Exception {
- return getBucket(request.getBucketName()).uploadPart(request);
- }
-
- @Override
- public CompleteMultipartUploadResult
completeMultipartUpload(CompleteMultipartUploadRequest request)
- throws AmazonS3Exception {
- return getBucket(request.getBucketName()).completeMultipart(request);
- }
-
- ConcurrentHashMap<String, MockBucket> buckets = new ConcurrentHashMap<>();
-
- static class MockBucket extends Bucket {
- ConcurrentHashMap<String, MockObject> objects = new
ConcurrentHashMap<>();
- ConcurrentHashMap<String, MockMultipart> inprogressMultipart = new
ConcurrentHashMap<>();
-
- MockBucket(String name) {
- super(name);
- }
-
- boolean hasObject(String key) {
- return objects.containsKey(key);
- }
-
- PutObjectResult putObject(PutObjectRequest putObjectRequest) throws
AmazonS3Exception {
- byte[] bytes = streamToBytes(putObjectRequest.getInputStream(),
-
(int)putObjectRequest.getMetadata().getContentLength());
- objects.put(putObjectRequest.getKey(),
- new MockObject(putObjectRequest.getMetadata(), bytes));
- return new PutObjectResult();
- }
-
- S3Object getObject(GetObjectRequest getObjectRequest) throws
AmazonS3Exception {
- MockObject obj = objects.get(getObjectRequest.getKey());
- if (obj == null) {
- throw new AmazonS3Exception("Object doesn't exist");
- }
-
- S3Object s3obj = new S3Object();
- s3obj.setBucketName(getObjectRequest.getBucketName());
- s3obj.setKey(getObjectRequest.getKey());
-
- if (getObjectRequest.getRange() != null) {
- long[] range = getObjectRequest.getRange();
- int size = (int)(range[1] - range[0] + 1);
- ObjectMetadata metadata = obj.metadata.clone();
- metadata.setHeader("Content-Range",
- String.format("bytes %d-%d/%d",
- range[0], range[1], size));
- s3obj.setObjectMetadata(metadata);
- s3obj.setObjectContent(new ByteArrayInputStream(obj.data,
(int)range[0], size));
- return s3obj;
- } else {
- s3obj.setObjectMetadata(obj.metadata);
- s3obj.setObjectContent(new ByteArrayInputStream(obj.data));
- return s3obj;
- }
- }
-
- void deleteObject(String key) {
- objects.remove(key);
- }
-
- ObjectMetadata getObjectMetadata(GetObjectMetadataRequest
getObjectMetadataRequest)
- throws AmazonS3Exception {
- MockObject obj = objects.get(getObjectMetadataRequest.getKey());
- if (obj == null) {
- throw new AmazonS3Exception("Object doesn't exist");
- }
- return obj.metadata;
- }
-
- InitiateMultipartUploadResult
initMultipart(InitiateMultipartUploadRequest request)
- throws AmazonS3Exception {
- String uploadId = UUID.randomUUID().toString();
- inprogressMultipart.put(uploadId, new
MockMultipart(request.getKey(),
-
request.getObjectMetadata()));
- InitiateMultipartUploadResult result = new
InitiateMultipartUploadResult();
- result.setBucketName(request.getBucketName());
- result.setKey(request.getKey());
- result.setUploadId(uploadId);
- return result;
- }
-
- MockMultipart getMultipart(String uploadId, String key) throws
AmazonS3Exception {
- MockMultipart multi = inprogressMultipart.get(uploadId);
- if (multi == null) {
- throw new AmazonS3Exception("No such upload " + uploadId);
- }
- if (!multi.key.equals(key)) {
- throw new AmazonS3Exception("Wrong key for upload " + uploadId
- + ", expected " + key
- + ", got " + multi.key);
- }
- return multi;
- }
-
- UploadPartResult uploadPart(UploadPartRequest request)
- throws AmazonS3Exception {
- MockMultipart multi = getMultipart(request.getUploadId(),
request.getKey());
- byte[] bytes = streamToBytes(request.getInputStream(),
(int)request.getPartSize());
- UploadPartResult result = new UploadPartResult();
- result.setPartNumber(request.getPartNumber());
- result.setETag(multi.addPart(request.getPartNumber(), bytes));
- return result;
- }
-
- CompleteMultipartUploadResult
completeMultipart(CompleteMultipartUploadRequest request)
- throws AmazonS3Exception {
- MockMultipart multi = getMultipart(request.getUploadId(),
request.getKey());
- inprogressMultipart.remove(request.getUploadId());
- objects.put(request.getKey(),
multi.complete(request.getPartETags()));
- CompleteMultipartUploadResult result = new
CompleteMultipartUploadResult();
- result.setBucketName(request.getBucketName());
- result.setKey(request.getKey());
- return result;
- }
- }
-
- private static byte[] streamToBytes(InputStream data, int length) throws
AmazonS3Exception {
- byte[] bytes = new byte[length];
- try {
- for (int i = 0; i < length; i++) {
- bytes[i] = (byte)data.read();
- }
- } catch (IOException ioe) {
- throw new AmazonS3Exception("Error loading data", ioe);
- }
- return bytes;
- }
-
- static class MockObject {
- final ObjectMetadata metadata;
- final byte[] data;
- final Map<Integer, long[]> partRanges;
-
-
- MockObject(ObjectMetadata metadata, byte[] data) {
- this(metadata, data, null);
- }
-
- MockObject(ObjectMetadata metadata, byte[] data, Map<Integer, long[]>
partRanges) {
- this.metadata = metadata;
- this.data = data;
- this.partRanges = partRanges;
- }
-
- }
-
- static class MockMultipart {
- final String key;
- final ObjectMetadata metadata;
- final ConcurrentSkipListMap<PartETag, byte[]> parts = new
ConcurrentSkipListMap<>(
- (etag1, etag2) ->
ComparisonChain.start().compare(etag1.getPartNumber(),
-
etag2.getPartNumber()).result());
-
- MockMultipart(String key, ObjectMetadata metadata) {
- this.key = key;
- this.metadata = metadata;
- }
-
- String addPart(int partNumber, byte[] bytes) {
- String etag = UUID.randomUUID().toString();
- parts.put(new PartETag(partNumber, etag), bytes);
- return etag;
- }
-
- MockObject complete(List<PartETag> tags) throws AmazonS3Exception {
- if (parts.size() != tags.size()
- || !parts.keySet().containsAll(tags)) {
- throw new AmazonS3Exception("Tags don't match uploaded parts");
- }
-
- int totalSize = parts.values().stream().map(v ->
v.length).reduce(0, (acc, v) -> acc + v);
- byte[] full = new byte[totalSize];
-
- Map<Integer, long[]> partRanges = new HashMap<>();
- int start = 0;
- for (Map.Entry<PartETag, byte[]> e : parts.entrySet()) {
- int partLength = e.getValue().length;
- System.arraycopy(e.getValue(), 0, full, start, partLength);
- partRanges.put(e.getKey().getPartNumber(),
- new long[] { start, start + partLength - 1 });
- start += partLength;
- }
- metadata.setContentLength(totalSize);
- return new MockObject(metadata, full, partRanges);
- }
- }
-}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/impl/S3ManagedLedgerOffloaderTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/impl/BlobStoreManagedLedgerOffloaderTest.java
similarity index 71%
rename from
pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/impl/S3ManagedLedgerOffloaderTest.java
rename to
pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/impl/BlobStoreManagedLedgerOffloaderTest.java
index 7b9d9a2..19463d2 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/impl/S3ManagedLedgerOffloaderTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/impl/BlobStoreManagedLedgerOffloaderTest.java
@@ -18,24 +18,23 @@
*/
package org.apache.pulsar.broker.offload.impl;
-import static
org.apache.pulsar.broker.offload.impl.S3ManagedLedgerOffloader.dataBlockOffloadKey;
-import static
org.apache.pulsar.broker.offload.impl.S3ManagedLedgerOffloader.indexBlockOffloadKey;
+import static
org.apache.pulsar.broker.offload.impl.BlobStoreManagedLedgerOffloader.dataBlockOffloadKey;
+import static
org.apache.pulsar.broker.offload.impl.BlobStoreManagedLedgerOffloader.indexBlockOffloadKey;
+import static org.mockito.AdditionalAnswers.delegatesTo;
import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.mock;
-import com.amazonaws.AmazonServiceException;
-import com.amazonaws.services.s3.AmazonS3;
-import com.amazonaws.services.s3.model.CopyObjectRequest;
-import com.amazonaws.services.s3.model.ObjectMetadata;
-import java.lang.reflect.Method;
import java.io.IOException;
+import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
-
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
@@ -51,21 +50,25 @@ import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
-import org.apache.pulsar.broker.offload.S3TestBase;
-import org.apache.pulsar.broker.offload.impl.DataBlockHeaderImpl;
-import org.apache.pulsar.broker.offload.impl.S3ManagedLedgerOffloader;
+import org.apache.pulsar.broker.offload.BlobStoreTestBase;
+import org.jclouds.blobstore.BlobStore;
+import org.jclouds.blobstore.options.CopyOptions;
import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.Test;
@Slf4j
-class S3ManagedLedgerOffloaderTest extends S3TestBase {
+class BlobStoreManagedLedgerOffloaderTest extends BlobStoreTestBase {
+ private static final Logger log =
LoggerFactory.getLogger(BlobStoreManagedLedgerOffloaderTest.class);
+
private static final int DEFAULT_BLOCK_SIZE = 5*1024*1024;
private static final int DEFAULT_READ_BUFFER_SIZE = 1*1024*1024;
final OrderedScheduler scheduler;
final MockBookKeeper bk;
- S3ManagedLedgerOffloaderTest() throws Exception {
+ BlobStoreManagedLedgerOffloaderTest() throws Exception {
scheduler =
OrderedScheduler.newSchedulerBuilder().numThreads(1).name("offloader").build();
bk = new
MockBookKeeper(MockedPulsarServiceBaseTest.createMockZooKeeper());
}
@@ -114,31 +117,32 @@ class S3ManagedLedgerOffloaderTest extends S3TestBase {
@Test
public void testHappyCase() throws Exception {
- LedgerOffloader offloader = new S3ManagedLedgerOffloader(s3client,
BUCKET, scheduler,
+ LedgerOffloader offloader = new
BlobStoreManagedLedgerOffloader(blobStore, BUCKET, scheduler,
DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE);
offloader.offload(buildReadHandle(), UUID.randomUUID(), new
HashMap<>()).get();
}
@Test
public void testBucketDoesNotExist() throws Exception {
- LedgerOffloader offloader = new S3ManagedLedgerOffloader(s3client,
"no-bucket", scheduler,
+ LedgerOffloader offloader = new
BlobStoreManagedLedgerOffloader(blobStore, "no-bucket", scheduler,
DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE);
try {
offloader.offload(buildReadHandle(), UUID.randomUUID(), new
HashMap<>()).get();
Assert.fail("Shouldn't be able to add to bucket");
} catch (ExecutionException e) {
- Assert.assertTrue(e.getMessage().contains("NoSuchBucket"));
+ log.error("Exception: ", e.getMessage());
+ Assert.assertTrue(e.getMessage().contains("not found"));
}
}
@Test
public void testNoRegionConfigured() throws Exception {
ServiceConfiguration conf = new ServiceConfiguration();
-
conf.setManagedLedgerOffloadDriver(S3ManagedLedgerOffloader.DRIVER_NAME);
+ conf.setManagedLedgerOffloadDriver("s3");
conf.setS3ManagedLedgerOffloadBucket(BUCKET);
try {
- S3ManagedLedgerOffloader.create(conf, scheduler);
+ BlobStoreManagedLedgerOffloader.create(conf, scheduler);
Assert.fail("Should have thrown exception");
} catch (PulsarServerException pse) {
// correct
@@ -148,11 +152,11 @@ class S3ManagedLedgerOffloaderTest extends S3TestBase {
@Test
public void testNoBucketConfigured() throws Exception {
ServiceConfiguration conf = new ServiceConfiguration();
-
conf.setManagedLedgerOffloadDriver(S3ManagedLedgerOffloader.DRIVER_NAME);
+ conf.setManagedLedgerOffloadDriver("s3");
conf.setS3ManagedLedgerOffloadRegion("eu-west-1");
try {
- S3ManagedLedgerOffloader.create(conf, scheduler);
+ BlobStoreManagedLedgerOffloader.create(conf, scheduler);
Assert.fail("Should have thrown exception");
} catch (PulsarServerException pse) {
// correct
@@ -162,13 +166,13 @@ class S3ManagedLedgerOffloaderTest extends S3TestBase {
@Test
public void testSmallBlockSizeConfigured() throws Exception {
ServiceConfiguration conf = new ServiceConfiguration();
-
conf.setManagedLedgerOffloadDriver(S3ManagedLedgerOffloader.DRIVER_NAME);
+ conf.setManagedLedgerOffloadDriver("s3");
conf.setS3ManagedLedgerOffloadRegion("eu-west-1");
conf.setS3ManagedLedgerOffloadBucket(BUCKET);
conf.setS3ManagedLedgerOffloadMaxBlockSizeInBytes(1024);
try {
- S3ManagedLedgerOffloader.create(conf, scheduler);
+ BlobStoreManagedLedgerOffloader.create(conf, scheduler);
Assert.fail("Should have thrown exception");
} catch (PulsarServerException pse) {
// correct
@@ -178,7 +182,7 @@ class S3ManagedLedgerOffloaderTest extends S3TestBase {
@Test
public void testOffloadAndRead() throws Exception {
ReadHandle toWrite = buildReadHandle(DEFAULT_BLOCK_SIZE, 3);
- LedgerOffloader offloader = new S3ManagedLedgerOffloader(s3client,
BUCKET, scheduler,
+ LedgerOffloader offloader = new
BlobStoreManagedLedgerOffloader(blobStore, BUCKET, scheduler,
DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE);
UUID uuid = UUID.randomUUID();
offloader.offload(toWrite, uuid, new HashMap<>()).get();
@@ -213,21 +217,22 @@ class S3ManagedLedgerOffloaderTest extends S3TestBase {
// mock throw exception when initiateMultipartUpload
try {
- AmazonS3 mockS3client = Mockito.spy(s3client);
+
+ BlobStore spiedBlobStore = mock(BlobStore.class,
delegatesTo(blobStore));
Mockito
- .doThrow(new AmazonServiceException(failureString))
- .when(mockS3client).initiateMultipartUpload(any());
+ .doThrow(new RuntimeException(failureString))
+ .when(spiedBlobStore).initiateMultipartUpload(any(), any(),
any());
- LedgerOffloader offloader = new
S3ManagedLedgerOffloader(mockS3client, BUCKET, scheduler,
+ LedgerOffloader offloader = new
BlobStoreManagedLedgerOffloader(spiedBlobStore, BUCKET, scheduler,
DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE);
offloader.offload(readHandle, uuid, new HashMap<>()).get();
Assert.fail("Should throw exception when initiateMultipartUpload");
} catch (Exception e) {
// excepted
- Assert.assertTrue(e.getCause() instanceof AmazonServiceException);
+ Assert.assertTrue(e.getCause() instanceof RuntimeException);
Assert.assertTrue(e.getCause().getMessage().contains(failureString));
- Assert.assertFalse(s3client.doesObjectExist(BUCKET,
dataBlockOffloadKey(readHandle.getId(), uuid)));
- Assert.assertFalse(s3client.doesObjectExist(BUCKET,
indexBlockOffloadKey(readHandle.getId(), uuid)));
+ Assert.assertFalse(blobStore.blobExists(BUCKET,
dataBlockOffloadKey(readHandle.getId(), uuid)));
+ Assert.assertFalse(blobStore.blobExists(BUCKET,
indexBlockOffloadKey(readHandle.getId(), uuid)));
}
}
@@ -239,22 +244,21 @@ class S3ManagedLedgerOffloaderTest extends S3TestBase {
// mock throw exception when uploadPart
try {
- AmazonS3 mockS3client = Mockito.spy(s3client);
+ BlobStore spiedBlobStore = mock(BlobStore.class,
delegatesTo(blobStore));
Mockito
- .doThrow(new AmazonServiceException("fail
DataBlockPartUpload"))
- .when(mockS3client).uploadPart(any());
- Mockito.doNothing().when(mockS3client).abortMultipartUpload(any());
+ .doThrow(new RuntimeException(failureString))
+ .when(spiedBlobStore).uploadMultipartPart(any(), anyInt(),
any());
- LedgerOffloader offloader = new
S3ManagedLedgerOffloader(mockS3client, BUCKET, scheduler,
-
DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE);
+ LedgerOffloader offloader = new
BlobStoreManagedLedgerOffloader(spiedBlobStore, BUCKET, scheduler,
+ DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE);
offloader.offload(readHandle, uuid, new HashMap<>()).get();
Assert.fail("Should throw exception for when uploadPart");
} catch (Exception e) {
// excepted
- Assert.assertTrue(e.getCause() instanceof AmazonServiceException);
+ Assert.assertTrue(e.getCause() instanceof RuntimeException);
Assert.assertTrue(e.getCause().getMessage().contains(failureString));
- Assert.assertFalse(s3client.doesObjectExist(BUCKET,
dataBlockOffloadKey(readHandle.getId(), uuid)));
- Assert.assertFalse(s3client.doesObjectExist(BUCKET,
indexBlockOffloadKey(readHandle.getId(), uuid)));
+ Assert.assertFalse(blobStore.blobExists(BUCKET,
dataBlockOffloadKey(readHandle.getId(), uuid)));
+ Assert.assertFalse(blobStore.blobExists(BUCKET,
indexBlockOffloadKey(readHandle.getId(), uuid)));
}
}
@@ -266,22 +270,25 @@ class S3ManagedLedgerOffloaderTest extends S3TestBase {
// mock throw exception when completeMultipartUpload
try {
- AmazonS3 mockS3client = Mockito.spy(s3client);
+ BlobStore spiedBlobStore = mock(BlobStore.class,
delegatesTo(blobStore));
+ Mockito
+ .doThrow(new RuntimeException(failureString))
+ .when(spiedBlobStore).completeMultipartUpload(any(), any());
Mockito
- .doThrow(new AmazonServiceException(failureString))
- .when(mockS3client).completeMultipartUpload(any());
- Mockito.doNothing().when(mockS3client).abortMultipartUpload(any());
+ .doNothing()
+ .when(spiedBlobStore).abortMultipartUpload(any());
- LedgerOffloader offloader = new
S3ManagedLedgerOffloader(mockS3client, BUCKET, scheduler,
-
DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE);
+ LedgerOffloader offloader = new
BlobStoreManagedLedgerOffloader(spiedBlobStore, BUCKET, scheduler,
+ DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE);
offloader.offload(readHandle, uuid, new HashMap<>()).get();
+
Assert.fail("Should throw exception for when
completeMultipartUpload");
} catch (Exception e) {
// excepted
- Assert.assertTrue(e.getCause() instanceof AmazonServiceException);
+ Assert.assertTrue(e.getCause() instanceof RuntimeException);
Assert.assertTrue(e.getCause().getMessage().contains(failureString));
- Assert.assertFalse(s3client.doesObjectExist(BUCKET,
dataBlockOffloadKey(readHandle.getId(), uuid)));
- Assert.assertFalse(s3client.doesObjectExist(BUCKET,
indexBlockOffloadKey(readHandle.getId(), uuid)));
+ Assert.assertFalse(blobStore.blobExists(BUCKET,
dataBlockOffloadKey(readHandle.getId(), uuid)));
+ Assert.assertFalse(blobStore.blobExists(BUCKET,
indexBlockOffloadKey(readHandle.getId(), uuid)));
}
}
@@ -293,21 +300,22 @@ class S3ManagedLedgerOffloaderTest extends S3TestBase {
// mock throw exception when putObject
try {
- AmazonS3 mockS3client = Mockito.spy(s3client);
+ BlobStore spiedBlobStore = mock(BlobStore.class,
delegatesTo(blobStore));
Mockito
- .doThrow(new AmazonServiceException(failureString))
- .when(mockS3client).putObject(any());
+ .doThrow(new RuntimeException(failureString))
+ .when(spiedBlobStore).putBlob(any(), any());
- LedgerOffloader offloader = new
S3ManagedLedgerOffloader(mockS3client, BUCKET, scheduler,
-
DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE);
+ LedgerOffloader offloader = new
BlobStoreManagedLedgerOffloader(spiedBlobStore, BUCKET, scheduler,
+ DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE);
offloader.offload(readHandle, uuid, new HashMap<>()).get();
+
Assert.fail("Should throw exception for when putObject for index
block");
} catch (Exception e) {
// excepted
- Assert.assertTrue(e.getCause() instanceof AmazonServiceException);
+ Assert.assertTrue(e.getCause() instanceof RuntimeException);
Assert.assertTrue(e.getCause().getMessage().contains(failureString));
- Assert.assertFalse(s3client.doesObjectExist(BUCKET,
dataBlockOffloadKey(readHandle.getId(), uuid)));
- Assert.assertFalse(s3client.doesObjectExist(BUCKET,
indexBlockOffloadKey(readHandle.getId(), uuid)));
+ Assert.assertFalse(blobStore.blobExists(BUCKET,
dataBlockOffloadKey(readHandle.getId(), uuid)));
+ Assert.assertFalse(blobStore.blobExists(BUCKET,
indexBlockOffloadKey(readHandle.getId(), uuid)));
}
}
@@ -328,7 +336,7 @@ class S3ManagedLedgerOffloaderTest extends S3TestBase {
randomAccesses[i][1] = second;
}
- LedgerOffloader offloader = new S3ManagedLedgerOffloader(s3client,
BUCKET, scheduler,
+ LedgerOffloader offloader = new
BlobStoreManagedLedgerOffloader(blobStore, BUCKET, scheduler,
DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE);
UUID uuid = UUID.randomUUID();
offloader.offload(toWrite, uuid, new HashMap<>()).get();
@@ -360,7 +368,7 @@ class S3ManagedLedgerOffloaderTest extends S3TestBase {
@Test
public void testOffloadReadInvalidEntryIds() throws Exception {
ReadHandle toWrite = buildReadHandle(DEFAULT_BLOCK_SIZE, 1);
- LedgerOffloader offloader = new S3ManagedLedgerOffloader(s3client,
BUCKET, scheduler,
+ LedgerOffloader offloader = new
BlobStoreManagedLedgerOffloader(blobStore, BUCKET, scheduler,
DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE);
UUID uuid = UUID.randomUUID();
offloader.offload(toWrite, uuid, new HashMap<>()).get();
@@ -385,46 +393,47 @@ class S3ManagedLedgerOffloaderTest extends S3TestBase {
public void testDeleteOffloaded() throws Exception {
ReadHandle readHandle = buildReadHandle(DEFAULT_BLOCK_SIZE, 1);
UUID uuid = UUID.randomUUID();
- LedgerOffloader offloader = new S3ManagedLedgerOffloader(s3client,
BUCKET, scheduler,
+ LedgerOffloader offloader = new
BlobStoreManagedLedgerOffloader(blobStore, BUCKET, scheduler,
DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE);
// verify object exist after offload
offloader.offload(readHandle, uuid, new HashMap<>()).get();
- Assert.assertTrue(s3client.doesObjectExist(BUCKET,
dataBlockOffloadKey(readHandle.getId(), uuid)));
- Assert.assertTrue(s3client.doesObjectExist(BUCKET,
indexBlockOffloadKey(readHandle.getId(), uuid)));
+ Assert.assertTrue(blobStore.blobExists(BUCKET,
dataBlockOffloadKey(readHandle.getId(), uuid)));
+ Assert.assertTrue(blobStore.blobExists(BUCKET,
indexBlockOffloadKey(readHandle.getId(), uuid)));
// verify object deleted after delete
offloader.deleteOffloaded(readHandle.getId(), uuid).get();
- Assert.assertFalse(s3client.doesObjectExist(BUCKET,
dataBlockOffloadKey(readHandle.getId(), uuid)));
- Assert.assertFalse(s3client.doesObjectExist(BUCKET,
indexBlockOffloadKey(readHandle.getId(), uuid)));
+ Assert.assertFalse(blobStore.blobExists(BUCKET,
dataBlockOffloadKey(readHandle.getId(), uuid)));
+ Assert.assertFalse(blobStore.blobExists(BUCKET,
indexBlockOffloadKey(readHandle.getId(), uuid)));
}
@Test
public void testDeleteOffloadedFail() throws Exception {
+ String failureString = "fail deleteOffloaded";
ReadHandle readHandle = buildReadHandle(DEFAULT_BLOCK_SIZE, 1);
UUID uuid = UUID.randomUUID();
- LedgerOffloader offloader = new S3ManagedLedgerOffloader(s3client,
BUCKET, scheduler,
-
DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE);
- String failureString = "fail deleteOffloaded";
- AmazonS3 mockS3client = Mockito.spy(s3client);
+ BlobStore spiedBlobStore = mock(BlobStore.class,
delegatesTo(blobStore));
+
Mockito
- .doThrow(new AmazonServiceException(failureString))
- .when(mockS3client).deleteObjects(any());
+ .doThrow(new RuntimeException(failureString))
+ .when(spiedBlobStore).removeBlobs(any(), any());
+
+ LedgerOffloader offloader = new
BlobStoreManagedLedgerOffloader(spiedBlobStore, BUCKET, scheduler,
+ DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE);
try {
// verify object exist after offload
offloader.offload(readHandle, uuid, new HashMap<>()).get();
- Assert.assertTrue(mockS3client.doesObjectExist(BUCKET,
dataBlockOffloadKey(readHandle.getId(), uuid)));
- Assert.assertTrue(mockS3client.doesObjectExist(BUCKET,
indexBlockOffloadKey(readHandle.getId(), uuid)));
+ Assert.assertTrue(blobStore.blobExists(BUCKET,
dataBlockOffloadKey(readHandle.getId(), uuid)));
+ Assert.assertTrue(blobStore.blobExists(BUCKET,
indexBlockOffloadKey(readHandle.getId(), uuid)));
offloader.deleteOffloaded(readHandle.getId(), uuid).get();
} catch (Exception e) {
// expected
- Assert.assertTrue(e.getCause() instanceof AmazonServiceException);
Assert.assertTrue(e.getCause().getMessage().contains(failureString));
// verify object still there.
- Assert.assertTrue(mockS3client.doesObjectExist(BUCKET,
dataBlockOffloadKey(readHandle.getId(), uuid)));
- Assert.assertTrue(mockS3client.doesObjectExist(BUCKET,
indexBlockOffloadKey(readHandle.getId(), uuid)));
+ Assert.assertTrue(blobStore.blobExists(BUCKET,
dataBlockOffloadKey(readHandle.getId(), uuid)));
+ Assert.assertTrue(blobStore.blobExists(BUCKET,
indexBlockOffloadKey(readHandle.getId(), uuid)));
}
}
@@ -441,7 +450,7 @@ class S3ManagedLedgerOffloaderTest extends S3TestBase {
Mockito.doReturn(1234L).when(readHandle).getId();
UUID uuid = UUID.randomUUID();
- LedgerOffloader offloader = new S3ManagedLedgerOffloader(s3client,
BUCKET, scheduler,
+ LedgerOffloader offloader = new
BlobStoreManagedLedgerOffloader(blobStore, BUCKET, scheduler,
DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE);
try {
offloader.offload(readHandle, uuid, new HashMap<>()).get();
@@ -454,48 +463,51 @@ class S3ManagedLedgerOffloaderTest extends S3TestBase {
@Test
public void testReadUnknownDataVersion() throws Exception {
ReadHandle toWrite = buildReadHandle(DEFAULT_BLOCK_SIZE, 1);
- LedgerOffloader offloader = new S3ManagedLedgerOffloader(s3client,
BUCKET, scheduler,
+ LedgerOffloader offloader = new
BlobStoreManagedLedgerOffloader(blobStore, BUCKET, scheduler,
DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE);
UUID uuid = UUID.randomUUID();
offloader.offload(toWrite, uuid, new HashMap<>()).get();
String dataKey = dataBlockOffloadKey(toWrite.getId(), uuid);
- ObjectMetadata md = s3client.getObjectMetadata(BUCKET, dataKey);
-
md.getUserMetadata().put(S3ManagedLedgerOffloader.METADATA_FORMAT_VERSION_KEY,
String.valueOf(-12345));
- s3client.copyObject(new CopyObjectRequest(BUCKET, dataKey, BUCKET,
dataKey).withNewObjectMetadata(md));
+
+ Map<String, String> userMeta = blobStore.blobMetadata(BUCKET,
dataKey).getUserMetadata();
+
userMeta.put(BlobStoreManagedLedgerOffloader.METADATA_FORMAT_VERSION_KEY,
String.valueOf(-12345));
+ blobStore.copyBlob(BUCKET, dataKey, BUCKET, dataKey,
CopyOptions.builder().userMetadata(userMeta).build());
try (ReadHandle toRead = offloader.readOffloaded(toWrite.getId(),
uuid).get()) {
toRead.readAsync(0, 0).get();
Assert.fail("Shouldn't have been able to read");
} catch (ExecutionException e) {
+ log.error("Exception: ", e);
Assert.assertEquals(e.getCause().getClass(), IOException.class);
- Assert.assertTrue(e.getCause().getMessage().contains("Invalid
object version"));
+ Assert.assertTrue(e.getCause().getMessage().contains("Error
reading from BlobStore"));
}
-
md.getUserMetadata().put(S3ManagedLedgerOffloader.METADATA_FORMAT_VERSION_KEY,
String.valueOf(12345));
- s3client.copyObject(new CopyObjectRequest(BUCKET, dataKey, BUCKET,
dataKey).withNewObjectMetadata(md));
+
userMeta.put(BlobStoreManagedLedgerOffloader.METADATA_FORMAT_VERSION_KEY,
String.valueOf(12345));
+ blobStore.copyBlob(BUCKET, dataKey, BUCKET, dataKey,
CopyOptions.builder().userMetadata(userMeta).build());
try (ReadHandle toRead = offloader.readOffloaded(toWrite.getId(),
uuid).get()) {
toRead.readAsync(0, 0).get();
Assert.fail("Shouldn't have been able to read");
} catch (ExecutionException e) {
Assert.assertEquals(e.getCause().getClass(), IOException.class);
- Assert.assertTrue(e.getCause().getMessage().contains("Invalid
object version"));
+ Assert.assertTrue(e.getCause().getMessage().contains("Error
reading from BlobStore"));
}
}
@Test
public void testReadUnknownIndexVersion() throws Exception {
ReadHandle toWrite = buildReadHandle(DEFAULT_BLOCK_SIZE, 1);
- LedgerOffloader offloader = new S3ManagedLedgerOffloader(s3client,
BUCKET, scheduler,
+ LedgerOffloader offloader = new
BlobStoreManagedLedgerOffloader(blobStore, BUCKET, scheduler,
DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE);
UUID uuid = UUID.randomUUID();
offloader.offload(toWrite, uuid, new HashMap<>()).get();
String indexKey = indexBlockOffloadKey(toWrite.getId(), uuid);
- ObjectMetadata md = s3client.getObjectMetadata(BUCKET, indexKey);
-
md.getUserMetadata().put(S3ManagedLedgerOffloader.METADATA_FORMAT_VERSION_KEY,
String.valueOf(-12345));
- s3client.copyObject(new CopyObjectRequest(BUCKET, indexKey, BUCKET,
indexKey).withNewObjectMetadata(md));
+
+ Map<String, String> userMeta = blobStore.blobMetadata(BUCKET,
indexKey).getUserMetadata();
+
userMeta.put(BlobStoreManagedLedgerOffloader.METADATA_FORMAT_VERSION_KEY,
String.valueOf(-12345));
+ blobStore.copyBlob(BUCKET, indexKey, BUCKET, indexKey,
CopyOptions.builder().userMetadata(userMeta).build());
try {
offloader.readOffloaded(toWrite.getId(), uuid).get();
@@ -505,8 +517,8 @@ class S3ManagedLedgerOffloaderTest extends S3TestBase {
Assert.assertTrue(e.getCause().getMessage().contains("Invalid
object version"));
}
-
md.getUserMetadata().put(S3ManagedLedgerOffloader.METADATA_FORMAT_VERSION_KEY,
String.valueOf(12345));
- s3client.copyObject(new CopyObjectRequest(BUCKET, indexKey, BUCKET,
indexKey).withNewObjectMetadata(md));
+
userMeta.put(BlobStoreManagedLedgerOffloader.METADATA_FORMAT_VERSION_KEY,
String.valueOf(12345));
+ blobStore.copyBlob(BUCKET, indexKey, BUCKET, indexKey,
CopyOptions.builder().userMetadata(userMeta).build());
try {
offloader.readOffloaded(toWrite.getId(), uuid).get();
diff --git
a/tests/integration-tests-topologies/src/main/resources/cube-definitions/single-cluster-2-bookie-1-broker-unstarted-with-s3.yaml
b/tests/integration-tests-topologies/src/main/resources/cube-definitions/single-cluster-2-bookie-1-broker-unstarted-with-s3.yaml
index 6b8eaa4..ec46571 100644
---
a/tests/integration-tests-topologies/src/main/resources/cube-definitions/single-cluster-2-bookie-1-broker-unstarted-with-s3.yaml
+++
b/tests/integration-tests-topologies/src/main/resources/cube-definitions/single-cluster-2-bookie-1-broker-unstarted-with-s3.yaml
@@ -148,7 +148,9 @@ pulsar-proxy*:
networkMode: pulsarnet*
s3*:
- image: adobe/s3mock
+ ## use latest adobe/s3mock, for issue:
https://github.com/adobe/S3Mock/issues/32
+ ## TODO: https://github.com/apache/incubator-pulsar/issues/2133
+ image: apachepulsar/s3mock
await:
strategy: org.apache.pulsar.tests.NoopAwaitStrategy
env: