This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 5980169 [tiered storage] store driver name and driver specific
metadata in original ledger metadata (#2398)
5980169 is described below
commit 59801695a9808c4c6574b421f90b0d91a9d712ad
Author: Sijie Guo <[email protected]>
AuthorDate: Mon Aug 20 23:21:53 2018 -0700
[tiered storage] store driver name and driver specific metadata in
original ledger metadata (#2398)
### Motivation
1) Currently the location of an offloaded ledger isn't stored in the
original ledger metadata.
That means if configuration is changed or modified by mistake. We might
potentially cause data loss.
2) The location of an offloaded ledger is needed by Pulsar SQL. so it is
very inconvinient to
have the location information stored in a configuration and the approach is
also problematic.
### Changes
Store `driverName` and driver-specific metadata (e.g. bucket name, region
name, endpoint) in the
original ledger metadata. Change ManagedLedgerImpl to use the
driver-specific metadata to read
the offloaded ledger. If the driver-specific metadata is missed, it will
fall back to use the configuration.
### Tests
This change doesn't change the behavior. Existing unit tests and
integration tests already covered the logic.
### NOTES
Currently the driver name in metadata is not used. We need to use driver
name to load different offloader driver
after #2393 is implemented
---
.../apache/bookkeeper/mledger/LedgerOffloader.java | 33 ++-
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 69 ++++++-
.../mledger/impl/NullLedgerOffloader.java | 11 +-
.../bookkeeper/mledger/offload/OffloadUtils.java | 91 +++++++++
managed-ledger/src/main/proto/MLDataFormats.proto | 11 +
.../mledger/impl/OffloadPrefixReadTest.java | 27 ++-
.../bookkeeper/mledger/impl/OffloadPrefixTest.java | 16 +-
.../pulsar/broker/admin/AdminApiOffloadTest.java | 3 +
tiered-storage/jcloud/pom.xml | 2 +
.../impl/BlobStoreManagedLedgerOffloader.java | 221 ++++++++++++++++-----
.../impl/BlobStoreManagedLedgerOffloaderTest.java | 19 +-
11 files changed, 419 insertions(+), 84 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
index 6885500..8fc35cc 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
@@ -20,6 +20,7 @@ package org.apache.bookkeeper.mledger;
import com.google.common.annotations.Beta;
+import java.util.Collections;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@@ -31,6 +32,25 @@ import org.apache.bookkeeper.client.api.ReadHandle;
*/
@Beta
public interface LedgerOffloader {
+
+ /**
+ * Get offload driver name.
+ *
+ * @return offload driver name.
+ */
+ String getOffloadDriverName();
+
+ /**
+ * Get offload driver metadata.
+ *
+ * <p>The driver metadata will be recorded as part of the metadata of the
original ledger.
+ *
+ * @return offload driver metadata.
+ */
+ default Map<String, String> getOffloadDriverMetadata() {
+ return Collections.emptyMap();
+ }
+
/**
* Offload the passed in ledger to longterm storage.
* Metadata passed in is for inspection purposes only and should be stored
@@ -51,10 +71,9 @@ public interface LedgerOffloader {
*
* @param ledger the ledger to offload
* @param uid unique id to identity this offload attempt
- * @param extraMetadata metadata to be stored with the ledger for
informational
+ * @param extraMetadata metadata to be stored with the offloaded ledger
for informational
* purposes
- * @return a future, which when completed, denotes that the offload has
been
- * successful
+ * @return a future, which when completed, denotes that the offload has
been successful.
*/
CompletableFuture<Void> offload(ReadHandle ledger,
UUID uid,
@@ -69,9 +88,11 @@ public interface LedgerOffloader {
*
* @param ledgerId the ID of the ledger to load from longterm storage
* @param uid unique ID for previous successful offload attempt
+ * @param offloadDriverMetadata offload driver metadata
* @return a future, which when completed, returns a ReadHandle
*/
- CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID uid);
+ CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID uid,
+ Map<String, String>
offloadDriverMetadata);
/**
* Delete a ledger from long term storage.
@@ -81,9 +102,11 @@ public interface LedgerOffloader {
*
* @param ledgerId the ID of the ledger to delete from longterm storage
* @param uid unique ID for previous offload attempt
+ * @param offloadDriverMetadata offload driver metadata
* @return a future, which when completed, signifies that the ledger has
* been deleted
*/
- CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid);
+ CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid,
+ Map<String, String>
offloadDriverMetadata);
}
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 388cdef..6e4a609 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -96,6 +96,7 @@ import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.VoidCallback;
import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
import org.apache.bookkeeper.mledger.impl.MetaStore.Stat;
+import org.apache.bookkeeper.mledger.offload.OffloadUtils;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo;
import
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.NestedPositionInfo;
@@ -1390,7 +1391,9 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
if (info != null && info.hasOffloadContext() &&
info.getOffloadContext().getComplete()) {
UUID uid = new UUID(info.getOffloadContext().getUidMsb(),
info.getOffloadContext().getUidLsb());
- openFuture =
config.getLedgerOffloader().readOffloaded(ledgerId, uid);
+ // TODO: improve this to load ledger offloader by driver
name recorded in metadata
+ openFuture = config.getLedgerOffloader()
+ .readOffloaded(ledgerId, uid,
OffloadUtils.getOffloadDriverMetadata(info));
} else {
openFuture = bookKeeper.newOpenLedgerOp()
.withRecovery(!isReadOnly())
@@ -1771,7 +1774,16 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
}
for (LedgerInfo ls : offloadedLedgersToDelete) {
LedgerInfo.Builder newInfoBuilder = ls.toBuilder();
-
newInfoBuilder.getOffloadContextBuilder().setBookkeeperDeleted(true);
+ newInfoBuilder.getOffloadContextBuilder()
+ .setBookkeeperDeleted(true);
+ String driverName = OffloadUtils.getOffloadDriverName(
+ ls, config.getLedgerOffloader().getOffloadDriverName());
+ Map<String, String> driverMetadata =
OffloadUtils.getOffloadDriverMetadata(
+ ls,
config.getLedgerOffloader().getOffloadDriverMetadata());
+ OffloadUtils.setOffloadDriverMetadata(
+ newInfoBuilder,
+ driverName, driverMetadata
+ );
ledgers.put(ls.getLedgerId(), newInfoBuilder.build());
}
@@ -1903,7 +1915,11 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
if (info.getOffloadContext().hasUidMsb()) {
UUID uuid = new UUID(info.getOffloadContext().getUidMsb(),
info.getOffloadContext().getUidLsb());
- cleanupOffloaded(ledgerId, uuid, "Trimming");
+ cleanupOffloaded(
+ ledgerId, uuid,
+ OffloadUtils.getOffloadDriverName(info,
config.getLedgerOffloader().getOffloadDriverName()),
+ OffloadUtils.getOffloadDriverMetadata(info,
config.getLedgerOffloader().getOffloadDriverMetadata()),
+ "Trimming");
}
}
@@ -2105,7 +2121,10 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
UUID uuid = UUID.randomUUID();
Map<String, String> extraMetadata =
ImmutableMap.of("ManagedLedgerName", name);
- prepareLedgerInfoForOffloaded(ledgerId, uuid)
+ String driverName =
config.getLedgerOffloader().getOffloadDriverName();
+ Map<String, String> driverMetadata =
config.getLedgerOffloader().getOffloadDriverMetadata();
+
+ prepareLedgerInfoForOffloaded(ledgerId, uuid, driverName,
driverMetadata)
.thenCompose((ignore) -> getLedgerHandle(ledgerId))
.thenCompose(readHandle ->
config.getLedgerOffloader().offload(readHandle, uuid, extraMetadata))
.thenCompose((ignore) -> {
@@ -2116,7 +2135,10 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
scheduledExecutor, name)
.whenComplete((ignore2, exception) -> {
if (exception != null) {
- cleanupOffloaded(ledgerId, uuid,
"Metastore failure");
+ cleanupOffloaded(
+ ledgerId, uuid,
+ driverName, driverMetadata,
+ "Metastore failure");
}
});
})
@@ -2216,7 +2238,10 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
}
}
- private CompletableFuture<Void> prepareLedgerInfoForOffloaded(long
ledgerId, UUID uuid) {
+ private CompletableFuture<Void> prepareLedgerInfoForOffloaded(long
ledgerId,
+ UUID uuid,
+ String
offloadDriverName,
+ Map<String,
String> offloadDriverMetadata) {
log.info("[{}] Preparing metadata to offload ledger {} with uuid {}",
name, ledgerId, uuid);
return transformLedgerInfo(ledgerId,
(oldInfo) -> {
@@ -2225,12 +2250,24 @@ public class ManagedLedgerImpl implements
ManagedLedger, CreateCallback {
oldInfo.getOffloadContext().getUidLsb());
log.info("[{}] Found previous
offload attempt for ledger {}, uuid {}"
+ ", cleaning up", name,
ledgerId, uuid);
- cleanupOffloaded(ledgerId, oldUuid,
"Previous failed offload");
+ cleanupOffloaded(
+ ledgerId,
+ oldUuid,
+
OffloadUtils.getOffloadDriverName(oldInfo,
+
config.getLedgerOffloader().getOffloadDriverName()),
+
OffloadUtils.getOffloadDriverMetadata(oldInfo,
+
config.getLedgerOffloader().getOffloadDriverMetadata()),
+ "Previous failed offload");
}
LedgerInfo.Builder builder =
oldInfo.toBuilder();
builder.getOffloadContextBuilder()
.setUidMsb(uuid.getMostSignificantBits())
.setUidLsb(uuid.getLeastSignificantBits());
+ OffloadUtils.setOffloadDriverMetadata(
+ builder,
+ offloadDriverName,
+ offloadDriverMetadata
+ );
return builder.build();
})
.whenComplete((result, exception) -> {
@@ -2254,6 +2291,16 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
builder.getOffloadContextBuilder()
.setTimestamp(clock.millis())
.setComplete(true);
+
+ String driverName =
OffloadUtils.getOffloadDriverName(
+ oldInfo,
config.getLedgerOffloader().getOffloadDriverName());
+ Map<String, String> driverMetadata
= OffloadUtils.getOffloadDriverMetadata(
+ oldInfo,
config.getLedgerOffloader().getOffloadDriverMetadata());
+
OffloadUtils.setOffloadDriverMetadata(
+ builder,
+ driverName,
+ driverMetadata
+ );
return builder.build();
} else {
throw new OffloadConflict(
@@ -2272,10 +2319,14 @@ public class ManagedLedgerImpl implements
ManagedLedger, CreateCallback {
});
}
- private void cleanupOffloaded(long ledgerId, UUID uuid, String
cleanupReason) {
+ private void cleanupOffloaded(long ledgerId,
+ UUID uuid,
+ String offloadDriverName, /* TODO: use
driver name to identify offloader */
+ Map<String, String> offloadDriverMetadata,
+ String cleanupReason) {
Retries.run(Backoff.exponentialJittered(TimeUnit.SECONDS.toMillis(1),
TimeUnit.SECONDS.toHours(1)).limit(10),
Retries.NonFatalPredicate,
- () ->
config.getLedgerOffloader().deleteOffloaded(ledgerId, uuid),
+ () ->
config.getLedgerOffloader().deleteOffloaded(ledgerId, uuid,
offloadDriverMetadata),
scheduledExecutor, name)
.whenComplete((ignored, exception) -> {
if (exception != null) {
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NullLedgerOffloader.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NullLedgerOffloader.java
index cd76a1b..3401f1b 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NullLedgerOffloader.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NullLedgerOffloader.java
@@ -32,6 +32,11 @@ public class NullLedgerOffloader implements LedgerOffloader {
public static NullLedgerOffloader INSTANCE = new NullLedgerOffloader();
@Override
+ public String getOffloadDriverName() {
+ return "NullLedgerOffloader";
+ }
+
+ @Override
public CompletableFuture<Void> offload(ReadHandle ledger,
UUID uid,
Map<String, String> extraMetadata) {
@@ -41,14 +46,16 @@ public class NullLedgerOffloader implements LedgerOffloader
{
}
@Override
- public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID
uid) {
+ public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID uid,
+ Map<String, String>
offloadDriverMetadata) {
CompletableFuture<ReadHandle> promise = new CompletableFuture<>();
promise.completeExceptionally(new UnsupportedOperationException());
return promise;
}
@Override
- public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid) {
+ public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid,
+ Map<String, String>
offloadDriverMetadata) {
CompletableFuture<Void> promise = new CompletableFuture<>();
promise.completeExceptionally(new UnsupportedOperationException());
return promise;
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloadUtils.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloadUtils.java
new file mode 100644
index 0000000..44ebc80
--- /dev/null
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloadUtils.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload;
+
+import com.google.common.collect.Maps;
+import java.util.Map;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats.KeyValue;
+import
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats.OffloadContext;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats.OffloadDriverMetadata;
+
+public final class OffloadUtils {
+
+ private OffloadUtils() {}
+
+ public static Map<String, String> getOffloadDriverMetadata(LedgerInfo
ledgerInfo) {
+ Map<String, String> metadata = Maps.newHashMap();
+ if (ledgerInfo.hasOffloadContext()) {
+ OffloadContext ctx = ledgerInfo.getOffloadContext();
+ if (ctx.hasDriverMetadata()) {
+ OffloadDriverMetadata driverMetadata = ctx.getDriverMetadata();
+ if (driverMetadata.getPropertiesCount() > 0) {
+ driverMetadata.getPropertiesList().forEach(kv ->
metadata.put(kv.getKey(), kv.getValue()));
+ }
+ }
+ }
+ return metadata;
+ }
+
+ public static Map<String, String> getOffloadDriverMetadata(LedgerInfo
ledgerInfo,
+ Map<String,
String> defaultOffloadDriverMetadata) {
+ if (ledgerInfo.hasOffloadContext()) {
+ OffloadContext ctx = ledgerInfo.getOffloadContext();
+ if (ctx.hasDriverMetadata()) {
+ OffloadDriverMetadata driverMetadata = ctx.getDriverMetadata();
+ if (driverMetadata.getPropertiesCount() > 0) {
+ Map<String, String> metadata = Maps.newHashMap();
+ driverMetadata.getPropertiesList().forEach(kv ->
metadata.put(kv.getKey(), kv.getValue()));
+ return metadata;
+ }
+ }
+ }
+ return defaultOffloadDriverMetadata;
+ }
+
+ public static String getOffloadDriverName(LedgerInfo ledgerInfo, String
defaultDriverName) {
+ if (ledgerInfo.hasOffloadContext()) {
+ OffloadContext ctx = ledgerInfo.getOffloadContext();
+ if (ctx.hasDriverMetadata()) {
+ OffloadDriverMetadata driverMetadata = ctx.getDriverMetadata();
+ if (driverMetadata.hasName()) {
+ return driverMetadata.getName();
+ }
+ }
+ }
+ return defaultDriverName;
+ }
+
+ public static void setOffloadDriverMetadata(LedgerInfo.Builder infoBuilder,
+ String driverName,
+ Map<String, String>
offloadDriverMetadata) {
+ infoBuilder.getOffloadContextBuilder()
+ .getDriverMetadataBuilder()
+ .setName(driverName);
+ offloadDriverMetadata.forEach((k, v) -> {
+ infoBuilder.getOffloadContextBuilder()
+ .getDriverMetadataBuilder()
+ .addProperties(KeyValue.newBuilder()
+ .setKey(k)
+ .setValue(v)
+ .build());
+ });
+ }
+
+}
diff --git a/managed-ledger/src/main/proto/MLDataFormats.proto
b/managed-ledger/src/main/proto/MLDataFormats.proto
index 0d5ad3a..4dbd231 100644
--- a/managed-ledger/src/main/proto/MLDataFormats.proto
+++ b/managed-ledger/src/main/proto/MLDataFormats.proto
@@ -21,12 +21,23 @@ syntax = "proto2";
option java_package = "org.apache.bookkeeper.mledger.proto";
option optimize_for = SPEED;
+message KeyValue {
+ required string key = 1;
+ required string value = 2;
+}
+
+message OffloadDriverMetadata {
+ required string name = 1;
+ repeated KeyValue properties = 2;
+}
+
message OffloadContext {
optional int64 uidMsb = 1;
optional int64 uidLsb = 2;
optional bool complete = 3;
optional bool bookkeeperDeleted = 4;
optional int64 timestamp = 5;
+ optional OffloadDriverMetadata driverMetadata = 6;
}
message ManagedLedgerInfo {
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
index 8bbb44a..4bf518f 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
@@ -19,6 +19,7 @@
package org.apache.bookkeeper.mledger.impl;
import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyMap;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.spy;
@@ -30,6 +31,7 @@ import com.google.common.collect.Lists;
import io.netty.buffer.ByteBuf;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
@@ -51,7 +53,6 @@ import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
-import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
@@ -98,25 +99,33 @@ public class OffloadPrefixReadTest extends
MockedBookKeeperTestCase {
for (Entry e : cursor.readEntries(10)) {
Assert.assertEquals(new String(e.getData()), "entry-" + i++);
}
- verify(offloader, times(1)).readOffloaded(anyLong(), anyObject());
- verify(offloader).readOffloaded(anyLong(), eq(firstLedgerUUID));
+ verify(offloader, times(1))
+ .readOffloaded(anyLong(), anyObject(), anyMap());
+ verify(offloader).readOffloaded(anyLong(), eq(firstLedgerUUID),
anyMap());
for (Entry e : cursor.readEntries(10)) {
Assert.assertEquals(new String(e.getData()), "entry-" + i++);
}
- verify(offloader, times(2)).readOffloaded(anyLong(), anyObject());
- verify(offloader).readOffloaded(anyLong(), eq(secondLedgerUUID));
+ verify(offloader, times(2))
+ .readOffloaded(anyLong(), anyObject(), anyMap());
+ verify(offloader).readOffloaded(anyLong(), eq(secondLedgerUUID),
anyMap());
for (Entry e : cursor.readEntries(5)) {
Assert.assertEquals(new String(e.getData()), "entry-" + i++);
}
- verify(offloader, times(2)).readOffloaded(anyLong(), anyObject());
+ verify(offloader, times(2))
+ .readOffloaded(anyLong(), anyObject(), anyMap());
}
static class MockLedgerOffloader implements LedgerOffloader {
ConcurrentHashMap<UUID, ReadHandle> offloads = new
ConcurrentHashMap<UUID, ReadHandle>();
@Override
+ public String getOffloadDriverName() {
+ return "mock";
+ }
+
+ @Override
public CompletableFuture<Void> offload(ReadHandle ledger,
UUID uuid,
Map<String, String>
extraMetadata) {
@@ -131,12 +140,14 @@ public class OffloadPrefixReadTest extends
MockedBookKeeperTestCase {
}
@Override
- public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID
uuid) {
+ public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID
uuid,
+ Map<String, String>
offloadDriverMetadata) {
return CompletableFuture.completedFuture(offloads.get(uuid));
}
@Override
- public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID
uuid) {
+ public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID
uuid,
+ Map<String, String>
offloadDriverMetadata) {
offloads.remove(uuid);
return CompletableFuture.completedFuture(null);
};
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
index 6d21ee2..351d86b 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
@@ -493,9 +493,10 @@ public class OffloadPrefixTest extends
MockedBookKeeperTestCase {
}
@Override
- public CompletableFuture<Void> deleteOffloaded(long ledgerId,
UUID uuid) {
+ public CompletableFuture<Void> deleteOffloaded(long ledgerId,
UUID uuid,
+ Map<String,
String> offloadDriverMetadata) {
deleted.add(Pair.of(ledgerId, uuid));
- return super.deleteOffloaded(ledgerId, uuid);
+ return super.deleteOffloaded(ledgerId, uuid,
offloadDriverMetadata);
}
};
ManagedLedgerConfig config = new ManagedLedgerConfig();
@@ -929,6 +930,11 @@ public class OffloadPrefixTest extends
MockedBookKeeperTestCase {
}
@Override
+ public String getOffloadDriverName() {
+ return "mock";
+ }
+
+ @Override
public CompletableFuture<Void> offload(ReadHandle ledger,
UUID uuid,
Map<String, String>
extraMetadata) {
@@ -942,14 +948,16 @@ public class OffloadPrefixTest extends
MockedBookKeeperTestCase {
}
@Override
- public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID
uuid) {
+ public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID
uuid,
+ Map<String, String>
offloadDriverMetadata) {
CompletableFuture<ReadHandle> promise = new CompletableFuture<>();
promise.completeExceptionally(new UnsupportedOperationException());
return promise;
}
@Override
- public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID
uuid) {
+ public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID
uuid,
+ Map<String, String>
offloadDriverMetadata) {
CompletableFuture<Void> promise = new CompletableFuture<>();
if (offloads.remove(ledgerId, uuid)) {
deletes.put(ledgerId, uuid);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
index dc925a2..63b4d84 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
@@ -23,6 +23,7 @@ import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
import com.google.common.collect.Sets;
@@ -78,6 +79,8 @@ public class AdminApiOffloadTest extends
MockedPulsarServiceBaseTest {
private void testOffload(String topicName, String mlName) throws Exception
{
LedgerOffloader offloader = mock(LedgerOffloader.class);
+ when(offloader.getOffloadDriverName()).thenReturn("mock");
+
doReturn(offloader).when(pulsar).getManagedLedgerOffloader();
CompletableFuture<Void> promise = new CompletableFuture<>();
diff --git a/tiered-storage/jcloud/pom.xml b/tiered-storage/jcloud/pom.xml
index eb4636f..fcd3300 100644
--- a/tiered-storage/jcloud/pom.xml
+++ b/tiered-storage/jcloud/pom.xml
@@ -42,6 +42,7 @@
<groupId>org.apache.pulsar</groupId>
<artifactId>jclouds-shaded</artifactId>
<version>${project.version}</version>
+ <!--
<exclusions>
<exclusion>
<groupId>com.google.code.gson</groupId>
@@ -68,6 +69,7 @@
<artifactId>*</artifactId>
</exclusion>
</exclusions>
+ -->
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
diff --git
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
index e09a66b..f96afaf 100644
---
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
+++
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
@@ -36,6 +36,9 @@ import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import lombok.Data;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.mledger.LedgerOffloader;
@@ -43,6 +46,7 @@ import
org.apache.bookkeeper.mledger.offload.jcloud.BlockAwareSegmentInputStream
import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlock;
import
org.apache.bookkeeper.mledger.offload.jcloud.TieredStorageConfigurationData;
import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockBuilder;
+import org.apache.commons.lang3.tuple.Pair;
import org.jclouds.Constants;
import org.jclouds.ContextBuilder;
import org.jclouds.blobstore.BlobStore;
@@ -66,6 +70,10 @@ import org.slf4j.LoggerFactory;
public class BlobStoreManagedLedgerOffloader implements LedgerOffloader {
private static final Logger log =
LoggerFactory.getLogger(BlobStoreManagedLedgerOffloader.class);
+ private static final String METADATA_FIELD_BUCKET = "bucket";
+ private static final String METADATA_FIELD_REGION = "region";
+ private static final String METADATA_FIELD_ENDPOINT = "endpoint";
+
public static final String[] DRIVER_NAMES = {"S3", "aws-s3",
"google-cloud-storage"};
// use these keys for both s3 and gcs.
@@ -91,6 +99,42 @@ public class BlobStoreManagedLedgerOffloader implements
LedgerOffloader {
blobBuilder.userMetadata(metadataBuilder.build());
}
+ @Data(staticConstructor = "of")
+ private static class BlobStoreLocation {
+ private final String region;
+ private final String endpoint;
+ }
+
+ private static Pair<BlobStoreLocation, BlobStore> createBlobStore(String
driver,
+ String
region,
+ String
endpoint,
+
Credentials credentials,
+ int
maxBlockSize) {
+ Properties overrides = new Properties();
+ // This property controls the number of parts being uploaded in
parallel.
+ overrides.setProperty("jclouds.mpu.parallel.degree", "1");
+ overrides.setProperty("jclouds.mpu.parts.size",
Integer.toString(maxBlockSize));
+ overrides.setProperty(Constants.PROPERTY_SO_TIMEOUT, "25000");
+ overrides.setProperty(Constants.PROPERTY_MAX_RETRIES,
Integer.toString(100));
+
+ ContextBuilder contextBuilder = ContextBuilder.newBuilder(driver);
+ contextBuilder.credentials(credentials.identity,
credentials.credential);
+
+ if (isS3Driver(driver) && !Strings.isNullOrEmpty(endpoint)) {
+ contextBuilder.endpoint(endpoint);
+
overrides.setProperty(S3Constants.PROPERTY_S3_VIRTUAL_HOST_BUCKETS, "false");
+ }
+ contextBuilder.overrides(overrides);
+ BlobStoreContext context =
contextBuilder.buildView(BlobStoreContext.class);
+ BlobStore blobStore = context.getBlobStore();
+
+ log.info("Connect to blobstore : driver: {}, region: {}, endpoint: {}",
+ driver, region, endpoint);
+ return Pair.of(
+ BlobStoreLocation.of(region, endpoint),
+ blobStore);
+ }
+
private final VersionCheck VERSION_CHECK = (key, blob) -> {
// NOTE all metadata in jclouds comes out as lowercase, in an effort
to normalize the providers
String version =
blob.getMetadata().getUserMetadata().get(METADATA_FORMAT_VERSION_KEY.toLowerCase());
@@ -102,16 +146,28 @@ public class BlobStoreManagedLedgerOffloader implements
LedgerOffloader {
private final OrderedScheduler scheduler;
- // container in jclouds
- private final String bucket;
+ // container in jclouds to write offloaded ledgers
+ private final String writeBucket;
+ // the region to write offloaded ledgers
+ private final String writeRegion;
+ // the endpoint
+ private final String writeEndpoint;
+ // credentials
+ private final Credentials credentials;
+
// max block size for each data block.
private int maxBlockSize;
private final int readBufferSize;
- private BlobStoreContext context;
- private BlobStore blobStore;
- Location location = null;
+ private final BlobStore writeBlobStore;
+ private final Location writeLocation;
+
+ private final ConcurrentMap<BlobStoreLocation, BlobStore> readBlobStores =
new ConcurrentHashMap<>();
+
+ // metadata to be stored as part of the offloaded ledger metadata
private final Map<String, String> userMetadata;
+ // offload driver metadata to be stored as part of the original ledger
metadata
+ private final String offloadDriverName;
@VisibleForTesting
static BlobStoreManagedLedgerOffloader
create(TieredStorageConfigurationData conf,
@@ -124,6 +180,9 @@ public class BlobStoreManagedLedgerOffloader implements
LedgerOffloader {
OrderedScheduler
scheduler)
throws IOException {
String driver = conf.getManagedLedgerOffloadDriver();
+ if ("s3".equals(driver.toLowerCase())) {
+ driver = "aws-s3";
+ }
if (!driverSupported(driver)) {
throw new IOException(
"Not support this kind of driver as offload backend: " +
driver);
@@ -217,35 +276,34 @@ public class BlobStoreManagedLedgerOffloader implements
LedgerOffloader {
int maxBlockSize, int readBufferSize,
String endpoint, String region,
Credentials credentials,
Map<String, String> userMetadata) {
+ this.offloadDriverName = driver;
this.scheduler = scheduler;
this.readBufferSize = readBufferSize;
- this.bucket = container;
+ this.writeBucket = container;
+ this.writeRegion = region;
+ this.writeEndpoint = endpoint;
this.maxBlockSize = maxBlockSize;
this.userMetadata = userMetadata;
+ this.credentials = credentials;
- Properties overrides = new Properties();
- // This property controls the number of parts being uploaded in
parallel.
- overrides.setProperty("jclouds.mpu.parallel.degree", "1");
- overrides.setProperty("jclouds.mpu.parts.size",
Integer.toString(maxBlockSize));
- overrides.setProperty(Constants.PROPERTY_SO_TIMEOUT, "25000");
- overrides.setProperty(Constants.PROPERTY_MAX_RETRIES,
Integer.toString(100));
-
- ContextBuilder contextBuilder = ContextBuilder.newBuilder(driver);
- contextBuilder.credentials(credentials.identity,
credentials.credential);
-
- if (isS3Driver(driver) && !Strings.isNullOrEmpty(endpoint)) {
- contextBuilder.endpoint(endpoint);
-
overrides.setProperty(S3Constants.PROPERTY_S3_VIRTUAL_HOST_BUCKETS, "false");
- }
if (!Strings.isNullOrEmpty(region)) {
- this.location = new
LocationBuilder().scope(LocationScope.REGION).id(region).description(region).build();
+ this.writeLocation = new LocationBuilder()
+ .scope(LocationScope.REGION)
+ .id(region)
+ .description(region)
+ .build();
+ } else {
+ this.writeLocation = null;
}
- log.info("Constructor driver: {}, host: {}, container: {}, region: {}
", driver, endpoint, bucket, region);
+ log.info("Constructor offload driver: {}, host: {}, container: {},
region: {} ",
+ driver, endpoint, container, region);
- contextBuilder.overrides(overrides);
- this.context = contextBuilder.buildView(BlobStoreContext.class);
- this.blobStore = context.getBlobStore();
+ Pair<BlobStoreLocation, BlobStore> blobStore = createBlobStore(
+ driver, region, endpoint, credentials, maxBlockSize
+ );
+ this.writeBlobStore = blobStore.getRight();
+ this.readBlobStores.put(blobStore.getLeft(), blobStore.getRight());
}
// build context for jclouds BlobStoreContext, mostly used in test
@@ -258,12 +316,22 @@ public class BlobStoreManagedLedgerOffloader implements
LedgerOffloader {
BlobStoreManagedLedgerOffloader(BlobStore blobStore, String container,
OrderedScheduler scheduler,
int maxBlockSize, int readBufferSize,
Map<String, String> userMetadata) {
+ this.offloadDriverName = "aws-s3";
this.scheduler = scheduler;
this.readBufferSize = readBufferSize;
- this.bucket = container;
+ this.writeBucket = container;
+ this.writeRegion = null;
+ this.writeEndpoint = null;
this.maxBlockSize = maxBlockSize;
- this.blobStore = blobStore;
+ this.writeBlobStore = blobStore;
+ this.writeLocation = null;
this.userMetadata = userMetadata;
+ this.credentials = null;
+
+ readBlobStores.put(
+ BlobStoreLocation.of(writeRegion, writeEndpoint),
+ blobStore
+ );
}
static String dataBlockOffloadKey(long ledgerId, UUID uuid) {
@@ -274,12 +342,26 @@ public class BlobStoreManagedLedgerOffloader implements
LedgerOffloader {
return String.format("%s-ledger-%d-index", uuid.toString(), ledgerId);
}
- public boolean createBucket() {
- return blobStore.createContainerInLocation(location, bucket);
+ public boolean createBucket(String bucket) {
+ return writeBlobStore.createContainerInLocation(writeLocation, bucket);
+ }
+
+ public void deleteBucket(String bucket) {
+ writeBlobStore.deleteContainer(bucket);
+ }
+
+ @Override
+ public String getOffloadDriverName() {
+ return offloadDriverName;
}
- public void deleteBucket() {
- blobStore.deleteContainer(bucket);
+ @Override
+ public Map<String, String> getOffloadDriverMetadata() {
+ return ImmutableMap.of(
+ METADATA_FIELD_BUCKET, writeBucket,
+ METADATA_FIELD_REGION, writeRegion,
+ METADATA_FIELD_ENDPOINT, writeEndpoint
+ );
}
// upload DataBlock to s3 using MultiPartUpload, and indexBlock in a new
Block,
@@ -305,10 +387,10 @@ public class BlobStoreManagedLedgerOffloader implements
LedgerOffloader {
// init multi part upload for data block.
try {
- BlobBuilder blobBuilder = blobStore.blobBuilder(dataBlockKey);
+ BlobBuilder blobBuilder =
writeBlobStore.blobBuilder(dataBlockKey);
addVersionInfo(blobBuilder, userMetadata);
Blob blob = blobBuilder.build();
- mpu = blobStore.initiateMultipartUpload(bucket,
blob.getMetadata(), new PutOptions());
+ mpu = writeBlobStore.initiateMultipartUpload(writeBucket,
blob.getMetadata(), new PutOptions());
} catch (Throwable t) {
promise.completeExceptionally(t);
return;
@@ -330,9 +412,9 @@ public class BlobStoreManagedLedgerOffloader implements
LedgerOffloader {
Payload partPayload =
Payloads.newInputStreamPayload(blockStream);
partPayload.getContentMetadata().setContentLength((long)blockSize);
partPayload.getContentMetadata().setContentType("application/octet-stream");
- parts.add(blobStore.uploadMultipartPart(mpu, partId,
partPayload));
+ parts.add(writeBlobStore.uploadMultipartPart(mpu,
partId, partPayload));
log.debug("UploadMultipartPart. container: {},
blobName: {}, partId: {}, mpu: {}",
- bucket, dataBlockKey, partId, mpu.id());
+ writeBucket, dataBlockKey, partId, mpu.id());
indexBuilder.addBlock(startEntry, partId, blockSize);
@@ -349,16 +431,16 @@ public class BlobStoreManagedLedgerOffloader implements
LedgerOffloader {
dataObjectLength += blockSize;
}
- blobStore.completeMultipartUpload(mpu, parts);
+ writeBlobStore.completeMultipartUpload(mpu, parts);
mpu = null;
} catch (Throwable t) {
try {
if (mpu != null) {
- blobStore.abortMultipartUpload(mpu);
+ writeBlobStore.abortMultipartUpload(mpu);
}
} catch (Throwable throwable) {
log.error("Failed abortMultipartUpload in bucket - {} with
key - {}, uploadId - {}.",
- bucket, dataBlockKey, mpu.id(), throwable);
+ writeBucket, dataBlockKey, mpu.id(), throwable);
}
promise.completeExceptionally(t);
return;
@@ -368,7 +450,7 @@ public class BlobStoreManagedLedgerOffloader implements
LedgerOffloader {
try (OffloadIndexBlock index =
indexBuilder.withDataObjectLength(dataObjectLength).build();
OffloadIndexBlock.IndexInputStream indexStream =
index.toStream()) {
// write the index block
- BlobBuilder blobBuilder = blobStore.blobBuilder(indexBlockKey);
+ BlobBuilder blobBuilder =
writeBlobStore.blobBuilder(indexBlockKey);
addVersionInfo(blobBuilder, userMetadata);
Payload indexPayload =
Payloads.newInputStreamPayload(indexStream);
indexPayload.getContentMetadata().setContentLength((long)indexStream.getStreamSize());
@@ -379,14 +461,14 @@ public class BlobStoreManagedLedgerOffloader implements
LedgerOffloader {
.contentLength((long)indexStream.getStreamSize())
.build();
- blobStore.putBlob(bucket, blob);
+ writeBlobStore.putBlob(writeBucket, blob);
promise.complete(null);
} catch (Throwable t) {
try {
- blobStore.removeBlob(bucket, dataBlockKey);
+ writeBlobStore.removeBlob(writeBucket, dataBlockKey);
} catch (Throwable throwable) {
log.error("Failed deleteObject in bucket - {} with key -
{}.",
- bucket, dataBlockKey, throwable);
+ writeBucket, dataBlockKey, throwable);
}
promise.completeExceptionally(t);
return;
@@ -395,16 +477,57 @@ public class BlobStoreManagedLedgerOffloader implements
LedgerOffloader {
return promise;
}
+ String getReadRegion(Map<String, String> offloadDriverMetadata) {
+ return offloadDriverMetadata.getOrDefault(METADATA_FIELD_REGION,
writeRegion);
+ }
+
+ String getReadBucket(Map<String, String> offloadDriverMetadata) {
+ return offloadDriverMetadata.getOrDefault(METADATA_FIELD_BUCKET,
writeBucket);
+ }
+
+ String getReadEndpoint(Map<String, String> offloadDriverMetadata) {
+ return offloadDriverMetadata.getOrDefault(METADATA_FIELD_ENDPOINT,
writeEndpoint);
+ }
+
+ BlobStore getReadBlobStore(Map<String, String> offloadDriverMetadata) {
+ BlobStoreLocation location = BlobStoreLocation.of(
+ getReadRegion(offloadDriverMetadata),
+ getReadEndpoint(offloadDriverMetadata)
+ );
+ BlobStore blobStore = readBlobStores.get(location);
+ if (null == blobStore) {
+ blobStore = createBlobStore(
+ offloadDriverName,
+ location.getRegion(),
+ location.getEndpoint(),
+ credentials,
+ maxBlockSize
+ ).getRight();
+ BlobStore existingBlobStore = readBlobStores.putIfAbsent(location,
blobStore);
+ if (null == existingBlobStore) {
+ return blobStore;
+ } else {
+ return existingBlobStore;
+ }
+ } else {
+ return blobStore;
+ }
+ }
+
@Override
- public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID
uid) {
+ public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID uid,
+ Map<String, String>
offloadDriverMetadata) {
+ String readBucket = getReadBucket(offloadDriverMetadata);
+ BlobStore readBlobstore = getReadBlobStore(offloadDriverMetadata);
+
CompletableFuture<ReadHandle> promise = new CompletableFuture<>();
String key = dataBlockOffloadKey(ledgerId, uid);
String indexKey = indexBlockOffloadKey(ledgerId, uid);
scheduler.chooseThread(ledgerId).submit(() -> {
try {
promise.complete(BlobStoreBackedReadHandleImpl.open(scheduler.chooseThread(ledgerId),
- blobStore,
- bucket, key,
indexKey,
+ readBlobstore,
+ readBucket,
key, indexKey,
VERSION_CHECK,
ledgerId,
readBufferSize));
} catch (Throwable t) {
@@ -418,11 +541,15 @@ public class BlobStoreManagedLedgerOffloader implements
LedgerOffloader {
@Override
- public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid) {
+ public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid,
+ Map<String, String>
offloadDriverMetadata) {
+ String readBucket = getReadBucket(offloadDriverMetadata);
+ BlobStore readBlobstore = getReadBlobStore(offloadDriverMetadata);
+
CompletableFuture<Void> promise = new CompletableFuture<>();
scheduler.chooseThread(ledgerId).submit(() -> {
try {
- blobStore.removeBlobs(bucket,
+ readBlobstore.removeBlobs(readBucket,
ImmutableList.of(dataBlockOffloadKey(ledgerId, uid),
indexBlockOffloadKey(ledgerId, uid)));
promise.complete(null);
} catch (Throwable t) {
diff --git
a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
index fca1ef2..eb88d37 100644
---
a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
+++
b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
@@ -30,6 +30,7 @@ import java.io.File;
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -252,7 +253,7 @@ class BlobStoreManagedLedgerOffloaderTest extends
BlobStoreTestBase {
UUID uuid = UUID.randomUUID();
offloader.offload(toWrite, uuid, new HashMap<>()).get();
- ReadHandle toTest = offloader.readOffloaded(toWrite.getId(),
uuid).get();
+ ReadHandle toTest = offloader.readOffloaded(toWrite.getId(), uuid,
Collections.emptyMap()).get();
Assert.assertEquals(toTest.getLastAddConfirmed(),
toWrite.getLastAddConfirmed());
try (LedgerEntries toWriteEntries = toWrite.read(0,
toWrite.getLastAddConfirmed());
@@ -406,7 +407,7 @@ class BlobStoreManagedLedgerOffloaderTest extends
BlobStoreTestBase {
UUID uuid = UUID.randomUUID();
offloader.offload(toWrite, uuid, new HashMap<>()).get();
- ReadHandle toTest = offloader.readOffloaded(toWrite.getId(),
uuid).get();
+ ReadHandle toTest = offloader.readOffloaded(toWrite.getId(), uuid,
Collections.emptyMap()).get();
Assert.assertEquals(toTest.getLastAddConfirmed(),
toWrite.getLastAddConfirmed());
for (long[] access : randomAccesses) {
@@ -438,7 +439,7 @@ class BlobStoreManagedLedgerOffloaderTest extends
BlobStoreTestBase {
UUID uuid = UUID.randomUUID();
offloader.offload(toWrite, uuid, new HashMap<>()).get();
- ReadHandle toTest = offloader.readOffloaded(toWrite.getId(),
uuid).get();
+ ReadHandle toTest = offloader.readOffloaded(toWrite.getId(), uuid,
Collections.emptyMap()).get();
Assert.assertEquals(toTest.getLastAddConfirmed(),
toWrite.getLastAddConfirmed());
try {
@@ -467,7 +468,7 @@ class BlobStoreManagedLedgerOffloaderTest extends
BlobStoreTestBase {
Assert.assertTrue(blobStore.blobExists(BUCKET,
BlobStoreManagedLedgerOffloader.indexBlockOffloadKey(readHandle.getId(),
uuid)));
// verify object deleted after delete
- offloader.deleteOffloaded(readHandle.getId(), uuid).get();
+ offloader.deleteOffloaded(readHandle.getId(), uuid,
Collections.emptyMap()).get();
Assert.assertFalse(blobStore.blobExists(BUCKET,
BlobStoreManagedLedgerOffloader.dataBlockOffloadKey(readHandle.getId(), uuid)));
Assert.assertFalse(blobStore.blobExists(BUCKET,
BlobStoreManagedLedgerOffloader.indexBlockOffloadKey(readHandle.getId(),
uuid)));
}
@@ -492,7 +493,7 @@ class BlobStoreManagedLedgerOffloaderTest extends
BlobStoreTestBase {
Assert.assertTrue(blobStore.blobExists(BUCKET,
BlobStoreManagedLedgerOffloader.dataBlockOffloadKey(readHandle.getId(), uuid)));
Assert.assertTrue(blobStore.blobExists(BUCKET,
BlobStoreManagedLedgerOffloader.indexBlockOffloadKey(readHandle.getId(),
uuid)));
- offloader.deleteOffloaded(readHandle.getId(), uuid).get();
+ offloader.deleteOffloaded(readHandle.getId(), uuid,
Collections.emptyMap()).get();
} catch (Exception e) {
// expected
Assert.assertTrue(e.getCause().getMessage().contains(failureString));
@@ -542,7 +543,7 @@ class BlobStoreManagedLedgerOffloaderTest extends
BlobStoreTestBase {
userMeta.put(BlobStoreManagedLedgerOffloader.METADATA_FORMAT_VERSION_KEY.toLowerCase(),
String.valueOf(-12345));
blobStore.copyBlob(BUCKET, dataKey, BUCKET, dataKey,
CopyOptions.builder().userMetadata(userMeta).build());
- try (ReadHandle toRead = offloader.readOffloaded(toWrite.getId(),
uuid).get()) {
+ try (ReadHandle toRead = offloader.readOffloaded(toWrite.getId(),
uuid, Collections.emptyMap()).get()) {
toRead.readAsync(0, 0).get();
Assert.fail("Shouldn't have been able to read");
} catch (ExecutionException e) {
@@ -554,7 +555,7 @@ class BlobStoreManagedLedgerOffloaderTest extends
BlobStoreTestBase {
userMeta.put(BlobStoreManagedLedgerOffloader.METADATA_FORMAT_VERSION_KEY.toLowerCase(),
String.valueOf(12345));
blobStore.copyBlob(BUCKET, dataKey, BUCKET, dataKey,
CopyOptions.builder().userMetadata(userMeta).build());
- try (ReadHandle toRead = offloader.readOffloaded(toWrite.getId(),
uuid).get()) {
+ try (ReadHandle toRead = offloader.readOffloaded(toWrite.getId(),
uuid, Collections.emptyMap()).get()) {
toRead.readAsync(0, 0).get();
Assert.fail("Shouldn't have been able to read");
} catch (ExecutionException e) {
@@ -581,7 +582,7 @@ class BlobStoreManagedLedgerOffloaderTest extends
BlobStoreTestBase {
blobStore.copyBlob(BUCKET, indexKey, BUCKET, indexKey,
CopyOptions.builder().userMetadata(userMeta).build());
try {
- offloader.readOffloaded(toWrite.getId(), uuid).get();
+ offloader.readOffloaded(toWrite.getId(), uuid,
Collections.emptyMap()).get();
Assert.fail("Shouldn't have been able to open");
} catch (ExecutionException e) {
Assert.assertEquals(e.getCause().getClass(), IOException.class);
@@ -592,7 +593,7 @@ class BlobStoreManagedLedgerOffloaderTest extends
BlobStoreTestBase {
blobStore.copyBlob(BUCKET, indexKey, BUCKET, indexKey,
CopyOptions.builder().userMetadata(userMeta).build());
try {
- offloader.readOffloaded(toWrite.getId(), uuid).get();
+ offloader.readOffloaded(toWrite.getId(), uuid,
Collections.emptyMap()).get();
Assert.fail("Shouldn't have been able to open");
} catch (ExecutionException e) {
Assert.assertEquals(e.getCause().getClass(), IOException.class);