sijie closed pull request #2065: GCS offload support(2): replace `s3client` api 
with `jclouds` related api 
URL: https://github.com/apache/incubator-pulsar/pull/2065
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/conf/broker.conf b/conf/broker.conf
index 2e40d82a87..96433aa517 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -486,7 +486,7 @@ 
schemaRegistryStorageClassName=org.apache.pulsar.broker.service.schema.Bookkeepe
 
 ### --- Ledger Offloading --- ###
 
-# Driver to use to offload old data to long term storage (Possible values: S3)
+# Driver to use to offload old data to long term storage (Possible values: S3, 
aws-s3, google-cloud-storage)
 managedLedgerOffloadDriver=
 
 # Maximum number of thread pool threads for ledger offloading
diff --git a/distribution/server/src/assemble/LICENSE.bin.txt 
b/distribution/server/src/assemble/LICENSE.bin.txt
index 33b97481da..85a97e5441 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -334,10 +334,10 @@ The Apache Software License, Version 2.0
     - io.swagger-swagger-annotations-1.5.3.jar
     - io.swagger-swagger-core-1.5.3.jar
     - io.swagger-swagger-models-1.5.3.jar
- * DataSketches 
+ * DataSketches
     - com.yahoo.datasketches-memory-0.8.3.jar
     - com.yahoo.datasketches-sketches-core-0.8.3.jar
- * Apache Commons 
+ * Apache Commons
     - commons-beanutils-commons-beanutils-1.7.0.jar
     - commons-beanutils-commons-beanutils-core-1.8.0.jar
     - commons-cli-commons-cli-1.2.jar
@@ -461,6 +461,12 @@ The Apache Software License, Version 2.0
     - org.xerial.snappy-snappy-java-1.1.1.3.jar
   * Flatbuffers Java
     - com.google.flatbuffers-flatbuffers-java-1.9.0.jar
+  * Apache Jclouds
+    - org.apache.jclouds-allblobstore-2.2.0-SNAPSHOT.jar
+  * Google Guice Core Library
+    - com.google.inject.guice-3.0.jar
+    - com.google.inject.extensions:guice-multibindings-3.0.jar
+    - com.google.inject.extensions:guice-assistedinject-3.0.jar
 
 
 BSD 3-clause "New" or "Revised" License
diff --git a/jclouds-shaded/pom.xml b/jclouds-shaded/pom.xml
new file mode 100644
index 0000000000..8aa1786c90
--- /dev/null
+++ b/jclouds-shaded/pom.xml
@@ -0,0 +1,105 @@
+<?xml version="1.0"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";
+  xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";>
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.pulsar</groupId>
+    <artifactId>pulsar</artifactId>
+    <version>2.2.0-incubating-SNAPSHOT</version>
+    <relativePath>..</relativePath>
+  </parent>
+
+  <artifactId>jclouds-shaded</artifactId>
+  <name>Apache Pulsar :: Jclouds shaded</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>com.google.code.gson</groupId>
+      <artifactId>gson</artifactId>
+      <version>2.5</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.jclouds</groupId>
+      <artifactId>jclouds-allblobstore</artifactId>
+      <version>2.2.0-SNAPSHOT</version>
+    </dependency>
+  </dependencies>
+
+  <repositories>
+    <repository>
+      <id>jclouds-snapshots</id>
+      <url>https://repository.apache.org/content/repositories/snapshots</url>
+      <snapshots>
+        <enabled>true</enabled>
+      </snapshots>
+    </repository>
+  </repositories>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-shade-plugin</artifactId>
+        <executions>
+          <execution>
+            <phase>package</phase>
+            <goals>
+              <goal>shade</goal>
+            </goals>
+            <configuration>
+              <createDependencyReducedPom>true</createDependencyReducedPom>
+              
<promoteTransitiveDependencies>true</promoteTransitiveDependencies>
+              <minimizeJar>false</minimizeJar>
+
+              <artifactSet>
+                <includes>
+                  <include>com.google.code.gson:gson</include>
+                  <include>com.google.guava:guava</include>
+                  <include>org.apache.jclouds:*</include>
+                  <include>org.apache.jclouds.api:*</include>
+                  <include>org.apache.jclouds.common:*</include>
+                  <include>org.apache.jclouds.provider:*</include>
+                  
<include>com.google.inject.extensions:guice-assistedinject</include>
+                  <include>com.google.inject:guice</include>
+                  
<include>com.google.inject.extensions:guice-multibindings</include>
+                </includes>
+              </artifactSet>
+
+              <relocations>
+                <relocation>
+                  <pattern>com.google</pattern>
+                  
<shadedPattern>org.apache.pulsar.shaded.com.google</shadedPattern>
+                </relocation>
+              </relocations>
+              <transformers>
+                <transformer 
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"
 />
+                <transformer 
implementation="org.apache.maven.plugins.shade.resource.PluginXmlResourceTransformer"
 />
+              </transformers>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+</project>
diff --git a/pom.xml b/pom.xml
index a6d645379b..88c2591ad9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -100,6 +100,8 @@ flexible messaging model and an intuitive client 
API.</description>
     <module>pulsar-zookeeper</module>
     <module>pulsar-log4j2-appender</module>
     <module>protobuf-shaded</module>
+    <!-- jclouds shaded for gson conflict: 
https://issues.apache.org/jira/browse/JCLOUDS-1166 -->
+    <module>jclouds-shaded</module>
 
     <!-- functions-related modules -->
     <module>pulsar-functions</module>
diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml
index d505c7a881..ac264bdf5b 100644
--- a/pulsar-broker/pom.xml
+++ b/pulsar-broker/pom.xml
@@ -127,7 +127,7 @@
 
     <dependency>
       <groupId>com.amazonaws</groupId>
-      <artifactId>aws-java-sdk-s3</artifactId>
+      <artifactId>aws-java-sdk-core</artifactId>
     </dependency>
 
     <!-- functions related dependencies (begin) -->
@@ -273,6 +273,12 @@
       <version>${project.version}</version>
       <scope>test</scope>
     </dependency>
+
+    <dependency>
+      <groupId>org.apache.pulsar</groupId>
+      <artifactId>jclouds-shaded</artifactId>
+      <version>${project.version}</version>
+    </dependency>
   </dependencies>
 
   <build>
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 2a341afba7..c0aa9d2cfb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -61,7 +61,7 @@
 import org.apache.pulsar.broker.loadbalance.LoadSheddingTask;
 import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
 import org.apache.pulsar.broker.namespace.NamespaceService;
-import org.apache.pulsar.broker.offload.impl.S3ManagedLedgerOffloader;
+import org.apache.pulsar.broker.offload.impl.BlobStoreManagedLedgerOffloader;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
@@ -657,8 +657,8 @@ public LedgerOffloader getManagedLedgerOffloader() {
     public synchronized LedgerOffloader 
createManagedLedgerOffloader(ServiceConfiguration conf)
             throws PulsarServerException {
         if (conf.getManagedLedgerOffloadDriver() != null
-            && 
conf.getManagedLedgerOffloadDriver().equalsIgnoreCase(S3ManagedLedgerOffloader.DRIVER_NAME))
 {
-            return S3ManagedLedgerOffloader.create(conf, 
getOffloaderScheduler(conf));
+            && 
BlobStoreManagedLedgerOffloader.driverSupported(conf.getManagedLedgerOffloadDriver()))
 {
+                return BlobStoreManagedLedgerOffloader.create(conf, 
getOffloaderScheduler(conf));
         } else {
             return NullLedgerOffloader.INSTANCE;
         }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/S3BackedInputStreamImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/BlobStoreBackedInputStreamImpl.java
similarity index 68%
rename from 
pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/S3BackedInputStreamImpl.java
rename to 
pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/BlobStoreBackedInputStreamImpl.java
index e55e61bf54..19fac5902a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/S3BackedInputStreamImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/BlobStoreBackedInputStreamImpl.java
@@ -18,27 +18,22 @@
  */
 package org.apache.pulsar.broker.offload.impl;
 
-import com.amazonaws.AmazonClientException;
-import com.amazonaws.services.s3.AmazonS3;
-import com.amazonaws.services.s3.model.GetObjectRequest;
-import com.amazonaws.services.s3.model.S3Object;
-
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.PooledByteBufAllocator;
-
-import java.io.InputStream;
 import java.io.IOException;
-
+import java.io.InputStream;
 import org.apache.pulsar.broker.offload.BackedInputStream;
-import 
org.apache.pulsar.broker.offload.impl.S3ManagedLedgerOffloader.VersionCheck;
-
+import 
org.apache.pulsar.broker.offload.impl.BlobStoreManagedLedgerOffloader.VersionCheck;
+import org.jclouds.blobstore.BlobStore;
+import org.jclouds.blobstore.domain.Blob;
+import org.jclouds.blobstore.options.GetOptions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class S3BackedInputStreamImpl extends BackedInputStream {
-    private static final Logger log = 
LoggerFactory.getLogger(S3BackedInputStreamImpl.class);
+public class BlobStoreBackedInputStreamImpl extends BackedInputStream {
+    private static final Logger log = 
LoggerFactory.getLogger(BlobStoreBackedInputStreamImpl.class);
 
-    private final AmazonS3 s3client;
+    private final BlobStore blobStore;
     private final String bucket;
     private final String key;
     private final VersionCheck versionCheck;
@@ -50,10 +45,10 @@
     private long bufferOffsetStart;
     private long bufferOffsetEnd;
 
-    public S3BackedInputStreamImpl(AmazonS3 s3client, String bucket, String 
key,
-                                   VersionCheck versionCheck,
-                                   long objectLen, int bufferSize) {
-        this.s3client = s3client;
+    public BlobStoreBackedInputStreamImpl(BlobStore blobStore, String bucket, 
String key,
+                                          VersionCheck versionCheck,
+                                          long objectLen, int bufferSize) {
+        this.blobStore = blobStore;
         this.bucket = bucket;
         this.key = key;
         this.versionCheck = versionCheck;
@@ -76,26 +71,24 @@ private boolean refillBufferIfNeeded() throws IOException {
             long startRange = cursor;
             long endRange = Math.min(cursor + bufferSize - 1,
                                      objectLen - 1);
-            GetObjectRequest req = new GetObjectRequest(bucket, key)
-                .withRange(startRange, endRange);
-            log.debug("Reading range {}-{} from {}/{}", startRange, endRange, 
bucket, key);
-            try (S3Object obj = s3client.getObject(req)) {
-                versionCheck.check(key, obj.getObjectMetadata());
-
-                Long[] range = obj.getObjectMetadata().getContentRange();
-                long bytesRead = range[1] - range[0] + 1;
 
-                buffer.clear();
-                bufferOffsetStart = range[0];
-                bufferOffsetEnd = range[1];
-                InputStream s = obj.getObjectContent();
-                int bytesToCopy = (int)bytesRead;
-                while (bytesToCopy > 0) {
-                    bytesToCopy -= buffer.writeBytes(s, bytesToCopy);
+            try {
+                Blob blob = blobStore.getBlob(bucket, key, new 
GetOptions().range(startRange, endRange));
+                versionCheck.check(key, blob);
+
+                try (InputStream stream = blob.getPayload().openStream()) {
+                    buffer.clear();
+                    bufferOffsetStart = startRange;
+                    bufferOffsetEnd = endRange;
+                    long bytesRead = endRange - startRange + 1;
+                    int bytesToCopy = (int) bytesRead;
+                    while (bytesToCopy > 0) {
+                        bytesToCopy -= buffer.writeBytes(stream, bytesToCopy);
+                    }
+                    cursor += buffer.readableBytes();
                 }
-                cursor += buffer.readableBytes();
-            } catch (AmazonClientException e) {
-                throw new IOException("Error reading from S3", e);
+            } catch (Throwable e) {
+                throw new IOException("Error reading from BlobStore", e);
             }
         }
         return true;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/S3BackedReadHandleImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/BlobStoreBackedReadHandleImpl.java
similarity index 82%
rename from 
pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/S3BackedReadHandleImpl.java
rename to 
pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/BlobStoreBackedReadHandleImpl.java
index 08b5ea6c37..36b382be96 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/S3BackedReadHandleImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/BlobStoreBackedReadHandleImpl.java
@@ -19,13 +19,8 @@
 package org.apache.pulsar.broker.offload.impl;
 
 import com.amazonaws.AmazonClientException;
-import com.amazonaws.services.s3.AmazonS3;
-import com.amazonaws.services.s3.model.GetObjectRequest;
-import com.amazonaws.services.s3.model.S3Object;
-
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.PooledByteBufAllocator;
-
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -33,7 +28,6 @@
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
-
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
 import org.apache.bookkeeper.client.api.LedgerEntries;
@@ -42,18 +36,18 @@
 import org.apache.bookkeeper.client.api.ReadHandle;
 import org.apache.bookkeeper.client.impl.LedgerEntriesImpl;
 import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
-
+import org.apache.pulsar.broker.offload.BackedInputStream;
 import org.apache.pulsar.broker.offload.OffloadIndexBlock;
 import org.apache.pulsar.broker.offload.OffloadIndexBlockBuilder;
 import org.apache.pulsar.broker.offload.OffloadIndexEntry;
-import org.apache.pulsar.broker.offload.BackedInputStream;
-import 
org.apache.pulsar.broker.offload.impl.S3ManagedLedgerOffloader.VersionCheck;
-
+import 
org.apache.pulsar.broker.offload.impl.BlobStoreManagedLedgerOffloader.VersionCheck;
+import org.jclouds.blobstore.BlobStore;
+import org.jclouds.blobstore.domain.Blob;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class S3BackedReadHandleImpl implements ReadHandle {
-    private static final Logger log = 
LoggerFactory.getLogger(S3BackedReadHandleImpl.class);
+public class BlobStoreBackedReadHandleImpl implements ReadHandle {
+    private static final Logger log = 
LoggerFactory.getLogger(BlobStoreBackedReadHandleImpl.class);
 
     private final long ledgerId;
     private final OffloadIndexBlock index;
@@ -61,9 +55,9 @@
     private final DataInputStream dataStream;
     private final ExecutorService executor;
 
-    private S3BackedReadHandleImpl(long ledgerId, OffloadIndexBlock index,
-                                   BackedInputStream inputStream,
-                                   ExecutorService executor) {
+    private BlobStoreBackedReadHandleImpl(long ledgerId, OffloadIndexBlock 
index,
+                                          BackedInputStream inputStream,
+                                          ExecutorService executor) {
         this.ledgerId = ledgerId;
         this.index = index;
         this.inputStream = inputStream;
@@ -189,22 +183,19 @@ public boolean isClosed() {
     }
 
     public static ReadHandle open(ScheduledExecutorService executor,
-                                  AmazonS3 s3client, String bucket, String 
key, String indexKey,
+                                  BlobStore blobStore, String bucket, String 
key, String indexKey,
                                   VersionCheck versionCheck,
                                   long ledgerId, int readBufferSize)
             throws AmazonClientException, IOException {
-        GetObjectRequest req = new GetObjectRequest(bucket, indexKey);
-        try (S3Object obj = s3client.getObject(req)) {
-            versionCheck.check(indexKey, obj.getObjectMetadata());
-
-            OffloadIndexBlockBuilder indexBuilder = 
OffloadIndexBlockBuilder.create();
-            OffloadIndexBlock index = 
indexBuilder.fromStream(obj.getObjectContent());
-
-            BackedInputStream inputStream = new 
S3BackedInputStreamImpl(s3client, bucket, key,
-                                                                          
versionCheck,
-                                                                          
index.getDataObjectLength(),
-                                                                          
readBufferSize);
-            return new S3BackedReadHandleImpl(ledgerId, index, inputStream, 
executor);
-        }
+        Blob blob = blobStore.getBlob(bucket, indexKey);
+        versionCheck.check(indexKey, blob);
+        OffloadIndexBlockBuilder indexBuilder = 
OffloadIndexBlockBuilder.create();
+        OffloadIndexBlock index = 
indexBuilder.fromStream(blob.getPayload().openStream());
+
+        BackedInputStream inputStream = new 
BlobStoreBackedInputStreamImpl(blobStore, bucket, key,
+            versionCheck,
+            index.getDataObjectLength(),
+            readBufferSize);
+        return new BlobStoreBackedReadHandleImpl(ledgerId, index, inputStream, 
executor);
     }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/S3ManagedLedgerOffloader.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/BlobStoreManagedLedgerOffloader.java
similarity index 55%
rename from 
pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/S3ManagedLedgerOffloader.java
rename to 
pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/BlobStoreManagedLedgerOffloader.java
index ec74d2762c..528ba28348 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/S3ManagedLedgerOffloader.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/BlobStoreManagedLedgerOffloader.java
@@ -18,24 +18,17 @@
  */
 package org.apache.pulsar.broker.offload.impl;
 
-import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration;
-import com.amazonaws.services.s3.AmazonS3;
-import com.amazonaws.services.s3.AmazonS3ClientBuilder;
-import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
-import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
-import com.amazonaws.services.s3.model.DeleteObjectsRequest;
-import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
-import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
-import com.amazonaws.services.s3.model.ObjectMetadata;
-import com.amazonaws.services.s3.model.PartETag;
-import com.amazonaws.services.s3.model.PutObjectRequest;
-import com.amazonaws.services.s3.model.UploadPartRequest;
-import com.amazonaws.services.s3.model.UploadPartResult;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
 import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
 import java.io.IOException;
-import java.util.LinkedList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import org.apache.bookkeeper.client.api.ReadHandle;
@@ -47,21 +40,48 @@
 import org.apache.pulsar.broker.offload.OffloadIndexBlock;
 import org.apache.pulsar.broker.offload.OffloadIndexBlockBuilder;
 import org.apache.pulsar.utils.PulsarBrokerVersionStringUtils;
+import org.jclouds.Constants;
+import org.jclouds.ContextBuilder;
+import org.jclouds.blobstore.BlobStore;
+import org.jclouds.blobstore.BlobStoreContext;
+import org.jclouds.blobstore.domain.Blob;
+import org.jclouds.blobstore.domain.BlobBuilder;
+import org.jclouds.blobstore.domain.MultipartPart;
+import org.jclouds.blobstore.domain.MultipartUpload;
+import org.jclouds.blobstore.options.PutOptions;
+import org.jclouds.domain.Location;
+import org.jclouds.domain.LocationBuilder;
+import org.jclouds.domain.LocationScope;
+import org.jclouds.io.Payload;
+import org.jclouds.io.Payloads;
+import org.jclouds.s3.reference.S3Constants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class S3ManagedLedgerOffloader implements LedgerOffloader {
-    private static final Logger log = 
LoggerFactory.getLogger(S3ManagedLedgerOffloader.class);
+public class BlobStoreManagedLedgerOffloader implements LedgerOffloader {
+    private static final Logger log = 
LoggerFactory.getLogger(BlobStoreManagedLedgerOffloader.class);
 
-    public static final String DRIVER_NAME = "S3";
+    public static final String[] DRIVER_NAMES = {"S3", "aws-s3", 
"google-cloud-storage"};
 
     static final String METADATA_FORMAT_VERSION_KEY = 
"S3ManagedLedgerOffloaderFormatVersion";
     static final String METADATA_SOFTWARE_VERSION_KEY = 
"S3ManagedLedgerOffloaderSoftwareVersion";
     static final String METADATA_SOFTWARE_GITSHA_KEY = 
"S3ManagedLedgerOffloaderSoftwareGitSha";
     static final String CURRENT_VERSION = String.valueOf(1);
 
-    private final VersionCheck VERSION_CHECK = (key, metadata) -> {
-        String version = 
metadata.getUserMetadata().get(METADATA_FORMAT_VERSION_KEY);
+    public static boolean driverSupported(String driver) {
+        return Arrays.stream(DRIVER_NAMES).anyMatch(d -> 
d.equalsIgnoreCase(driver));
+    }
+
+    private static void addVersionInfo(BlobBuilder blobBuilder) {
+        blobBuilder.userMetadata(ImmutableMap.of(
+            METADATA_FORMAT_VERSION_KEY, CURRENT_VERSION,
+            METADATA_SOFTWARE_VERSION_KEY, 
PulsarBrokerVersionStringUtils.getNormalizedVersionString(),
+            METADATA_SOFTWARE_GITSHA_KEY, 
PulsarBrokerVersionStringUtils.getGitSha()));
+    }
+
+    private final VersionCheck VERSION_CHECK = (key, blob) -> {
+        // NOTE all metadata in jclouds comes out as lowercase, in an effort 
to normalize the providers
+        String version = 
blob.getMetadata().getUserMetadata().get(METADATA_FORMAT_VERSION_KEY.toLowerCase());
         if (version == null || !version.equals(CURRENT_VERSION)) {
             throw new IOException(String.format("Invalid object version %s for 
%s, expect %s",
                                                 version, key, 
CURRENT_VERSION));
@@ -69,15 +89,21 @@
     };
 
     private final OrderedScheduler scheduler;
-    private final AmazonS3 s3client;
+
+    // container in jclouds
     private final String bucket;
     // max block size for each data block.
     private int maxBlockSize;
     private final int readBufferSize;
 
-    public static S3ManagedLedgerOffloader create(ServiceConfiguration conf,
-                                                  OrderedScheduler scheduler)
+    private BlobStoreContext context;
+    private BlobStore blobStore;
+    Location location = null;
+
+    public static BlobStoreManagedLedgerOffloader create(ServiceConfiguration 
conf,
+                                                         OrderedScheduler 
scheduler)
             throws PulsarServerException {
+        String driver = conf.getManagedLedgerOffloadDriver();
         String region = conf.getS3ManagedLedgerOffloadRegion();
         String bucket = conf.getS3ManagedLedgerOffloadBucket();
         String endpoint = conf.getS3ManagedLedgerOffloadServiceEndpoint();
@@ -96,23 +122,66 @@ public static S3ManagedLedgerOffloader 
create(ServiceConfiguration conf,
             throw new 
PulsarServerException("s3ManagedLedgerOffloadMaxBlockSizeInBytes cannot be less 
than 5MB");
         }
 
-        AmazonS3ClientBuilder builder = AmazonS3ClientBuilder.standard();
+        return new BlobStoreManagedLedgerOffloader(driver, bucket, scheduler, 
maxBlockSize, readBufferSize, endpoint, region);
+    }
+
+    // build context for jclouds BlobStoreContext
+    BlobStoreManagedLedgerOffloader(String driver, String container, 
OrderedScheduler scheduler,
+                                    int maxBlockSize, int readBufferSize, 
String endpoint, String region) {
+        this.scheduler = scheduler;
+        this.readBufferSize = readBufferSize;
+
+        this.bucket = container;
+        this.maxBlockSize = maxBlockSize;
+
+        Properties overrides = new Properties();
+        // This property controls the number of parts being uploaded in 
parallel.
+        overrides.setProperty("jclouds.mpu.parallel.degree", "1");
+        overrides.setProperty("jclouds.mpu.parts.size", 
Integer.toString(maxBlockSize));
+        overrides.setProperty(Constants.PROPERTY_SO_TIMEOUT, "25000");
+        overrides.setProperty(Constants.PROPERTY_MAX_RETRIES, 
Integer.toString(100));
+
+        ContextBuilder contextBuilder = ContextBuilder.newBuilder(driver);
+
+        AWSCredentials credentials = null;
+        try {
+            DefaultAWSCredentialsProviderChain creds = 
DefaultAWSCredentialsProviderChain.getInstance();
+            credentials = creds.getCredentials();
+        } catch (Exception e) {
+            log.error("Exception when get credentials for s3 ", e);
+        }
+
+        String id = "accesskey";
+        String key = "secretkey";
+        if (credentials != null) {
+            id = credentials.getAWSAccessKeyId();
+            key = credentials.getAWSSecretKey();
+        }
+        contextBuilder.credentials(id, key);
+
         if (!Strings.isNullOrEmpty(endpoint)) {
-            builder.setEndpointConfiguration(new 
EndpointConfiguration(endpoint, region));
-            builder.setPathStyleAccessEnabled(true);
-        } else {
-            builder.setRegion(region);
+            contextBuilder.endpoint(endpoint);
+            
overrides.setProperty(S3Constants.PROPERTY_S3_VIRTUAL_HOST_BUCKETS, "false");
+        }
+        if (!Strings.isNullOrEmpty(region)) {
+            this.location = new 
LocationBuilder().scope(LocationScope.REGION).id(region).description(region).build();
         }
-        return new S3ManagedLedgerOffloader(builder.build(), bucket, 
scheduler, maxBlockSize, readBufferSize);
+
+        log.info("Constructor driver: {}, host: {}, container: {}, region: {} 
",  driver, endpoint, bucket, region);
+
+        contextBuilder.overrides(overrides);
+        this.context = contextBuilder.buildView(BlobStoreContext.class);
+        this.blobStore = context.getBlobStore();
     }
 
-    S3ManagedLedgerOffloader(AmazonS3 s3client, String bucket, 
OrderedScheduler scheduler,
-                             int maxBlockSize, int readBufferSize) {
-        this.s3client = s3client;
-        this.bucket = bucket;
+    // build context for jclouds BlobStoreContext
+    BlobStoreManagedLedgerOffloader(BlobStore blobStore, String container, 
OrderedScheduler scheduler,
+                                    int maxBlockSize, int readBufferSize) {
         this.scheduler = scheduler;
-        this.maxBlockSize = maxBlockSize;
         this.readBufferSize = readBufferSize;
+        this.bucket = container;
+        this.maxBlockSize = maxBlockSize;
+        this.blobStore = blobStore;
     }
 
     static String dataBlockOffloadKey(long ledgerId, UUID uuid) {
@@ -141,15 +210,15 @@ static String indexBlockOffloadKey(long ledgerId, UUID 
uuid) {
             String dataBlockKey = dataBlockOffloadKey(readHandle.getId(), 
uuid);
             String indexBlockKey = indexBlockOffloadKey(readHandle.getId(), 
uuid);
 
-            ObjectMetadata dataMetadata = new ObjectMetadata();
-            addVersionInfo(dataMetadata);
-
-            InitiateMultipartUploadRequest dataBlockReq = new 
InitiateMultipartUploadRequest(bucket, dataBlockKey, dataMetadata);
-            InitiateMultipartUploadResult dataBlockRes = null;
+            MultipartUpload mpu = null;
+            List<MultipartPart> parts = Lists.newArrayList();
 
             // init multi part upload for data block.
             try {
-                dataBlockRes = s3client.initiateMultipartUpload(dataBlockReq);
+                BlobBuilder blobBuilder = blobStore.blobBuilder(dataBlockKey);
+                addVersionInfo(blobBuilder);
+                Blob blob = blobBuilder.build();
+                mpu = blobStore.initiateMultipartUpload(bucket, 
blob.getMetadata(), new PutOptions());
             } catch (Throwable t) {
                 promise.completeExceptionally(t);
                 return;
@@ -161,7 +230,6 @@ static String indexBlockOffloadKey(long ledgerId, UUID 
uuid) {
                 long startEntry = 0;
                 int partId = 1;
                 long entryBytesWritten = 0;
-                List<PartETag> etags = new LinkedList<>();
                 while (startEntry <= readHandle.getLastAddConfirmed()) {
                     int blockSize = BlockAwareSegmentInputStreamImpl
                         .calculateBlockSize(maxBlockSize, readHandle, 
startEntry, entryBytesWritten);
@@ -169,15 +237,13 @@ static String indexBlockOffloadKey(long ledgerId, UUID 
uuid) {
                     try (BlockAwareSegmentInputStream blockStream = new 
BlockAwareSegmentInputStreamImpl(
                         readHandle, startEntry, blockSize)) {
 
-                        UploadPartResult uploadRes = s3client.uploadPart(
-                            new UploadPartRequest()
-                                .withBucketName(bucket)
-                                .withKey(dataBlockKey)
-                                .withUploadId(dataBlockRes.getUploadId())
-                                .withInputStream(blockStream)
-                                .withPartSize(blockSize)
-                                .withPartNumber(partId));
-                        etags.add(uploadRes.getPartETag());
+                        Payload partPayload = 
Payloads.newInputStreamPayload(blockStream);
+                        
partPayload.getContentMetadata().setContentLength((long)blockSize);
+                        
partPayload.getContentMetadata().setContentType("application/octet-stream");
+                        parts.add(blobStore.uploadMultipartPart(mpu, partId, 
partPayload));
+                        log.debug("UploadMultipartPart. container: {}, 
blobName: {}, partId: {}, mpu: {}",
+                            bucket, dataBlockKey, partId, mpu.id());
+
                         indexBuilder.addBlock(startEntry, partId, blockSize);
 
                         if (blockStream.getEndEntryId() != -1) {
@@ -193,17 +259,16 @@ static String indexBlockOffloadKey(long ledgerId, UUID 
uuid) {
                     dataObjectLength += blockSize;
                 }
 
-                s3client.completeMultipartUpload(new 
CompleteMultipartUploadRequest()
-                    .withBucketName(bucket).withKey(dataBlockKey)
-                    .withUploadId(dataBlockRes.getUploadId())
-                    .withPartETags(etags));
+                blobStore.completeMultipartUpload(mpu, parts);
+                mpu = null;
             } catch (Throwable t) {
                 try {
-                    s3client.abortMultipartUpload(
-                        new AbortMultipartUploadRequest(bucket, dataBlockKey, 
dataBlockRes.getUploadId()));
+                    if (mpu != null) {
+                        blobStore.abortMultipartUpload(mpu);
+                    }
                 } catch (Throwable throwable) {
                     log.error("Failed abortMultipartUpload in bucket - {} with 
key - {}, uploadId - {}.",
-                        bucket, dataBlockKey, dataBlockRes.getUploadId(), 
throwable);
+                        bucket, dataBlockKey, mpu.id(), throwable);
                 }
                 promise.completeExceptionally(t);
                 return;
@@ -213,19 +278,22 @@ static String indexBlockOffloadKey(long ledgerId, UUID 
uuid) {
             try (OffloadIndexBlock index = 
indexBuilder.withDataObjectLength(dataObjectLength).build();
                  OffloadIndexBlock.IndexInputStream indexStream = 
index.toStream()) {
                 // write the index block
-                ObjectMetadata metadata = new ObjectMetadata();
-                metadata.setContentLength(indexStream.getStreamSize());
-                addVersionInfo(metadata);
-
-                s3client.putObject(new PutObjectRequest(
-                    bucket,
-                    indexBlockKey,
-                    indexStream,
-                    metadata));
+                BlobBuilder blobBuilder = blobStore.blobBuilder(indexBlockKey);
+                addVersionInfo(blobBuilder);
+                Payload indexPayload = 
Payloads.newInputStreamPayload(indexStream);
+                
indexPayload.getContentMetadata().setContentLength((long)indexStream.getStreamSize());
+                
indexPayload.getContentMetadata().setContentType("application/octet-stream");
+
+                Blob blob = blobBuilder
+                    .payload(indexPayload)
+                    .contentLength((long)indexStream.getStreamSize())
+                    .build();
+
+                blobStore.putBlob(bucket, blob);
                 promise.complete(null);
             } catch (Throwable t) {
                 try {
-                    s3client.deleteObject(bucket, dataBlockKey);
+                    blobStore.removeBlob(bucket, dataBlockKey);
                 } catch (Throwable throwable) {
                     log.error("Failed deleteObject in bucket - {} with key - 
{}.",
                         bucket, dataBlockKey, throwable);
@@ -244,35 +312,31 @@ static String indexBlockOffloadKey(long ledgerId, UUID 
uuid) {
         String indexKey = indexBlockOffloadKey(ledgerId, uid);
         scheduler.chooseThread(ledgerId).submit(() -> {
                 try {
-                    
promise.complete(S3BackedReadHandleImpl.open(scheduler.chooseThread(ledgerId),
-                                                                 s3client,
+                    
promise.complete(BlobStoreBackedReadHandleImpl.open(scheduler.chooseThread(ledgerId),
+                                                                 blobStore,
                                                                  bucket, key, 
indexKey,
                                                                  VERSION_CHECK,
                                                                  ledgerId, 
readBufferSize));
                 } catch (Throwable t) {
+                    log.error("Failed readOffloaded: ", t);
                     promise.completeExceptionally(t);
                 }
             });
         return promise;
     }
 
-    private static void addVersionInfo(ObjectMetadata metadata) {
-        metadata.getUserMetadata().put(METADATA_FORMAT_VERSION_KEY, 
CURRENT_VERSION);
-        metadata.getUserMetadata().put(METADATA_SOFTWARE_VERSION_KEY,
-                                       
PulsarBrokerVersionStringUtils.getNormalizedVersionString());
-        metadata.getUserMetadata().put(METADATA_SOFTWARE_GITSHA_KEY, 
PulsarBrokerVersionStringUtils.getGitSha());
-    }
+
 
     @Override
     public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid) {
         CompletableFuture<Void> promise = new CompletableFuture<>();
         scheduler.chooseThread(ledgerId).submit(() -> {
             try {
-                s3client.deleteObjects(new DeleteObjectsRequest(bucket)
-                    .withKeys(dataBlockOffloadKey(ledgerId, uid), 
indexBlockOffloadKey(ledgerId, uid)));
+                blobStore.removeBlobs(bucket,
+                    ImmutableList.of(dataBlockOffloadKey(ledgerId, uid), 
indexBlockOffloadKey(ledgerId, uid)));
                 promise.complete(null);
             } catch (Throwable t) {
-                log.error("Failed delete s3 Object ", t);
+                log.error("Failed delete Blob", t);
                 promise.completeExceptionally(t);
             }
         });
@@ -281,7 +345,7 @@ private static void addVersionInfo(ObjectMetadata metadata) 
{
     }
 
     public interface VersionCheck {
-        void check(String key, ObjectMetadata md) throws IOException;
+        void check(String key, Blob blob) throws IOException;
     }
 }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/S3BackedInputStreamTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/BlobStoreBackedInputStreamTest.java
similarity index 61%
rename from 
pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/S3BackedInputStreamTest.java
rename to 
pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/BlobStoreBackedInputStreamTest.java
index 45bca52d59..bde4ddee9a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/S3BackedInputStreamTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/BlobStoreBackedInputStreamTest.java
@@ -18,29 +18,35 @@
  */
 package org.apache.pulsar.broker.offload;
 
-import static org.mockito.Matchers.anyObject;
-import static org.mockito.Mockito.spy;
+import static org.mockito.AdditionalAnswers.delegatesTo;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
-import com.amazonaws.services.s3.AmazonS3;
-import com.amazonaws.services.s3.model.ObjectMetadata;
-
-import java.io.InputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Random;
-
 import lombok.extern.slf4j.Slf4j;
-
-import org.apache.pulsar.broker.offload.impl.S3BackedInputStreamImpl;
-
+import org.apache.pulsar.broker.offload.impl.BlobStoreBackedInputStreamImpl;
+import org.jclouds.blobstore.BlobStore;
+import org.jclouds.blobstore.domain.Blob;
+import org.jclouds.blobstore.options.GetOptions;
+import org.jclouds.io.Payload;
+import org.jclouds.io.Payloads;
+import org.mockito.Matchers;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
 @Slf4j
-class S3BackedInputStreamTest extends S3TestBase {
+class BlobStoreBackedInputStreamTest extends BlobStoreTestBase {
+    private static final Logger log = 
LoggerFactory.getLogger(BlobStoreBackedInputStreamTest.class);
+
     class RandomInputStream extends InputStream {
         final Random r;
         int bytesRemaining;
@@ -85,16 +91,21 @@ private void assertStreamsMatchByBytes(InputStream a, 
InputStream b) throws Exce
 
     @Test
     public void testReadingFullObject() throws Exception {
-        String objectKey = "foobar";
+        String objectKey = "testReadingFull";
         int objectSize = 12345;
         RandomInputStream toWrite = new RandomInputStream(0, objectSize);
         RandomInputStream toCompare = new RandomInputStream(0, objectSize);
 
-        ObjectMetadata metadata = new ObjectMetadata();
-        metadata.setContentLength(objectSize);
-        s3client.putObject(BUCKET, objectKey, toWrite, metadata);
+        Payload payload = Payloads.newInputStreamPayload(toWrite);
+        payload.getContentMetadata().setContentLength((long)objectSize);
+        Blob blob = blobStore.blobBuilder(objectKey)
+            .payload(payload)
+            .contentLength((long)objectSize)
+            .build();
+        String ret = blobStore.putBlob(BUCKET, blob);
+        log.debug("put blob: {} in Bucket: {}, in blobStore, result: {}", 
objectKey, BUCKET, ret);
 
-        BackedInputStream toTest = new S3BackedInputStreamImpl(s3client, 
BUCKET, objectKey,
+        BackedInputStream toTest = new 
BlobStoreBackedInputStreamImpl(blobStore, BUCKET, objectKey,
                                                                  (key, md) -> 
{},
                                                                  objectSize, 
1000);
         assertStreamsMatch(toTest, toCompare);
@@ -102,24 +113,29 @@ public void testReadingFullObject() throws Exception {
 
     @Test
     public void testReadingFullObjectByBytes() throws Exception {
-        String objectKey = "foobar";
+        String objectKey = "testReadingFull2";
         int objectSize = 12345;
         RandomInputStream toWrite = new RandomInputStream(0, objectSize);
         RandomInputStream toCompare = new RandomInputStream(0, objectSize);
 
-        ObjectMetadata metadata = new ObjectMetadata();
-        metadata.setContentLength(objectSize);
-        s3client.putObject(BUCKET, objectKey, toWrite, metadata);
+        Payload payload = Payloads.newInputStreamPayload(toWrite);
+        payload.getContentMetadata().setContentLength((long)objectSize);
+        Blob blob = blobStore.blobBuilder(objectKey)
+            .payload(payload)
+            .contentLength((long)objectSize)
+            .build();
+        String ret = blobStore.putBlob(BUCKET, blob);
+        log.debug("put blob: {} in Bucket: {}, in blobStore, result: {}", 
objectKey, BUCKET, ret);
 
-        BackedInputStream toTest = new S3BackedInputStreamImpl(s3client, 
BUCKET, objectKey,
+        BackedInputStream toTest = new 
BlobStoreBackedInputStreamImpl(blobStore, BUCKET, objectKey,
                                                                  (key, md) -> 
{},
                                                                  objectSize, 
1000);
         assertStreamsMatchByBytes(toTest, toCompare);
     }
 
     @Test(expectedExceptions = IOException.class)
-    public void testErrorOnS3Read() throws Exception {
-        BackedInputStream toTest = new S3BackedInputStreamImpl(s3client, 
BUCKET, "doesn't exist",
+    public void testErrorOnRead() throws Exception {
+        BackedInputStream toTest = new 
BlobStoreBackedInputStreamImpl(blobStore, BUCKET, "doesn't exist",
                                                                  (key, md) -> 
{},
                                                                  1234, 1000);
         toTest.read();
@@ -128,7 +144,7 @@ public void testErrorOnS3Read() throws Exception {
 
     @Test
     public void testSeek() throws Exception {
-        String objectKey = "foobar";
+        String objectKey = "testSeek";
         int objectSize = 12345;
         RandomInputStream toWrite = new RandomInputStream(0, objectSize);
 
@@ -141,11 +157,16 @@ public void testSeek() throws Exception {
             seeks.put(seek, stream);
         }
 
-        ObjectMetadata metadata = new ObjectMetadata();
-        metadata.setContentLength(objectSize);
-        s3client.putObject(BUCKET, objectKey, toWrite, metadata);
+        Payload payload = Payloads.newInputStreamPayload(toWrite);
+        payload.getContentMetadata().setContentLength((long)objectSize);
+        Blob blob = blobStore.blobBuilder(objectKey)
+            .payload(payload)
+            .contentLength((long)objectSize)
+            .build();
+        String ret = blobStore.putBlob(BUCKET, blob);
+        log.debug("put blob: {} in Bucket: {}, in blobStore, result: {}", 
objectKey, BUCKET, ret);
 
-        BackedInputStream toTest = new S3BackedInputStreamImpl(s3client, 
BUCKET, objectKey,
+        BackedInputStream toTest = new 
BlobStoreBackedInputStreamImpl(blobStore, BUCKET, objectKey,
                                                                  (key, md) -> 
{},
                                                                  objectSize, 
1000);
         for (Map.Entry<Integer, InputStream> e : seeks.entrySet()) {
@@ -156,16 +177,23 @@ public void testSeek() throws Exception {
 
     @Test
     public void testSeekWithinCurrent() throws Exception {
-        String objectKey = "foobar";
+        String objectKey = "testSeekWithinCurrent";
         int objectSize = 12345;
         RandomInputStream toWrite = new RandomInputStream(0, objectSize);
 
-        ObjectMetadata metadata = new ObjectMetadata();
-        metadata.setContentLength(objectSize);
-        s3client.putObject(BUCKET, objectKey, toWrite, metadata);
+        Payload payload = Payloads.newInputStreamPayload(toWrite);
+        payload.getContentMetadata().setContentLength((long)objectSize);
+        Blob blob = blobStore.blobBuilder(objectKey)
+            .payload(payload)
+            .contentLength((long)objectSize)
+            .build();
+        String ret = blobStore.putBlob(BUCKET, blob);
+        log.debug("put blob: {} in Bucket: {}, in blobStore, result: {}", 
objectKey, BUCKET, ret);
+
+        //BlobStore spiedBlobStore = spy(blobStore);
+        BlobStore spiedBlobStore = mock(BlobStore.class, 
delegatesTo(blobStore));
 
-        AmazonS3 spiedClient = spy(s3client);
-        BackedInputStream toTest = new S3BackedInputStreamImpl(spiedClient, 
BUCKET, objectKey,
+        BackedInputStream toTest = new 
BlobStoreBackedInputStreamImpl(spiedBlobStore, BUCKET, objectKey,
                                                                  (key, md) -> 
{},
                                                                  objectSize, 
1000);
 
@@ -193,20 +221,26 @@ public void testSeekWithinCurrent() throws Exception {
             Assert.assertEquals(thirdSeek.read(), toTest.read());
         }
 
-        verify(spiedClient, times(1)).getObject(anyObject());
+        verify(spiedBlobStore, times(1))
+            .getBlob(Mockito.eq(BUCKET), Mockito.eq(objectKey), 
Matchers.<GetOptions>anyObject());
     }
 
     @Test
     public void testSeekForward() throws Exception {
-        String objectKey = "foobar";
+        String objectKey = "testSeekForward";
         int objectSize = 12345;
         RandomInputStream toWrite = new RandomInputStream(0, objectSize);
 
-        ObjectMetadata metadata = new ObjectMetadata();
-        metadata.setContentLength(objectSize);
-        s3client.putObject(BUCKET, objectKey, toWrite, metadata);
+        Payload payload = Payloads.newInputStreamPayload(toWrite);
+        payload.getContentMetadata().setContentLength((long)objectSize);
+        Blob blob = blobStore.blobBuilder(objectKey)
+            .payload(payload)
+            .contentLength((long)objectSize)
+            .build();
+        String ret = blobStore.putBlob(BUCKET, blob);
+        log.debug("put blob: {} in Bucket: {}, in blobStore, result: {}", 
objectKey, BUCKET, ret);
 
-        BackedInputStream toTest = new S3BackedInputStreamImpl(s3client, 
BUCKET, objectKey,
+        BackedInputStream toTest = new 
BlobStoreBackedInputStreamImpl(blobStore, BUCKET, objectKey,
                                                                  (key, md) -> 
{},
                                                                  objectSize, 
1000);
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/S3TestBase.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/BlobStoreTestBase.java
similarity index 53%
rename from 
pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/S3TestBase.java
rename to 
pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/BlobStoreTestBase.java
index f2ea6c4b33..d1474f95fa 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/S3TestBase.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/BlobStoreTestBase.java
@@ -18,27 +18,39 @@
  */
 package org.apache.pulsar.broker.offload;
 
-import com.amazonaws.services.s3.AmazonS3;
-import com.amazonaws.services.s3.AmazonS3ClientBuilder;
-
+import org.jclouds.ContextBuilder;
+import org.jclouds.blobstore.BlobStore;
+import org.jclouds.blobstore.BlobStoreContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 
-public class S3TestBase {
+public class BlobStoreTestBase {
+    private static final Logger log = 
LoggerFactory.getLogger(BlobStoreTestBase.class);
+
     public final static String BUCKET = "pulsar-unittest";
 
-    protected AmazonS3 s3client = null;
+    protected BlobStoreContext context = null;
+    protected BlobStore blobStore = null;
 
     @BeforeMethod
     public void start() throws Exception {
-        if (Boolean.parseBoolean(System.getProperty("testRealAWS", "false"))) {
-            // To use this, ~/.aws must be configured with credentials and a 
default region
-            s3client = AmazonS3ClientBuilder.standard().build();
-        } else {
-            s3client = new S3Mock();
+        context = 
ContextBuilder.newBuilder("transient").build(BlobStoreContext.class);
+        blobStore = context.getBlobStore();
+        boolean create = blobStore.createContainerInLocation(null, BUCKET);
+
+        log.debug("TestBase Create Bucket: {}, in blobStore, result: {}", 
BUCKET, create);
+    }
+
+    @AfterMethod
+    public void tearDown() {
+        if (blobStore != null) {
+            blobStore.deleteContainer(BUCKET);
         }
 
-        if (!s3client.doesBucketExistV2(BUCKET)) {
-            s3client.createBucket(BUCKET);
+        if (context != null) {
+            context.close();
         }
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/S3Mock.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/S3Mock.java
deleted file mode 100644
index 4bfc1401a8..0000000000
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/S3Mock.java
+++ /dev/null
@@ -1,334 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker.offload;
-
-import com.amazonaws.services.s3.AbstractAmazonS3;
-import com.amazonaws.services.s3.model.AmazonS3Exception;
-import com.amazonaws.services.s3.model.Bucket;
-import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
-import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
-import com.amazonaws.services.s3.model.CopyObjectRequest;
-import com.amazonaws.services.s3.model.CopyObjectResult;
-import com.amazonaws.services.s3.model.DeleteObjectRequest;
-import com.amazonaws.services.s3.model.DeleteObjectsRequest;
-import com.amazonaws.services.s3.model.DeleteObjectsResult;
-import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
-import com.amazonaws.services.s3.model.GetObjectRequest;
-import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
-import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
-import com.amazonaws.services.s3.model.ObjectMetadata;
-import com.amazonaws.services.s3.model.PartETag;
-import com.amazonaws.services.s3.model.PutObjectRequest;
-import com.amazonaws.services.s3.model.PutObjectResult;
-import com.amazonaws.services.s3.model.S3Object;
-import com.amazonaws.services.s3.model.UploadPartRequest;
-import com.amazonaws.services.s3.model.UploadPartResult;
-
-import com.google.common.collect.ComparisonChain;
-
-import java.io.ByteArrayInputStream;
-import java.io.InputStream;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.stream.Collectors;
-
-/**
- * Minimal mock for amazon client.
- * If making any changes, validate they behave the same as S3 by running all 
S3 tests with -DtestRealAWS=true
- */
-class S3Mock extends AbstractAmazonS3 {
-    @Override
-    public boolean doesBucketExistV2(String bucketName) {
-        return buckets.containsKey(bucketName);
-    }
-
-    @Override
-    public boolean doesObjectExist(String bucketName, String objectName) {
-        return buckets.containsKey(bucketName) && 
getBucket(bucketName).hasObject(objectName);
-    }
-
-    @Override
-    public Bucket createBucket(String bucketName) {
-        return buckets.computeIfAbsent(bucketName, (k) -> new MockBucket(k));
-    }
-
-    private MockBucket getBucket(String bucketName) throws AmazonS3Exception {
-        MockBucket bucket = buckets.get(bucketName);
-        if (bucket != null) {
-            return bucket;
-        } else {
-            throw new AmazonS3Exception("NoSuchBucket: Bucket doesn't exist");
-        }
-    }
-
-    @Override
-    public PutObjectResult putObject(PutObjectRequest putObjectRequest)
-            throws AmazonS3Exception {
-        return 
getBucket(putObjectRequest.getBucketName()).putObject(putObjectRequest);
-    }
-
-    @Override
-    public S3Object getObject(GetObjectRequest getObjectRequest) {
-        return 
getBucket(getObjectRequest.getBucketName()).getObject(getObjectRequest);
-    }
-
-    @Override
-    public ObjectMetadata getObjectMetadata(GetObjectMetadataRequest 
getObjectMetadataRequest)
-            throws AmazonS3Exception {
-        return 
getBucket(getObjectMetadataRequest.getBucketName()).getObjectMetadata(getObjectMetadataRequest);
-    }
-
-    @Override
-    public void deleteObject(DeleteObjectRequest deleteObjectRequest)
-            throws AmazonS3Exception {
-        
getBucket(deleteObjectRequest.getBucketName()).deleteObject(deleteObjectRequest.getKey());
-    }
-
-    @Override
-    public DeleteObjectsResult deleteObjects(DeleteObjectsRequest 
deleteObjectsRequest)
-            throws AmazonS3Exception {
-        List<DeleteObjectsResult.DeletedObject> results = 
deleteObjectsRequest.getKeys().stream().map((k) -> {
-                
getBucket(deleteObjectsRequest.getBucketName()).deleteObject(k.getKey());
-                DeleteObjectsResult.DeletedObject res = new 
DeleteObjectsResult.DeletedObject();
-                res.setKey(k.getKey());
-                return res;
-            }).collect(Collectors.toList());
-        return new DeleteObjectsResult(results);
-    }
-
-    @Override
-    public CopyObjectResult copyObject(CopyObjectRequest copyObjectRequest)
-            throws AmazonS3Exception {
-        S3Object from = getObject(new 
GetObjectRequest(copyObjectRequest.getSourceBucketName(),
-                                                       
copyObjectRequest.getSourceKey()));
-        ObjectMetadata newMetadata = copyObjectRequest.getNewObjectMetadata();
-        if (newMetadata == null) {
-            newMetadata = from.getObjectMetadata();
-        }
-        
newMetadata.setContentLength(from.getObjectMetadata().getContentLength());
-        putObject(new 
PutObjectRequest(copyObjectRequest.getDestinationBucketName(),
-                                       copyObjectRequest.getDestinationKey(),
-                                       from.getObjectContent(),
-                                       newMetadata));
-        return new CopyObjectResult();
-    }
-
-    @Override
-    public InitiateMultipartUploadResult 
initiateMultipartUpload(InitiateMultipartUploadRequest request)
-            throws AmazonS3Exception {
-        return getBucket(request.getBucketName()).initMultipart(request);
-    }
-
-    @Override
-    public UploadPartResult uploadPart(UploadPartRequest request)
-            throws AmazonS3Exception {
-        return getBucket(request.getBucketName()).uploadPart(request);
-    }
-
-    @Override
-    public CompleteMultipartUploadResult 
completeMultipartUpload(CompleteMultipartUploadRequest request)
-            throws AmazonS3Exception {
-        return getBucket(request.getBucketName()).completeMultipart(request);
-    }
-
-    ConcurrentHashMap<String, MockBucket> buckets = new ConcurrentHashMap<>();
-
-    static class MockBucket extends Bucket {
-        ConcurrentHashMap<String, MockObject> objects = new 
ConcurrentHashMap<>();
-        ConcurrentHashMap<String, MockMultipart> inprogressMultipart = new 
ConcurrentHashMap<>();
-
-        MockBucket(String name) {
-            super(name);
-        }
-
-        boolean hasObject(String key) {
-            return objects.containsKey(key);
-        }
-
-        PutObjectResult putObject(PutObjectRequest putObjectRequest) throws 
AmazonS3Exception {
-            byte[] bytes = streamToBytes(putObjectRequest.getInputStream(),
-                                         
(int)putObjectRequest.getMetadata().getContentLength());
-            objects.put(putObjectRequest.getKey(),
-                        new MockObject(putObjectRequest.getMetadata(), bytes));
-            return new PutObjectResult();
-        }
-
-        S3Object getObject(GetObjectRequest getObjectRequest) throws 
AmazonS3Exception {
-            MockObject obj = objects.get(getObjectRequest.getKey());
-            if (obj == null) {
-                throw new AmazonS3Exception("Object doesn't exist");
-            }
-
-            S3Object s3obj = new S3Object();
-            s3obj.setBucketName(getObjectRequest.getBucketName());
-            s3obj.setKey(getObjectRequest.getKey());
-
-            if (getObjectRequest.getRange() != null) {
-                long[] range = getObjectRequest.getRange();
-                int size = (int)(range[1] - range[0] + 1);
-                ObjectMetadata metadata = obj.metadata.clone();
-                metadata.setHeader("Content-Range",
-                                   String.format("bytes %d-%d/%d",
-                                                 range[0], range[1], size));
-                s3obj.setObjectMetadata(metadata);
-                s3obj.setObjectContent(new ByteArrayInputStream(obj.data, 
(int)range[0], size));
-                return s3obj;
-            } else {
-                s3obj.setObjectMetadata(obj.metadata);
-                s3obj.setObjectContent(new ByteArrayInputStream(obj.data));
-                return s3obj;
-            }
-        }
-
-        void deleteObject(String key) {
-            objects.remove(key);
-        }
-
-        ObjectMetadata getObjectMetadata(GetObjectMetadataRequest 
getObjectMetadataRequest)
-                throws AmazonS3Exception {
-            MockObject obj = objects.get(getObjectMetadataRequest.getKey());
-            if (obj == null) {
-                throw new AmazonS3Exception("Object doesn't exist");
-            }
-            return obj.metadata;
-        }
-
-        InitiateMultipartUploadResult 
initMultipart(InitiateMultipartUploadRequest request)
-                throws AmazonS3Exception {
-            String uploadId = UUID.randomUUID().toString();
-            inprogressMultipart.put(uploadId, new 
MockMultipart(request.getKey(),
-                                                                
request.getObjectMetadata()));
-            InitiateMultipartUploadResult result = new 
InitiateMultipartUploadResult();
-            result.setBucketName(request.getBucketName());
-            result.setKey(request.getKey());
-            result.setUploadId(uploadId);
-            return result;
-        }
-
-        MockMultipart getMultipart(String uploadId, String key) throws 
AmazonS3Exception {
-            MockMultipart multi = inprogressMultipart.get(uploadId);
-            if (multi == null) {
-                throw new AmazonS3Exception("No such upload " + uploadId);
-            }
-            if (!multi.key.equals(key)) {
-                throw new AmazonS3Exception("Wrong key for upload " + uploadId
-                                            + ", expected " + key
-                                            + ", got " + multi.key);
-            }
-            return multi;
-        }
-
-        UploadPartResult uploadPart(UploadPartRequest request)
-                throws AmazonS3Exception {
-            MockMultipart multi = getMultipart(request.getUploadId(), 
request.getKey());
-            byte[] bytes = streamToBytes(request.getInputStream(), 
(int)request.getPartSize());
-            UploadPartResult result = new UploadPartResult();
-            result.setPartNumber(request.getPartNumber());
-            result.setETag(multi.addPart(request.getPartNumber(), bytes));
-            return result;
-        }
-
-        CompleteMultipartUploadResult 
completeMultipart(CompleteMultipartUploadRequest request)
-                throws AmazonS3Exception {
-            MockMultipart multi = getMultipart(request.getUploadId(), 
request.getKey());
-            inprogressMultipart.remove(request.getUploadId());
-            objects.put(request.getKey(), 
multi.complete(request.getPartETags()));
-            CompleteMultipartUploadResult result = new 
CompleteMultipartUploadResult();
-            result.setBucketName(request.getBucketName());
-            result.setKey(request.getKey());
-            return result;
-        }
-    }
-
-    private static byte[] streamToBytes(InputStream data, int length) throws 
AmazonS3Exception {
-        byte[] bytes = new byte[length];
-        try {
-            for (int i = 0; i < length; i++) {
-                bytes[i] = (byte)data.read();
-            }
-        } catch (IOException ioe) {
-            throw new AmazonS3Exception("Error loading data", ioe);
-        }
-        return bytes;
-    }
-
-    static class MockObject {
-        final ObjectMetadata metadata;
-        final byte[] data;
-        final Map<Integer, long[]> partRanges;
-
-
-        MockObject(ObjectMetadata metadata, byte[] data) {
-            this(metadata, data, null);
-        }
-
-        MockObject(ObjectMetadata metadata, byte[] data, Map<Integer, long[]> 
partRanges) {
-            this.metadata = metadata;
-            this.data = data;
-            this.partRanges = partRanges;
-        }
-
-    }
-
-    static class MockMultipart {
-        final String key;
-        final ObjectMetadata metadata;
-        final ConcurrentSkipListMap<PartETag, byte[]> parts = new 
ConcurrentSkipListMap<>(
-                (etag1, etag2) -> 
ComparisonChain.start().compare(etag1.getPartNumber(),
-                                                                  
etag2.getPartNumber()).result());
-
-        MockMultipart(String key, ObjectMetadata metadata) {
-            this.key = key;
-            this.metadata = metadata;
-        }
-
-        String addPart(int partNumber, byte[] bytes) {
-            String etag = UUID.randomUUID().toString();
-            parts.put(new PartETag(partNumber, etag), bytes);
-            return etag;
-        }
-
-        MockObject complete(List<PartETag> tags) throws AmazonS3Exception {
-            if (parts.size() != tags.size()
-                || !parts.keySet().containsAll(tags)) {
-                throw new AmazonS3Exception("Tags don't match uploaded parts");
-            }
-
-            int totalSize = parts.values().stream().map(v -> 
v.length).reduce(0, (acc, v) -> acc + v);
-            byte[] full = new byte[totalSize];
-
-            Map<Integer, long[]> partRanges = new HashMap<>();
-            int start = 0;
-            for (Map.Entry<PartETag, byte[]> e : parts.entrySet()) {
-                int partLength = e.getValue().length;
-                System.arraycopy(e.getValue(), 0, full, start, partLength);
-                partRanges.put(e.getKey().getPartNumber(),
-                               new long[] { start, start + partLength - 1 });
-                start += partLength;
-            }
-            metadata.setContentLength(totalSize);
-            return new MockObject(metadata, full, partRanges);
-        }
-    }
-}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/impl/S3ManagedLedgerOffloaderTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/impl/BlobStoreManagedLedgerOffloaderTest.java
similarity index 71%
rename from 
pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/impl/S3ManagedLedgerOffloaderTest.java
rename to 
pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/impl/BlobStoreManagedLedgerOffloaderTest.java
index 7b9d9a2fa9..19463d2fc0 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/impl/S3ManagedLedgerOffloaderTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/impl/BlobStoreManagedLedgerOffloaderTest.java
@@ -18,24 +18,23 @@
  */
 package org.apache.pulsar.broker.offload.impl;
 
-import static 
org.apache.pulsar.broker.offload.impl.S3ManagedLedgerOffloader.dataBlockOffloadKey;
-import static 
org.apache.pulsar.broker.offload.impl.S3ManagedLedgerOffloader.indexBlockOffloadKey;
+import static 
org.apache.pulsar.broker.offload.impl.BlobStoreManagedLedgerOffloader.dataBlockOffloadKey;
+import static 
org.apache.pulsar.broker.offload.impl.BlobStoreManagedLedgerOffloader.indexBlockOffloadKey;
+import static org.mockito.AdditionalAnswers.delegatesTo;
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
 import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.mock;
 
-import com.amazonaws.AmazonServiceException;
-import com.amazonaws.services.s3.AmazonS3;
-import com.amazonaws.services.s3.model.CopyObjectRequest;
-import com.amazonaws.services.s3.model.ObjectMetadata;
-import java.lang.reflect.Method;
 import java.io.IOException;
+import java.lang.reflect.Method;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.Map;
 import java.util.Random;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
-
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
@@ -51,21 +50,25 @@
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
-import org.apache.pulsar.broker.offload.S3TestBase;
-import org.apache.pulsar.broker.offload.impl.DataBlockHeaderImpl;
-import org.apache.pulsar.broker.offload.impl.S3ManagedLedgerOffloader;
+import org.apache.pulsar.broker.offload.BlobStoreTestBase;
+import org.jclouds.blobstore.BlobStore;
+import org.jclouds.blobstore.options.CopyOptions;
 import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
 @Slf4j
-class S3ManagedLedgerOffloaderTest extends S3TestBase {
+class BlobStoreManagedLedgerOffloaderTest extends BlobStoreTestBase {
+    private static final Logger log = 
LoggerFactory.getLogger(BlobStoreManagedLedgerOffloaderTest.class);
+
     private static final int DEFAULT_BLOCK_SIZE = 5*1024*1024;
     private static final int DEFAULT_READ_BUFFER_SIZE = 1*1024*1024;
     final OrderedScheduler scheduler;
     final MockBookKeeper bk;
 
-    S3ManagedLedgerOffloaderTest() throws Exception {
+    BlobStoreManagedLedgerOffloaderTest() throws Exception {
         scheduler = 
OrderedScheduler.newSchedulerBuilder().numThreads(1).name("offloader").build();
         bk = new 
MockBookKeeper(MockedPulsarServiceBaseTest.createMockZooKeeper());
     }
@@ -114,31 +117,32 @@ private ReadHandle buildReadHandle(int maxBlockSize, int 
blockCount) throws Exce
 
     @Test
     public void testHappyCase() throws Exception {
-        LedgerOffloader offloader = new S3ManagedLedgerOffloader(s3client, 
BUCKET, scheduler,
+        LedgerOffloader offloader = new 
BlobStoreManagedLedgerOffloader(blobStore, BUCKET, scheduler,
                 DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE);
         offloader.offload(buildReadHandle(), UUID.randomUUID(), new 
HashMap<>()).get();
     }
 
     @Test
     public void testBucketDoesNotExist() throws Exception {
-        LedgerOffloader offloader = new S3ManagedLedgerOffloader(s3client, 
"no-bucket", scheduler,
+        LedgerOffloader offloader = new 
BlobStoreManagedLedgerOffloader(blobStore, "no-bucket", scheduler,
                                                                  
DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE);
         try {
             offloader.offload(buildReadHandle(), UUID.randomUUID(), new 
HashMap<>()).get();
             Assert.fail("Shouldn't be able to add to bucket");
         } catch (ExecutionException e) {
-            Assert.assertTrue(e.getMessage().contains("NoSuchBucket"));
+            log.error("Exception: ", e.getMessage());
+            Assert.assertTrue(e.getMessage().contains("not found"));
         }
     }
 
     @Test
     public void testNoRegionConfigured() throws Exception {
         ServiceConfiguration conf = new ServiceConfiguration();
-        
conf.setManagedLedgerOffloadDriver(S3ManagedLedgerOffloader.DRIVER_NAME);
+        conf.setManagedLedgerOffloadDriver("s3");
         conf.setS3ManagedLedgerOffloadBucket(BUCKET);
 
         try {
-            S3ManagedLedgerOffloader.create(conf, scheduler);
+            BlobStoreManagedLedgerOffloader.create(conf, scheduler);
             Assert.fail("Should have thrown exception");
         } catch (PulsarServerException pse) {
             // correct
@@ -148,11 +152,11 @@ public void testNoRegionConfigured() throws Exception {
     @Test
     public void testNoBucketConfigured() throws Exception {
         ServiceConfiguration conf = new ServiceConfiguration();
-        
conf.setManagedLedgerOffloadDriver(S3ManagedLedgerOffloader.DRIVER_NAME);
+        conf.setManagedLedgerOffloadDriver("s3");
         conf.setS3ManagedLedgerOffloadRegion("eu-west-1");
 
         try {
-            S3ManagedLedgerOffloader.create(conf, scheduler);
+            BlobStoreManagedLedgerOffloader.create(conf, scheduler);
             Assert.fail("Should have thrown exception");
         } catch (PulsarServerException pse) {
             // correct
@@ -162,13 +166,13 @@ public void testNoBucketConfigured() throws Exception {
     @Test
     public void testSmallBlockSizeConfigured() throws Exception {
         ServiceConfiguration conf = new ServiceConfiguration();
-        
conf.setManagedLedgerOffloadDriver(S3ManagedLedgerOffloader.DRIVER_NAME);
+        conf.setManagedLedgerOffloadDriver("s3");
         conf.setS3ManagedLedgerOffloadRegion("eu-west-1");
         conf.setS3ManagedLedgerOffloadBucket(BUCKET);
         conf.setS3ManagedLedgerOffloadMaxBlockSizeInBytes(1024);
 
         try {
-            S3ManagedLedgerOffloader.create(conf, scheduler);
+            BlobStoreManagedLedgerOffloader.create(conf, scheduler);
             Assert.fail("Should have thrown exception");
         } catch (PulsarServerException pse) {
             // correct
@@ -178,7 +182,7 @@ public void testSmallBlockSizeConfigured() throws Exception 
{
     @Test
     public void testOffloadAndRead() throws Exception {
         ReadHandle toWrite = buildReadHandle(DEFAULT_BLOCK_SIZE, 3);
-        LedgerOffloader offloader = new S3ManagedLedgerOffloader(s3client, 
BUCKET, scheduler,
+        LedgerOffloader offloader = new 
BlobStoreManagedLedgerOffloader(blobStore, BUCKET, scheduler,
                                                                  
DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE);
         UUID uuid = UUID.randomUUID();
         offloader.offload(toWrite, uuid, new HashMap<>()).get();
@@ -213,21 +217,22 @@ public void testOffloadFailInitDataBlockUpload() throws 
Exception {
 
         // mock throw exception when initiateMultipartUpload
         try {
-            AmazonS3 mockS3client = Mockito.spy(s3client);
+
+            BlobStore spiedBlobStore = mock(BlobStore.class, 
delegatesTo(blobStore));
             Mockito
-                .doThrow(new AmazonServiceException(failureString))
-                .when(mockS3client).initiateMultipartUpload(any());
+                .doThrow(new RuntimeException(failureString))
+                .when(spiedBlobStore).initiateMultipartUpload(any(), any(), 
any());
 
-            LedgerOffloader offloader = new 
S3ManagedLedgerOffloader(mockS3client, BUCKET, scheduler,
+            LedgerOffloader offloader = new 
BlobStoreManagedLedgerOffloader(spiedBlobStore, BUCKET, scheduler,
                                                                      
DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE);
             offloader.offload(readHandle, uuid, new HashMap<>()).get();
             Assert.fail("Should throw exception when initiateMultipartUpload");
         } catch (Exception e) {
             // excepted
-            Assert.assertTrue(e.getCause() instanceof AmazonServiceException);
+            Assert.assertTrue(e.getCause() instanceof RuntimeException);
             
Assert.assertTrue(e.getCause().getMessage().contains(failureString));
-            Assert.assertFalse(s3client.doesObjectExist(BUCKET, 
dataBlockOffloadKey(readHandle.getId(), uuid)));
-            Assert.assertFalse(s3client.doesObjectExist(BUCKET, 
indexBlockOffloadKey(readHandle.getId(), uuid)));
+            Assert.assertFalse(blobStore.blobExists(BUCKET, 
dataBlockOffloadKey(readHandle.getId(), uuid)));
+            Assert.assertFalse(blobStore.blobExists(BUCKET, 
indexBlockOffloadKey(readHandle.getId(), uuid)));
         }
     }
 
@@ -239,22 +244,21 @@ public void testOffloadFailDataBlockPartUpload() throws 
Exception {
 
         // mock throw exception when uploadPart
         try {
-            AmazonS3 mockS3client = Mockito.spy(s3client);
+            BlobStore spiedBlobStore = mock(BlobStore.class, 
delegatesTo(blobStore));
             Mockito
-                .doThrow(new AmazonServiceException("fail 
DataBlockPartUpload"))
-                .when(mockS3client).uploadPart(any());
-            Mockito.doNothing().when(mockS3client).abortMultipartUpload(any());
+                .doThrow(new RuntimeException(failureString))
+                .when(spiedBlobStore).uploadMultipartPart(any(), anyInt(), 
any());
 
-            LedgerOffloader offloader = new 
S3ManagedLedgerOffloader(mockS3client, BUCKET, scheduler,
-                                                                     
DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE);
+            LedgerOffloader offloader = new 
BlobStoreManagedLedgerOffloader(spiedBlobStore, BUCKET, scheduler,
+                DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE);
             offloader.offload(readHandle, uuid, new HashMap<>()).get();
             Assert.fail("Should throw exception for when uploadPart");
         } catch (Exception e) {
             // excepted
-            Assert.assertTrue(e.getCause() instanceof AmazonServiceException);
+            Assert.assertTrue(e.getCause() instanceof RuntimeException);
             
Assert.assertTrue(e.getCause().getMessage().contains(failureString));
-            Assert.assertFalse(s3client.doesObjectExist(BUCKET, 
dataBlockOffloadKey(readHandle.getId(), uuid)));
-            Assert.assertFalse(s3client.doesObjectExist(BUCKET, 
indexBlockOffloadKey(readHandle.getId(), uuid)));
+            Assert.assertFalse(blobStore.blobExists(BUCKET, 
dataBlockOffloadKey(readHandle.getId(), uuid)));
+            Assert.assertFalse(blobStore.blobExists(BUCKET, 
indexBlockOffloadKey(readHandle.getId(), uuid)));
         }
     }
 
@@ -266,22 +270,25 @@ public void testOffloadFailDataBlockUploadComplete() 
throws Exception {
 
         // mock throw exception when completeMultipartUpload
         try {
-            AmazonS3 mockS3client = Mockito.spy(s3client);
+            BlobStore spiedBlobStore = mock(BlobStore.class, 
delegatesTo(blobStore));
+            Mockito
+                .doThrow(new RuntimeException(failureString))
+                .when(spiedBlobStore).completeMultipartUpload(any(), any());
             Mockito
-                .doThrow(new AmazonServiceException(failureString))
-                .when(mockS3client).completeMultipartUpload(any());
-            Mockito.doNothing().when(mockS3client).abortMultipartUpload(any());
+                .doNothing()
+                .when(spiedBlobStore).abortMultipartUpload(any());
 
-            LedgerOffloader offloader = new 
S3ManagedLedgerOffloader(mockS3client, BUCKET, scheduler,
-                                                                     
DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE);
+            LedgerOffloader offloader = new 
BlobStoreManagedLedgerOffloader(spiedBlobStore, BUCKET, scheduler,
+                DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE);
             offloader.offload(readHandle, uuid, new HashMap<>()).get();
+
             Assert.fail("Should throw exception for when 
completeMultipartUpload");
         } catch (Exception e) {
             // excepted
-            Assert.assertTrue(e.getCause() instanceof AmazonServiceException);
+            Assert.assertTrue(e.getCause() instanceof RuntimeException);
             
Assert.assertTrue(e.getCause().getMessage().contains(failureString));
-            Assert.assertFalse(s3client.doesObjectExist(BUCKET, 
dataBlockOffloadKey(readHandle.getId(), uuid)));
-            Assert.assertFalse(s3client.doesObjectExist(BUCKET, 
indexBlockOffloadKey(readHandle.getId(), uuid)));
+            Assert.assertFalse(blobStore.blobExists(BUCKET, 
dataBlockOffloadKey(readHandle.getId(), uuid)));
+            Assert.assertFalse(blobStore.blobExists(BUCKET, 
indexBlockOffloadKey(readHandle.getId(), uuid)));
         }
     }
 
@@ -293,21 +300,22 @@ public void testOffloadFailPutIndexBlock() throws 
Exception {
 
         // mock throw exception when putObject
         try {
-            AmazonS3 mockS3client = Mockito.spy(s3client);
+            BlobStore spiedBlobStore = mock(BlobStore.class, 
delegatesTo(blobStore));
             Mockito
-                .doThrow(new AmazonServiceException(failureString))
-                .when(mockS3client).putObject(any());
+                .doThrow(new RuntimeException(failureString))
+                .when(spiedBlobStore).putBlob(any(), any());
 
-            LedgerOffloader offloader = new 
S3ManagedLedgerOffloader(mockS3client, BUCKET, scheduler,
-                                                                     
DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE);
+            LedgerOffloader offloader = new 
BlobStoreManagedLedgerOffloader(spiedBlobStore, BUCKET, scheduler,
+                DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE);
             offloader.offload(readHandle, uuid, new HashMap<>()).get();
+
             Assert.fail("Should throw exception for when putObject for index 
block");
         } catch (Exception e) {
             // excepted
-            Assert.assertTrue(e.getCause() instanceof AmazonServiceException);
+            Assert.assertTrue(e.getCause() instanceof RuntimeException);
             
Assert.assertTrue(e.getCause().getMessage().contains(failureString));
-            Assert.assertFalse(s3client.doesObjectExist(BUCKET, 
dataBlockOffloadKey(readHandle.getId(), uuid)));
-            Assert.assertFalse(s3client.doesObjectExist(BUCKET, 
indexBlockOffloadKey(readHandle.getId(), uuid)));
+            Assert.assertFalse(blobStore.blobExists(BUCKET, 
dataBlockOffloadKey(readHandle.getId(), uuid)));
+            Assert.assertFalse(blobStore.blobExists(BUCKET, 
indexBlockOffloadKey(readHandle.getId(), uuid)));
         }
     }
 
@@ -328,7 +336,7 @@ public void testOffloadReadRandomAccess() throws Exception {
             randomAccesses[i][1] = second;
         }
 
-        LedgerOffloader offloader = new S3ManagedLedgerOffloader(s3client, 
BUCKET, scheduler,
+        LedgerOffloader offloader = new 
BlobStoreManagedLedgerOffloader(blobStore, BUCKET, scheduler,
                                                                  
DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE);
         UUID uuid = UUID.randomUUID();
         offloader.offload(toWrite, uuid, new HashMap<>()).get();
@@ -360,7 +368,7 @@ public void testOffloadReadRandomAccess() throws Exception {
     @Test
     public void testOffloadReadInvalidEntryIds() throws Exception {
         ReadHandle toWrite = buildReadHandle(DEFAULT_BLOCK_SIZE, 1);
-        LedgerOffloader offloader = new S3ManagedLedgerOffloader(s3client, 
BUCKET, scheduler,
+        LedgerOffloader offloader = new 
BlobStoreManagedLedgerOffloader(blobStore, BUCKET, scheduler,
                                                                  
DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE);
         UUID uuid = UUID.randomUUID();
         offloader.offload(toWrite, uuid, new HashMap<>()).get();
@@ -385,46 +393,47 @@ public void testOffloadReadInvalidEntryIds() throws 
Exception {
     public void testDeleteOffloaded() throws Exception {
         ReadHandle readHandle = buildReadHandle(DEFAULT_BLOCK_SIZE, 1);
         UUID uuid = UUID.randomUUID();
-        LedgerOffloader offloader = new S3ManagedLedgerOffloader(s3client, 
BUCKET, scheduler,
+        LedgerOffloader offloader = new 
BlobStoreManagedLedgerOffloader(blobStore, BUCKET, scheduler,
                                                                  
DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE);
 
         // verify object exist after offload
         offloader.offload(readHandle, uuid, new HashMap<>()).get();
-        Assert.assertTrue(s3client.doesObjectExist(BUCKET, 
dataBlockOffloadKey(readHandle.getId(), uuid)));
-        Assert.assertTrue(s3client.doesObjectExist(BUCKET, 
indexBlockOffloadKey(readHandle.getId(), uuid)));
+        Assert.assertTrue(blobStore.blobExists(BUCKET, 
dataBlockOffloadKey(readHandle.getId(), uuid)));
+        Assert.assertTrue(blobStore.blobExists(BUCKET, 
indexBlockOffloadKey(readHandle.getId(), uuid)));
 
         // verify object deleted after delete
         offloader.deleteOffloaded(readHandle.getId(), uuid).get();
-        Assert.assertFalse(s3client.doesObjectExist(BUCKET, 
dataBlockOffloadKey(readHandle.getId(), uuid)));
-        Assert.assertFalse(s3client.doesObjectExist(BUCKET, 
indexBlockOffloadKey(readHandle.getId(), uuid)));
+        Assert.assertFalse(blobStore.blobExists(BUCKET, 
dataBlockOffloadKey(readHandle.getId(), uuid)));
+        Assert.assertFalse(blobStore.blobExists(BUCKET, 
indexBlockOffloadKey(readHandle.getId(), uuid)));
     }
 
     @Test
     public void testDeleteOffloadedFail() throws Exception {
+        String failureString = "fail deleteOffloaded";
         ReadHandle readHandle = buildReadHandle(DEFAULT_BLOCK_SIZE, 1);
         UUID uuid = UUID.randomUUID();
-        LedgerOffloader offloader = new S3ManagedLedgerOffloader(s3client, 
BUCKET, scheduler,
-                                                                 
DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE);
-        String failureString = "fail deleteOffloaded";
-        AmazonS3 mockS3client = Mockito.spy(s3client);
+        BlobStore spiedBlobStore = mock(BlobStore.class, 
delegatesTo(blobStore));
+
         Mockito
-            .doThrow(new AmazonServiceException(failureString))
-            .when(mockS3client).deleteObjects(any());
+            .doThrow(new RuntimeException(failureString))
+            .when(spiedBlobStore).removeBlobs(any(), any());
+
+        LedgerOffloader offloader = new 
BlobStoreManagedLedgerOffloader(spiedBlobStore, BUCKET, scheduler,
+            DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE);
 
         try {
             // verify object exist after offload
             offloader.offload(readHandle, uuid, new HashMap<>()).get();
-            Assert.assertTrue(mockS3client.doesObjectExist(BUCKET, 
dataBlockOffloadKey(readHandle.getId(), uuid)));
-            Assert.assertTrue(mockS3client.doesObjectExist(BUCKET, 
indexBlockOffloadKey(readHandle.getId(), uuid)));
+            Assert.assertTrue(blobStore.blobExists(BUCKET, 
dataBlockOffloadKey(readHandle.getId(), uuid)));
+            Assert.assertTrue(blobStore.blobExists(BUCKET, 
indexBlockOffloadKey(readHandle.getId(), uuid)));
 
             offloader.deleteOffloaded(readHandle.getId(), uuid).get();
         } catch (Exception e) {
             // expected
-            Assert.assertTrue(e.getCause() instanceof AmazonServiceException);
             
Assert.assertTrue(e.getCause().getMessage().contains(failureString));
             // verify object still there.
-            Assert.assertTrue(mockS3client.doesObjectExist(BUCKET, 
dataBlockOffloadKey(readHandle.getId(), uuid)));
-            Assert.assertTrue(mockS3client.doesObjectExist(BUCKET, 
indexBlockOffloadKey(readHandle.getId(), uuid)));
+            Assert.assertTrue(blobStore.blobExists(BUCKET, 
dataBlockOffloadKey(readHandle.getId(), uuid)));
+            Assert.assertTrue(blobStore.blobExists(BUCKET, 
indexBlockOffloadKey(readHandle.getId(), uuid)));
         }
     }
 
@@ -441,7 +450,7 @@ public void testOffloadEmpty() throws Exception {
         Mockito.doReturn(1234L).when(readHandle).getId();
 
         UUID uuid = UUID.randomUUID();
-        LedgerOffloader offloader = new S3ManagedLedgerOffloader(s3client, 
BUCKET, scheduler,
+        LedgerOffloader offloader = new 
BlobStoreManagedLedgerOffloader(blobStore, BUCKET, scheduler,
                                                                  
DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE);
         try {
             offloader.offload(readHandle, uuid, new HashMap<>()).get();
@@ -454,48 +463,51 @@ public void testOffloadEmpty() throws Exception {
     @Test
     public void testReadUnknownDataVersion() throws Exception {
         ReadHandle toWrite = buildReadHandle(DEFAULT_BLOCK_SIZE, 1);
-        LedgerOffloader offloader = new S3ManagedLedgerOffloader(s3client, 
BUCKET, scheduler,
+        LedgerOffloader offloader = new 
BlobStoreManagedLedgerOffloader(blobStore, BUCKET, scheduler,
                                                                  
DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE);
         UUID uuid = UUID.randomUUID();
         offloader.offload(toWrite, uuid, new HashMap<>()).get();
 
         String dataKey = dataBlockOffloadKey(toWrite.getId(), uuid);
-        ObjectMetadata md = s3client.getObjectMetadata(BUCKET, dataKey);
-        
md.getUserMetadata().put(S3ManagedLedgerOffloader.METADATA_FORMAT_VERSION_KEY, 
String.valueOf(-12345));
-        s3client.copyObject(new CopyObjectRequest(BUCKET, dataKey, BUCKET, 
dataKey).withNewObjectMetadata(md));
+
+        Map<String, String> userMeta = blobStore.blobMetadata(BUCKET, 
dataKey).getUserMetadata();
+        
userMeta.put(BlobStoreManagedLedgerOffloader.METADATA_FORMAT_VERSION_KEY, 
String.valueOf(-12345));
+        blobStore.copyBlob(BUCKET, dataKey, BUCKET, dataKey, 
CopyOptions.builder().userMetadata(userMeta).build());
 
         try (ReadHandle toRead = offloader.readOffloaded(toWrite.getId(), 
uuid).get()) {
             toRead.readAsync(0, 0).get();
             Assert.fail("Shouldn't have been able to read");
         } catch (ExecutionException e) {
+            log.error("Exception: ", e);
             Assert.assertEquals(e.getCause().getClass(), IOException.class);
-            Assert.assertTrue(e.getCause().getMessage().contains("Invalid 
object version"));
+            Assert.assertTrue(e.getCause().getMessage().contains("Error 
reading from BlobStore"));
         }
 
-        
md.getUserMetadata().put(S3ManagedLedgerOffloader.METADATA_FORMAT_VERSION_KEY, 
String.valueOf(12345));
-        s3client.copyObject(new CopyObjectRequest(BUCKET, dataKey, BUCKET, 
dataKey).withNewObjectMetadata(md));
+        
userMeta.put(BlobStoreManagedLedgerOffloader.METADATA_FORMAT_VERSION_KEY, 
String.valueOf(12345));
+        blobStore.copyBlob(BUCKET, dataKey, BUCKET, dataKey, 
CopyOptions.builder().userMetadata(userMeta).build());
 
         try (ReadHandle toRead = offloader.readOffloaded(toWrite.getId(), 
uuid).get()) {
             toRead.readAsync(0, 0).get();
             Assert.fail("Shouldn't have been able to read");
         } catch (ExecutionException e) {
             Assert.assertEquals(e.getCause().getClass(), IOException.class);
-            Assert.assertTrue(e.getCause().getMessage().contains("Invalid 
object version"));
+            Assert.assertTrue(e.getCause().getMessage().contains("Error 
reading from BlobStore"));
         }
     }
 
     @Test
     public void testReadUnknownIndexVersion() throws Exception {
         ReadHandle toWrite = buildReadHandle(DEFAULT_BLOCK_SIZE, 1);
-        LedgerOffloader offloader = new S3ManagedLedgerOffloader(s3client, 
BUCKET, scheduler,
+        LedgerOffloader offloader = new 
BlobStoreManagedLedgerOffloader(blobStore, BUCKET, scheduler,
                                                                  
DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE);
         UUID uuid = UUID.randomUUID();
         offloader.offload(toWrite, uuid, new HashMap<>()).get();
 
         String indexKey = indexBlockOffloadKey(toWrite.getId(), uuid);
-        ObjectMetadata md = s3client.getObjectMetadata(BUCKET, indexKey);
-        
md.getUserMetadata().put(S3ManagedLedgerOffloader.METADATA_FORMAT_VERSION_KEY, 
String.valueOf(-12345));
-        s3client.copyObject(new CopyObjectRequest(BUCKET, indexKey, BUCKET, 
indexKey).withNewObjectMetadata(md));
+
+        Map<String, String> userMeta = blobStore.blobMetadata(BUCKET, 
indexKey).getUserMetadata();
+        
userMeta.put(BlobStoreManagedLedgerOffloader.METADATA_FORMAT_VERSION_KEY, 
String.valueOf(-12345));
+        blobStore.copyBlob(BUCKET, indexKey, BUCKET, indexKey, 
CopyOptions.builder().userMetadata(userMeta).build());
 
         try {
             offloader.readOffloaded(toWrite.getId(), uuid).get();
@@ -505,8 +517,8 @@ public void testReadUnknownIndexVersion() throws Exception {
             Assert.assertTrue(e.getCause().getMessage().contains("Invalid 
object version"));
         }
 
-        
md.getUserMetadata().put(S3ManagedLedgerOffloader.METADATA_FORMAT_VERSION_KEY, 
String.valueOf(12345));
-        s3client.copyObject(new CopyObjectRequest(BUCKET, indexKey, BUCKET, 
indexKey).withNewObjectMetadata(md));
+        
userMeta.put(BlobStoreManagedLedgerOffloader.METADATA_FORMAT_VERSION_KEY, 
String.valueOf(12345));
+        blobStore.copyBlob(BUCKET, indexKey, BUCKET, indexKey, 
CopyOptions.builder().userMetadata(userMeta).build());
 
         try {
             offloader.readOffloaded(toWrite.getId(), uuid).get();
diff --git 
a/tests/integration-tests-topologies/src/main/resources/cube-definitions/single-cluster-2-bookie-1-broker-unstarted-with-s3.yaml
 
b/tests/integration-tests-topologies/src/main/resources/cube-definitions/single-cluster-2-bookie-1-broker-unstarted-with-s3.yaml
index 6b8eaa427d..ec46571249 100644
--- 
a/tests/integration-tests-topologies/src/main/resources/cube-definitions/single-cluster-2-bookie-1-broker-unstarted-with-s3.yaml
+++ 
b/tests/integration-tests-topologies/src/main/resources/cube-definitions/single-cluster-2-bookie-1-broker-unstarted-with-s3.yaml
@@ -148,7 +148,9 @@ pulsar-proxy*:
   networkMode: pulsarnet*
 
 s3*:
-  image: adobe/s3mock
+  ## use latest adobe/s3mock, for issue: 
https://github.com/adobe/S3Mock/issues/32
+  ## TODO: https://github.com/apache/incubator-pulsar/issues/2133
+  image: apachepulsar/s3mock
   await:
     strategy: org.apache.pulsar.tests.NoopAwaitStrategy
   env:


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to