This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 629230c  Use the same thread to monitor ledger and index directories
629230c is described below

commit 629230cb13a7ccb1211343dbb0a8bfe7a674ed96
Author: Like <[email protected]>
AuthorDate: Sun Mar 3 23:14:59 2019 +0800

    Use the same thread to monitor ledger and index directories
    
    Closes #1655
    
    Reviewers: Charan Reddy Guttapalem <[email protected]>, Sijie Guo 
<[email protected]>
    
    This closes #1957 from liketic/reduce-monitor-threads
---
 .../java/org/apache/bookkeeper/bookie/Bookie.java  | 79 +++++-----------------
 .../bookkeeper/bookie/LedgerDirsMonitor.java       | 29 +++++---
 .../bookie/BookieInitializationTest.java           |  4 +-
 .../bookie/BookieStorageThresholdTest.java         | 19 +++---
 .../apache/bookkeeper/bookie/CompactionTest.java   |  2 +-
 .../bookkeeper/bookie/IndexPersistenceMgrTest.java |  4 +-
 .../bookkeeper/bookie/TestLedgerDirsManager.java   | 11 +--
 7 files changed, 57 insertions(+), 91 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
index a0acd31..16b6312 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
@@ -94,7 +94,6 @@ import 
org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.mutable.MutableBoolean;
 import org.apache.commons.lang3.tuple.Pair;
-import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -123,10 +122,8 @@ public class Bookie extends BookieCriticalThread {
     static final long METAENTRY_ID_LEDGER_EXPLICITLAC  = -0x8000;
 
     private final LedgerDirsManager ledgerDirsManager;
-    private LedgerDirsManager indexDirsManager;
-
-    LedgerDirsMonitor ledgerMonitor;
-    LedgerDirsMonitor idxMonitor;
+    private final LedgerDirsManager indexDirsManager;
+    LedgerDirsMonitor dirsMonitor;
 
     // Registration Manager for managing registration
     protected final MetadataBookieDriver metadataDriver;
@@ -232,7 +229,7 @@ public class Bookie extends BookieCriticalThread {
     private void checkEnvironment(MetadataBookieDriver metadataDriver)
             throws BookieException, IOException {
         List<File> allLedgerDirs = new 
ArrayList<File>(ledgerDirsManager.getAllLedgerDirs().size()
-                                                     + 
indexDirsManager.getAllLedgerDirs().size());
+                + indexDirsManager.getAllLedgerDirs().size());
         allLedgerDirs.addAll(ledgerDirsManager.getAllLedgerDirs());
         if (indexDirsManager != ledgerDirsManager) {
             allLedgerDirs.addAll(indexDirsManager.getAllLedgerDirs());
@@ -705,12 +702,17 @@ public class Bookie extends BookieCriticalThread {
         stateManager = initializeStateManager();
         // register shutdown handler using trigger mode
         stateManager.setShutdownHandler(exitCode -> 
triggerBookieShutdown(exitCode));
-        // Initialise ledgerDirMonitor. This would look through all the
+        // Initialise dirsMonitor. This would look through all the
         // configured directories. When disk errors or all the ledger
         // directories are full, would throws exception and fail bookie 
startup.
-        this.ledgerMonitor = new LedgerDirsMonitor(conf, diskChecker, 
ledgerDirsManager);
+        List<LedgerDirsManager> dirsManagers = new ArrayList<>();
+        dirsManagers.add(ledgerDirsManager);
+        if (indexDirsManager != ledgerDirsManager) {
+            dirsManagers.add(indexDirsManager);
+        }
+        this.dirsMonitor = new LedgerDirsMonitor(conf, diskChecker, 
dirsManagers);
         try {
-            this.ledgerMonitor.init();
+            this.dirsMonitor.init();
         } catch (NoWritableLedgerDirException nle) {
             // start in read-only mode if no writable dirs and read-only 
allowed
             if (!conf.isReadOnlyModeEnabled()) {
@@ -720,23 +722,6 @@ public class Bookie extends BookieCriticalThread {
             }
         }
 
-        if (ledgerDirsManager == indexDirsManager) {
-            this.idxMonitor = this.ledgerMonitor;
-        } else {
-            this.idxMonitor = new LedgerDirsMonitor(conf, diskChecker, 
indexDirsManager);
-            try {
-                this.idxMonitor.init();
-            } catch (NoWritableLedgerDirException nle) {
-                // start in read-only mode if no writable dirs and read-only 
allowed
-                if (!conf.isReadOnlyModeEnabled()) {
-                    throw nle;
-                } else {
-                    this.stateManager.transitionToReadOnlyMode();
-                }
-            }
-        }
-
-
         // instantiate the journals
         journals = Lists.newArrayList();
         for (int i = 0; i < journalDirectories.size(); i++) {
@@ -912,22 +897,15 @@ public class Bookie extends BookieCriticalThread {
                     
journalDirectories.stream().map(File::getName).collect(Collectors.joining(", 
")));
         }
         //Start DiskChecker thread
-        ledgerMonitor.start();
-        if (indexDirsManager != ledgerDirsManager) {
-            idxMonitor.start();
-        }
+        dirsMonitor.start();
 
         // replay journals
         try {
             readJournal();
-        } catch (IOException ioe) {
+        } catch (IOException | BookieException ioe) {
             LOG.error("Exception while replaying journals, shutting down", 
ioe);
             shutdown(ExitCode.BOOKIE_EXCEPTION);
             return;
-        } catch (BookieException be) {
-            LOG.error("Exception while replaying journals, shutting down", be);
-            shutdown(ExitCode.BOOKIE_EXCEPTION);
-            return;
         }
 
         // Do a fully flush after journal replay
@@ -1184,11 +1162,7 @@ public class Bookie extends BookieCriticalThread {
                 }
 
                 //Shutdown disk checker
-                ledgerMonitor.shutdown();
-                if (indexDirsManager != ledgerDirsManager) {
-                    idxMonitor.shutdown();
-                }
-
+                dirsMonitor.shutdown();
             }
             // Shutdown the ZK client
             if (metadataDriver != null) {
@@ -1350,10 +1324,9 @@ public class Bookie extends BookieCriticalThread {
 
     /**
      * Add entry to a ledger.
-     * @throws BookieException.LedgerFencedException if the ledger is fenced
      */
     public void addEntry(ByteBuf entry, boolean ackBeforeSync, WriteCallback 
cb, Object ctx, byte[] masterKey)
-            throws IOException, BookieException.LedgerFencedException, 
BookieException, InterruptedException {
+            throws IOException, BookieException, InterruptedException {
         long requestNanos = MathUtils.nowInNano();
         boolean success = false;
         int entrySize = 0;
@@ -1385,26 +1358,6 @@ public class Bookie extends BookieCriticalThread {
         }
     }
 
-    static class FutureWriteCallback implements WriteCallback {
-
-        SettableFuture<Boolean> result = SettableFuture.create();
-
-        @Override
-        public void writeComplete(int rc, long ledgerId, long entryId,
-                                  BookieSocketAddress addr, Object ctx) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Finished writing entry {} @ ledger {} for {} : {}",
-                        entryId, ledgerId, addr, rc);
-            }
-
-            result.set(0 == rc);
-        }
-
-        public SettableFuture<Boolean> getResult() {
-            return result;
-        }
-    }
-
     /**
      * Fences a ledger. From this point on, clients will be unable to
      * write to this ledger. Only recoveryAddEntry will be
@@ -1591,7 +1544,7 @@ public class Bookie extends BookieCriticalThread {
      * @throws InterruptedException
      */
     public static void main(String[] args)
-            throws IOException, InterruptedException, BookieException, 
KeeperException {
+            throws IOException, InterruptedException, BookieException {
         Bookie b = new Bookie(new ServerConfiguration());
         b.start();
         CounterCallback cb = new CounterCallback();
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsMonitor.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsMonitor.java
index d003366..2b7c901 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsMonitor.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsMonitor.java
@@ -50,25 +50,24 @@ class LedgerDirsMonitor {
 
     private final int interval;
     private final ServerConfiguration conf;
-    private final ConcurrentMap<File, Float> diskUsages;
     private final DiskChecker diskChecker;
-    private final LedgerDirsManager ldm;
+    private final List<LedgerDirsManager> dirsManagers;
     private long minUsableSizeForHighPriorityWrites;
     private ScheduledExecutorService executor;
     private ScheduledFuture<?> checkTask;
 
     public LedgerDirsMonitor(final ServerConfiguration conf,
                              final DiskChecker diskChecker,
-                             final LedgerDirsManager ldm) {
+                             final List<LedgerDirsManager> dirsManagers) {
         this.interval = conf.getDiskCheckInterval();
         this.minUsableSizeForHighPriorityWrites = 
conf.getMinUsableSizeForHighPriorityWrites();
         this.conf = conf;
         this.diskChecker = diskChecker;
-        this.diskUsages = ldm.getDiskUsages();
-        this.ldm = ldm;
+        this.dirsManagers = dirsManagers;
     }
 
-    private void check() {
+    private void check(final LedgerDirsManager ldm) {
+        final ConcurrentMap<File, Float> diskUsages = ldm.getDiskUsages();
         try {
             List<File> writableDirs = ldm.getWritableLedgerDirs();
             // Check all writable dirs disk space usage.
@@ -171,6 +170,10 @@ class LedgerDirsMonitor {
         }
     }
 
+    private void check() {
+        dirsManagers.forEach(this::check);
+    }
+
     /**
      * Sweep through all the directories to check disk errors or disk full.
      *
@@ -181,7 +184,7 @@ class LedgerDirsMonitor {
      *             less space than threshold
      */
     public void init() throws DiskErrorException, NoWritableLedgerDirException 
{
-        checkDirs(ldm.getWritableLedgerDirs());
+        checkDirs();
     }
 
     // start the daemon for disk monitoring
@@ -191,7 +194,7 @@ class LedgerDirsMonitor {
                 .setNameFormat("LedgerDirsMonitorThread")
                 .setDaemon(true)
                 .build());
-        this.checkTask = this.executor.scheduleAtFixedRate(() -> check(), 
interval, interval, TimeUnit.MILLISECONDS);
+        this.checkTask = this.executor.scheduleAtFixedRate(this::check, 
interval, interval, TimeUnit.MILLISECONDS);
     }
 
     // shutdown disk monitoring daemon
@@ -207,9 +210,15 @@ class LedgerDirsMonitor {
         }
     }
 
-    public void checkDirs(List<File> writableDirs)
+    private void checkDirs() throws NoWritableLedgerDirException, 
DiskErrorException {
+        for (LedgerDirsManager dirsManager : dirsManagers) {
+            checkDirs(dirsManager);
+        }
+    }
+
+    private void checkDirs(final LedgerDirsManager ldm)
             throws DiskErrorException, NoWritableLedgerDirException {
-        for (File dir : writableDirs) {
+        for (File dir : ldm.getWritableLedgerDirs()) {
             try {
                 diskChecker.checkDir(dir);
             } catch (DiskWarnThresholdException e) {
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
index 7dde6fd..c4ed031 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
@@ -50,6 +50,7 @@ import java.net.URL;
 import java.net.URLConnection;
 import java.security.AccessControlException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
@@ -812,7 +813,8 @@ public class BookieInitializationTest extends 
BookKeeperClusterTestCase {
             LedgerDirsManager ldm = new LedgerDirsManager(conf, 
conf.getLedgerDirs(),
                     new DiskChecker(conf.getDiskUsageThreshold(), 
conf.getDiskUsageWarnThreshold()));
             LedgerDirsMonitor ledgerMonitor = new LedgerDirsMonitor(conf,
-                    new DiskChecker(conf.getDiskUsageThreshold(), 
conf.getDiskUsageWarnThreshold()), ldm);
+                    new DiskChecker(conf.getDiskUsageThreshold(), 
conf.getDiskUsageWarnThreshold()),
+                    Collections.singletonList(ldm));
             ledgerMonitor.init();
             fail("should throw exception");
         } catch (Exception e) {
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieStorageThresholdTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieStorageThresholdTest.java
index 6f7d1c9..0bd2a97 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieStorageThresholdTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieStorageThresholdTest.java
@@ -26,6 +26,7 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.io.File;
+import java.util.Collections;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -157,9 +158,9 @@ public class BookieStorageThresholdTest extends 
BookKeeperClusterTestCase {
         bs.add(server);
         bsConfs.add(conf);
         Bookie bookie = server.getBookie();
-        // since we are going to set dependency injected ledgermonitor, so we 
need to shutdown
-        // the ledgermonitor which was created as part of the initialization 
of Bookie
-        bookie.ledgerMonitor.shutdown();
+        // since we are going to set dependency injected dirsMonitor, so we 
need to shutdown
+        // the dirsMonitor which was created as part of the initialization of 
Bookie
+        bookie.dirsMonitor.shutdown();
 
         LedgerDirsManager ledgerDirsManager = bookie.getLedgerDirsManager();
 
@@ -183,13 +184,11 @@ public class BookieStorageThresholdTest extends 
BookKeeperClusterTestCase {
         // Dependency Injected class
         ThresholdTestDiskChecker thresholdTestDiskChecker = new 
ThresholdTestDiskChecker(
                 baseConf.getDiskUsageThreshold(), 
baseConf.getDiskUsageWarnThreshold());
-        LedgerDirsMonitor ledgerDirsMonitor = new LedgerDirsMonitor(baseConf, 
thresholdTestDiskChecker,
-                ledgerDirsManager);
-        // set the ledgermonitor and idxmonitor and initiate/start it
-        bookie.ledgerMonitor = ledgerDirsMonitor;
-        bookie.idxMonitor = ledgerDirsMonitor;
-        bookie.ledgerMonitor.init();
-        bookie.ledgerMonitor.start();
+        bookie.dirsMonitor = new LedgerDirsMonitor(baseConf, 
thresholdTestDiskChecker,
+                Collections.singletonList(ledgerDirsManager));
+        // set the dirsMonitor and initiate/start it
+        bookie.dirsMonitor.init();
+        bookie.dirsMonitor.start();
 
         // create ledgers and add fragments
         LedgerHandle[] lhs = prepareData(3);
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
index 3d7d2c4..0d6e5d6 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
@@ -451,7 +451,7 @@ public abstract class CompactionTest extends 
BookKeeperClusterTestCase {
         for (BookieServer bookieServer : bs) {
             Bookie bookie = bookieServer.getBookie();
             bookie.ledgerStorage.flush();
-            bookie.ledgerMonitor.shutdown();
+            bookie.dirsMonitor.shutdown();
             LedgerDirsManager ledgerDirsManager = 
bookie.getLedgerDirsManager();
             List<File> ledgerDirs = ledgerDirsManager.getAllLedgerDirs();
             // Major and Minor compaction are not disabled even though discs 
are full. Check LedgerDirsListener of
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/IndexPersistenceMgrTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/IndexPersistenceMgrTest.java
index 909e646..5b31b71 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/IndexPersistenceMgrTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/IndexPersistenceMgrTest.java
@@ -38,6 +38,7 @@ import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.security.GeneralSecurityException;
 import java.util.Arrays;
+import java.util.Collections;
 
 import org.apache.bookkeeper.bookie.FileInfoBackingCache.CachedFileInfo;
 import org.apache.bookkeeper.client.BookKeeper;
@@ -82,7 +83,8 @@ public class IndexPersistenceMgrTest {
         ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
                 new DiskChecker(conf.getDiskUsageThreshold(), 
conf.getDiskUsageWarnThreshold()));
         ledgerMonitor = new LedgerDirsMonitor(conf,
-                new DiskChecker(conf.getDiskUsageThreshold(), 
conf.getDiskUsageWarnThreshold()), ledgerDirsManager);
+                new DiskChecker(conf.getDiskUsageThreshold(), 
conf.getDiskUsageWarnThreshold()),
+                Collections.singletonList(ledgerDirsManager));
         ledgerMonitor.init();
     }
 
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestLedgerDirsManager.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestLedgerDirsManager.java
index 9d166a4..e0a4838 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestLedgerDirsManager.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestLedgerDirsManager.java
@@ -32,6 +32,7 @@ import java.io.File;
 import java.io.IOException;
 import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -112,7 +113,7 @@ public class TestLedgerDirsManager {
         dirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
                 new DiskChecker(conf.getDiskUsageThreshold(), 
conf.getDiskUsageWarnThreshold()), statsLogger);
         ledgerMonitor = new LedgerDirsMonitor(conf,
-                mockDiskChecker, dirsManager);
+                mockDiskChecker, Collections.singletonList(dirsManager));
         ledgerMonitor.init();
     }
 
@@ -215,7 +216,7 @@ public class TestLedgerDirsManager {
             conf.setMinUsableSizeForHighPriorityWrites(curDir.getUsableSpace() 
+ 1024);
             dirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
                 new DiskChecker(conf.getDiskUsageThreshold(), 
conf.getDiskUsageWarnThreshold()), statsLogger);
-            ledgerMonitor = new LedgerDirsMonitor(conf, mockDiskChecker, 
dirsManager);
+            ledgerMonitor = new LedgerDirsMonitor(conf, mockDiskChecker, 
Collections.singletonList(dirsManager));
             ledgerMonitor.init();
         }
 
@@ -257,7 +258,7 @@ public class TestLedgerDirsManager {
         mockDiskChecker = new MockDiskChecker(nospace, warnThreshold);
         dirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
                 new DiskChecker(conf.getDiskUsageThreshold(), 
conf.getDiskUsageWarnThreshold()));
-        ledgerMonitor = new LedgerDirsMonitor(conf, mockDiskChecker, 
dirsManager);
+        ledgerMonitor = new LedgerDirsMonitor(conf, mockDiskChecker, 
Collections.singletonList(dirsManager));
         ledgerMonitor.init();
         final MockLedgerDirsListener mockLedgerDirsListener = new 
MockLedgerDirsListener();
         dirsManager.addLedgerDirsListener(mockLedgerDirsListener);
@@ -321,7 +322,7 @@ public class TestLedgerDirsManager {
         dirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
                 new DiskChecker(conf.getDiskUsageThreshold(), 
conf.getDiskUsageWarnThreshold()),
                 statsLogger);
-        ledgerMonitor = new LedgerDirsMonitor(conf, mockDiskChecker, 
dirsManager);
+        ledgerMonitor = new LedgerDirsMonitor(conf, mockDiskChecker, 
Collections.singletonList(dirsManager));
         usageMap = new HashMap<File, Float>();
         usageMap.put(curDir1, 0.1f);
         usageMap.put(curDir2, 0.1f);
@@ -412,7 +413,7 @@ public class TestLedgerDirsManager {
                 new DiskChecker(conf.getDiskUsageThreshold(), 
conf.getDiskUsageWarnThreshold()),
                 statsLogger);
 
-        ledgerMonitor = new LedgerDirsMonitor(conf, mockDiskChecker, 
dirsManager);
+        ledgerMonitor = new LedgerDirsMonitor(conf, mockDiskChecker, 
Collections.singletonList(dirsManager));
         try {
             ledgerMonitor.init();
             fail("NoWritableLedgerDirException expected");

Reply via email to