This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a95a382 Cancel scheduled tasks when deleting ManagedLedgerImpl
(#12565)
a95a382 is described below
commit a95a3824e0b1c207ea844fbce724734764f0be62
Author: Masahiro Sakamoto <[email protected]>
AuthorDate: Tue Nov 2 02:13:47 2021 +0900
Cancel scheduled tasks when deleting ManagedLedgerImpl (#12565)
---
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 20 +++++++------
.../bookkeeper/mledger/impl/ManagedLedgerTest.java | 33 ++++++++++++++++++++++
2 files changed, 45 insertions(+), 8 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 0bc88c5..10ccde2 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1324,14 +1324,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
factory.close(this);
STATE_UPDATER.set(this, State.Closed);
-
- if (this.timeoutTask != null) {
- this.timeoutTask.cancel(false);
- }
-
- if (this.checkLedgerRollTask != null) {
- this.checkLedgerRollTask.cancel(false);
- }
+ cancelScheduledTasks();
LedgerHandle lh = currentLedger;
@@ -2606,6 +2599,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
// Delete the managed ledger without closing, since we are not
interested in gracefully closing cursors and
// ledgers
STATE_UPDATER.set(this, State.Fenced);
+ cancelScheduledTasks();
List<ManagedCursor> cursors = Lists.newArrayList(this.cursors);
if (cursors.isEmpty()) {
@@ -4003,4 +3997,14 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
}
}
+ private void cancelScheduledTasks() {
+ if (this.timeoutTask != null) {
+ this.timeoutTask.cancel(false);
+ }
+
+ if (this.checkLedgerRollTask != null) {
+ this.checkLedgerRollTask.cancel(false);
+ }
+ }
+
}
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 4358c2f..d10fcdd 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -61,6 +61,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -3360,4 +3361,36 @@ public class ManagedLedgerTest extends
MockedBookKeeperTestCase {
managedLedgerB.close();
}
+
+ @Test
+ public void testCancellationOfScheduledTasks() throws Exception {
+ Field timeoutTaskField =
ManagedLedgerImpl.class.getDeclaredField("timeoutTask");
+ timeoutTaskField.setAccessible(true);
+ Field checkLedgerRollTaskField =
ManagedLedgerImpl.class.getDeclaredField("checkLedgerRollTask");
+ checkLedgerRollTaskField.setAccessible(true);
+
+ ManagedLedgerImpl ledger1 = (ManagedLedgerImpl)
factory.open("my_test_ledger_1");
+ ledger1.addEntry("dummy-entry-1".getBytes(Encoding));
+ ScheduledFuture<?> timeoutTask1 = (ScheduledFuture<?>)
timeoutTaskField.get(ledger1);
+ assertNotNull(timeoutTask1);
+ assertFalse(timeoutTask1.isDone());
+ ScheduledFuture<?> checkLedgerRollTask1 = (ScheduledFuture<?>)
checkLedgerRollTaskField.get(ledger1);
+ assertNotNull(checkLedgerRollTask1);
+ assertFalse(checkLedgerRollTask1.isDone());
+ ledger1.close();
+ assertTrue(timeoutTask1.isCancelled());
+ assertTrue(checkLedgerRollTask1.isCancelled());
+
+ ManagedLedgerImpl ledger2 = (ManagedLedgerImpl)
factory.open("my_test_ledger_2");
+ ledger2.addEntry("dummy-entry-2".getBytes(Encoding));
+ ScheduledFuture<?> timeoutTask2 = (ScheduledFuture<?>)
timeoutTaskField.get(ledger2);
+ assertNotNull(timeoutTask2);
+ assertFalse(timeoutTask2.isDone());
+ ScheduledFuture<?> checkLedgerRollTask2 = (ScheduledFuture<?>)
checkLedgerRollTaskField.get(ledger2);
+ assertNotNull(checkLedgerRollTask2);
+ assertFalse(checkLedgerRollTask2.isDone());
+ ledger2.delete();
+ assertTrue(timeoutTask2.isCancelled());
+ assertTrue(checkLedgerRollTask2.isCancelled());
+ }
}