This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 660ee85e93476cec9867a92fcc319c2fdd10e32c
Author: frankxieke <[email protected]>
AuthorDate: Wed Oct 13 09:05:25 2021 +0800

    [offload] fix FileSystemManagedLedgerOffloader can not cleanup outdated 
ledger data (#12309)
    
    ### Motivation
    When using FileSystem Offloader, and you set the ledger retention policy, 
after the offloaded ledger is outdated, however the data can not be deleted 
from file system. This is because the datafile path is wrong.
    
    For example, in fact the datafile path is 
"file:///Users/pulsar_nfs/public/default/persistent/test_pulsar_delta/449-775f0961-9719-4658-8357-6b0edbdef7a3",
 but is mistaken formatted as 
"file:///Users/pulsar_nfs/null/449-775f0961-9719-4658-8357-6b0edbdef7a3".
    
    The reason is when format the data path, "ManagedLedgerName" property in 
"offloadDriverMetadata" is missing.
    
    ### Modifications
    Before format the data path, add "ManagedLedgerName" property in 
"offloadDriverMetadata" map.
    
    (cherry picked from commit f28303068e720cba9bd0df1484843ed27b3dc753)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |   6 +-
 .../mledger/impl/OffloadLedgerDeleteTest.java      | 149 +++++++++++++++++++++
 2 files changed, 154 insertions(+), 1 deletion(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index d93ad09..52cae3e 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -3077,9 +3077,13 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
             Map<String, String> offloadDriverMetadata, String cleanupReason) {
         log.info("[{}] Cleanup offload for ledgerId {} uuid {} because of the 
reason {}.",
                 name, ledgerId, uuid.toString(), cleanupReason);
+        Map<String, String> metadataMap = Maps.newHashMap();
+        metadataMap.putAll(offloadDriverMetadata);
+        metadataMap.put("ManagedLedgerName", name);
+
         Retries.run(Backoff.exponentialJittered(TimeUnit.SECONDS.toMillis(1), 
TimeUnit.SECONDS.toHours(1)).limit(10),
                 Retries.NonFatalPredicate,
-                () -> config.getLedgerOffloader().deleteOffloaded(ledgerId, 
uuid, offloadDriverMetadata),
+                () -> config.getLedgerOffloader().deleteOffloaded(ledgerId, 
uuid, metadataMap),
                 scheduledExecutor, name).whenComplete((ignored, exception) -> {
                     if (exception != null) {
                         log.warn("[{}] Error cleaning up offload for {}, 
(cleanup reason: {})",
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadLedgerDeleteTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadLedgerDeleteTest.java
index f25332e..e36835f 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadLedgerDeleteTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadLedgerDeleteTest.java
@@ -20,11 +20,16 @@ package org.apache.bookkeeper.mledger.impl;
 
 import static 
org.apache.bookkeeper.mledger.impl.OffloadPrefixTest.assertEventuallyTrue;
 
+import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
+import org.apache.bookkeeper.client.api.ReadHandle;
 import org.apache.bookkeeper.mledger.LedgerOffloader;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
@@ -43,6 +48,102 @@ import org.testng.annotations.Test;
 public class OffloadLedgerDeleteTest extends MockedBookKeeperTestCase {
     private static final Logger log = 
LoggerFactory.getLogger(OffloadLedgerDeleteTest.class);
 
+
+    static class MockFileSystemLedgerOffloader implements LedgerOffloader {
+        interface InjectAfterOffload {
+            void call();
+        }
+
+        private String storageBasePath = "/Users/pulsar_filesystem_offloader";
+
+        private static String getStoragePath(String storageBasePath, String 
managedLedgerName) {
+            return storageBasePath == null ? managedLedgerName + "/" : 
storageBasePath + "/" + managedLedgerName + "/";
+        }
+
+        private static String getDataFilePath(String storagePath, long 
ledgerId, UUID uuid) {
+            return storagePath + ledgerId + "-" + uuid.toString();
+        }
+
+        ConcurrentHashMap<Long, String> offloads = new ConcurrentHashMap<Long, 
String>();
+        ConcurrentHashMap<Long, String> deletes = new ConcurrentHashMap<Long, 
String>();
+        OffloadPrefixTest.MockLedgerOffloader.InjectAfterOffload inject = null;
+
+        Set<Long> offloadedLedgers() {
+            return offloads.keySet();
+        }
+
+        Set<Long> deletedOffloads() {
+            return deletes.keySet();
+        }
+
+        OffloadPoliciesImpl offloadPolicies = 
OffloadPoliciesImpl.create("filesystem", "", "", "",
+                null, null,
+                null, null,
+                OffloadPoliciesImpl.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES,
+                OffloadPoliciesImpl.DEFAULT_READ_BUFFER_SIZE_IN_BYTES,
+                OffloadPoliciesImpl.DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES,
+                OffloadPoliciesImpl.DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS,
+                OffloadPoliciesImpl.DEFAULT_OFFLOADED_READ_PRIORITY);
+
+        @Override
+        public String getOffloadDriverName() {
+            return "mockfilesystem";
+        }
+
+        @Override
+        public CompletableFuture<Void> offload(ReadHandle ledger,
+                                               UUID uuid,
+                                               Map<String, String> 
extraMetadata) {
+            Assert.assertNotNull(extraMetadata.get("ManagedLedgerName"));
+            String storagePath = getStoragePath(storageBasePath, 
extraMetadata.get("ManagedLedgerName"));
+            String dataFilePath = getDataFilePath(storagePath, ledger.getId(), 
uuid);
+            CompletableFuture<Void> promise = new CompletableFuture<>();
+            if (offloads.putIfAbsent(ledger.getId(), dataFilePath) == null) {
+                promise.complete(null);
+            } else {
+                promise.completeExceptionally(new Exception("Already exists 
exception"));
+            }
+
+            if (inject != null) {
+                inject.call();
+            }
+            return promise;
+        }
+
+        @Override
+        public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID 
uuid,
+                                                           Map<String, String> 
offloadDriverMetadata) {
+            CompletableFuture<ReadHandle> promise = new CompletableFuture<>();
+            promise.completeExceptionally(new UnsupportedOperationException());
+            return promise;
+        }
+
+        @Override
+        public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID 
uuid,
+                                                       Map<String, String> 
offloadDriverMetadata) {
+            
Assert.assertNotNull(offloadDriverMetadata.get("ManagedLedgerName"));
+            String storagePath = getStoragePath(storageBasePath, 
offloadDriverMetadata.get("ManagedLedgerName"));
+            String dataFilePath = getDataFilePath(storagePath, ledgerId, uuid);
+            CompletableFuture<Void> promise = new CompletableFuture<>();
+            if (offloads.remove(ledgerId, dataFilePath)) {
+                deletes.put(ledgerId, dataFilePath);
+                promise.complete(null);
+            } else {
+                promise.completeExceptionally(new Exception("Not found"));
+            }
+            return promise;
+        };
+
+        @Override
+        public OffloadPoliciesImpl getOffloadPolicies() {
+            return offloadPolicies;
+        }
+
+        @Override
+        public void close() {
+        }
+    }
+
     @Test
     public void testLaggedDelete() throws Exception {
         OffloadPrefixTest.MockLedgerOffloader offloader = new 
OffloadPrefixTest.MockLedgerOffloader();
@@ -105,6 +206,54 @@ public class OffloadLedgerDeleteTest extends 
MockedBookKeeperTestCase {
         assertEventuallyTrue(() -> 
offloader.deletedOffloads().contains(firstLedgerId));
     }
 
+    @Test(timeOut = 5000)
+    public void testFileSystemOffloadDeletePath() throws Exception {
+        MockFileSystemLedgerOffloader offloader = new 
MockFileSystemLedgerOffloader();
+
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        MockClock clock = new MockClock();
+        config.setMaxEntriesPerLedger(10);
+        config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
+        config.setRetentionTime(3, TimeUnit.MINUTES);
+        config.setRetentionSizeInMB(10);
+        
offloader.getOffloadPolicies().setManagedLedgerOffloadDeletionLagInMillis(300000L);
+        config.setLedgerOffloader(offloader);
+        config.setClock(clock);
+
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl) 
factory.open("my_test_ledger_filesystem", config);
+        int i = 0;
+        for (; i < 15; i++) {
+            String content = "entry-" + i;
+            ledger.addEntry(content.getBytes());
+        }
+        Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 2);
+        long firstLedgerId = 
ledger.getLedgersInfoAsList().get(0).getLedgerId();
+
+        ledger.offloadPrefix(ledger.getLastConfirmedEntry());
+
+        Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 2);
+        Assert.assertEquals(ledger.getLedgersInfoAsList().stream()
+                        .filter(e -> e.getOffloadContext().getComplete())
+                        .map(e -> e.getLedgerId()).collect(Collectors.toSet()),
+                offloader.offloadedLedgers());
+        Assert.assertTrue(bkc.getLedgers().contains(firstLedgerId));
+
+        // ledger still exists in list
+        Assert.assertEquals(ledger.getLedgersInfoAsList().stream()
+                        .filter(e -> e.getOffloadContext().getComplete())
+                        .map(e -> e.getLedgerId()).collect(Collectors.toSet()),
+                offloader.offloadedLedgers());
+
+        // move past retention, should be deleted from offloaded also
+        clock.advance(5, TimeUnit.MINUTES);
+        CompletableFuture<Void> promise3 = new CompletableFuture<>();
+        ledger.internalTrimConsumedLedgers(promise3);
+        promise3.join();
+
+        Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 1);
+        assertEventuallyTrue(() -> 
offloader.deletedOffloads().contains(firstLedgerId));
+    }
+
     @Test
     public void testLaggedDeleteRetentionSetLower() throws Exception {
         OffloadPrefixTest.MockLedgerOffloader offloader = new 
OffloadPrefixTest.MockLedgerOffloader();

Reply via email to