This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new 36d85ea9564 [fix] [broker] Fix system topic can not be loaded up if it
contains data offloaded (#23279)
36d85ea9564 is described below
commit 36d85ea9564a49988012243b5458b75735dce7bc
Author: fengyubiao <[email protected]>
AuthorDate: Wed Sep 11 19:33:47 2024 +0800
[fix] [broker] Fix system topic can not be loaded up if it contains data
offloaded (#23279)
(cherry picked from commit fc0e4e3fe0fa14d9ac1361871edc53957625fe29)
---
.../apache/bookkeeper/mledger/LedgerOffloader.java | 4 +
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 28 +++--
...ader.java => NonAppendableLedgerOffloader.java} | 33 +++---
.../mledger/impl/NullLedgerOffloader.java | 5 +
.../bookkeeper/mledger/impl/ManagedLedgerTest.java | 2 +-
.../mledger/impl/OffloadPrefixReadTest.java | 114 +++++++++++++++++++--
.../bookkeeper/mledger/impl/OffloadPrefixTest.java | 2 +-
.../bookkeeper/test/MockedBookKeeperTestCase.java | 8 +-
.../pulsar/broker/service/BrokerService.java | 41 ++++----
.../pulsar/broker/admin/AdminApiOffloadTest.java | 1 +
.../pulsar/broker/service/BrokerServiceTest.java | 21 +++-
11 files changed, 198 insertions(+), 61 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
index 868a8e42653..11148ef1a59 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
@@ -230,5 +230,9 @@ public interface LedgerOffloader {
Map<String, String> offloadDriverMetadata) throws
ManagedLedgerException {
throw ManagedLedgerException.getManagedLedgerException(new
UnsupportedOperationException());
}
+
+ default boolean isAppendable() {
+ return true;
+ }
}
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 3f327152586..7dfe8d4fb03 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -94,6 +94,7 @@ import
org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.TerminateCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.UpdatePropertiesCallback;
import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
@@ -2484,8 +2485,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
}
public void maybeOffloadInBackground(CompletableFuture<PositionImpl>
promise) {
- if (config.getLedgerOffloader() == null || config.getLedgerOffloader()
== NullLedgerOffloader.INSTANCE
- || config.getLedgerOffloader().getOffloadPolicies() == null) {
+ if (getOffloadPoliciesIfAppendable().isEmpty()) {
return;
}
@@ -2501,8 +2501,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
private void maybeOffload(long offloadThresholdInBytes, long
offloadThresholdInSeconds,
CompletableFuture<PositionImpl> finalPromise) {
- if (config.getLedgerOffloader() == null || config.getLedgerOffloader()
== NullLedgerOffloader.INSTANCE
- || config.getLedgerOffloader().getOffloadPolicies() == null) {
+ if (getOffloadPoliciesIfAppendable().isEmpty()) {
String msg = String.format("[%s] Nothing to offload due to
offloader or offloadPolicies is NULL", name);
finalPromise.completeExceptionally(new
IllegalArgumentException(msg));
return;
@@ -2604,6 +2603,16 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
internalTrimLedgers(false, promise);
}
+ private Optional<OffloadPolicies> getOffloadPoliciesIfAppendable() {
+ LedgerOffloader ledgerOffloader = config.getLedgerOffloader();
+ if (ledgerOffloader == null
+ || !ledgerOffloader.isAppendable()
+ || ledgerOffloader.getOffloadPolicies() == null) {
+ return Optional.empty();
+ }
+ return Optional.ofNullable(ledgerOffloader.getOffloadPolicies());
+ }
+
void internalTrimLedgers(boolean isTruncate, CompletableFuture<?> promise)
{
if (!factory.isMetadataServiceAvailable()) {
// Defer trimming of ledger if we cannot connect to metadata
service
@@ -2619,10 +2628,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
List<LedgerInfo> ledgersToDelete = new ArrayList<>();
List<LedgerInfo> offloadedLedgersToDelete = new ArrayList<>();
- Optional<OffloadPolicies> optionalOffloadPolicies =
Optional.ofNullable(config.getLedgerOffloader() != null
- && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
- ? config.getLedgerOffloader().getOffloadPolicies()
- : null);
+ Optional<OffloadPolicies> optionalOffloadPolicies =
getOffloadPoliciesIfAppendable();
synchronized (this) {
if (log.isDebugEnabled()) {
log.debug("[{}] Start TrimConsumedLedgers. ledgers={}
totalSize={}", name, ledgers.keySet(),
@@ -3141,8 +3147,10 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
@Override
public void asyncOffloadPrefix(Position pos, OffloadCallback callback,
Object ctx) {
- if (config.getLedgerOffloader() != null && config.getLedgerOffloader()
== NullLedgerOffloader.INSTANCE) {
- callback.offloadFailed(new
ManagedLedgerException("NullLedgerOffloader"), ctx);
+ LedgerOffloader ledgerOffloader = config.getLedgerOffloader();
+ if (ledgerOffloader != null && !ledgerOffloader.isAppendable()) {
+ String msg = String.format("[%s] does not support offload",
ledgerOffloader.getClass().getSimpleName());
+ callback.offloadFailed(new ManagedLedgerException(msg), ctx);
return;
}
PositionImpl requestOffloadTo = (PositionImpl) pos;
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NullLedgerOffloader.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonAppendableLedgerOffloader.java
similarity index 70%
copy from
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NullLedgerOffloader.java
copy to
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonAppendableLedgerOffloader.java
index 938ceb0c7df..f3001ec8050 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NullLedgerOffloader.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonAppendableLedgerOffloader.java
@@ -24,50 +24,51 @@ import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
+import org.apache.pulsar.common.util.FutureUtil;
-/**
- * Null implementation that throws an error on any invokation.
- */
-public class NullLedgerOffloader implements LedgerOffloader {
- public static final NullLedgerOffloader INSTANCE = new
NullLedgerOffloader();
+public class NonAppendableLedgerOffloader implements LedgerOffloader {
+ private LedgerOffloader delegate;
+
+ public NonAppendableLedgerOffloader(LedgerOffloader delegate) {
+ this.delegate = delegate;
+ }
@Override
public String getOffloadDriverName() {
- return "NullLedgerOffloader";
+ return delegate.getOffloadDriverName();
}
@Override
public CompletableFuture<Void> offload(ReadHandle ledger,
UUID uid,
Map<String, String> extraMetadata) {
- CompletableFuture<Void> promise = new CompletableFuture<>();
- promise.completeExceptionally(new UnsupportedOperationException());
- return promise;
+ return FutureUtil.failedFuture(new UnsupportedOperationException());
}
@Override
public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID uid,
Map<String, String>
offloadDriverMetadata) {
- CompletableFuture<ReadHandle> promise = new CompletableFuture<>();
- promise.completeExceptionally(new UnsupportedOperationException());
- return promise;
+ return delegate.readOffloaded(ledgerId, uid, offloadDriverMetadata);
}
@Override
public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid,
Map<String, String>
offloadDriverMetadata) {
- CompletableFuture<Void> promise = new CompletableFuture<>();
- promise.completeExceptionally(new UnsupportedOperationException());
- return promise;
+ return delegate.deleteOffloaded(ledgerId, uid, offloadDriverMetadata);
}
@Override
public OffloadPolicies getOffloadPolicies() {
- return null;
+ return delegate.getOffloadPolicies();
}
@Override
public void close() {
+ delegate.close();
+ }
+ @Override
+ public boolean isAppendable() {
+ return false;
}
}
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NullLedgerOffloader.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NullLedgerOffloader.java
index 938ceb0c7df..fe646bc82e5 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NullLedgerOffloader.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NullLedgerOffloader.java
@@ -70,4 +70,9 @@ public class NullLedgerOffloader implements LedgerOffloader {
public void close() {
}
+
+ @Override
+ public boolean isAppendable() {
+ return false;
+ }
}
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 4f521f1e99e..166cb468d0f 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -3848,7 +3848,7 @@ public class ManagedLedgerTest extends
MockedBookKeeperTestCase {
config.setLedgerOffloader(ledgerOffloader);
ledger.internalTrimConsumedLedgers(Futures.NULL_PROMISE);
- verify(ledgerOffloader, times(1)).getOffloadPolicies();
+ verify(ledgerOffloader, times(1)).isAppendable();
}
@Test(timeOut = 30000)
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
index cd224e33e27..8ab844dcfae 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
@@ -29,6 +29,7 @@ import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
import io.netty.buffer.ByteBuf;
import java.util.ArrayList;
@@ -54,18 +55,42 @@ import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
import
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
import org.apache.bookkeeper.mledger.util.MockClock;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
import org.apache.pulsar.common.policies.data.OffloadedReadPriority;
+import org.awaitility.Awaitility;
import org.testng.Assert;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
public class OffloadPrefixReadTest extends MockedBookKeeperTestCase {
- @Test
- public void testOffloadRead() throws Exception {
+
+ private final String offloadTypeAppendable = "NonAppendable";
+
+ @Override
+ protected void initManagedLedgerFactoryConfig(ManagedLedgerFactoryConfig
config) {
+ super.initManagedLedgerFactoryConfig(config);
+ // disable cache.
+ config.setMaxCacheSize(0);
+ }
+
+ @DataProvider(name = "offloadAndDeleteTypes")
+ public Object[][] offloadAndDeleteTypes() {
+ return new Object[][]{
+ {"normal", true},
+ {"normal", false},
+ {offloadTypeAppendable, true},
+ {offloadTypeAppendable, false},
+ };
+ }
+
+ @Test(dataProvider = "offloadAndDeleteTypes")
+ public void testOffloadRead(String offloadType, boolean deleteMl) throws
Exception {
MockLedgerOffloader offloader = spy(MockLedgerOffloader.class);
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setMaxEntriesPerLedger(10);
@@ -88,6 +113,10 @@ public class OffloadPrefixReadTest extends
MockedBookKeeperTestCase {
Assert.assertTrue(ledger.getLedgersInfoAsList().get(1).getOffloadContext().getComplete());
Assert.assertFalse(ledger.getLedgersInfoAsList().get(2).getOffloadContext().getComplete());
+ if (offloadTypeAppendable.equals(offloadType)) {
+ config.setLedgerOffloader(new
NonAppendableLedgerOffloader(offloader));
+ }
+
UUID firstLedgerUUID = new
UUID(ledger.getLedgersInfoAsList().get(0).getOffloadContext().getUidMsb(),
ledger.getLedgersInfoAsList().get(0).getOffloadContext().getUidLsb());
UUID secondLedgerUUID = new
UUID(ledger.getLedgersInfoAsList().get(1).getOffloadContext().getUidMsb(),
@@ -115,13 +144,30 @@ public class OffloadPrefixReadTest extends
MockedBookKeeperTestCase {
verify(offloader, times(2))
.readOffloaded(anyLong(), (UUID) any(), anyMap());
- ledger.close();
- // Ensure that all the read handles had been closed
- assertEquals(offloader.openedReadHandles.get(), 0);
+ if (!deleteMl) {
+ ledger.close();
+ // Ensure that all the read handles had been closed
+ assertEquals(offloader.openedReadHandles.get(), 0);
+ } else {
+ // Verify: the ledger offloaded will be deleted after managed
ledger is deleted.
+ ledger.delete();
+ Awaitility.await().untilAsserted(() -> {
+ assertTrue(offloader.offloads.size() <= 1);
+ assertTrue(ledger.ledgers.size() <= 1);
+ });
+ }
}
- @Test
- public void testBookkeeperFirstOffloadRead() throws Exception {
+ @DataProvider(name = "offloadTypes")
+ public Object[][] offloadTypes() {
+ return new Object[][]{
+ {"normal"},
+ {offloadTypeAppendable},
+ };
+ }
+
+ @Test(dataProvider = "offloadTypes")
+ public void testBookkeeperFirstOffloadRead(String offloadType) throws
Exception {
MockLedgerOffloader offloader = spy(MockLedgerOffloader.class);
MockClock clock = new MockClock();
offloader.getOffloadPolicies()
@@ -186,6 +232,10 @@ public class OffloadPrefixReadTest extends
MockedBookKeeperTestCase {
Assert.assertTrue(ledger.getLedgersInfoAsList().get(0).getOffloadContext().getBookkeeperDeleted());
Assert.assertTrue(ledger.getLedgersInfoAsList().get(1).getOffloadContext().getBookkeeperDeleted());
+ if (offloadTypeAppendable.equals(offloadType)) {
+ config.setLedgerOffloader(new
NonAppendableLedgerOffloader(offloader));
+ }
+
for (Entry e : cursor.readEntries(10)) {
Assert.assertEquals(new String(e.getData()), "entry-" + i++);
}
@@ -195,6 +245,56 @@ public class OffloadPrefixReadTest extends
MockedBookKeeperTestCase {
.readOffloaded(anyLong(), (UUID) any(), anyMap());
verify(offloader).readOffloaded(anyLong(), eq(secondLedgerUUID),
anyMap());
+ // Verify: the ledger offloaded will be trimmed after if no backlog.
+ while (cursor.hasMoreEntries()) {
+ cursor.readEntries(1);
+ }
+ config.setRetentionTime(0, TimeUnit.MILLISECONDS);
+ config.setRetentionSizeInMB(0);
+ CompletableFuture trimFuture = new CompletableFuture();
+ ledger.trimConsumedLedgersInBackground(trimFuture);
+ trimFuture.join();
+ Awaitility.await().untilAsserted(() -> {
+ assertTrue(offloader.offloads.size() <= 1);
+ assertTrue(ledger.ledgers.size() <= 1);
+ });
+
+ // cleanup.
+ ledger.delete();
+ }
+
+
+
+ @Test
+ public void testSkipOffloadIfReadOnly() throws Exception {
+ LedgerOffloader ol = new
NonAppendableLedgerOffloader(spy(MockLedgerOffloader.class));
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setMaxEntriesPerLedger(10);
+ config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
+ config.setRetentionTime(10, TimeUnit.MINUTES);
+ config.setRetentionSizeInMB(10);
+ config.setLedgerOffloader(ol);
+ ManagedLedgerImpl ledger = (ManagedLedgerImpl)
factory.open("my_test_ledger", config);
+
+ for (int i = 0; i < 25; i++) {
+ String content = "entry-" + i;
+ ledger.addEntry(content.getBytes());
+ }
+ assertEquals(ledger.getLedgersInfoAsList().size(), 3);
+
+ try {
+ ledger.offloadPrefix(ledger.getLastConfirmedEntry());
+ } catch (ManagedLedgerException mle) {
+ assertTrue(mle.getMessage().contains("does not support offload"));
+ }
+
+ assertEquals(ledger.getLedgersInfoAsList().size(), 3);
+
Assert.assertFalse(ledger.getLedgersInfoAsList().get(0).getOffloadContext().getComplete());
+
Assert.assertFalse(ledger.getLedgersInfoAsList().get(1).getOffloadContext().getComplete());
+
Assert.assertFalse(ledger.getLedgersInfoAsList().get(2).getOffloadContext().getComplete());
+
+ // cleanup.
+ ledger.delete();
}
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
index 2cdb14fb71e..53fa1725585 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
@@ -94,7 +94,7 @@ public class OffloadPrefixTest extends
MockedBookKeeperTestCase {
ledger.offloadPrefix(p);
fail("Should have thrown an exception");
} catch (ManagedLedgerException e) {
- assertEquals(e.getMessage(), "NullLedgerOffloader");
+ assertTrue(e.getMessage().contains("does not support offload"));
}
assertEquals(ledger.getLedgersInfoAsList().size(), 5);
assertEquals(ledger.getLedgersInfoAsList().stream()
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java
index 645563eb78c..c7685cfaa65 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java
@@ -83,13 +83,17 @@ public abstract class MockedBookKeeperTestCase {
}
ManagedLedgerFactoryConfig managedLedgerFactoryConfig = new
ManagedLedgerFactoryConfig();
- // increase default cache eviction interval so that caching could be
tested with less flakyness
- managedLedgerFactoryConfig.setCacheEvictionIntervalMs(200);
+ initManagedLedgerFactoryConfig(managedLedgerFactoryConfig);
factory = new ManagedLedgerFactoryImpl(metadataStore, bkc);
setUpTestCase();
}
+ protected void initManagedLedgerFactoryConfig(ManagedLedgerFactoryConfig
config) {
+ // increase default cache eviction interval so that caching could be
tested with less flakyness
+ config.setCacheEvictionIntervalMs(200);
+ }
+
protected void setUpTestCase() throws Exception {
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 5ce9519d9c0..959c0043aaf 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -87,7 +87,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import
org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerNotFoundException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
-import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
+import org.apache.bookkeeper.mledger.impl.NonAppendableLedgerOffloader;
import org.apache.bookkeeper.mledger.util.Futures;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
@@ -1903,29 +1903,26 @@ public class BrokerService implements Closeable {
topicLevelOffloadPolicies,
OffloadPoliciesImpl.oldPoliciesCompatible(nsLevelOffloadPolicies,
policies.orElse(null)),
getPulsar().getConfig().getProperties());
- if (NamespaceService.isSystemServiceNamespace(namespace.toString())
- || SystemTopicNames.isSystemTopic(topicName)) {
- /*
- Avoid setting broker internal system topics using off-loader
because some of them are the
- preconditions of other topics. The slow replying log speed
will cause a delay in all the topic
- loading.(timeout)
- */
-
managedLedgerConfig.setLedgerOffloader(NullLedgerOffloader.INSTANCE);
- } else {
- if (topicLevelOffloadPolicies != null) {
- try {
- LedgerOffloader topicLevelLedgerOffLoader =
-
pulsar().createManagedLedgerOffloader(offloadPolicies);
-
managedLedgerConfig.setLedgerOffloader(topicLevelLedgerOffLoader);
- } catch (PulsarServerException e) {
- throw new RuntimeException(e);
- }
- } else {
- //If the topic level policy is null, use the namespace
level
- managedLedgerConfig
-
.setLedgerOffloader(pulsar.getManagedLedgerOffloader(namespace,
offloadPolicies));
+ if (topicLevelOffloadPolicies != null) {
+ try {
+ LedgerOffloader topicLevelLedgerOffLoader =
pulsar().createManagedLedgerOffloader(offloadPolicies);
+
managedLedgerConfig.setLedgerOffloader(topicLevelLedgerOffLoader);
+ } catch (PulsarServerException e) {
+ throw new RuntimeException(e);
}
+ } else {
+ //If the topic level policy is null, use the namespace level
+ managedLedgerConfig
+
.setLedgerOffloader(pulsar.getManagedLedgerOffloader(namespace,
offloadPolicies));
}
+ if (managedLedgerConfig.getLedgerOffloader() != null
+ && managedLedgerConfig.getLedgerOffloader().isAppendable()
+ &&
(NamespaceService.isSystemServiceNamespace(namespace.toString())
+ || SystemTopicNames.isSystemTopic(topicName))) {
+ managedLedgerConfig.setLedgerOffloader(
+ new
NonAppendableLedgerOffloader(managedLedgerConfig.getLedgerOffloader()));
+ }
+
managedLedgerConfig.setTriggerOffloadOnTopicLoad(serviceConfig.isTriggerOffloadOnTopicLoad());
managedLedgerConfig.setDeletionAtBatchIndexLevelEnabled(
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
index 95b0d48c69a..03f358f2bc7 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
@@ -114,6 +114,7 @@ public class AdminApiOffloadTest extends
MockedPulsarServiceBaseTest {
CompletableFuture<Void> promise = new CompletableFuture<>();
doReturn(promise).when(offloader).offload(any(), any(), any());
+ doReturn(true).when(offloader).isAppendable();
MessageId currentId = MessageId.latest;
try (Producer<byte[]> p =
pulsarClient.newProducer().topic(topicName).enableBatching(false).create()) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index 1279b97a675..e1b42303f74 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -77,6 +77,7 @@ import
org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.commons.lang3.StringUtils;
import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
+import org.apache.bookkeeper.mledger.impl.NonAppendableLedgerOffloader;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
@@ -1910,6 +1911,10 @@ public class BrokerServiceTest extends BrokerTestBase {
final String namespace = "prop/" + UUID.randomUUID();
admin.namespaces().createNamespace(namespace);
admin.namespaces().setOffloadPolicies(namespace, offloadPolicies);
+ Awaitility.await().untilAsserted(() -> {
+ OffloadPolicies policiesGot =
admin.namespaces().getOffloadPolicies(namespace);
+ assertNotNull(policiesGot);
+ });
// Inject the cache to avoid real load off-loader jar
final Map<NamespaceName, LedgerOffloader> ledgerOffloaderMap =
pulsar.getLedgerOffloaderMap();
@@ -1923,8 +1928,20 @@ public class BrokerServiceTest extends BrokerTestBase {
// (2) test system topic
for (String eventTopicName : SystemTopicNames.EVENTS_TOPIC_NAMES) {
- managedLedgerConfig =
brokerService.getManagedLedgerConfig(TopicName.get(eventTopicName)).join();
- Assert.assertEquals(managedLedgerConfig.getLedgerOffloader(),
NullLedgerOffloader.INSTANCE);
+ boolean offloadPoliciesExists = false;
+ try {
+ OffloadPolicies policiesGot =
+
admin.namespaces().getOffloadPolicies(TopicName.get(eventTopicName).getNamespace());
+ offloadPoliciesExists = policiesGot != null;
+ } catch (PulsarAdminException.NotFoundException notFoundException)
{
+ offloadPoliciesExists = false;
+ }
+ var managedLedgerConfig2 =
brokerService.getManagedLedgerConfig(TopicName.get(eventTopicName)).join();
+ if (offloadPoliciesExists) {
+ Assert.assertTrue(managedLedgerConfig2.getLedgerOffloader()
instanceof NonAppendableLedgerOffloader);
+ } else {
+ Assert.assertEquals(managedLedgerConfig2.getLedgerOffloader(),
NullLedgerOffloader.INSTANCE);
+ }
}
}
}