This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 1ccf90d95a7 [fix][offload] Don't cleanup data when offload met
MetaStore exception (#21686)
1ccf90d95a7 is described below
commit 1ccf90d95a7e7d291b8d27f627d06ac7da607954
Author: Yong Zhang <[email protected]>
AuthorDate: Fri Dec 8 04:20:55 2023 +0800
[fix][offload] Don't cleanup data when offload met MetaStore exception
(#21686)
---
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 3 +-
.../bookkeeper/mledger/impl/ManagedLedgerTest.java | 55 ++++++++++++++++++++++
2 files changed, 57 insertions(+), 1 deletion(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 1e5d67871d4..8ce2a6924eb 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -3209,7 +3209,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
}
}
- private void offloadLoop(CompletableFuture<PositionImpl> promise,
Queue<LedgerInfo> ledgersToOffload,
+ void offloadLoop(CompletableFuture<PositionImpl> promise,
Queue<LedgerInfo> ledgersToOffload,
PositionImpl firstUnoffloaded, Optional<Throwable> firstError) {
State currentState = getState();
if (currentState == State.Closed) {
@@ -3257,6 +3257,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
log.error("[{}] Failed to update
offloaded metadata for the ledgerId {}, "
+ "the offloaded
data will not be cleaned up",
name, ledgerId, exception);
+ return;
} else {
log.error("[{}] Failed to offload
data for the ledgerId {}, "
+ "clean up the
offloaded data",
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 601cc4633d7..8430afb4e4f 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -21,7 +21,9 @@ package org.apache.bookkeeper.mledger.impl;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
@@ -55,10 +57,12 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Optional;
+import java.util.Queue;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
@@ -4152,4 +4156,55 @@ public class ManagedLedgerTest extends
MockedBookKeeperTestCase {
}
return taskCounter;
}
+
+ @Test
+ public void testNoCleanupOffloadLedgerWhenMetadataExceptionHappens()
throws Exception {
+ ManagedLedgerConfig config = spy(new ManagedLedgerConfig());
+ ManagedLedgerImpl ml = spy((ManagedLedgerImpl)
factory.open("testNoCleanupOffloadLedger", config));
+
+ // mock the ledger offloader
+ LedgerOffloader ledgerOffloader = mock(NullLedgerOffloader.class);
+ when(config.getLedgerOffloader()).thenReturn(ledgerOffloader);
+ when(ledgerOffloader.getOffloadDriverName()).thenReturn("mock");
+
+ // There will have two put call to the metadata store, the first time
is prepare the offload.
+ // And the second is the complete the offload. This case is testing
when completing the offload,
+ // the metadata store meets an exception.
+ AtomicInteger metadataPutCallCount = new AtomicInteger(0);
+ metadataStore.failConditional(new MetadataStoreException("mock
completion error"),
+ (key, value) ->
key.equals(FaultInjectionMetadataStore.OperationType.PUT) &&
+ metadataPutCallCount.incrementAndGet() == 2);
+
+ // prepare the arguments for the offloadLoop method
+ CompletableFuture<PositionImpl> future = new CompletableFuture<>();
+ Queue<LedgerInfo> ledgersToOffload = new LinkedList<>();
+ LedgerInfo ledgerInfo =
LedgerInfo.getDefaultInstance().toBuilder().setLedgerId(1).setEntries(10).build();
+ ledgersToOffload.add(ledgerInfo);
+ PositionImpl firstUnoffloaded = new PositionImpl(1, 0);
+ Optional<Throwable> firstError = Optional.empty();
+
+ // mock the read handle to make the offload successful
+ CompletableFuture<ReadHandle> readHandle = new CompletableFuture<>();
+ readHandle.complete(mock(ReadHandle.class));
+
when(ml.getLedgerHandle(eq(ledgerInfo.getLedgerId()))).thenReturn(readHandle);
+ when(ledgerOffloader.offload(any(), any(),
anyMap())).thenReturn(CompletableFuture.completedFuture(null));
+
+ ml.ledgers.put(ledgerInfo.getLedgerId(), ledgerInfo);
+
+ // do the offload
+ ml.offloadLoop(future, ledgersToOffload, firstUnoffloaded, firstError);
+
+ // waiting for the offload complete
+ try {
+ future.join();
+ fail("The offload should fail");
+ } catch (Exception e) {
+ // the offload should fail
+ assertTrue(e.getCause().getMessage().contains("mock completion
error"));
+ }
+
+ // the ledger deletion shouldn't happen
+ verify(ledgerOffloader, times(0))
+ .deleteOffloaded(eq(ledgerInfo.getLedgerId()), any(), anyMap());
+ }
}