This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 629230c Use the same thread to monitor ledger and index directories
629230c is described below
commit 629230cb13a7ccb1211343dbb0a8bfe7a674ed96
Author: Like <[email protected]>
AuthorDate: Sun Mar 3 23:14:59 2019 +0800
Use the same thread to monitor ledger and index directories
Closes #1655
Reviewers: Charan Reddy Guttapalem <[email protected]>, Sijie Guo
<[email protected]>
This closes #1957 from liketic/reduce-monitor-threads
---
.../java/org/apache/bookkeeper/bookie/Bookie.java | 79 +++++-----------------
.../bookkeeper/bookie/LedgerDirsMonitor.java | 29 +++++---
.../bookie/BookieInitializationTest.java | 4 +-
.../bookie/BookieStorageThresholdTest.java | 19 +++---
.../apache/bookkeeper/bookie/CompactionTest.java | 2 +-
.../bookkeeper/bookie/IndexPersistenceMgrTest.java | 4 +-
.../bookkeeper/bookie/TestLedgerDirsManager.java | 11 +--
7 files changed, 57 insertions(+), 91 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
index a0acd31..16b6312 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
@@ -94,7 +94,6 @@ import
org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.commons.lang3.tuple.Pair;
-import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -123,10 +122,8 @@ public class Bookie extends BookieCriticalThread {
static final long METAENTRY_ID_LEDGER_EXPLICITLAC = -0x8000;
private final LedgerDirsManager ledgerDirsManager;
- private LedgerDirsManager indexDirsManager;
-
- LedgerDirsMonitor ledgerMonitor;
- LedgerDirsMonitor idxMonitor;
+ private final LedgerDirsManager indexDirsManager;
+ LedgerDirsMonitor dirsMonitor;
// Registration Manager for managing registration
protected final MetadataBookieDriver metadataDriver;
@@ -232,7 +229,7 @@ public class Bookie extends BookieCriticalThread {
private void checkEnvironment(MetadataBookieDriver metadataDriver)
throws BookieException, IOException {
List<File> allLedgerDirs = new
ArrayList<File>(ledgerDirsManager.getAllLedgerDirs().size()
- +
indexDirsManager.getAllLedgerDirs().size());
+ + indexDirsManager.getAllLedgerDirs().size());
allLedgerDirs.addAll(ledgerDirsManager.getAllLedgerDirs());
if (indexDirsManager != ledgerDirsManager) {
allLedgerDirs.addAll(indexDirsManager.getAllLedgerDirs());
@@ -705,12 +702,17 @@ public class Bookie extends BookieCriticalThread {
stateManager = initializeStateManager();
// register shutdown handler using trigger mode
stateManager.setShutdownHandler(exitCode ->
triggerBookieShutdown(exitCode));
- // Initialise ledgerDirMonitor. This would look through all the
+ // Initialise dirsMonitor. This would look through all the
// configured directories. When disk errors or all the ledger
// directories are full, would throws exception and fail bookie
startup.
- this.ledgerMonitor = new LedgerDirsMonitor(conf, diskChecker,
ledgerDirsManager);
+ List<LedgerDirsManager> dirsManagers = new ArrayList<>();
+ dirsManagers.add(ledgerDirsManager);
+ if (indexDirsManager != ledgerDirsManager) {
+ dirsManagers.add(indexDirsManager);
+ }
+ this.dirsMonitor = new LedgerDirsMonitor(conf, diskChecker,
dirsManagers);
try {
- this.ledgerMonitor.init();
+ this.dirsMonitor.init();
} catch (NoWritableLedgerDirException nle) {
// start in read-only mode if no writable dirs and read-only
allowed
if (!conf.isReadOnlyModeEnabled()) {
@@ -720,23 +722,6 @@ public class Bookie extends BookieCriticalThread {
}
}
- if (ledgerDirsManager == indexDirsManager) {
- this.idxMonitor = this.ledgerMonitor;
- } else {
- this.idxMonitor = new LedgerDirsMonitor(conf, diskChecker,
indexDirsManager);
- try {
- this.idxMonitor.init();
- } catch (NoWritableLedgerDirException nle) {
- // start in read-only mode if no writable dirs and read-only
allowed
- if (!conf.isReadOnlyModeEnabled()) {
- throw nle;
- } else {
- this.stateManager.transitionToReadOnlyMode();
- }
- }
- }
-
-
// instantiate the journals
journals = Lists.newArrayList();
for (int i = 0; i < journalDirectories.size(); i++) {
@@ -912,22 +897,15 @@ public class Bookie extends BookieCriticalThread {
journalDirectories.stream().map(File::getName).collect(Collectors.joining(",
")));
}
//Start DiskChecker thread
- ledgerMonitor.start();
- if (indexDirsManager != ledgerDirsManager) {
- idxMonitor.start();
- }
+ dirsMonitor.start();
// replay journals
try {
readJournal();
- } catch (IOException ioe) {
+ } catch (IOException | BookieException ioe) {
LOG.error("Exception while replaying journals, shutting down",
ioe);
shutdown(ExitCode.BOOKIE_EXCEPTION);
return;
- } catch (BookieException be) {
- LOG.error("Exception while replaying journals, shutting down", be);
- shutdown(ExitCode.BOOKIE_EXCEPTION);
- return;
}
// Do a fully flush after journal replay
@@ -1184,11 +1162,7 @@ public class Bookie extends BookieCriticalThread {
}
//Shutdown disk checker
- ledgerMonitor.shutdown();
- if (indexDirsManager != ledgerDirsManager) {
- idxMonitor.shutdown();
- }
-
+ dirsMonitor.shutdown();
}
// Shutdown the ZK client
if (metadataDriver != null) {
@@ -1350,10 +1324,9 @@ public class Bookie extends BookieCriticalThread {
/**
* Add entry to a ledger.
- * @throws BookieException.LedgerFencedException if the ledger is fenced
*/
public void addEntry(ByteBuf entry, boolean ackBeforeSync, WriteCallback
cb, Object ctx, byte[] masterKey)
- throws IOException, BookieException.LedgerFencedException,
BookieException, InterruptedException {
+ throws IOException, BookieException, InterruptedException {
long requestNanos = MathUtils.nowInNano();
boolean success = false;
int entrySize = 0;
@@ -1385,26 +1358,6 @@ public class Bookie extends BookieCriticalThread {
}
}
- static class FutureWriteCallback implements WriteCallback {
-
- SettableFuture<Boolean> result = SettableFuture.create();
-
- @Override
- public void writeComplete(int rc, long ledgerId, long entryId,
- BookieSocketAddress addr, Object ctx) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Finished writing entry {} @ ledger {} for {} : {}",
- entryId, ledgerId, addr, rc);
- }
-
- result.set(0 == rc);
- }
-
- public SettableFuture<Boolean> getResult() {
- return result;
- }
- }
-
/**
* Fences a ledger. From this point on, clients will be unable to
* write to this ledger. Only recoveryAddEntry will be
@@ -1591,7 +1544,7 @@ public class Bookie extends BookieCriticalThread {
* @throws InterruptedException
*/
public static void main(String[] args)
- throws IOException, InterruptedException, BookieException,
KeeperException {
+ throws IOException, InterruptedException, BookieException {
Bookie b = new Bookie(new ServerConfiguration());
b.start();
CounterCallback cb = new CounterCallback();
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsMonitor.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsMonitor.java
index d003366..2b7c901 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsMonitor.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsMonitor.java
@@ -50,25 +50,24 @@ class LedgerDirsMonitor {
private final int interval;
private final ServerConfiguration conf;
- private final ConcurrentMap<File, Float> diskUsages;
private final DiskChecker diskChecker;
- private final LedgerDirsManager ldm;
+ private final List<LedgerDirsManager> dirsManagers;
private long minUsableSizeForHighPriorityWrites;
private ScheduledExecutorService executor;
private ScheduledFuture<?> checkTask;
public LedgerDirsMonitor(final ServerConfiguration conf,
final DiskChecker diskChecker,
- final LedgerDirsManager ldm) {
+ final List<LedgerDirsManager> dirsManagers) {
this.interval = conf.getDiskCheckInterval();
this.minUsableSizeForHighPriorityWrites =
conf.getMinUsableSizeForHighPriorityWrites();
this.conf = conf;
this.diskChecker = diskChecker;
- this.diskUsages = ldm.getDiskUsages();
- this.ldm = ldm;
+ this.dirsManagers = dirsManagers;
}
- private void check() {
+ private void check(final LedgerDirsManager ldm) {
+ final ConcurrentMap<File, Float> diskUsages = ldm.getDiskUsages();
try {
List<File> writableDirs = ldm.getWritableLedgerDirs();
// Check all writable dirs disk space usage.
@@ -171,6 +170,10 @@ class LedgerDirsMonitor {
}
}
+ private void check() {
+ dirsManagers.forEach(this::check);
+ }
+
/**
* Sweep through all the directories to check disk errors or disk full.
*
@@ -181,7 +184,7 @@ class LedgerDirsMonitor {
* less space than threshold
*/
public void init() throws DiskErrorException, NoWritableLedgerDirException
{
- checkDirs(ldm.getWritableLedgerDirs());
+ checkDirs();
}
// start the daemon for disk monitoring
@@ -191,7 +194,7 @@ class LedgerDirsMonitor {
.setNameFormat("LedgerDirsMonitorThread")
.setDaemon(true)
.build());
- this.checkTask = this.executor.scheduleAtFixedRate(() -> check(),
interval, interval, TimeUnit.MILLISECONDS);
+ this.checkTask = this.executor.scheduleAtFixedRate(this::check,
interval, interval, TimeUnit.MILLISECONDS);
}
// shutdown disk monitoring daemon
@@ -207,9 +210,15 @@ class LedgerDirsMonitor {
}
}
- public void checkDirs(List<File> writableDirs)
+ private void checkDirs() throws NoWritableLedgerDirException,
DiskErrorException {
+ for (LedgerDirsManager dirsManager : dirsManagers) {
+ checkDirs(dirsManager);
+ }
+ }
+
+ private void checkDirs(final LedgerDirsManager ldm)
throws DiskErrorException, NoWritableLedgerDirException {
- for (File dir : writableDirs) {
+ for (File dir : ldm.getWritableLedgerDirs()) {
try {
diskChecker.checkDir(dir);
} catch (DiskWarnThresholdException e) {
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
index 7dde6fd..c4ed031 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
@@ -50,6 +50,7 @@ import java.net.URL;
import java.net.URLConnection;
import java.security.AccessControlException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Random;
@@ -812,7 +813,8 @@ public class BookieInitializationTest extends
BookKeeperClusterTestCase {
LedgerDirsManager ldm = new LedgerDirsManager(conf,
conf.getLedgerDirs(),
new DiskChecker(conf.getDiskUsageThreshold(),
conf.getDiskUsageWarnThreshold()));
LedgerDirsMonitor ledgerMonitor = new LedgerDirsMonitor(conf,
- new DiskChecker(conf.getDiskUsageThreshold(),
conf.getDiskUsageWarnThreshold()), ldm);
+ new DiskChecker(conf.getDiskUsageThreshold(),
conf.getDiskUsageWarnThreshold()),
+ Collections.singletonList(ldm));
ledgerMonitor.init();
fail("should throw exception");
} catch (Exception e) {
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieStorageThresholdTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieStorageThresholdTest.java
index 6f7d1c9..0bd2a97 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieStorageThresholdTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieStorageThresholdTest.java
@@ -26,6 +26,7 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.File;
+import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -157,9 +158,9 @@ public class BookieStorageThresholdTest extends
BookKeeperClusterTestCase {
bs.add(server);
bsConfs.add(conf);
Bookie bookie = server.getBookie();
- // since we are going to set dependency injected ledgermonitor, so we
need to shutdown
- // the ledgermonitor which was created as part of the initialization
of Bookie
- bookie.ledgerMonitor.shutdown();
+ // since we are going to set dependency injected dirsMonitor, so we
need to shutdown
+ // the dirsMonitor which was created as part of the initialization of
Bookie
+ bookie.dirsMonitor.shutdown();
LedgerDirsManager ledgerDirsManager = bookie.getLedgerDirsManager();
@@ -183,13 +184,11 @@ public class BookieStorageThresholdTest extends
BookKeeperClusterTestCase {
// Dependency Injected class
ThresholdTestDiskChecker thresholdTestDiskChecker = new
ThresholdTestDiskChecker(
baseConf.getDiskUsageThreshold(),
baseConf.getDiskUsageWarnThreshold());
- LedgerDirsMonitor ledgerDirsMonitor = new LedgerDirsMonitor(baseConf,
thresholdTestDiskChecker,
- ledgerDirsManager);
- // set the ledgermonitor and idxmonitor and initiate/start it
- bookie.ledgerMonitor = ledgerDirsMonitor;
- bookie.idxMonitor = ledgerDirsMonitor;
- bookie.ledgerMonitor.init();
- bookie.ledgerMonitor.start();
+ bookie.dirsMonitor = new LedgerDirsMonitor(baseConf,
thresholdTestDiskChecker,
+ Collections.singletonList(ledgerDirsManager));
+ // set the dirsMonitor and initiate/start it
+ bookie.dirsMonitor.init();
+ bookie.dirsMonitor.start();
// create ledgers and add fragments
LedgerHandle[] lhs = prepareData(3);
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
index 3d7d2c4..0d6e5d6 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
@@ -451,7 +451,7 @@ public abstract class CompactionTest extends
BookKeeperClusterTestCase {
for (BookieServer bookieServer : bs) {
Bookie bookie = bookieServer.getBookie();
bookie.ledgerStorage.flush();
- bookie.ledgerMonitor.shutdown();
+ bookie.dirsMonitor.shutdown();
LedgerDirsManager ledgerDirsManager =
bookie.getLedgerDirsManager();
List<File> ledgerDirs = ledgerDirsManager.getAllLedgerDirs();
// Major and Minor compaction are not disabled even though discs
are full. Check LedgerDirsListener of
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/IndexPersistenceMgrTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/IndexPersistenceMgrTest.java
index 909e646..5b31b71 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/IndexPersistenceMgrTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/IndexPersistenceMgrTest.java
@@ -38,6 +38,7 @@ import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.security.GeneralSecurityException;
import java.util.Arrays;
+import java.util.Collections;
import org.apache.bookkeeper.bookie.FileInfoBackingCache.CachedFileInfo;
import org.apache.bookkeeper.client.BookKeeper;
@@ -82,7 +83,8 @@ public class IndexPersistenceMgrTest {
ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
new DiskChecker(conf.getDiskUsageThreshold(),
conf.getDiskUsageWarnThreshold()));
ledgerMonitor = new LedgerDirsMonitor(conf,
- new DiskChecker(conf.getDiskUsageThreshold(),
conf.getDiskUsageWarnThreshold()), ledgerDirsManager);
+ new DiskChecker(conf.getDiskUsageThreshold(),
conf.getDiskUsageWarnThreshold()),
+ Collections.singletonList(ledgerDirsManager));
ledgerMonitor.init();
}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestLedgerDirsManager.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestLedgerDirsManager.java
index 9d166a4..e0a4838 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestLedgerDirsManager.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestLedgerDirsManager.java
@@ -32,6 +32,7 @@ import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -112,7 +113,7 @@ public class TestLedgerDirsManager {
dirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
new DiskChecker(conf.getDiskUsageThreshold(),
conf.getDiskUsageWarnThreshold()), statsLogger);
ledgerMonitor = new LedgerDirsMonitor(conf,
- mockDiskChecker, dirsManager);
+ mockDiskChecker, Collections.singletonList(dirsManager));
ledgerMonitor.init();
}
@@ -215,7 +216,7 @@ public class TestLedgerDirsManager {
conf.setMinUsableSizeForHighPriorityWrites(curDir.getUsableSpace()
+ 1024);
dirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
new DiskChecker(conf.getDiskUsageThreshold(),
conf.getDiskUsageWarnThreshold()), statsLogger);
- ledgerMonitor = new LedgerDirsMonitor(conf, mockDiskChecker,
dirsManager);
+ ledgerMonitor = new LedgerDirsMonitor(conf, mockDiskChecker,
Collections.singletonList(dirsManager));
ledgerMonitor.init();
}
@@ -257,7 +258,7 @@ public class TestLedgerDirsManager {
mockDiskChecker = new MockDiskChecker(nospace, warnThreshold);
dirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
new DiskChecker(conf.getDiskUsageThreshold(),
conf.getDiskUsageWarnThreshold()));
- ledgerMonitor = new LedgerDirsMonitor(conf, mockDiskChecker,
dirsManager);
+ ledgerMonitor = new LedgerDirsMonitor(conf, mockDiskChecker,
Collections.singletonList(dirsManager));
ledgerMonitor.init();
final MockLedgerDirsListener mockLedgerDirsListener = new
MockLedgerDirsListener();
dirsManager.addLedgerDirsListener(mockLedgerDirsListener);
@@ -321,7 +322,7 @@ public class TestLedgerDirsManager {
dirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
new DiskChecker(conf.getDiskUsageThreshold(),
conf.getDiskUsageWarnThreshold()),
statsLogger);
- ledgerMonitor = new LedgerDirsMonitor(conf, mockDiskChecker,
dirsManager);
+ ledgerMonitor = new LedgerDirsMonitor(conf, mockDiskChecker,
Collections.singletonList(dirsManager));
usageMap = new HashMap<File, Float>();
usageMap.put(curDir1, 0.1f);
usageMap.put(curDir2, 0.1f);
@@ -412,7 +413,7 @@ public class TestLedgerDirsManager {
new DiskChecker(conf.getDiskUsageThreshold(),
conf.getDiskUsageWarnThreshold()),
statsLogger);
- ledgerMonitor = new LedgerDirsMonitor(conf, mockDiskChecker,
dirsManager);
+ ledgerMonitor = new LedgerDirsMonitor(conf, mockDiskChecker,
Collections.singletonList(dirsManager));
try {
ledgerMonitor.init();
fail("NoWritableLedgerDirException expected");