This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push: new 34bd4b38fa Upgrade mockito from 3.12.4 to 4.11.0 (#4218) 34bd4b38fa is described below commit 34bd4b38fa13f105608ae4e7b4c749124c271e1e Author: Zixuan Liu <node...@gmail.com> AuthorDate: Wed Feb 21 23:39:28 2024 +0800 Upgrade mockito from 3.12.4 to 4.11.0 (#4218) --- .../org/apache/bookkeeper/bookie/BookieImpl.java | 26 ++++++++++++--- .../java/org/apache/bookkeeper/bookie/Journal.java | 38 +++++++++++++++------- .../bookie/BookieMultipleJournalsTest.java | 4 +-- pom.xml | 2 +- 4 files changed, 50 insertions(+), 20 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java index 654ace4547..a660a13ce8 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java @@ -87,7 +87,7 @@ import org.slf4j.LoggerFactory; /** * Implements a bookie. */ -public class BookieImpl extends BookieCriticalThread implements Bookie { +public class BookieImpl implements Bookie { private static final Logger LOG = LoggerFactory.getLogger(Bookie.class); @@ -119,6 +119,8 @@ public class BookieImpl extends BookieCriticalThread implements Bookie { protected StateManager stateManager; + private BookieCriticalThread bookieThread; + // Expose Stats final StatsLogger statsLogger; private final BookieStats bookieStats; @@ -390,7 +392,6 @@ public class BookieImpl extends BookieCriticalThread implements Bookie { ByteBufAllocator allocator, Supplier<BookieServiceInfo> bookieServiceInfoProvider) throws IOException, InterruptedException, BookieException { - super("Bookie-" + conf.getBookiePort()); this.bookieServiceInfoProvider = bookieServiceInfoProvider; this.statsLogger = statsLogger; this.conf = conf; @@ -656,7 +657,9 @@ public class BookieImpl extends BookieCriticalThread implements Bookie { @Override public synchronized void start() { - setDaemon(true); + bookieThread = new BookieCriticalThread(() -> run(), "Bookie-" + conf.getBookiePort()); + bookieThread.setDaemon(true); + ThreadRegistry.register("BookieThread", 0); if (LOG.isDebugEnabled()) { LOG.debug("I'm starting a bookie with journal directories {}", @@ -717,7 +720,7 @@ public class BookieImpl extends BookieCriticalThread implements Bookie { syncThread.start(); // start bookie thread - super.start(); + bookieThread.start(); // After successful bookie startup, register listener for disk // error/full notifications. @@ -741,6 +744,20 @@ public class BookieImpl extends BookieCriticalThread implements Bookie { } } + @Override + public void join() throws InterruptedException { + if (bookieThread != null) { + bookieThread.join(); + } + } + + public boolean isAlive() { + if (bookieThread == null) { + return false; + } + return bookieThread.isAlive(); + } + /* * Get the DiskFailure listener for the bookie */ @@ -824,7 +841,6 @@ public class BookieImpl extends BookieCriticalThread implements Bookie { return stateManager.isRunning(); } - @Override public void run() { // start journals for (Journal journal: journals) { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java index 3b730cb476..df10dfe522 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java @@ -42,6 +42,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException; import org.apache.bookkeeper.bookie.stats.JournalStats; import org.apache.bookkeeper.common.collections.BatchedArrayBlockingQueue; @@ -66,13 +67,15 @@ import org.slf4j.LoggerFactory; /** * Provide journal related management. */ -public class Journal extends BookieCriticalThread implements CheckpointSource { +public class Journal implements CheckpointSource { private static final Logger LOG = LoggerFactory.getLogger(Journal.class); private static final RecyclableArrayList.Recycler<QueueEntry> entryListRecycler = new RecyclableArrayList.Recycler<QueueEntry>(); + private BookieCriticalThread thread; + /** * Filter to pickup journals. */ @@ -461,13 +464,13 @@ public class Journal extends BookieCriticalThread implements CheckpointSource { volatile boolean running = true; // This holds the queue entries that should be notified after a // successful force write - Thread threadToNotifyOnEx; + Consumer<Void> threadToNotifyOnEx; // should we group force writes private final boolean enableGroupForceWrites; private final Counter forceWriteThreadTime; - public ForceWriteThread(Thread threadToNotifyOnEx, + public ForceWriteThread(Consumer<Void> threadToNotifyOnEx, boolean enableGroupForceWrites, StatsLogger statsLogger) { super("ForceWriteThread"); @@ -531,7 +534,7 @@ public class Journal extends BookieCriticalThread implements CheckpointSource { // Regardless of what caused us to exit, we should notify the // the parent thread as it should either exit or be in the process // of exiting else we will have write requests hang - threadToNotifyOnEx.interrupt(); + threadToNotifyOnEx.accept(null); } private void syncJournal(ForceWriteRequest lastRequest) throws IOException { @@ -652,8 +655,6 @@ public class Journal extends BookieCriticalThread implements CheckpointSource { public Journal(int journalIndex, File journalDirectory, ServerConfiguration conf, LedgerDirsManager ledgerDirsManager, StatsLogger statsLogger, ByteBufAllocator allocator) { - super(journalThreadName + "-" + conf.getBookiePort()); - this.setPriority(Thread.MAX_PRIORITY); this.allocator = allocator; StatsLogger journalStatsLogger = statsLogger.scopeLabel("journalIndex", String.valueOf(journalIndex)); @@ -678,8 +679,8 @@ public class Journal extends BookieCriticalThread implements CheckpointSource { this.journalWriteBufferSize = conf.getJournalWriteBufferSizeKB() * KB; this.syncData = conf.getJournalSyncData(); this.maxBackupJournals = conf.getMaxBackupJournals(); - this.forceWriteThread = new ForceWriteThread(this, conf.getJournalAdaptiveGroupWrites(), - journalStatsLogger); + this.forceWriteThread = new ForceWriteThread((__) -> this.interruptThread(), + conf.getJournalAdaptiveGroupWrites(), journalStatsLogger); this.maxGroupWaitInNanos = TimeUnit.MILLISECONDS.toNanos(conf.getJournalMaxGroupWaitMSec()); this.bufferedWritesThreshold = conf.getJournalBufferedWritesThreshold(); this.bufferedEntriesThreshold = conf.getJournalBufferedEntriesThreshold(); @@ -956,7 +957,6 @@ public class Journal extends BookieCriticalThread implements CheckpointSource { * </p> * @see org.apache.bookkeeper.bookie.SyncThread */ - @Override public void run() { LOG.info("Starting journal on {}", journalDirectory); ThreadRegistry.register(journalThreadName, 0); @@ -1255,8 +1255,8 @@ public class Journal extends BookieCriticalThread implements CheckpointSource { forceWriteThread.shutdown(); running = false; - this.interrupt(); - this.join(); + this.interruptThread(); + this.joinThread(); LOG.info("Finished Shutting down Journal thread"); } catch (IOException | InterruptedException ie) { Thread.currentThread().interrupt(); @@ -1284,7 +1284,21 @@ public class Journal extends BookieCriticalThread implements CheckpointSource { */ @VisibleForTesting public void joinThread() throws InterruptedException { - join(); + if (thread != null) { + thread.join(); + } + } + + public void interruptThread() { + if (thread != null) { + thread.interrupt(); + } + } + + public synchronized void start() { + thread = new BookieCriticalThread(() -> run(), journalThreadName + "-" + conf.getBookiePort()); + thread.setPriority(Thread.MAX_PRIORITY); + thread.start(); } long getMemoryUsage() { diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieMultipleJournalsTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieMultipleJournalsTest.java index b3622b3bb6..664e31541b 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieMultipleJournalsTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieMultipleJournalsTest.java @@ -74,7 +74,7 @@ public class BookieMultipleJournalsTest extends BookKeeperClusterTestCase { Field journalList = bookie.getClass().getDeclaredField("journals"); journalList.setAccessible(true); List<Journal> journals = (List<Journal>) journalList.get(bookie); - journals.get(0).interrupt(); + journals.get(0).interruptThread(); Awaitility.await().untilAsserted(() -> assertFalse(bookie.isRunning())); } @@ -92,7 +92,7 @@ public class BookieMultipleJournalsTest extends BookKeeperClusterTestCase { Field journalList = bookie.getClass().getDeclaredField("journals"); journalList.setAccessible(true); List<Journal> journals = (List<Journal>) journalList.get(bookie); - journals.get(0).interrupt(); + journals.get(0).interruptThread(); bookie.shutdown(ExitCode.OK); Awaitility.await().untilAsserted(() -> assertFalse(bookie.isRunning())); } diff --git a/pom.xml b/pom.xml index 69d1bbe56b..215700bce0 100644 --- a/pom.xml +++ b/pom.xml @@ -153,7 +153,7 @@ <lombok.version>1.18.30</lombok.version> <log4j.version>2.18.0</log4j.version> <lz4.version>1.3.0</lz4.version> - <mockito.version>3.12.4</mockito.version> + <mockito.version>4.11.0</mockito.version> <netty.version>4.1.104.Final</netty.version> <netty-iouring.version>0.0.24.Final</netty-iouring.version> <ostrich.version>9.1.3</ostrich.version>