This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit dc21282348c45228226722e5e458aab49ed5b072
Author: Renkai Ge <[email protected]>
AuthorDate: Tue Jan 26 17:23:26 2021 +0800

    Fix fake complete issue in offloading (#9306)
    
    ### Motivation
    In our current code, complete in offloading context may set true even sync 
metadata to Zookeeper failed, which may lead to more fatal error like data in 
Bookkeeper will be deleted but other managed ledger will see data not offloaded 
and try to read from Bookkeeper.
    
    ### Modification
    This PR make sure local ledger info will be updated after Zookeeper updated.
    * prevent ledgers info change without write to zk succeed
    * add unit test to prevent fake positive when offload failed
    
    (cherry picked from commit 3c22b473ddb124941d4bc9044ed6caaad97fab53)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 23 +++++++++-
 .../bookkeeper/mledger/impl/OffloadPrefixTest.java | 49 ++++++++++++++++++++++
 2 files changed, 70 insertions(+), 2 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 1e6b898..10c54bb 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -2637,12 +2637,14 @@ public class ManagedLedgerImpl implements 
ManagedLedger, CreateCallback {
                 } else {
                     try {
                         LedgerInfo newInfo = transformation.transform(oldInfo);
-                        ledgers.put(ledgerId, newInfo);
-                        store.asyncUpdateLedgerIds(name, 
getManagedLedgerInfo(), ledgersStat,
+                        final HashMap<Long, LedgerInfo> newLedgers = new 
HashMap<>(ledgers);
+                        newLedgers.put(ledgerId, newInfo);
+                        store.asyncUpdateLedgerIds(name, 
buildManagedLedgerInfo(newLedgers), ledgersStat,
                                 new MetaStoreCallback<Void>() {
                                     @Override
                                     public void operationComplete(Void result, 
Stat stat) {
                                         ledgersStat = stat;
+                                        ledgers.put(ledgerId, newInfo);
                                         unlockingPromise.complete(null);
                                     }
 
@@ -3111,6 +3113,23 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
         return mlInfo.build();
     }
 
+    private ManagedLedgerInfo buildManagedLedgerInfo(Map<Long, LedgerInfo> 
ledgers) {
+        ManagedLedgerInfo.Builder mlInfo = 
ManagedLedgerInfo.newBuilder().addAllLedgerInfo(ledgers.values());
+        if (state == State.Terminated) {
+            
mlInfo.setTerminatedPosition(NestedPositionInfo.newBuilder().setLedgerId(lastConfirmedEntry.getLedgerId())
+                    .setEntryId(lastConfirmedEntry.getEntryId()));
+        }
+        if (managedLedgerInterceptor != null) {
+            managedLedgerInterceptor.onUpdateManagedLedgerInfo(propertiesMap);
+        }
+        for (Map.Entry<String, String> property : propertiesMap.entrySet()) {
+            mlInfo.addProperties(MLDataFormats.KeyValue.newBuilder()
+                    .setKey(property.getKey()).setValue(property.getValue()));
+        }
+
+        return mlInfo.build();
+    }
+
     /**
      * Throws an exception if the managed ledger has been previously fenced.
      *
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
index 807876e..81080db 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
@@ -47,11 +47,13 @@ import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats;
 import 
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
 import org.apache.commons.lang3.tuple.Pair;
 
 import org.apache.pulsar.common.policies.data.OffloadPolicies;
+import org.apache.zookeeper.MockZooKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -979,8 +981,13 @@ public class OffloadPrefixTest extends 
MockedBookKeeperTestCase {
     }
 
     static class MockLedgerOffloader implements LedgerOffloader {
+        interface InjectAfterOffload {
+            void call();
+        }
+
         ConcurrentHashMap<Long, UUID> offloads = new ConcurrentHashMap<Long, 
UUID>();
         ConcurrentHashMap<Long, UUID> deletes = new ConcurrentHashMap<Long, 
UUID>();
+        InjectAfterOffload inject = null;
 
         Set<Long> offloadedLedgers() {
             return offloads.keySet();
@@ -1012,6 +1019,10 @@ public class OffloadPrefixTest extends 
MockedBookKeeperTestCase {
             } else {
                 promise.completeExceptionally(new Exception("Already exists 
exception"));
             }
+
+            if (inject != null) {
+                inject.call();
+            }
             return promise;
         }
 
@@ -1047,6 +1058,44 @@ public class OffloadPrefixTest extends 
MockedBookKeeperTestCase {
         }
     }
 
+    @Test
+    public void testFailByZk() throws Exception {
+        MockLedgerOffloader offloader = new MockLedgerOffloader();
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setMaxEntriesPerLedger(10);
+        config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
+        config.setRetentionTime(10, TimeUnit.MINUTES);
+        config.setRetentionSizeInMB(10);
+        config.setLedgerOffloader(offloader);
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl) 
factory.open("my_test_ledger", config);
+
+        int i = 0;
+        for (; i < 25; i++) {
+            String content = "entry-" + i;
+            ledger.addEntry(content.getBytes());
+        }
+        assertEquals(ledger.getLedgersInfoAsList().size(), 3);
+
+        offloader.inject = () -> {
+            try {
+                stopZooKeeper();
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        };
+
+        try {
+            ledger.offloadPrefix(ledger.getLastConfirmedEntry());
+        } catch (Exception e) {
+
+        }
+        final LedgerInfo ledgerInfo = ledger.getLedgersInfoAsList().get(0);
+        final MLDataFormats.OffloadContext offloadContext = 
ledgerInfo.getOffloadContext();
+        //should not set complete when
+        assertEquals(offloadContext.getComplete(), false);
+        zkc = MockZooKeeper.newInstance();
+    }
+
     static class ErroringMockLedgerOffloader extends MockLedgerOffloader {
         CompletableFuture<Set<Long>> errorLedgers = new CompletableFuture<>();
 

Reply via email to