This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 36d85ea9564 [fix] [broker] Fix system topic can not be loaded up if it 
contains data offloaded (#23279)
36d85ea9564 is described below

commit 36d85ea9564a49988012243b5458b75735dce7bc
Author: fengyubiao <[email protected]>
AuthorDate: Wed Sep 11 19:33:47 2024 +0800

    [fix] [broker] Fix system topic can not be loaded up if it contains data 
offloaded (#23279)
    
    (cherry picked from commit fc0e4e3fe0fa14d9ac1361871edc53957625fe29)
---
 .../apache/bookkeeper/mledger/LedgerOffloader.java |   4 +
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  28 +++--
 ...ader.java => NonAppendableLedgerOffloader.java} |  33 +++---
 .../mledger/impl/NullLedgerOffloader.java          |   5 +
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java |   2 +-
 .../mledger/impl/OffloadPrefixReadTest.java        | 114 +++++++++++++++++++--
 .../bookkeeper/mledger/impl/OffloadPrefixTest.java |   2 +-
 .../bookkeeper/test/MockedBookKeeperTestCase.java  |   8 +-
 .../pulsar/broker/service/BrokerService.java       |  41 ++++----
 .../pulsar/broker/admin/AdminApiOffloadTest.java   |   1 +
 .../pulsar/broker/service/BrokerServiceTest.java   |  21 +++-
 11 files changed, 198 insertions(+), 61 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
index 868a8e42653..11148ef1a59 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
@@ -230,5 +230,9 @@ public interface LedgerOffloader {
                              Map<String, String> offloadDriverMetadata) throws 
ManagedLedgerException {
         throw ManagedLedgerException.getManagedLedgerException(new 
UnsupportedOperationException());
     }
+
+    default boolean isAppendable() {
+        return true;
+    }
 }
 
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 3f327152586..7dfe8d4fb03 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -94,6 +94,7 @@ import 
org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.TerminateCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.UpdatePropertiesCallback;
 import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.LedgerOffloader;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
@@ -2484,8 +2485,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
     }
 
     public void maybeOffloadInBackground(CompletableFuture<PositionImpl> 
promise) {
-        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() 
== NullLedgerOffloader.INSTANCE
-                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+        if (getOffloadPoliciesIfAppendable().isEmpty()) {
             return;
         }
 
@@ -2501,8 +2501,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
 
     private void maybeOffload(long offloadThresholdInBytes, long 
offloadThresholdInSeconds,
                               CompletableFuture<PositionImpl> finalPromise) {
-        if (config.getLedgerOffloader() == null || config.getLedgerOffloader() 
== NullLedgerOffloader.INSTANCE
-                || config.getLedgerOffloader().getOffloadPolicies() == null) {
+        if (getOffloadPoliciesIfAppendable().isEmpty()) {
             String msg = String.format("[%s] Nothing to offload due to 
offloader or offloadPolicies is NULL", name);
             finalPromise.completeExceptionally(new 
IllegalArgumentException(msg));
             return;
@@ -2604,6 +2603,16 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
         internalTrimLedgers(false, promise);
     }
 
+    private Optional<OffloadPolicies> getOffloadPoliciesIfAppendable() {
+        LedgerOffloader ledgerOffloader = config.getLedgerOffloader();
+        if (ledgerOffloader == null
+                || !ledgerOffloader.isAppendable()
+                || ledgerOffloader.getOffloadPolicies() == null) {
+            return Optional.empty();
+        }
+        return Optional.ofNullable(ledgerOffloader.getOffloadPolicies());
+    }
+
     void internalTrimLedgers(boolean isTruncate, CompletableFuture<?> promise) 
{
         if (!factory.isMetadataServiceAvailable()) {
             // Defer trimming of ledger if we cannot connect to metadata 
service
@@ -2619,10 +2628,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
 
         List<LedgerInfo> ledgersToDelete = new ArrayList<>();
         List<LedgerInfo> offloadedLedgersToDelete = new ArrayList<>();
-        Optional<OffloadPolicies> optionalOffloadPolicies = 
Optional.ofNullable(config.getLedgerOffloader() != null
-                && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
-                ? config.getLedgerOffloader().getOffloadPolicies()
-                : null);
+        Optional<OffloadPolicies> optionalOffloadPolicies = 
getOffloadPoliciesIfAppendable();
         synchronized (this) {
             if (log.isDebugEnabled()) {
                 log.debug("[{}] Start TrimConsumedLedgers. ledgers={} 
totalSize={}", name, ledgers.keySet(),
@@ -3141,8 +3147,10 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
 
     @Override
     public void asyncOffloadPrefix(Position pos, OffloadCallback callback, 
Object ctx) {
-        if (config.getLedgerOffloader() != null && config.getLedgerOffloader() 
== NullLedgerOffloader.INSTANCE) {
-            callback.offloadFailed(new 
ManagedLedgerException("NullLedgerOffloader"), ctx);
+        LedgerOffloader ledgerOffloader = config.getLedgerOffloader();
+        if (ledgerOffloader != null && !ledgerOffloader.isAppendable()) {
+            String msg = String.format("[%s] does not support offload", 
ledgerOffloader.getClass().getSimpleName());
+            callback.offloadFailed(new ManagedLedgerException(msg), ctx);
             return;
         }
         PositionImpl requestOffloadTo = (PositionImpl) pos;
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NullLedgerOffloader.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonAppendableLedgerOffloader.java
similarity index 70%
copy from 
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NullLedgerOffloader.java
copy to 
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonAppendableLedgerOffloader.java
index 938ceb0c7df..f3001ec8050 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NullLedgerOffloader.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonAppendableLedgerOffloader.java
@@ -24,50 +24,51 @@ import java.util.concurrent.CompletableFuture;
 import org.apache.bookkeeper.client.api.ReadHandle;
 import org.apache.bookkeeper.mledger.LedgerOffloader;
 import org.apache.pulsar.common.policies.data.OffloadPolicies;
+import org.apache.pulsar.common.util.FutureUtil;
 
-/**
- * Null implementation that throws an error on any invokation.
- */
-public class NullLedgerOffloader implements LedgerOffloader {
-    public static final NullLedgerOffloader INSTANCE = new 
NullLedgerOffloader();
+public class NonAppendableLedgerOffloader implements LedgerOffloader {
+    private LedgerOffloader delegate;
+
+    public NonAppendableLedgerOffloader(LedgerOffloader delegate) {
+        this.delegate = delegate;
+    }
 
     @Override
     public String getOffloadDriverName() {
-        return "NullLedgerOffloader";
+        return delegate.getOffloadDriverName();
     }
 
     @Override
     public CompletableFuture<Void> offload(ReadHandle ledger,
                                            UUID uid,
                                            Map<String, String> extraMetadata) {
-        CompletableFuture<Void> promise = new CompletableFuture<>();
-        promise.completeExceptionally(new UnsupportedOperationException());
-        return promise;
+        return FutureUtil.failedFuture(new UnsupportedOperationException());
     }
 
     @Override
     public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID uid,
                                                        Map<String, String> 
offloadDriverMetadata) {
-        CompletableFuture<ReadHandle> promise = new CompletableFuture<>();
-        promise.completeExceptionally(new UnsupportedOperationException());
-        return promise;
+        return delegate.readOffloaded(ledgerId, uid, offloadDriverMetadata);
     }
 
     @Override
     public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid,
                                                    Map<String, String> 
offloadDriverMetadata) {
-        CompletableFuture<Void> promise = new CompletableFuture<>();
-        promise.completeExceptionally(new UnsupportedOperationException());
-        return promise;
+        return delegate.deleteOffloaded(ledgerId, uid, offloadDriverMetadata);
     }
 
     @Override
     public OffloadPolicies getOffloadPolicies() {
-        return null;
+        return delegate.getOffloadPolicies();
     }
 
     @Override
     public void close() {
+        delegate.close();
+    }
 
+    @Override
+    public boolean isAppendable() {
+        return false;
     }
 }
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NullLedgerOffloader.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NullLedgerOffloader.java
index 938ceb0c7df..fe646bc82e5 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NullLedgerOffloader.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NullLedgerOffloader.java
@@ -70,4 +70,9 @@ public class NullLedgerOffloader implements LedgerOffloader {
     public void close() {
 
     }
+
+    @Override
+    public boolean isAppendable() {
+        return false;
+    }
 }
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 4f521f1e99e..166cb468d0f 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -3848,7 +3848,7 @@ public class ManagedLedgerTest extends 
MockedBookKeeperTestCase {
         config.setLedgerOffloader(ledgerOffloader);
 
         ledger.internalTrimConsumedLedgers(Futures.NULL_PROMISE);
-        verify(ledgerOffloader, times(1)).getOffloadPolicies();
+        verify(ledgerOffloader, times(1)).isAppendable();
     }
 
     @Test(timeOut = 30000)
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
index cd224e33e27..8ab844dcfae 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
@@ -29,6 +29,7 @@ import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
 import io.netty.buffer.ByteBuf;
 
 import java.util.ArrayList;
@@ -54,18 +55,42 @@ import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.LedgerOffloader;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
 import 
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
 import org.apache.bookkeeper.mledger.util.MockClock;
 import org.apache.bookkeeper.net.BookieId;
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
 import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
 import org.apache.pulsar.common.policies.data.OffloadedReadPriority;
+import org.awaitility.Awaitility;
 import org.testng.Assert;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 public class OffloadPrefixReadTest extends MockedBookKeeperTestCase {
-    @Test
-    public void testOffloadRead() throws Exception {
+
+    private final String offloadTypeAppendable = "NonAppendable";
+
+    @Override
+    protected void initManagedLedgerFactoryConfig(ManagedLedgerFactoryConfig 
config) {
+        super.initManagedLedgerFactoryConfig(config);
+        // disable cache.
+        config.setMaxCacheSize(0);
+    }
+
+    @DataProvider(name = "offloadAndDeleteTypes")
+    public Object[][] offloadAndDeleteTypes() {
+        return new Object[][]{
+                {"normal", true},
+                {"normal", false},
+                {offloadTypeAppendable, true},
+                {offloadTypeAppendable, false},
+        };
+    }
+
+    @Test(dataProvider = "offloadAndDeleteTypes")
+    public void testOffloadRead(String offloadType, boolean deleteMl) throws 
Exception {
         MockLedgerOffloader offloader = spy(MockLedgerOffloader.class);
         ManagedLedgerConfig config = new ManagedLedgerConfig();
         config.setMaxEntriesPerLedger(10);
@@ -88,6 +113,10 @@ public class OffloadPrefixReadTest extends 
MockedBookKeeperTestCase {
         
Assert.assertTrue(ledger.getLedgersInfoAsList().get(1).getOffloadContext().getComplete());
         
Assert.assertFalse(ledger.getLedgersInfoAsList().get(2).getOffloadContext().getComplete());
 
+        if (offloadTypeAppendable.equals(offloadType)) {
+            config.setLedgerOffloader(new 
NonAppendableLedgerOffloader(offloader));
+        }
+
         UUID firstLedgerUUID = new 
UUID(ledger.getLedgersInfoAsList().get(0).getOffloadContext().getUidMsb(),
                 
ledger.getLedgersInfoAsList().get(0).getOffloadContext().getUidLsb());
         UUID secondLedgerUUID = new 
UUID(ledger.getLedgersInfoAsList().get(1).getOffloadContext().getUidMsb(),
@@ -115,13 +144,30 @@ public class OffloadPrefixReadTest extends 
MockedBookKeeperTestCase {
         verify(offloader, times(2))
                 .readOffloaded(anyLong(), (UUID) any(), anyMap());
 
-        ledger.close();
-        // Ensure that all the read handles had been closed
-        assertEquals(offloader.openedReadHandles.get(), 0);
+        if (!deleteMl) {
+            ledger.close();
+            // Ensure that all the read handles had been closed
+            assertEquals(offloader.openedReadHandles.get(), 0);
+        } else {
+            // Verify: the ledger offloaded will be deleted after managed 
ledger is deleted.
+            ledger.delete();
+            Awaitility.await().untilAsserted(() -> {
+                assertTrue(offloader.offloads.size() <= 1);
+                assertTrue(ledger.ledgers.size() <= 1);
+            });
+        }
     }
 
-    @Test
-    public void testBookkeeperFirstOffloadRead() throws Exception {
+    @DataProvider(name = "offloadTypes")
+    public Object[][] offloadTypes() {
+        return new Object[][]{
+                {"normal"},
+                {offloadTypeAppendable},
+        };
+    }
+
+    @Test(dataProvider = "offloadTypes")
+    public void testBookkeeperFirstOffloadRead(String offloadType) throws 
Exception {
         MockLedgerOffloader offloader = spy(MockLedgerOffloader.class);
         MockClock clock = new MockClock();
         offloader.getOffloadPolicies()
@@ -186,6 +232,10 @@ public class OffloadPrefixReadTest extends 
MockedBookKeeperTestCase {
         
Assert.assertTrue(ledger.getLedgersInfoAsList().get(0).getOffloadContext().getBookkeeperDeleted());
         
Assert.assertTrue(ledger.getLedgersInfoAsList().get(1).getOffloadContext().getBookkeeperDeleted());
 
+        if (offloadTypeAppendable.equals(offloadType)) {
+            config.setLedgerOffloader(new 
NonAppendableLedgerOffloader(offloader));
+        }
+
         for (Entry e : cursor.readEntries(10)) {
             Assert.assertEquals(new String(e.getData()), "entry-" + i++);
         }
@@ -195,6 +245,56 @@ public class OffloadPrefixReadTest extends 
MockedBookKeeperTestCase {
                 .readOffloaded(anyLong(), (UUID) any(), anyMap());
         verify(offloader).readOffloaded(anyLong(), eq(secondLedgerUUID), 
anyMap());
 
+        // Verify: the ledger offloaded will be trimmed after if no backlog.
+        while (cursor.hasMoreEntries()) {
+            cursor.readEntries(1);
+        }
+        config.setRetentionTime(0, TimeUnit.MILLISECONDS);
+        config.setRetentionSizeInMB(0);
+        CompletableFuture trimFuture = new CompletableFuture();
+        ledger.trimConsumedLedgersInBackground(trimFuture);
+        trimFuture.join();
+        Awaitility.await().untilAsserted(() -> {
+            assertTrue(offloader.offloads.size() <= 1);
+            assertTrue(ledger.ledgers.size() <= 1);
+        });
+
+        // cleanup.
+        ledger.delete();
+    }
+
+
+
+    @Test
+    public void testSkipOffloadIfReadOnly() throws Exception {
+        LedgerOffloader ol = new 
NonAppendableLedgerOffloader(spy(MockLedgerOffloader.class));
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setMaxEntriesPerLedger(10);
+        config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
+        config.setRetentionTime(10, TimeUnit.MINUTES);
+        config.setRetentionSizeInMB(10);
+        config.setLedgerOffloader(ol);
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl) 
factory.open("my_test_ledger", config);
+
+        for (int i = 0; i < 25; i++) {
+            String content = "entry-" + i;
+            ledger.addEntry(content.getBytes());
+        }
+        assertEquals(ledger.getLedgersInfoAsList().size(), 3);
+
+        try {
+            ledger.offloadPrefix(ledger.getLastConfirmedEntry());
+        } catch (ManagedLedgerException mle) {
+            assertTrue(mle.getMessage().contains("does not support offload"));
+        }
+
+        assertEquals(ledger.getLedgersInfoAsList().size(), 3);
+        
Assert.assertFalse(ledger.getLedgersInfoAsList().get(0).getOffloadContext().getComplete());
+        
Assert.assertFalse(ledger.getLedgersInfoAsList().get(1).getOffloadContext().getComplete());
+        
Assert.assertFalse(ledger.getLedgersInfoAsList().get(2).getOffloadContext().getComplete());
+
+        // cleanup.
+        ledger.delete();
     }
 
 
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
index 2cdb14fb71e..53fa1725585 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
@@ -94,7 +94,7 @@ public class OffloadPrefixTest extends 
MockedBookKeeperTestCase {
             ledger.offloadPrefix(p);
             fail("Should have thrown an exception");
         } catch (ManagedLedgerException e) {
-            assertEquals(e.getMessage(), "NullLedgerOffloader");
+            assertTrue(e.getMessage().contains("does not support offload"));
         }
         assertEquals(ledger.getLedgersInfoAsList().size(), 5);
         assertEquals(ledger.getLedgersInfoAsList().stream()
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java
index 645563eb78c..c7685cfaa65 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java
@@ -83,13 +83,17 @@ public abstract class MockedBookKeeperTestCase {
         }
 
         ManagedLedgerFactoryConfig managedLedgerFactoryConfig = new 
ManagedLedgerFactoryConfig();
-        // increase default cache eviction interval so that caching could be 
tested with less flakyness
-        managedLedgerFactoryConfig.setCacheEvictionIntervalMs(200);
+        initManagedLedgerFactoryConfig(managedLedgerFactoryConfig);
         factory = new ManagedLedgerFactoryImpl(metadataStore, bkc);
 
         setUpTestCase();
     }
 
+    protected void initManagedLedgerFactoryConfig(ManagedLedgerFactoryConfig 
config) {
+        // increase default cache eviction interval so that caching could be 
tested with less flakyness
+        config.setCacheEvictionIntervalMs(200);
+    }
+
     protected void setUpTestCase() throws Exception {
 
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 5ce9519d9c0..959c0043aaf 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -87,7 +87,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import 
org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerNotFoundException;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
-import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
+import org.apache.bookkeeper.mledger.impl.NonAppendableLedgerOffloader;
 import org.apache.bookkeeper.mledger.util.Futures;
 import org.apache.commons.collections4.MapUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -1903,29 +1903,26 @@ public class BrokerService implements Closeable {
                     topicLevelOffloadPolicies,
                     
OffloadPoliciesImpl.oldPoliciesCompatible(nsLevelOffloadPolicies, 
policies.orElse(null)),
                     getPulsar().getConfig().getProperties());
-            if (NamespaceService.isSystemServiceNamespace(namespace.toString())
-                || SystemTopicNames.isSystemTopic(topicName)) {
-                /*
-                 Avoid setting broker internal system topics using off-loader 
because some of them are the
-                 preconditions of other topics. The slow replying log speed 
will cause a delay in all the topic
-                 loading.(timeout)
-                 */
-                
managedLedgerConfig.setLedgerOffloader(NullLedgerOffloader.INSTANCE);
-            } else  {
-                if (topicLevelOffloadPolicies != null) {
-                    try {
-                        LedgerOffloader topicLevelLedgerOffLoader =
-                                
pulsar().createManagedLedgerOffloader(offloadPolicies);
-                        
managedLedgerConfig.setLedgerOffloader(topicLevelLedgerOffLoader);
-                    } catch (PulsarServerException e) {
-                        throw new RuntimeException(e);
-                    }
-                } else {
-                    //If the topic level policy is null, use the namespace 
level
-                    managedLedgerConfig
-                            
.setLedgerOffloader(pulsar.getManagedLedgerOffloader(namespace, 
offloadPolicies));
+            if (topicLevelOffloadPolicies != null) {
+                try {
+                    LedgerOffloader topicLevelLedgerOffLoader = 
pulsar().createManagedLedgerOffloader(offloadPolicies);
+                    
managedLedgerConfig.setLedgerOffloader(topicLevelLedgerOffLoader);
+                } catch (PulsarServerException e) {
+                    throw new RuntimeException(e);
                 }
+            } else {
+                //If the topic level policy is null, use the namespace level
+                managedLedgerConfig
+                        
.setLedgerOffloader(pulsar.getManagedLedgerOffloader(namespace, 
offloadPolicies));
             }
+            if (managedLedgerConfig.getLedgerOffloader() != null
+                    && managedLedgerConfig.getLedgerOffloader().isAppendable()
+                    && 
(NamespaceService.isSystemServiceNamespace(namespace.toString())
+                            || SystemTopicNames.isSystemTopic(topicName))) {
+                managedLedgerConfig.setLedgerOffloader(
+                        new 
NonAppendableLedgerOffloader(managedLedgerConfig.getLedgerOffloader()));
+            }
+
             
managedLedgerConfig.setTriggerOffloadOnTopicLoad(serviceConfig.isTriggerOffloadOnTopicLoad());
 
             managedLedgerConfig.setDeletionAtBatchIndexLevelEnabled(
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
index 95b0d48c69a..03f358f2bc7 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
@@ -114,6 +114,7 @@ public class AdminApiOffloadTest extends 
MockedPulsarServiceBaseTest {
 
         CompletableFuture<Void> promise = new CompletableFuture<>();
         doReturn(promise).when(offloader).offload(any(), any(), any());
+        doReturn(true).when(offloader).isAppendable();
 
         MessageId currentId = MessageId.latest;
         try (Producer<byte[]> p = 
pulsarClient.newProducer().topic(topicName).enableBatching(false).create()) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index 1279b97a675..e1b42303f74 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -77,6 +77,7 @@ import 
org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
+import org.apache.bookkeeper.mledger.impl.NonAppendableLedgerOffloader;
 import org.apache.http.HttpResponse;
 import org.apache.http.client.methods.HttpGet;
 import org.apache.http.impl.client.CloseableHttpClient;
@@ -1910,6 +1911,10 @@ public class BrokerServiceTest extends BrokerTestBase {
         final String namespace = "prop/" + UUID.randomUUID();
         admin.namespaces().createNamespace(namespace);
         admin.namespaces().setOffloadPolicies(namespace, offloadPolicies);
+        Awaitility.await().untilAsserted(() -> {
+            OffloadPolicies policiesGot = 
admin.namespaces().getOffloadPolicies(namespace);
+            assertNotNull(policiesGot);
+        });
 
         // Inject the cache to avoid real load off-loader jar
         final Map<NamespaceName, LedgerOffloader> ledgerOffloaderMap = 
pulsar.getLedgerOffloaderMap();
@@ -1923,8 +1928,20 @@ public class BrokerServiceTest extends BrokerTestBase {
 
         // (2) test system topic
         for (String eventTopicName : SystemTopicNames.EVENTS_TOPIC_NAMES) {
-            managedLedgerConfig = 
brokerService.getManagedLedgerConfig(TopicName.get(eventTopicName)).join();
-            Assert.assertEquals(managedLedgerConfig.getLedgerOffloader(), 
NullLedgerOffloader.INSTANCE);
+            boolean offloadPoliciesExists = false;
+            try {
+                OffloadPolicies policiesGot =
+                        
admin.namespaces().getOffloadPolicies(TopicName.get(eventTopicName).getNamespace());
+                offloadPoliciesExists = policiesGot != null;
+            } catch (PulsarAdminException.NotFoundException notFoundException) 
{
+                offloadPoliciesExists = false;
+            }
+            var managedLedgerConfig2 = 
brokerService.getManagedLedgerConfig(TopicName.get(eventTopicName)).join();
+            if (offloadPoliciesExists) {
+                Assert.assertTrue(managedLedgerConfig2.getLedgerOffloader() 
instanceof NonAppendableLedgerOffloader);
+            } else {
+                Assert.assertEquals(managedLedgerConfig2.getLedgerOffloader(), 
NullLedgerOffloader.INSTANCE);
+            }
         }
     }
 }

Reply via email to