This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7a10d93 GCS offload support(1): rename s3offload related classes to
be reuse-able (#2064)
7a10d93 is described below
commit 7a10d938a286a3d25114ee59f5cee63e1a57a795
Author: Jia Zhai <[email protected]>
AuthorDate: Tue Jul 3 01:25:29 2018 +0800
GCS offload support(1): rename s3offload related classes to be reuse-able
(#2064)
This is the first part to support `Google Cloud Storage` offload.
change:
rename Offload related classes to make them also reuse-able for GCS.
Master Issue: #2067
---
.../java/org/apache/pulsar/broker/PulsarService.java | 2 +-
.../BackedInputStream.java} | 4 ++--
.../BlockAwareSegmentInputStream.java | 3 +--
.../{s3offload => offload}/DataBlockHeader.java | 3 +--
.../{s3offload => offload}/OffloadIndexBlock.java | 2 +-
.../OffloadIndexBlockBuilder.java | 4 ++--
.../{s3offload => offload}/OffloadIndexEntry.java | 2 +-
.../impl/BlockAwareSegmentInputStreamImpl.java | 4 ++--
.../impl/DataBlockHeaderImpl.java | 4 ++--
.../impl/OffloadIndexBlockBuilderImpl.java | 6 +++---
.../impl/OffloadIndexBlockImpl.java | 7 +++----
.../impl/OffloadIndexEntryImpl.java | 4 ++--
.../impl/S3BackedInputStreamImpl.java | 8 ++++----
.../impl/S3BackedReadHandleImpl.java | 19 +++++++++----------
.../impl}/S3ManagedLedgerOffloader.java | 8 ++++----
.../S3BackedInputStreamTest.java | 18 ++++++++----------
.../pulsar/broker/{s3offload => offload}/S3Mock.java | 2 +-
.../broker/{s3offload => offload}/S3TestBase.java | 7 ++-----
.../impl/BlockAwareSegmentInputStreamTest.java | 6 ++----
.../impl/DataBlockHeaderTest.java | 4 ++--
.../{s3offload => offload}/impl/OffloadIndexTest.java | 8 ++++----
.../impl}/S3ManagedLedgerOffloaderTest.java | 12 ++++++------
22 files changed, 63 insertions(+), 74 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 484235f..2a341af 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -61,7 +61,7 @@ import
org.apache.pulsar.broker.loadbalance.LoadResourceQuotaUpdaterTask;
import org.apache.pulsar.broker.loadbalance.LoadSheddingTask;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
import org.apache.pulsar.broker.namespace.NamespaceService;
-import org.apache.pulsar.broker.s3offload.S3ManagedLedgerOffloader;
+import org.apache.pulsar.broker.offload.impl.S3ManagedLedgerOffloader;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3BackedInputStream.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/BackedInputStream.java
similarity index 90%
rename from
pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3BackedInputStream.java
rename to
pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/BackedInputStream.java
index 28e6855..b596dc5 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3BackedInputStream.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/BackedInputStream.java
@@ -16,12 +16,12 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.broker.s3offload;
+package org.apache.pulsar.broker.offload;
import java.io.InputStream;
import java.io.IOException;
-public abstract class S3BackedInputStream extends InputStream {
+public abstract class BackedInputStream extends InputStream {
public abstract void seek(long position);
public abstract void seekForward(long position) throws IOException;
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/BlockAwareSegmentInputStream.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/BlockAwareSegmentInputStream.java
similarity index 96%
rename from
pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/BlockAwareSegmentInputStream.java
rename to
pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/BlockAwareSegmentInputStream.java
index 50d9e37..7ddd9cc 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/BlockAwareSegmentInputStream.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/BlockAwareSegmentInputStream.java
@@ -16,9 +16,8 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.broker.s3offload;
+package org.apache.pulsar.broker.offload;
-import java.io.IOException;
import java.io.InputStream;
import org.apache.bookkeeper.client.api.ReadHandle;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/DataBlockHeader.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/DataBlockHeader.java
similarity index 95%
rename from
pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/DataBlockHeader.java
rename to
pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/DataBlockHeader.java
index f49b6a2..6e2021f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/DataBlockHeader.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/DataBlockHeader.java
@@ -16,9 +16,8 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.broker.s3offload;
+package org.apache.pulsar.broker.offload;
-import java.io.IOException;
import java.io.InputStream;
import org.apache.bookkeeper.common.annotation.InterfaceStability.Unstable;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlock.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/OffloadIndexBlock.java
similarity index 98%
rename from
pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlock.java
rename to
pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/OffloadIndexBlock.java
index 2bb59b4..e340283 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlock.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/OffloadIndexBlock.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.broker.s3offload;
+package org.apache.pulsar.broker.offload;
import java.io.Closeable;
import java.io.FilterInputStream;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlockBuilder.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/OffloadIndexBlockBuilder.java
similarity index 95%
rename from
pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlockBuilder.java
rename to
pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/OffloadIndexBlockBuilder.java
index 126b4b9..7975a6f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlockBuilder.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/OffloadIndexBlockBuilder.java
@@ -16,14 +16,14 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.broker.s3offload;
+package org.apache.pulsar.broker.offload;
import java.io.IOException;
import java.io.InputStream;
import org.apache.bookkeeper.client.api.LedgerMetadata;
import
org.apache.bookkeeper.common.annotation.InterfaceAudience.LimitedPrivate;
import org.apache.bookkeeper.common.annotation.InterfaceStability.Unstable;
-import org.apache.pulsar.broker.s3offload.impl.OffloadIndexBlockBuilderImpl;
+import org.apache.pulsar.broker.offload.impl.OffloadIndexBlockBuilderImpl;
/**
* Interface for builder of index block used for offload a ledger to long term
storage.
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexEntry.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/OffloadIndexEntry.java
similarity index 97%
rename from
pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexEntry.java
rename to
pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/OffloadIndexEntry.java
index 6a976ff..dc98e42 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexEntry.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/OffloadIndexEntry.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.broker.s3offload;
+package org.apache.pulsar.broker.offload;
import
org.apache.bookkeeper.common.annotation.InterfaceAudience.LimitedPrivate;
import org.apache.bookkeeper.common.annotation.InterfaceStability.Unstable;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStreamImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/BlockAwareSegmentInputStreamImpl.java
similarity index 98%
rename from
pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStreamImpl.java
rename to
pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/BlockAwareSegmentInputStreamImpl.java
index 78d2380..06d2198 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStreamImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/BlockAwareSegmentInputStreamImpl.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.broker.s3offload.impl;
+package org.apache.pulsar.broker.offload.impl;
import static com.google.common.base.Preconditions.checkState;
@@ -32,7 +32,7 @@ import java.util.concurrent.ExecutionException;
import org.apache.bookkeeper.client.api.LedgerEntries;
import org.apache.bookkeeper.client.api.LedgerEntry;
import org.apache.bookkeeper.client.api.ReadHandle;
-import org.apache.pulsar.broker.s3offload.BlockAwareSegmentInputStream;
+import org.apache.pulsar.broker.offload.BlockAwareSegmentInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/DataBlockHeaderImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/DataBlockHeaderImpl.java
similarity index 97%
rename from
pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/DataBlockHeaderImpl.java
rename to
pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/DataBlockHeaderImpl.java
index 19a644a..4e3a5a3 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/DataBlockHeaderImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/DataBlockHeaderImpl.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.broker.s3offload.impl;
+package org.apache.pulsar.broker.offload.impl;
import com.google.common.io.CountingInputStream;
@@ -27,7 +27,7 @@ import java.io.DataInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
-import org.apache.pulsar.broker.s3offload.DataBlockHeader;
+import org.apache.pulsar.broker.offload.DataBlockHeader;
/**
*
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexBlockBuilderImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/OffloadIndexBlockBuilderImpl.java
similarity index 94%
rename from
pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexBlockBuilderImpl.java
rename to
pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/OffloadIndexBlockBuilderImpl.java
index 1b03f00..0b64de9 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexBlockBuilderImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/OffloadIndexBlockBuilderImpl.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.broker.s3offload.impl;
+package org.apache.pulsar.broker.offload.impl;
import static com.google.common.base.Preconditions.checkState;
@@ -25,8 +25,8 @@ import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import org.apache.bookkeeper.client.api.LedgerMetadata;
-import org.apache.pulsar.broker.s3offload.OffloadIndexBlock;
-import org.apache.pulsar.broker.s3offload.OffloadIndexBlockBuilder;
+import org.apache.pulsar.broker.offload.OffloadIndexBlock;
+import org.apache.pulsar.broker.offload.OffloadIndexBlockBuilder;
/**
* Interface for builder of index block used for offload a ledger to long term
storage.
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexBlockImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/OffloadIndexBlockImpl.java
similarity index 98%
rename from
pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexBlockImpl.java
rename to
pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/OffloadIndexBlockImpl.java
index e910150..c109035 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexBlockImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/OffloadIndexBlockImpl.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.broker.s3offload.impl;
+package org.apache.pulsar.broker.offload.impl;
import static com.google.common.base.Preconditions.checkState;
@@ -31,7 +31,6 @@ import io.netty.util.Recycler.Handle;
import java.io.DataInputStream;
import java.io.IOException;
-import java.io.FilterInputStream;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
@@ -47,8 +46,8 @@ import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.DataFormats;
import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat;
import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.State;
-import org.apache.pulsar.broker.s3offload.OffloadIndexBlock;
-import org.apache.pulsar.broker.s3offload.OffloadIndexEntry;
+import org.apache.pulsar.broker.offload.OffloadIndexBlock;
+import org.apache.pulsar.broker.offload.OffloadIndexEntry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexEntryImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/OffloadIndexEntryImpl.java
similarity index 94%
rename from
pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexEntryImpl.java
rename to
pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/OffloadIndexEntryImpl.java
index b83de85..ff5e9ce 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexEntryImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/OffloadIndexEntryImpl.java
@@ -16,9 +16,9 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.broker.s3offload.impl;
+package org.apache.pulsar.broker.offload.impl;
-import org.apache.pulsar.broker.s3offload.OffloadIndexEntry;
+import org.apache.pulsar.broker.offload.OffloadIndexEntry;
/**
*
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/S3BackedInputStreamImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/S3BackedInputStreamImpl.java
similarity index 95%
rename from
pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/S3BackedInputStreamImpl.java
rename to
pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/S3BackedInputStreamImpl.java
index 65f2337..e55e61b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/S3BackedInputStreamImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/S3BackedInputStreamImpl.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.broker.s3offload.impl;
+package org.apache.pulsar.broker.offload.impl;
import com.amazonaws.AmazonClientException;
import com.amazonaws.services.s3.AmazonS3;
@@ -29,13 +29,13 @@ import io.netty.buffer.PooledByteBufAllocator;
import java.io.InputStream;
import java.io.IOException;
-import org.apache.pulsar.broker.s3offload.S3BackedInputStream;
-import
org.apache.pulsar.broker.s3offload.S3ManagedLedgerOffloader.VersionCheck;
+import org.apache.pulsar.broker.offload.BackedInputStream;
+import
org.apache.pulsar.broker.offload.impl.S3ManagedLedgerOffloader.VersionCheck;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class S3BackedInputStreamImpl extends S3BackedInputStream {
+public class S3BackedInputStreamImpl extends BackedInputStream {
private static final Logger log =
LoggerFactory.getLogger(S3BackedInputStreamImpl.class);
private final AmazonS3 s3client;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/S3BackedReadHandleImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/S3BackedReadHandleImpl.java
similarity index 92%
rename from
pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/S3BackedReadHandleImpl.java
rename to
pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/S3BackedReadHandleImpl.java
index 65acbb8..08b5ea6 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/S3BackedReadHandleImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/S3BackedReadHandleImpl.java
@@ -16,12 +16,11 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.broker.s3offload.impl;
+package org.apache.pulsar.broker.offload.impl;
import com.amazonaws.AmazonClientException;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.GetObjectRequest;
-import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.S3Object;
import io.netty.buffer.ByteBuf;
@@ -44,11 +43,11 @@ import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.client.impl.LedgerEntriesImpl;
import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
-import org.apache.pulsar.broker.s3offload.OffloadIndexBlock;
-import org.apache.pulsar.broker.s3offload.OffloadIndexBlockBuilder;
-import org.apache.pulsar.broker.s3offload.OffloadIndexEntry;
-import org.apache.pulsar.broker.s3offload.S3BackedInputStream;
-import
org.apache.pulsar.broker.s3offload.S3ManagedLedgerOffloader.VersionCheck;
+import org.apache.pulsar.broker.offload.OffloadIndexBlock;
+import org.apache.pulsar.broker.offload.OffloadIndexBlockBuilder;
+import org.apache.pulsar.broker.offload.OffloadIndexEntry;
+import org.apache.pulsar.broker.offload.BackedInputStream;
+import
org.apache.pulsar.broker.offload.impl.S3ManagedLedgerOffloader.VersionCheck;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,12 +57,12 @@ public class S3BackedReadHandleImpl implements ReadHandle {
private final long ledgerId;
private final OffloadIndexBlock index;
- private final S3BackedInputStream inputStream;
+ private final BackedInputStream inputStream;
private final DataInputStream dataStream;
private final ExecutorService executor;
private S3BackedReadHandleImpl(long ledgerId, OffloadIndexBlock index,
- S3BackedInputStream inputStream,
+ BackedInputStream inputStream,
ExecutorService executor) {
this.ledgerId = ledgerId;
this.index = index;
@@ -201,7 +200,7 @@ public class S3BackedReadHandleImpl implements ReadHandle {
OffloadIndexBlockBuilder indexBuilder =
OffloadIndexBlockBuilder.create();
OffloadIndexBlock index =
indexBuilder.fromStream(obj.getObjectContent());
- S3BackedInputStream inputStream = new
S3BackedInputStreamImpl(s3client, bucket, key,
+ BackedInputStream inputStream = new
S3BackedInputStreamImpl(s3client, bucket, key,
versionCheck,
index.getDataObjectLength(),
readBufferSize);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/S3ManagedLedgerOffloader.java
similarity index 98%
rename from
pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java
rename to
pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/S3ManagedLedgerOffloader.java
index f65eda9..ec74d27 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/offload/impl/S3ManagedLedgerOffloader.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.broker.s3offload;
+package org.apache.pulsar.broker.offload.impl;
import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration;
import com.amazonaws.services.s3.AmazonS3;
@@ -32,7 +32,6 @@ import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.UploadPartRequest;
import com.amazonaws.services.s3.model.UploadPartResult;
import com.google.common.base.Strings;
-import java.io.InputStream;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
@@ -44,8 +43,9 @@ import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.ServiceConfiguration;
-import
org.apache.pulsar.broker.s3offload.impl.BlockAwareSegmentInputStreamImpl;
-import org.apache.pulsar.broker.s3offload.impl.S3BackedReadHandleImpl;
+import org.apache.pulsar.broker.offload.BlockAwareSegmentInputStream;
+import org.apache.pulsar.broker.offload.OffloadIndexBlock;
+import org.apache.pulsar.broker.offload.OffloadIndexBlockBuilder;
import org.apache.pulsar.utils.PulsarBrokerVersionStringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3BackedInputStreamTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/S3BackedInputStreamTest.java
similarity index 91%
rename from
pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3BackedInputStreamTest.java
rename to
pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/S3BackedInputStreamTest.java
index 4b75869..45bca52 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3BackedInputStreamTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/S3BackedInputStreamTest.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.broker.s3offload;
+package org.apache.pulsar.broker.offload;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Mockito.spy;
@@ -34,11 +34,9 @@ import java.util.Random;
import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.broker.s3offload.impl.S3BackedInputStreamImpl;
+import org.apache.pulsar.broker.offload.impl.S3BackedInputStreamImpl;
import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@Slf4j
@@ -96,7 +94,7 @@ class S3BackedInputStreamTest extends S3TestBase {
metadata.setContentLength(objectSize);
s3client.putObject(BUCKET, objectKey, toWrite, metadata);
- S3BackedInputStream toTest = new S3BackedInputStreamImpl(s3client,
BUCKET, objectKey,
+ BackedInputStream toTest = new S3BackedInputStreamImpl(s3client,
BUCKET, objectKey,
(key, md) ->
{},
objectSize,
1000);
assertStreamsMatch(toTest, toCompare);
@@ -113,7 +111,7 @@ class S3BackedInputStreamTest extends S3TestBase {
metadata.setContentLength(objectSize);
s3client.putObject(BUCKET, objectKey, toWrite, metadata);
- S3BackedInputStream toTest = new S3BackedInputStreamImpl(s3client,
BUCKET, objectKey,
+ BackedInputStream toTest = new S3BackedInputStreamImpl(s3client,
BUCKET, objectKey,
(key, md) ->
{},
objectSize,
1000);
assertStreamsMatchByBytes(toTest, toCompare);
@@ -121,7 +119,7 @@ class S3BackedInputStreamTest extends S3TestBase {
@Test(expectedExceptions = IOException.class)
public void testErrorOnS3Read() throws Exception {
- S3BackedInputStream toTest = new S3BackedInputStreamImpl(s3client,
BUCKET, "doesn't exist",
+ BackedInputStream toTest = new S3BackedInputStreamImpl(s3client,
BUCKET, "doesn't exist",
(key, md) ->
{},
1234, 1000);
toTest.read();
@@ -147,7 +145,7 @@ class S3BackedInputStreamTest extends S3TestBase {
metadata.setContentLength(objectSize);
s3client.putObject(BUCKET, objectKey, toWrite, metadata);
- S3BackedInputStream toTest = new S3BackedInputStreamImpl(s3client,
BUCKET, objectKey,
+ BackedInputStream toTest = new S3BackedInputStreamImpl(s3client,
BUCKET, objectKey,
(key, md) ->
{},
objectSize,
1000);
for (Map.Entry<Integer, InputStream> e : seeks.entrySet()) {
@@ -167,7 +165,7 @@ class S3BackedInputStreamTest extends S3TestBase {
s3client.putObject(BUCKET, objectKey, toWrite, metadata);
AmazonS3 spiedClient = spy(s3client);
- S3BackedInputStream toTest = new S3BackedInputStreamImpl(spiedClient,
BUCKET, objectKey,
+ BackedInputStream toTest = new S3BackedInputStreamImpl(spiedClient,
BUCKET, objectKey,
(key, md) ->
{},
objectSize,
1000);
@@ -208,7 +206,7 @@ class S3BackedInputStreamTest extends S3TestBase {
metadata.setContentLength(objectSize);
s3client.putObject(BUCKET, objectKey, toWrite, metadata);
- S3BackedInputStream toTest = new S3BackedInputStreamImpl(s3client,
BUCKET, objectKey,
+ BackedInputStream toTest = new S3BackedInputStreamImpl(s3client,
BUCKET, objectKey,
(key, md) ->
{},
objectSize,
1000);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3Mock.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/S3Mock.java
similarity index 99%
rename from
pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3Mock.java
rename to
pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/S3Mock.java
index 74d48e2..4bfc140 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3Mock.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/S3Mock.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.broker.s3offload;
+package org.apache.pulsar.broker.offload;
import com.amazonaws.services.s3.AbstractAmazonS3;
import com.amazonaws.services.s3.model.AmazonS3Exception;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3TestBase.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/S3TestBase.java
similarity index 84%
rename from
pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3TestBase.java
rename to
pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/S3TestBase.java
index b56c850..f2ea6c4 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3TestBase.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/S3TestBase.java
@@ -16,18 +16,15 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.broker.s3offload;
+package org.apache.pulsar.broker.offload;
-import com.amazonaws.auth.AnonymousAWSCredentials;
-import com.amazonaws.auth.AWSStaticCredentialsProvider;
-import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import org.testng.annotations.BeforeMethod;
public class S3TestBase {
- final static String BUCKET = "pulsar-unittest";
+ public final static String BUCKET = "pulsar-unittest";
protected AmazonS3 s3client = null;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStreamTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/impl/BlockAwareSegmentInputStreamTest.java
similarity index 99%
rename from
pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStreamTest.java
rename to
pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/impl/BlockAwareSegmentInputStreamTest.java
index e6f5249..757a135 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStreamTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/impl/BlockAwareSegmentInputStreamTest.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.broker.s3offload.impl;
+package org.apache.pulsar.broker.offload.impl;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.fail;
@@ -27,7 +27,6 @@ import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
-import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.ByteArrayInputStream;
import java.nio.ByteBuffer;
import java.util.Arrays;
@@ -35,7 +34,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executors;
import java.util.function.Supplier;
import java.util.stream.IntStream;
import lombok.Data;
@@ -45,7 +43,7 @@ import org.apache.bookkeeper.client.api.LedgerEntries;
import org.apache.bookkeeper.client.api.LedgerEntry;
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.client.api.ReadHandle;
-import org.apache.pulsar.broker.s3offload.DataBlockHeader;
+import org.apache.pulsar.broker.offload.DataBlockHeader;
import org.testng.annotations.Test;
import org.testng.collections.Lists;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/impl/DataBlockHeaderTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/impl/DataBlockHeaderTest.java
similarity index 96%
rename from
pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/impl/DataBlockHeaderTest.java
rename to
pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/impl/DataBlockHeaderTest.java
index a658032..2f8a680 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/impl/DataBlockHeaderTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/impl/DataBlockHeaderTest.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.broker.s3offload.impl;
+package org.apache.pulsar.broker.offload.impl;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
@@ -27,7 +27,7 @@ import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.broker.s3offload.DataBlockHeader;
+import org.apache.pulsar.broker.offload.DataBlockHeader;
import org.testng.annotations.Test;
@Slf4j
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/impl/OffloadIndexTest.java
similarity index 97%
rename from
pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexTest.java
rename to
pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/impl/OffloadIndexTest.java
index 445ba01..f166bd7 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/impl/OffloadIndexTest.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.broker.s3offload.impl;
+package org.apache.pulsar.broker.offload.impl;
import static com.google.common.base.Charsets.UTF_8;
import static org.testng.Assert.assertEquals;
@@ -36,9 +36,9 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.net.BookieSocketAddress;
-import org.apache.pulsar.broker.s3offload.OffloadIndexBlock;
-import org.apache.pulsar.broker.s3offload.OffloadIndexBlockBuilder;
-import org.apache.pulsar.broker.s3offload.OffloadIndexEntry;
+import org.apache.pulsar.broker.offload.OffloadIndexBlock;
+import org.apache.pulsar.broker.offload.OffloadIndexBlockBuilder;
+import org.apache.pulsar.broker.offload.OffloadIndexEntry;
import org.testng.annotations.Test;
@Slf4j
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/impl/S3ManagedLedgerOffloaderTest.java
similarity index 98%
rename from
pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java
rename to
pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/impl/S3ManagedLedgerOffloaderTest.java
index ab78f07..7b9d9a2 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/offload/impl/S3ManagedLedgerOffloaderTest.java
@@ -16,10 +16,10 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.broker.s3offload;
+package org.apache.pulsar.broker.offload.impl;
-import static
org.apache.pulsar.broker.s3offload.S3ManagedLedgerOffloader.dataBlockOffloadKey;
-import static
org.apache.pulsar.broker.s3offload.S3ManagedLedgerOffloader.indexBlockOffloadKey;
+import static
org.apache.pulsar.broker.offload.impl.S3ManagedLedgerOffloader.dataBlockOffloadKey;
+import static
org.apache.pulsar.broker.offload.impl.S3ManagedLedgerOffloader.indexBlockOffloadKey;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
@@ -27,7 +27,6 @@ import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.CopyObjectRequest;
import com.amazonaws.services.s3.model.ObjectMetadata;
-import io.netty.util.concurrent.DefaultThreadFactory;
import java.lang.reflect.Method;
import java.io.IOException;
import java.util.HashMap;
@@ -36,7 +35,6 @@ import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.BKException;
@@ -53,7 +51,9 @@ import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
-import org.apache.pulsar.broker.s3offload.impl.DataBlockHeaderImpl;
+import org.apache.pulsar.broker.offload.S3TestBase;
+import org.apache.pulsar.broker.offload.impl.DataBlockHeaderImpl;
+import org.apache.pulsar.broker.offload.impl.S3ManagedLedgerOffloader;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;