This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7c09f5c Configurable data source for offloaded messages (#8717)
7c09f5c is described below
commit 7c09f5ce649edcca0be792198d97573197c5a272
Author: Renkai Ge <[email protected]>
AuthorDate: Fri Jan 8 13:01:17 2021 +0800
Configurable data source for offloaded messages (#8717)
Fix issue: https://github.com/apache/pulsar/issues/8591
This PR include:
* API change in command tools
* Related implementation with tests
* Related docs in cookbook
By the way log4j dependency is removed for module `managed-ledger` because
now the whole project use log4j2 as the default logger framework.
---
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 14 ++-
managed-ledger/src/main/proto/MLDataFormats.proto | 4 +-
.../mledger/impl/OffloadPrefixReadTest.java | 125 ++++++++++++++++++---
.../bookkeeper/mledger/impl/OffloadPrefixTest.java | 10 +-
.../apache/pulsar/broker/ServiceConfiguration.java | 26 +++--
.../pulsar/broker/admin/AdminApiOffloadTest.java | 8 +-
.../apache/pulsar/broker/admin/NamespacesTest.java | 13 ++-
.../common/naming/ServiceConfigurationTest.java | 3 +-
.../configurations/pulsar_broker_test.conf | 5 +-
pulsar-broker/src/test/resources/logback.xml | 2 +-
.../pulsar/admin/cli/PulsarAdminToolTest.java | 10 +-
.../org/apache/pulsar/admin/cli/CmdNamespaces.java | 40 +++++--
.../org/apache/pulsar/admin/cli/CmdTopics.java | 27 ++++-
.../common/policies/data/OffloadPolicies.java | 92 ++++++++++++---
.../common/policies/data/OffloadPoliciesTest.java | 9 +-
site2/docs/cookbooks-tiered-storage.md | 10 ++
16 files changed, 312 insertions(+), 86 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index e295566..7713545 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -123,6 +123,7 @@ import org.apache.bookkeeper.mledger.util.CallbackMutex;
import org.apache.bookkeeper.mledger.util.Futures;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
+import
org.apache.pulsar.common.policies.data.OffloadPolicies.OffloadedReadPriority;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.apache.pulsar.metadata.api.Stat;
import org.slf4j.Logger;
@@ -1692,7 +1693,18 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
LedgerInfo info = ledgers.get(ledgerId);
CompletableFuture<ReadHandle> openFuture = new
CompletableFuture<>();
- if (info != null && info.hasOffloadContext() &&
info.getOffloadContext().getComplete()) {
+
+ if (config.getLedgerOffloader() != null
+ && config.getLedgerOffloader().getOffloadPolicies() != null
+ && config.getLedgerOffloader().getOffloadPolicies()
+ .getManagedLedgerOffloadedReadPriority() ==
OffloadedReadPriority.BOOKKEEPER_FIRST
+ && info != null && info.hasOffloadContext()
+ && !info.getOffloadContext().getBookkeeperDeleted()) {
+ openFuture =
bookKeeper.newOpenLedgerOp().withRecovery(!isReadOnly()).withLedgerId(ledgerId)
+
.withDigestType(config.getDigestType()).withPassword(config.getPassword()).execute();
+
+ } else if (info != null && info.hasOffloadContext() &&
info.getOffloadContext().getComplete()) {
+
UUID uid = new UUID(info.getOffloadContext().getUidMsb(),
info.getOffloadContext().getUidLsb());
// TODO: improve this to load ledger offloader by driver name
recorded in metadata
Map<String, String> offloadDriverMetadata =
OffloadUtils.getOffloadDriverMetadata(info);
diff --git a/managed-ledger/src/main/proto/MLDataFormats.proto
b/managed-ledger/src/main/proto/MLDataFormats.proto
index 8b1ecbf8..b7dc580 100644
--- a/managed-ledger/src/main/proto/MLDataFormats.proto
+++ b/managed-ledger/src/main/proto/MLDataFormats.proto
@@ -48,8 +48,8 @@ message ManagedLedgerInfo {
optional int64 timestamp = 4;
optional OffloadContext offloadContext = 5;
}
-
- repeated LedgerInfo ledgerInfo = 1;
+
+ repeated LedgerInfo ledgerInfo = 1;
// If present, it signals the managed ledger has been
// terminated and this was the position of the last
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
index d3d24b2..69011cac 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
@@ -18,19 +18,20 @@
*/
package org.apache.bookkeeper.mledger.impl;
+import static
org.apache.bookkeeper.mledger.impl.OffloadPrefixTest.assertEventuallyTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
-
+import static org.testng.Assert.assertEquals;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
-
import io.netty.buffer.ByteBuf;
-
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@@ -39,7 +40,6 @@ import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
-
import org.apache.bookkeeper.client.api.DigestType;
import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
import org.apache.bookkeeper.client.api.LedgerEntries;
@@ -52,10 +52,12 @@ import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
+import org.apache.bookkeeper.mledger.util.MockClock;
import org.apache.bookkeeper.net.BookieId;
-import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
+import
org.apache.pulsar.common.policies.data.OffloadPolicies.OffloadedReadPriority;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -69,53 +71,140 @@ public class OffloadPrefixReadTest extends
MockedBookKeeperTestCase {
config.setRetentionTime(10, TimeUnit.MINUTES);
config.setRetentionSizeInMB(10);
config.setLedgerOffloader(offloader);
- ManagedLedgerImpl ledger =
(ManagedLedgerImpl)factory.open("my_test_ledger", config);
+ ManagedLedgerImpl ledger = (ManagedLedgerImpl)
factory.open("my_test_ledger", config);
for (int i = 0; i < 25; i++) {
String content = "entry-" + i;
ledger.addEntry(content.getBytes());
}
- Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 3);
+ assertEquals(ledger.getLedgersInfoAsList().size(), 3);
ledger.offloadPrefix(ledger.getLastConfirmedEntry());
- Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 3);
- Assert.assertEquals(ledger.getLedgersInfoAsList().stream()
- .filter(e ->
e.getOffloadContext().getComplete()).count(), 2);
+ assertEquals(ledger.getLedgersInfoAsList().size(), 3);
Assert.assertTrue(ledger.getLedgersInfoAsList().get(0).getOffloadContext().getComplete());
Assert.assertTrue(ledger.getLedgersInfoAsList().get(1).getOffloadContext().getComplete());
+
Assert.assertFalse(ledger.getLedgersInfoAsList().get(2).getOffloadContext().getComplete());
UUID firstLedgerUUID = new
UUID(ledger.getLedgersInfoAsList().get(0).getOffloadContext().getUidMsb(),
-
ledger.getLedgersInfoAsList().get(0).getOffloadContext().getUidLsb());
+
ledger.getLedgersInfoAsList().get(0).getOffloadContext().getUidLsb());
UUID secondLedgerUUID = new
UUID(ledger.getLedgersInfoAsList().get(1).getOffloadContext().getUidMsb(),
-
ledger.getLedgersInfoAsList().get(1).getOffloadContext().getUidLsb());
+
ledger.getLedgersInfoAsList().get(1).getOffloadContext().getUidLsb());
ManagedCursor cursor =
ledger.newNonDurableCursor(PositionImpl.earliest);
int i = 0;
for (Entry e : cursor.readEntries(10)) {
- Assert.assertEquals(new String(e.getData()), "entry-" + i++);
+ assertEquals(new String(e.getData()), "entry-" + i++);
}
verify(offloader, times(1))
.readOffloaded(anyLong(), any(), anyMap());
verify(offloader).readOffloaded(anyLong(), eq(firstLedgerUUID),
anyMap());
for (Entry e : cursor.readEntries(10)) {
- Assert.assertEquals(new String(e.getData()), "entry-" + i++);
+ assertEquals(new String(e.getData()), "entry-" + i++);
}
verify(offloader, times(2))
- .readOffloaded(anyLong(), any(), anyMap());
+ .readOffloaded(anyLong(), any(), anyMap());
verify(offloader).readOffloaded(anyLong(), eq(secondLedgerUUID),
anyMap());
for (Entry e : cursor.readEntries(5)) {
- Assert.assertEquals(new String(e.getData()), "entry-" + i++);
+ assertEquals(new String(e.getData()), "entry-" + i++);
}
verify(offloader, times(2))
- .readOffloaded(anyLong(), any(), anyMap());
+ .readOffloaded(anyLong(), any(), anyMap());
+ }
+
+ @Test
+ public void testBookkeeperFirstOffloadRead() throws Exception {
+ MockLedgerOffloader offloader = spy(new MockLedgerOffloader());
+ MockClock clock = new MockClock();
+ offloader.getOffloadPolicies()
+
.setManagedLedgerOffloadedReadPriority(OffloadedReadPriority.BOOKKEEPER_FIRST);
+ //delete after 5 minutes
+ offloader.getOffloadPolicies()
+ .setManagedLedgerOffloadDeletionLagInMillis(300000L);
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setMaxEntriesPerLedger(10);
+ config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
+ config.setRetentionTime(10, TimeUnit.MINUTES);
+ config.setRetentionSizeInMB(10);
+ config.setLedgerOffloader(offloader);
+ config.setClock(clock);
+
+
+ ManagedLedgerImpl ledger = (ManagedLedgerImpl)
factory.open("my_bookkeeper_first_test_ledger", config);
+
+ for (int i = 0; i < 25; i++) {
+ String content = "entry-" + i;
+ ledger.addEntry(content.getBytes());
+ }
+ assertEquals(ledger.getLedgersInfoAsList().size(), 3);
+
+ ledger.offloadPrefix(ledger.getLastConfirmedEntry());
+
+ assertEquals(ledger.getLedgersInfoAsList().size(), 3);
+ assertEquals(ledger.getLedgersInfoAsList().stream()
+ .filter(e -> e.getOffloadContext().getComplete()).count(), 2);
+
+ LedgerInfo firstLedger = ledger.getLedgersInfoAsList().get(0);
+ Assert.assertTrue(firstLedger.getOffloadContext().getComplete());
+ LedgerInfo secondLedger;
+ secondLedger = ledger.getLedgersInfoAsList().get(1);
+ Assert.assertTrue(secondLedger.getOffloadContext().getComplete());
+
+ UUID firstLedgerUUID = new
UUID(firstLedger.getOffloadContext().getUidMsb(),
+ firstLedger.getOffloadContext().getUidLsb());
+ UUID secondLedgerUUID = new
UUID(secondLedger.getOffloadContext().getUidMsb(),
+ secondLedger.getOffloadContext().getUidLsb());
+
+ ManagedCursor cursor =
ledger.newNonDurableCursor(PositionImpl.earliest);
+ int i = 0;
+ for (Entry e : cursor.readEntries(10)) {
+ Assert.assertEquals(new String(e.getData()), "entry-" + i++);
+ }
+ // For offloaded first and not deleted ledgers, they should be read
from bookkeeper.
+ verify(offloader, never())
+ .readOffloaded(anyLong(), any(), anyMap());
+
+ // Delete offladed message from bookkeeper
+ assertEventuallyTrue(() ->
bkc.getLedgers().contains(firstLedger.getLedgerId()));
+ assertEventuallyTrue(() ->
bkc.getLedgers().contains(secondLedger.getLedgerId()));
+ clock.advance(6, TimeUnit.MINUTES);
+ CompletableFuture<Void> promise = new CompletableFuture<>();
+ ledger.internalTrimConsumedLedgers(promise);
+ promise.join();
+
+ // assert bk ledger is deleted
+ assertEventuallyTrue(() ->
!bkc.getLedgers().contains(firstLedger.getLedgerId()));
+ assertEventuallyTrue(() ->
!bkc.getLedgers().contains(secondLedger.getLedgerId()));
+
Assert.assertTrue(ledger.getLedgersInfoAsList().get(0).getOffloadContext().getBookkeeperDeleted());
+
Assert.assertTrue(ledger.getLedgersInfoAsList().get(1).getOffloadContext().getBookkeeperDeleted());
+
+ for (Entry e : cursor.readEntries(10)) {
+ Assert.assertEquals(new String(e.getData()), "entry-" + i++);
+ }
+
+ // Ledgers deleted from bookkeeper, now should read from offloader
+ verify(offloader, atLeastOnce())
+ .readOffloaded(anyLong(), any(), anyMap());
+ verify(offloader).readOffloaded(anyLong(), eq(secondLedgerUUID),
anyMap());
+
}
+
static class MockLedgerOffloader implements LedgerOffloader {
ConcurrentHashMap<UUID, ReadHandle> offloads = new
ConcurrentHashMap<UUID, ReadHandle>();
+
+ OffloadPolicies offloadPolicies = OffloadPolicies.create("S3", "", "",
"",
+ null, null,
+ OffloadPolicies.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES,
+ OffloadPolicies.DEFAULT_READ_BUFFER_SIZE_IN_BYTES,
+ OffloadPolicies.DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES,
+ OffloadPolicies.DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS,
+ OffloadPolicies.DEFAULT_OFFLOADED_READ_PRIORITY);
+
+
@Override
public String getOffloadDriverName() {
return "mock";
@@ -150,7 +239,7 @@ public class OffloadPrefixReadTest extends
MockedBookKeeperTestCase {
@Override
public OffloadPolicies getOffloadPolicies() {
- return null;
+ return offloadPolicies;
}
@Override
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
index 807876e..85543c5 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
@@ -23,22 +23,19 @@ import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
-
import com.google.common.collect.ImmutableSet;
-
import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.BooleanSupplier;
import java.util.stream.Collectors;
-
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.mledger.AsyncCallbacks.OffloadCallback;
@@ -50,11 +47,9 @@ import org.apache.bookkeeper.mledger.Position;
import
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.commons.lang3.tuple.Pair;
-
import org.apache.pulsar.common.policies.data.OffloadPolicies;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import org.testng.annotations.Test;
public class OffloadPrefixTest extends MockedBookKeeperTestCase {
@@ -995,7 +990,8 @@ public class OffloadPrefixTest extends
MockedBookKeeperTestCase {
OffloadPolicies.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES,
OffloadPolicies.DEFAULT_READ_BUFFER_SIZE_IN_BYTES,
OffloadPolicies.DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES,
- OffloadPolicies.DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS);
+ OffloadPolicies.DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS,
+ OffloadPolicies.DEFAULT_OFFLOADED_READ_PRIORITY);
@Override
public String getOffloadDriverName() {
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 1482835..ea457ab 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -21,9 +21,7 @@ package org.apache.pulsar.broker;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
-
import io.netty.util.internal.PlatformDependent;
-
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
@@ -35,14 +33,15 @@ import lombok.Getter;
import lombok.Setter;
import org.apache.bookkeeper.client.api.DigestType;
import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
-import org.apache.pulsar.common.nar.NarClassLoader;
-import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
-import org.apache.pulsar.common.policies.data.TopicType;
-import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.configuration.Category;
import org.apache.pulsar.common.configuration.FieldContext;
import org.apache.pulsar.common.configuration.PulsarConfiguration;
+import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.policies.data.BacklogQuota;
+import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
+import org.apache.pulsar.common.policies.data.OffloadPolicies;
+import org.apache.pulsar.common.policies.data.TopicType;
+import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.sasl.SaslConstants;
/**
@@ -1459,17 +1458,22 @@ public class ServiceConfiguration implements
PulsarConfiguration {
+ "Of course, this may degrade consumption throughput.
Default is 10ms.")
private int managedLedgerNewEntriesCheckDelayInMillis = 10;
+ @FieldContext(category = CATEGORY_STORAGE_ML,
+ doc = "Read priority when ledgers exists in both bookkeeper and
the second layer storage.")
+ private String managedLedgerDataReadPriority =
OffloadPolicies.OffloadedReadPriority.TIERED_STORAGE_FIRST
+ .getValue();
+
/*** --- Load balancer --- ****/
@FieldContext(
- category = CATEGORY_LOAD_BALANCER,
- doc = "Enable load balancer"
+ category = CATEGORY_LOAD_BALANCER,
+ doc = "Enable load balancer"
)
private boolean loadBalancerEnabled = true;
@Deprecated
@FieldContext(
- category = CATEGORY_LOAD_BALANCER,
- deprecated = true,
- doc = "load placement
strategy[weightedRandomSelection/leastLoadedServer] (only used by
SimpleLoadManagerImpl)"
+ category = CATEGORY_LOAD_BALANCER,
+ deprecated = true,
+ doc = "load placement
strategy[weightedRandomSelection/leastLoadedServer] (only used by
SimpleLoadManagerImpl)"
)
private String loadBalancerPlacementStrategy = "leastLoadedServer"; //
weighted random selection
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
index 7aa0279..6644d8c 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
@@ -27,14 +27,11 @@ import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
-
import com.google.common.collect.Sets;
-
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
@@ -165,10 +162,11 @@ public class AdminApiOffloadTest extends
MockedPulsarServiceBaseTest {
String endpoint = "test-endpoint";
long offloadThresholdInBytes = 0;
long offloadDeletionLagInMillis = 100L;
+ OffloadPolicies.OffloadedReadPriority priority =
OffloadPolicies.OffloadedReadPriority.TIERED_STORAGE_FIRST;
OffloadPolicies offload1 = OffloadPolicies.create(
driver, region, bucket, endpoint, null, null,
- 100, 100, offloadThresholdInBytes, offloadDeletionLagInMillis);
+ 100, 100, offloadThresholdInBytes, offloadDeletionLagInMillis,
priority);
admin.namespaces().setOffloadPolicies(namespaceName, offload1);
OffloadPolicies offload2 =
admin.namespaces().getOffloadPolicies(namespaceName);
assertEquals(offload1, offload2);
@@ -214,7 +212,7 @@ public class AdminApiOffloadTest extends
MockedPulsarServiceBaseTest {
Thread.sleep(2000);
testOffload(true);
}
-
+
@Test
public void testTopicLevelOffloadNonPartitioned() throws Exception {
//wait for cache init
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index d6f9a5c..041539e 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -35,7 +35,6 @@ import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
-
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.lang.reflect.Field;
@@ -1274,7 +1273,8 @@ public class NamespacesTest extends
MockedPulsarServiceBaseTest {
OffloadPolicies.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES,
OffloadPolicies.DEFAULT_READ_BUFFER_SIZE_IN_BYTES,
admin.namespaces().getOffloadThreshold(namespace),
-
pulsar.getConfiguration().getManagedLedgerOffloadDeletionLagMs()));
+
pulsar.getConfiguration().getManagedLedgerOffloadDeletionLagMs(),
+ OffloadPolicies.DEFAULT_OFFLOADED_READ_PRIORITY));
ledgerConf.setLedgerOffloader(offloader);
assertEquals(ledgerConf.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes(),
new Long(-1));
@@ -1289,7 +1289,8 @@ public class NamespacesTest extends
MockedPulsarServiceBaseTest {
OffloadPolicies.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES,
OffloadPolicies.DEFAULT_READ_BUFFER_SIZE_IN_BYTES,
admin.namespaces().getOffloadThreshold(namespace),
-
pulsar.getConfiguration().getManagedLedgerOffloadDeletionLagMs()));
+
pulsar.getConfiguration().getManagedLedgerOffloadDeletionLagMs(),
+ OffloadPolicies.DEFAULT_OFFLOADED_READ_PRIORITY));
ledgerConf.setLedgerOffloader(offloader);
assertEquals(ledgerConf.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes(),
new Long(100));
@@ -1303,7 +1304,8 @@ public class NamespacesTest extends
MockedPulsarServiceBaseTest {
OffloadPolicies.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES,
OffloadPolicies.DEFAULT_READ_BUFFER_SIZE_IN_BYTES,
admin.namespaces().getOffloadThreshold(namespace),
-
pulsar.getConfiguration().getManagedLedgerOffloadDeletionLagMs()));
+
pulsar.getConfiguration().getManagedLedgerOffloadDeletionLagMs(),
+ OffloadPolicies.DEFAULT_OFFLOADED_READ_PRIORITY));
ledgerConf.setLedgerOffloader(offloader);
assertEquals(ledgerConf.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes(),
new Long(-2));
@@ -1317,7 +1319,8 @@ public class NamespacesTest extends
MockedPulsarServiceBaseTest {
OffloadPolicies.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES,
OffloadPolicies.DEFAULT_READ_BUFFER_SIZE_IN_BYTES,
admin.namespaces().getOffloadThreshold(namespace),
-
pulsar.getConfiguration().getManagedLedgerOffloadDeletionLagMs()));
+
pulsar.getConfiguration().getManagedLedgerOffloadDeletionLagMs(),
+ OffloadPolicies.DEFAULT_OFFLOADED_READ_PRIORITY));
ledgerConf.setLedgerOffloader(offloader);
assertEquals(ledgerConf.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes(),
new Long(-1));
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
index 258c1234..bc9e4bd 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
@@ -22,7 +22,6 @@ import static
com.google.common.base.Preconditions.checkNotNull;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
-
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
@@ -31,7 +30,6 @@ import java.io.StringWriter;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import java.util.Properties;
-
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
@@ -64,6 +62,7 @@ public class ServiceConfigurationTest {
assertEquals(config.getDefaultNamespaceBundleSplitAlgorithm(),
"topic_count_equally_divide");
assertEquals(config.getSupportedNamespaceBundleSplitAlgorithms().size(), 1);
assertEquals(config.getMaxMessagePublishBufferSizeInMB(), -1);
+ assertEquals(config.getManagedLedgerDataReadPriority(),
"bookkeeper-first");
}
@Test
diff --git
a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf
b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf
index 08f7a9c..a21d92d 100644
--- a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf
+++ b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf
@@ -74,8 +74,9 @@ managedLedgerMaxEntriesPerLedger=50000
managedLedgerMinLedgerRolloverTimeMinutes=10
managedLedgerMaxLedgerRolloverTimeMinutes=240
managedLedgerCursorMaxEntriesPerLedger=50000
-managedLedgerCursorRolloverTimeInSeconds=14400
-loadBalancerEnabled=false
+managedLedgerCursorRolloverTimeInSeconds = 14400
+managedLedgerDataReadPriority = bookkeeper-first
+loadBalancerEnabled = false
loadBalancerReportUpdateThresholdPercentage=10
loadBalancerReportUpdateMaxIntervalMinutes=15
loadBalancerHostUsageCheckIntervalMinutes=1
diff --git a/pulsar-broker/src/test/resources/logback.xml
b/pulsar-broker/src/test/resources/logback.xml
index bf3daa8..f5735b0 100644
--- a/pulsar-broker/src/test/resources/logback.xml
+++ b/pulsar-broker/src/test/resources/logback.xml
@@ -19,7 +19,7 @@
-->
<configuration scan="true">
-<!--
+<!--
<logger name="org.apache" level="IN" />
<logger name="org.apache.bookkeeper.mledger" level="ERROR" />
-->
diff --git
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 68793ba..d74184f 100644
---
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -544,11 +544,12 @@ public class PulsarAdminToolTest {
namespaces.run(split("clear-offload-deletion-lag myprop/clust/ns1"));
verify(mockNamespaces).clearOffloadDeleteLag("myprop/clust/ns1");
- namespaces.run(split("set-offload-policies myprop/clust/ns1 -r
test-region -d aws-s3 -b test-bucket -e http://test.endpoint -mbs 32M -rbs 5M
-oat 10M -oae 10s"));
+ namespaces.run(split(
+ "set-offload-policies myprop/clust/ns1 -r test-region -d
aws-s3 -b test-bucket -e http://test.endpoint -mbs 32M -rbs 5M -oat 10M -oae
10s -orp tiered-storage-first"));
verify(mockNamespaces).setOffloadPolicies("myprop/clust/ns1",
OffloadPolicies.create("aws-s3", "test-region", "test-bucket",
"http://test.endpoint", null, null, 32 * 1024 * 1024,
5 * 1024 * 1024,
- 10 * 1024 * 1024L, 10000L));
+ 10 * 1024 * 1024L, 10000L,
OffloadPolicies.OffloadedReadPriority.TIERED_STORAGE_FIRST));
namespaces.run(split("remove-offload-policies myprop/clust/ns1"));
verify(mockNamespaces).removeOffloadPolicies("myprop/clust/ns1");
@@ -763,9 +764,10 @@ public class PulsarAdminToolTest {
cmdTopics.run(split("remove-offload-policies
persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).removeOffloadPolicies("persistent://myprop/clust/ns1/ds1");
- cmdTopics.run(split("set-offload-policies
persistent://myprop/clust/ns1/ds1 -d s3 -r region -b bucket -e endpoint -m 8
-rb 9 -t 10"));
+ cmdTopics.run(split(
+ "set-offload-policies persistent://myprop/clust/ns1/ds1 -d s3
-r region -b bucket -e endpoint -m 8 -rb 9 -t 10 -orp tiered-storage-first"));
OffloadPolicies offloadPolicies = OffloadPolicies.create("s3",
"region", "bucket"
- , "endpoint", null, null, 8, 9, 10L, null);
+ , "endpoint", null, null, 8, 9, 10L, null,
OffloadPolicies.OffloadedReadPriority.TIERED_STORAGE_FIRST);
verify(mockTopics).setOffloadPolicies("persistent://myprop/clust/ns1/ds1",
offloadPolicies);
cmdTopics.run(split("get-max-unacked-messages-on-consumer
persistent://myprop/clust/ns1/ds1"));
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index d755571..2065cd5 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -23,15 +23,15 @@ import com.beust.jcommander.ParameterException;
import com.beust.jcommander.Parameters;
import com.beust.jcommander.converters.CommaParameterSplitter;
import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
-
import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.TimeUnit;
-
+import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.admin.cli.utils.IOUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -42,11 +42,12 @@ import
org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BookieAffinityGroupData;
import org.apache.pulsar.common.policies.data.BundlesData;
-import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
+import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
+import
org.apache.pulsar.common.policies.data.OffloadPolicies.OffloadedReadPriority;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PublishRate;
@@ -1650,7 +1651,7 @@ public class CmdNamespaces extends CmdBase {
@Parameter(
names = {"--driver", "-d"},
description = "Driver to use to offload old data to long term
storage, " +
- "(Possible values: S3, aws-s3, google-cloud-storage)",
+ "(Possible values: S3, aws-s3, google-cloud-storage,
filesystem, azureblob)",
required = true)
private String driver;
@@ -1710,17 +1711,24 @@ public class CmdNamespaces extends CmdBase {
required = false)
private String offloadAfterThresholdStr;
- private final String[] DRIVER_NAMES = {"S3", "aws-s3",
"google-cloud-storage"};
+ @Parameter(
+ names = {"--offloadedReadPriority", "-orp"},
+ description = "read priority for offloaded messages",
+ required = false
+ )
+ private String offloadReadPriorityStr;
+
+ public final ImmutableList<String> DRIVER_NAMES =
OffloadPolicies.DRIVER_NAMES;
public boolean driverSupported(String driver) {
- return Arrays.stream(DRIVER_NAMES).anyMatch(d ->
d.equalsIgnoreCase(driver));
+ return DRIVER_NAMES.stream().anyMatch(d ->
d.equalsIgnoreCase(driver));
}
public boolean isS3Driver(String driver) {
if (StringUtils.isEmpty(driver)) {
return false;
}
- return driver.equalsIgnoreCase(DRIVER_NAMES[0]) ||
driver.equalsIgnoreCase(DRIVER_NAMES[1]);
+ return driver.equalsIgnoreCase(DRIVER_NAMES.get(0)) ||
driver.equalsIgnoreCase(DRIVER_NAMES.get(1));
}
public boolean positiveCheck(String paramName, long value) {
@@ -1744,7 +1752,7 @@ public class CmdNamespaces extends CmdBase {
if (!driverSupported(driver)) {
throw new ParameterException(
"The driver " + driver + " is not supported, " +
- "(Possible values: S3, aws-s3,
google-cloud-storage).");
+ "(Possible values: " + String.join(",",
DRIVER_NAMES) + ").");
}
if (isS3Driver(driver) && Strings.isNullOrEmpty(region) &&
Strings.isNullOrEmpty(endpoint)) {
@@ -1788,10 +1796,24 @@ public class CmdNamespaces extends CmdBase {
offloadAfterThresholdInBytes = offloadAfterThreshold;
}
}
+ OffloadedReadPriority offloadedReadPriority =
OffloadPolicies.DEFAULT_OFFLOADED_READ_PRIORITY;
+
+ if (this.offloadReadPriorityStr != null) {
+ try {
+ offloadedReadPriority =
OffloadedReadPriority.fromString(this.offloadReadPriorityStr);
+ } catch (Exception e) {
+ throw new ParameterException("--offloadedReadPriority
parameter must be one of " +
+ Arrays.stream(OffloadedReadPriority.values())
+ .map(OffloadedReadPriority::toString)
+ .collect(Collectors.joining(","))
+ + " but got: " + this.offloadReadPriorityStr, e);
+ }
+ }
OffloadPolicies offloadPolicies = OffloadPolicies.create(driver,
region, bucket, endpoint, awsId, awsSecret,
maxBlockSizeInBytes, readBufferSizeInBytes,
offloadAfterThresholdInBytes,
- offloadAfterElapsedInMillis);
+ offloadAfterElapsedInMillis, offloadedReadPriority);
+
admin.namespaces().setOffloadPolicies(namespace, offloadPolicies);
}
}
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 8f55e3d..8488e17 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -35,6 +35,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -49,6 +50,7 @@ import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
+import
org.apache.pulsar.common.policies.data.OffloadPolicies.OffloadedReadPriority;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.PublishRate;
@@ -1336,11 +1338,34 @@ public class CmdTopics extends CmdBase {
, description = "ManagedLedger offload deletion lag in bytes")
private Long offloadDeletionLagInMillis;
+ @Parameter(
+ names = {"--offloadedReadPriority", "-orp"},
+ description = "read priority for offloaded messages",
+ required = false
+ )
+ private String offloadReadPriorityStr;
+
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
+
+ OffloadedReadPriority offloadedReadPriority =
OffloadPolicies.DEFAULT_OFFLOADED_READ_PRIORITY;
+
+ if (this.offloadReadPriorityStr != null) {
+ try {
+ offloadedReadPriority =
OffloadedReadPriority.fromString(this.offloadReadPriorityStr);
+ } catch (Exception e) {
+ throw new ParameterException("--offloadedReadPriority
parameter must be one of " +
+ Arrays.stream(OffloadedReadPriority.values())
+ .map(OffloadedReadPriority::toString)
+ .collect(Collectors.joining(","))
+ + " but got: " + this.offloadReadPriorityStr, e);
+ }
+ }
+
OffloadPolicies offloadPolicies = OffloadPolicies.create(driver,
region, bucket, endpoint, awsId, awsSecret, maxBlockSizeInBytes
- , readBufferSizeInBytes, offloadThresholdInBytes,
offloadDeletionLagInMillis);
+ , readBufferSizeInBytes, offloadThresholdInBytes,
offloadDeletionLagInMillis, offloadedReadPriority);
+
admin.topics().setOffloadPolicies(persistentTopic,
offloadPolicies);
}
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java
index da9db18..6620081 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.common.policies.data;
import static org.apache.pulsar.common.util.FieldParser.value;
import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableList;
import java.io.Serializable;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
@@ -31,9 +32,12 @@ import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
+import java.util.stream.Collectors;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.common.classification.InterfaceAudience;
+import org.apache.pulsar.common.classification.InterfaceStability;
/**
* Definition of the offload policies.
@@ -42,9 +46,58 @@ import org.apache.commons.lang3.StringUtils;
@Data
public class OffloadPolicies implements Serializable {
+ @InterfaceAudience.Public
+ @InterfaceStability.Stable
+ public enum OffloadedReadPriority {
+ /**
+ * For offloaded messages, readers will try to read from bookkeeper at
first,
+ * if messages not exist at bookkeeper then read from offloaded
storage.
+ */
+ BOOKKEEPER_FIRST("bookkeeper-first"),
+ /**
+ * For offloaded messages, readers will try to read from offloaded
storage first,
+ * even they are still exist in bookkeeper.
+ */
+ TIERED_STORAGE_FIRST("tiered-storage-first");
+
+ private final String value;
+
+ OffloadedReadPriority(String value) {
+ this.value = value;
+ }
+
+ public boolean equalsName(String otherName) {
+ return value.equals(otherName);
+ }
+
+ @Override
+ public String toString() {
+ return value;
+ }
+
+ public static OffloadedReadPriority fromString(String str) {
+ for (OffloadedReadPriority value : OffloadedReadPriority.values())
{
+ if (value.value.equals(str)) {
+ return value;
+ }
+ }
+
+ throw new IllegalArgumentException("--offloadedReadPriority
parameter must be one of "
+ + Arrays.stream(OffloadedReadPriority.values())
+ .map(OffloadedReadPriority::toString)
+ .collect(Collectors.joining(","))
+ + " but got: " + str);
+ }
+
+ public String getValue() {
+ return value;
+ }
+ }
+
private final static long serialVersionUID = 0L;
private final static List<Field> CONFIGURATION_FIELDS;
+
static {
CONFIGURATION_FIELDS = new ArrayList<>();
Class<OffloadPolicies> clazz = OffloadPolicies.class;
@@ -60,9 +113,8 @@ public class OffloadPolicies implements Serializable {
public final static int DEFAULT_READ_BUFFER_SIZE_IN_BYTES = 1024 * 1024;
// 1MB
public final static int DEFAULT_OFFLOAD_MAX_THREADS = 2;
public final static int DEFAULT_OFFLOAD_MAX_PREFETCH_ROUNDS = 1;
- final static String[] DRIVER_NAMES = {
- "S3", "aws-s3", "google-cloud-storage", "filesystem", "azureblob",
"aliyun-oss"
- };
+ public final static ImmutableList<String> DRIVER_NAMES = ImmutableList
+ .of("S3", "aws-s3", "google-cloud-storage", "filesystem",
"azureblob", "aliyun-oss");
public final static String DEFAULT_OFFLOADER_DIRECTORY = "./offloaders";
public final static Long DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES = null;
public final static Long DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS = null;
@@ -70,6 +122,7 @@ public class OffloadPolicies implements Serializable {
public final static String OFFLOAD_THRESHOLD_NAME_IN_CONF_FILE =
"managedLedgerOffloadAutoTriggerSizeThresholdBytes";
public final static String DELETION_LAG_NAME_IN_CONF_FILE =
"managedLedgerOffloadDeletionLagMs";
+ public final static OffloadedReadPriority DEFAULT_OFFLOADED_READ_PRIORITY
= OffloadedReadPriority.TIERED_STORAGE_FIRST;
// common config
@Configuration
@@ -84,6 +137,8 @@ public class OffloadPolicies implements Serializable {
private Long managedLedgerOffloadThresholdInBytes =
DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES;
@Configuration
private Long managedLedgerOffloadDeletionLagInMillis =
DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS;
+ @Configuration
+ private OffloadedReadPriority managedLedgerOffloadedReadPriority =
DEFAULT_OFFLOADED_READ_PRIORITY;
// s3 config, set by service configuration or cli
@Configuration
@@ -137,7 +192,8 @@ public class OffloadPolicies implements Serializable {
public static OffloadPolicies create(String driver, String region, String
bucket, String endpoint,
String credentialId, String
credentialSecret,
Integer maxBlockSizeInBytes, Integer
readBufferSizeInBytes,
- Long offloadThresholdInBytes, Long
offloadDeletionLagInMillis) {
+ Long offloadThresholdInBytes, Long
offloadDeletionLagInMillis,
+ OffloadedReadPriority readPriority) {
OffloadPolicies offloadPolicies = new OffloadPolicies();
offloadPolicies.setManagedLedgerOffloadDriver(driver);
offloadPolicies.setManagedLedgerOffloadThresholdInBytes(offloadThresholdInBytes);
@@ -148,8 +204,9 @@ public class OffloadPolicies implements Serializable {
offloadPolicies.setManagedLedgerOffloadServiceEndpoint(endpoint);
offloadPolicies.setManagedLedgerOffloadMaxBlockSizeInBytes(maxBlockSizeInBytes);
offloadPolicies.setManagedLedgerOffloadReadBufferSizeInBytes(readBufferSizeInBytes);
+ offloadPolicies.setManagedLedgerOffloadedReadPriority(readPriority);
- if (driver.equalsIgnoreCase(DRIVER_NAMES[0]) ||
driver.equalsIgnoreCase(DRIVER_NAMES[1])) {
+ if (driver.equalsIgnoreCase(DRIVER_NAMES.get(0)) ||
driver.equalsIgnoreCase(DRIVER_NAMES.get(1))) {
if (credentialId != null) {
offloadPolicies.setS3ManagedLedgerOffloadRole(credentialId);
}
@@ -161,7 +218,7 @@ public class OffloadPolicies implements Serializable {
offloadPolicies.setS3ManagedLedgerOffloadServiceEndpoint(endpoint);
offloadPolicies.setS3ManagedLedgerOffloadMaxBlockSizeInBytes(maxBlockSizeInBytes);
offloadPolicies.setS3ManagedLedgerOffloadReadBufferSizeInBytes(readBufferSizeInBytes);
- } else if (driver.equalsIgnoreCase(DRIVER_NAMES[2])) {
+ } else if (driver.equalsIgnoreCase(DRIVER_NAMES.get(2))) {
offloadPolicies.setGcsManagedLedgerOffloadRegion(region);
offloadPolicies.setGcsManagedLedgerOffloadBucket(bucket);
offloadPolicies.setGcsManagedLedgerOffloadMaxBlockSizeInBytes(maxBlockSizeInBytes);
@@ -204,7 +261,7 @@ public class OffloadPolicies implements Serializable {
}
public boolean driverSupported() {
- return Arrays.stream(DRIVER_NAMES).anyMatch(d ->
d.equalsIgnoreCase(this.managedLedgerOffloadDriver));
+ return DRIVER_NAMES.stream().anyMatch(d ->
d.equalsIgnoreCase(this.managedLedgerOffloadDriver));
}
public static String getSupportedDriverNames() {
@@ -215,22 +272,22 @@ public class OffloadPolicies implements Serializable {
if (managedLedgerOffloadDriver == null) {
return false;
}
- return managedLedgerOffloadDriver.equalsIgnoreCase(DRIVER_NAMES[0])
- ||
managedLedgerOffloadDriver.equalsIgnoreCase(DRIVER_NAMES[1]);
+ return managedLedgerOffloadDriver.equalsIgnoreCase(DRIVER_NAMES.get(0))
+ ||
managedLedgerOffloadDriver.equalsIgnoreCase(DRIVER_NAMES.get(1));
}
public boolean isGcsDriver() {
if (managedLedgerOffloadDriver == null) {
return false;
}
- return managedLedgerOffloadDriver.equalsIgnoreCase(DRIVER_NAMES[2]);
+ return
managedLedgerOffloadDriver.equalsIgnoreCase(DRIVER_NAMES.get(2));
}
public boolean isFileSystemDriver() {
if (managedLedgerOffloadDriver == null) {
return false;
}
- return managedLedgerOffloadDriver.equalsIgnoreCase(DRIVER_NAMES[3]);
+ return
managedLedgerOffloadDriver.equalsIgnoreCase(DRIVER_NAMES.get(3));
}
public boolean bucketValid() {
@@ -253,6 +310,7 @@ public class OffloadPolicies implements Serializable {
@Override
public int hashCode() {
return Objects.hash(
+ managedLedgerOffloadedReadPriority,
managedLedgerOffloadDriver,
managedLedgerOffloadMaxThreads,
managedLedgerOffloadPrefetchRounds,
@@ -288,17 +346,18 @@ public class OffloadPolicies implements Serializable {
return false;
}
OffloadPolicies other = (OffloadPolicies) obj;
- return Objects.equals(managedLedgerOffloadDriver,
other.getManagedLedgerOffloadDriver())
+ return Objects.equals(managedLedgerOffloadedReadPriority,
other.getManagedLedgerOffloadedReadPriority())
+ && Objects.equals(managedLedgerOffloadDriver,
other.getManagedLedgerOffloadDriver())
&& Objects.equals(managedLedgerOffloadMaxThreads,
other.getManagedLedgerOffloadMaxThreads())
&& Objects.equals(managedLedgerOffloadPrefetchRounds,
other.getManagedLedgerOffloadPrefetchRounds())
&& Objects.equals(managedLedgerOffloadThresholdInBytes,
- other.getManagedLedgerOffloadThresholdInBytes())
+ other.getManagedLedgerOffloadThresholdInBytes())
&& Objects.equals(managedLedgerOffloadDeletionLagInMillis,
- other.getManagedLedgerOffloadDeletionLagInMillis())
+ other.getManagedLedgerOffloadDeletionLagInMillis())
&& Objects.equals(s3ManagedLedgerOffloadRegion,
other.getS3ManagedLedgerOffloadRegion())
&& Objects.equals(s3ManagedLedgerOffloadBucket,
other.getS3ManagedLedgerOffloadBucket())
&& Objects.equals(s3ManagedLedgerOffloadServiceEndpoint,
- other.getS3ManagedLedgerOffloadServiceEndpoint())
+ other.getS3ManagedLedgerOffloadServiceEndpoint())
&& Objects.equals(s3ManagedLedgerOffloadMaxBlockSizeInBytes,
other.getS3ManagedLedgerOffloadMaxBlockSizeInBytes())
&& Objects.equals(s3ManagedLedgerOffloadReadBufferSizeInBytes,
@@ -328,6 +387,7 @@ public class OffloadPolicies implements Serializable {
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
+ .add("managedLedgerOffloadedReadPriority",
managedLedgerOffloadedReadPriority)
.add("managedLedgerOffloadDriver", managedLedgerOffloadDriver)
.add("managedLedgerOffloadMaxThreads",
managedLedgerOffloadMaxThreads)
.add("managedLedgerOffloadPrefetchRounds",
managedLedgerOffloadPrefetchRounds)
@@ -358,7 +418,7 @@ public class OffloadPolicies implements Serializable {
public Properties toProperties() {
Properties properties = new Properties();
-
+ setProperty(properties, "managedLedgerOffloadedReadPriority",
this.getManagedLedgerOffloadedReadPriority());
setProperty(properties, "offloadersDirectory",
this.getOffloadersDirectory());
setProperty(properties, "managedLedgerOffloadDriver",
this.getManagedLedgerOffloadDriver());
setProperty(properties, "managedLedgerOffloadMaxThreads",
diff --git
a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java
b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java
index d87887d..a89409c 100644
---
a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java
+++
b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.common.policies.data;
import java.util.Properties;
+import
org.apache.pulsar.common.policies.data.OffloadPolicies.OffloadedReadPriority;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -59,7 +60,8 @@ public class OffloadPoliciesTest {
maxBlockSizeInBytes,
readBufferSizeInBytes,
offloadThresholdInBytes,
- offloadDeletionLagInMillis
+ offloadDeletionLagInMillis,
+ OffloadedReadPriority.TIERED_STORAGE_FIRST
);
Assert.assertEquals(offloadPolicies.getManagedLedgerOffloadDriver(),
driver);
@@ -86,6 +88,7 @@ public class OffloadPoliciesTest {
final Integer readBufferSizeInBytes = 2 * M;
final Long offloadThresholdInBytes = 0L;
final Long offloadDeletionLagInMillis = 5 * MIN;
+ final OffloadedReadPriority readPriority =
OffloadedReadPriority.TIERED_STORAGE_FIRST;
OffloadPolicies offloadPolicies = OffloadPolicies.create(
driver,
@@ -97,7 +100,8 @@ public class OffloadPoliciesTest {
maxBlockSizeInBytes,
readBufferSizeInBytes,
offloadThresholdInBytes,
- offloadDeletionLagInMillis
+ offloadDeletionLagInMillis,
+ readPriority
);
Assert.assertEquals(offloadPolicies.getManagedLedgerOffloadDriver(),
driver);
@@ -107,6 +111,7 @@ public class OffloadPoliciesTest {
Assert.assertEquals(offloadPolicies.getGcsManagedLedgerOffloadReadBufferSizeInBytes(),
readBufferSizeInBytes);
Assert.assertEquals(offloadPolicies.getManagedLedgerOffloadThresholdInBytes(),
offloadThresholdInBytes);
Assert.assertEquals(offloadPolicies.getManagedLedgerOffloadDeletionLagInMillis(),
offloadDeletionLagInMillis);
+
Assert.assertEquals(offloadPolicies.getManagedLedgerOffloadedReadPriority(),
readPriority);
}
@Test
diff --git a/site2/docs/cookbooks-tiered-storage.md
b/site2/docs/cookbooks-tiered-storage.md
index 6dd2803..0263ec2 100644
--- a/site2/docs/cookbooks-tiered-storage.md
+++ b/site2/docs/cookbooks-tiered-storage.md
@@ -262,6 +262,16 @@ $ bin/pulsar-admin namespaces set-offload-threshold --size
10M my-tenant/my-name
> Automatic offload runs when a new segment is added to a topic log. If you
> set the threshold on a namespace, but few messages are being produced to the
> topic, offload will not until the current segment is full.
+## Configuring read priority for offloaded messages
+
+By default, once messages were offloaded to long term storage, brokers will
read them from long term storage, but messages still exists in bookkeeper for a
period depends on the administrator's configuration. For
+messages exists in both bookkeeper and long term storage, if they are
preferred to read from bookkeeper, you can use command to change this
configuration.
+
+```bash
+# default value for -orp is tiered-storage-first
+$ bin/pulsar-admin namespaces set-offload-policies my-tenant/my-namespace -orp
bookkeeper-first
+$ bin/pulsar-admin topics set-offload-policies my-tenant/my-namespace/topic1
-orp bookkeeper-first
+```
## Triggering offload manually