This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 6a5f636  ISSUE280: Add StateManager to manage Bookie's state, and use 
it to turn bookie into readonly when sortedLedgerStorage failed to flush data
6a5f636 is described below

commit 6a5f6367a77094406ff0c938cf6b7f6d804451e1
Author: Arvin <[email protected]>
AuthorDate: Mon Jan 8 15:56:40 2018 -0800

    ISSUE280: Add StateManager to manage Bookie's state, and use it to turn 
bookie into readonly when sortedLedgerStorage failed to flush data
    
    Descriptions of the changes in this PR:
    
    Add SortedLedgerStorageListener to implement, and add a test case to verify 
it.
    
    Master Issue: #280
    
    Author: Arvin <[email protected]>
    
    Reviewers: Enrico Olivelli <[email protected]>, Sijie Guo 
<[email protected]>, Ivan Kelly <[email protected]>
    
    This closes #873 from ArvinDevel/issue280
---
 .../java/org/apache/bookkeeper/bookie/Bookie.java  | 258 +++--------------
 .../org/apache/bookkeeper/bookie/BookieShell.java  |   8 +-
 .../bookkeeper/bookie/BookieStateManager.java      | 310 +++++++++++++++++++++
 .../bookie/InterleavedLedgerStorage.java           |   1 +
 .../apache/bookkeeper/bookie/LedgerStorage.java    |   1 +
 .../apache/bookkeeper/bookie/ReadOnlyBookie.java   |  29 +-
 .../bookkeeper/bookie/SortedLedgerStorage.java     |  12 +-
 .../org/apache/bookkeeper/bookie/StateManager.java |  97 +++++++
 .../bookie/storage/ldb/DbLedgerStorage.java        |   5 +-
 .../bookie/BookieInitializationTest.java           |  33 +--
 .../apache/bookkeeper/bookie/CompactionTest.java   |  11 +-
 .../apache/bookkeeper/bookie/LedgerCacheTest.java  |  66 ++++-
 .../bookie/SortedLedgerStorageCheckpointTest.java  |   2 +
 .../apache/bookkeeper/bookie/StateManagerTest.java | 258 +++++++++++++++++
 .../apache/bookkeeper/bookie/TestSyncThread.java   |   1 +
 .../bookie/storage/ldb/ConversionRollbackTest.java |   6 +-
 .../bookie/storage/ldb/ConversionTest.java         |  12 +-
 .../storage/ldb/LocationsIndexRebuildTest.java     |   4 +-
 .../org/apache/bookkeeper/meta/GcLedgersTest.java  |   2 +
 .../bookkeeper/meta/LedgerManagerTestCase.java     |   2 +
 .../replication/AuditorLedgerCheckerTest.java      |   6 +-
 .../bookkeeper/test/BookKeeperClusterTestCase.java |   2 +-
 .../apache/bookkeeper/test/ReadOnlyBookieTest.java |   4 +-
 23 files changed, 844 insertions(+), 286 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
index aaabdb5..0b57d56 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
@@ -30,13 +30,11 @@ import static 
org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_SCOPE;
 import static 
org.apache.bookkeeper.bookie.BookKeeperServerStats.LD_INDEX_SCOPE;
 import static 
org.apache.bookkeeper.bookie.BookKeeperServerStats.LD_LEDGER_SCOPE;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_BYTES;
-import static org.apache.bookkeeper.bookie.BookKeeperServerStats.SERVER_STATUS;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.WRITE_BYTES;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.SettableFuture;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import java.io.File;
@@ -55,11 +53,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
@@ -82,7 +76,6 @@ import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.net.DNS;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import org.apache.bookkeeper.stats.Counter;
-import org.apache.bookkeeper.stats.Gauge;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
@@ -133,24 +126,12 @@ public class Bookie extends BookieCriticalThread {
     // Registration Manager for managing registration
     RegistrationManager registrationManager;
 
-    // Running flag
-    private volatile boolean running = false;
-    // Flag identify whether it is in shutting down progress
-    private volatile boolean shuttingdown = false;
-    // Bookie status
-    private final BookieStatus bookieStatus = new BookieStatus();
 
     private int exitCode = ExitCode.OK;
 
     private final ConcurrentLongHashMap<byte[]> masterKeyCache = new 
ConcurrentLongHashMap<>();
 
-    protected final String bookieId;
-
-    private final AtomicBoolean rmRegistered = new AtomicBoolean(false);
-    protected final AtomicBoolean forceReadOnly = new AtomicBoolean(false);
-    // executor to manage the state changes for a bookie.
-    final ExecutorService stateService = Executors.newSingleThreadExecutor(
-            new 
ThreadFactoryBuilder().setNameFormat("BookieStateService-%d").build());
+    protected StateManager stateManager;
 
     // Expose Stats
     private final StatsLogger statsLogger;
@@ -246,12 +227,13 @@ public class Bookie extends BookieCriticalThread {
     }
 
     @VisibleForTesting
-    public void setRegistrationManager(RegistrationManager rm) {
-        this.registrationManager = rm;
+    public synchronized void setRegistrationManager(RegistrationManager rm) {
+            this.registrationManager = rm;
+            this.getStateManager().setRegistrationManager(rm);
     }
 
     @VisibleForTesting
-    public RegistrationManager getRegistrationManager() {
+    public synchronized RegistrationManager getRegistrationManager() {
         return this.registrationManager;
     }
 
@@ -667,7 +649,9 @@ public class Bookie extends BookieCriticalThread {
         } catch (KeeperException e) {
             throw new MetadataStoreException("Failed to initialize ledger 
manager", e);
         }
-
+        stateManager = new BookieStateManager(conf, statsLogger, 
registrationManager, ledgerDirsManager);
+        // register shutdown handler using trigger mode
+        stateManager.setShutdownHandler(exitCode -> 
triggerBookieShutdown(exitCode));
         // Initialise ledgerDirMonitor. This would look through all the
         // configured directories. When disk errors or all the ledger
         // directories are full, would throws exception and fail bookie 
startup.
@@ -679,7 +663,7 @@ public class Bookie extends BookieCriticalThread {
             if (!conf.isReadOnlyModeEnabled()) {
                 throw nle;
             } else {
-                this.transitionToReadOnlyMode();
+                this.stateManager.transitionToReadOnlyMode();
             }
         }
 
@@ -694,13 +678,11 @@ public class Bookie extends BookieCriticalThread {
                 if (!conf.isReadOnlyModeEnabled()) {
                     throw nle;
                 } else {
-                    this.transitionToReadOnlyMode();
+                    this.stateManager.transitionToReadOnlyMode();
                 }
             }
         }
 
-        // ZK ephemeral node for this Bookie.
-        this.bookieId = getMyId();
 
         // instantiate the journals
         journals = Lists.newArrayList();
@@ -722,6 +704,7 @@ public class Bookie extends BookieCriticalThread {
             ledgerManager,
             ledgerDirsManager,
             indexDirsManager,
+            stateManager,
             checkpointSource,
             syncThread,
             statsLogger);
@@ -737,28 +720,6 @@ public class Bookie extends BookieCriticalThread {
         readEntryStats = statsLogger.getOpStatsLogger(BOOKIE_READ_ENTRY);
         addBytesStats = statsLogger.getOpStatsLogger(BOOKIE_ADD_ENTRY_BYTES);
         readBytesStats = statsLogger.getOpStatsLogger(BOOKIE_READ_ENTRY_BYTES);
-        // 1 : up, 0 : readonly, -1 : unregistered
-        statsLogger.registerGauge(SERVER_STATUS, new Gauge<Number>() {
-            @Override
-            public Number getDefaultValue() {
-                return 0;
-            }
-
-            @Override
-            public Number getSample() {
-                if (!rmRegistered.get()){
-                    return -1;
-                } else if (forceReadOnly.get() || 
bookieStatus.isInReadOnlyMode()) {
-                    return 0;
-                } else {
-                    return 1;
-                }
-            }
-        });
-    }
-
-    private String getMyId() throws UnknownHostException {
-        return Bookie.getBookieAddress(conf).toString();
     }
 
     void readJournal() throws IOException, BookieException {
@@ -877,19 +838,13 @@ public class Bookie extends BookieCriticalThread {
 
         ledgerStorage.start();
 
-        // check the bookie status to start with
-        if (forceReadOnly.get()) {
-            this.bookieStatus.setToReadOnlyMode();
-        } else if (conf.isPersistBookieStatusEnabled()) {
-            
this.bookieStatus.readFromDirectories(ledgerDirsManager.getAllLedgerDirs());
-        }
-
-        // set running here.
+        // check the bookie status to start with, and set running.
         // since bookie server use running as a flag to tell bookie server 
whether it is alive
         // if setting it in bookie thread, the watcher might run before bookie 
thread.
-        running = true;
+        stateManager.initState();
+
         try {
-            registerBookie(true).get();
+            stateManager.registerBookie(true).get();
         } catch (Exception e) {
             LOG.error("Couldn't register bookie with zookeeper, shutting down 
: ", e);
             shutdown(ExitCode.ZK_REG_FAIL);
@@ -922,7 +877,7 @@ public class Bookie extends BookieCriticalThread {
             @Override
             public void allDisksFull() {
                 // Transition to readOnly mode on all disks full
-                transitionToReadOnlyMode();
+                stateManager.transitionToReadOnlyMode();
             }
 
             @Override
@@ -934,13 +889,13 @@ public class Bookie extends BookieCriticalThread {
             @Override
             public void diskWritable(File disk) {
                 // Transition to writable mode when a disk becomes writable 
again.
-                transitionToWritableMode();
+                stateManager.transitionToWritableMode();
             }
 
             @Override
             public void diskJustWritable(File disk) {
                 // Transition to writable mode when a disk becomes writable 
again.
-                transitionToWritableMode();
+                stateManager.transitionToWritableMode();
             }
         };
     }
@@ -959,163 +914,21 @@ public class Bookie extends BookieCriticalThread {
 
         RegistrationManager manager = ReflectionUtils.newInstance(managerCls);
         return manager.initialize(conf, () -> {
-            rmRegistered.set(false);
+            stateManager.forceToUnregistered();
             // schedule a re-register operation
-            registerBookie(false);
+            stateManager.registerBookie(false);
         }, statsLogger);
     }
 
-    /**
-     * Register as an available bookie.
-     */
-    protected Future<Void> registerBookie(final boolean throwException) {
-        return stateService.submit(new Callable<Void>() {
-            @Override
-            public Void call() throws IOException {
-                try {
-                    doRegisterBookie();
-                } catch (IOException ioe) {
-                    if (throwException) {
-                        throw ioe;
-                    } else {
-                        LOG.error("Couldn't register bookie with zookeeper, 
shutting down : ", ioe);
-                        triggerBookieShutdown(ExitCode.ZK_REG_FAIL);
-                    }
-                }
-                return (Void) null;
-            }
-        });
-    }
-
-    protected void doRegisterBookie() throws IOException {
-        doRegisterBookie(forceReadOnly.get() || 
bookieStatus.isInReadOnlyMode());
-    }
-
-    private void doRegisterBookie(boolean isReadOnly) throws IOException {
-        if (null == registrationManager || ((ZKRegistrationManager) 
this.registrationManager).getZk() == null) {
-            // registration manager is null, means not register itself to zk.
-            // ZooKeeper is null existing only for testing.
-            LOG.info("null zk while do register");
-            return;
-        }
-
-        rmRegistered.set(false);
-        try {
-            registrationManager.registerBookie(bookieId, isReadOnly);
-            rmRegistered.set(true);
-        } catch (BookieException e) {
-            throw new IOException(e);
-        }
-    }
-
-
-    /**
-     * Transition the bookie from readOnly mode to writable.
-     */
-    private Future<Void> transitionToWritableMode() {
-        return stateService.submit(new Callable<Void>() {
-            @Override
-            public Void call() throws Exception {
-                doTransitionToWritableMode();
-                return null;
-            }
-        });
-    }
-
-    @VisibleForTesting
-    public void doTransitionToWritableMode() {
-        if (shuttingdown || forceReadOnly.get()) {
-            return;
-        }
-
-        if (!bookieStatus.setToWritableMode()) {
-            // do nothing if already in writable mode
-            return;
-        }
-        LOG.info("Transitioning Bookie to Writable mode and will serve 
read/write requests.");
-        if (conf.isPersistBookieStatusEnabled()) {
-            
bookieStatus.writeToDirectories(ledgerDirsManager.getAllLedgerDirs());
-        }
-        // change zookeeper state only when using zookeeper
-        if (null == registrationManager) {
-            return;
-        }
-        try {
-            doRegisterBookie(false);
-        } catch (IOException e) {
-            LOG.warn("Error in transitioning back to writable mode : ", e);
-            transitionToReadOnlyMode();
-            return;
-        }
-        // clear the readonly state
-        try {
-            registrationManager.unregisterBookie(bookieId, true);
-        } catch (BookieException e) {
-            // if we failed when deleting the readonly flag in zookeeper, it 
is OK since client would
-            // already see the bookie in writable list. so just log the 
exception
-            LOG.warn("Failed to delete bookie readonly state in zookeeper : ", 
e);
-            return;
-        }
-    }
-
-    /**
-     * Transition the bookie to readOnly mode.
-     */
-    private Future<Void> transitionToReadOnlyMode() {
-        return stateService.submit(new Callable<Void>() {
-            @Override
-            public Void call() {
-                doTransitionToReadOnlyMode();
-                return (Void) null;
-            }
-        });
-    }
-
-    @VisibleForTesting
-    public void doTransitionToReadOnlyMode() {
-        if (shuttingdown) {
-            return;
-        }
-        if (!bookieStatus.setToReadOnlyMode()) {
-            return;
-        }
-        if (!conf.isReadOnlyModeEnabled()) {
-            LOG.warn("ReadOnly mode is not enabled. "
-                    + "Can be enabled by configuring "
-                    + "'readOnlyModeEnabled=true' in configuration."
-                    + "Shutting down bookie");
-            triggerBookieShutdown(ExitCode.BOOKIE_EXCEPTION);
-            return;
-        }
-        LOG.info("Transitioning Bookie to ReadOnly mode,"
-                + " and will serve only read requests from clients!");
-        // persist the bookie status if we enable this
-        if (conf.isPersistBookieStatusEnabled()) {
-            
this.bookieStatus.writeToDirectories(ledgerDirsManager.getAllLedgerDirs());
-        }
-        // change zookeeper state only when using zookeeper
-        if (null == registrationManager) {
-            return;
-        }
-        try {
-            registrationManager.registerBookie(bookieId, true);
-        } catch (BookieException e) {
-            LOG.error("Error in transition to ReadOnly Mode."
-                    + " Shutting down", e);
-            triggerBookieShutdown(ExitCode.BOOKIE_EXCEPTION);
-            return;
-        }
-    }
-
     /*
-     * Check whether Bookie is writable
+     * Check whether Bookie is writable.
      */
     public boolean isReadOnly() {
-        return forceReadOnly.get() || bookieStatus.isInReadOnlyMode();
+        return stateManager.isReadOnly();
     }
 
     public boolean isRunning() {
-        return running;
+        return stateManager.isRunning();
     }
 
     @Override
@@ -1137,7 +950,7 @@ public class Bookie extends BookieCriticalThread {
             LOG.warn("Interrupted on running journal thread : ", ie);
         }
         // if the journal thread quits due to shutting down, it is ok
-        if (!shuttingdown) {
+        if (!stateManager.isShuttingDown()) {
             // some error found in journal thread and it quits
             // following add operations to it would hang unit client timeout
             // so we should let bookie server exists
@@ -1175,19 +988,19 @@ public class Bookie extends BookieCriticalThread {
     // when encountering exception
     synchronized int shutdown(int exitCode) {
         try {
-            if (running) { // avoid shutdown twice
+            if (isRunning()) { // avoid shutdown twice
                 // the exitCode only set when first shutdown usually due to 
exception found
                 LOG.info("Shutting down Bookie-{} with exitCode {}",
                          conf.getBookiePort(), exitCode);
                 if (this.exitCode == ExitCode.OK) {
                     this.exitCode = exitCode;
                 }
-                // mark bookie as in shutting down progress
-                shuttingdown = true;
+
+                stateManager.forceToShuttingDown();
 
                 // turn bookie to read only during shutting down process
                 LOG.info("Turning bookie to read only during shut down");
-                this.forceReadOnly.set(true);
+                stateManager.forceToReadOnly();
 
                 // Shutdown Sync thread
                 syncThread.shutdown();
@@ -1219,8 +1032,6 @@ public class Bookie extends BookieCriticalThread {
                     idxMonitor.shutdown();
                 }
 
-                // Shutdown the state service
-                stateService.shutdown();
             }
             // Shutdown the ZK client
             if (registrationManager != null) {
@@ -1231,7 +1042,7 @@ public class Bookie extends BookieCriticalThread {
         } finally {
             // setting running to false here, so watch thread
             // in bookie server know it only after bookie shut down
-            running = false;
+            stateManager.close();
         }
         return this.exitCode;
     }
@@ -1306,7 +1117,7 @@ public class Bookie extends BookieCriticalThread {
             }
             success = true;
         } catch (NoWritableLedgerDirException e) {
-            transitionToReadOnlyMode();
+            stateManager.transitionToReadOnlyMode();
             throw new IOException(e);
         } finally {
             long elapsedNanos = MathUtils.elapsedNanos(requestNanos);
@@ -1331,7 +1142,7 @@ public class Bookie extends BookieCriticalThread {
                 handle.setExplicitLac(entry);
             }
         } catch (NoWritableLedgerDirException e) {
-            transitionToReadOnlyMode();
+            stateManager.transitionToReadOnlyMode();
             throw new IOException(e);
         }
     }
@@ -1366,7 +1177,7 @@ public class Bookie extends BookieCriticalThread {
             }
             success = true;
         } catch (NoWritableLedgerDirException e) {
-            transitionToReadOnlyMode();
+            stateManager.transitionToReadOnlyMode();
             throw new IOException(e);
         } finally {
             long elapsedNanos = MathUtils.elapsedNanos(requestNanos);
@@ -1458,6 +1269,11 @@ public class Bookie extends BookieCriticalThread {
         return ledgerStorage;
     }
 
+    @VisibleForTesting
+    public BookieStateManager getStateManager() {
+        return (BookieStateManager) this.stateManager;
+    }
+
     // The rest of the code is test stuff
     static class CounterCallback implements WriteCallback {
         int count;
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
index bfc24fc..780852a 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
@@ -2331,8 +2331,8 @@ public class BookieShell implements Tool {
             };
 
             interleavedStorage.initialize(conf, null, ledgerDirsManager, 
ledgerIndexManager,
-                    checkpointSource, checkpointer, NullStatsLogger.INSTANCE);
-            dbStorage.initialize(conf, null, ledgerDirsManager, 
ledgerIndexManager,
+                    null, checkpointSource, checkpointer, 
NullStatsLogger.INSTANCE);
+            dbStorage.initialize(conf, null, ledgerDirsManager, 
ledgerIndexManager, null,
                     checkpointSource, checkpointer, NullStatsLogger.INSTANCE);
 
             int convertedLedgers = 0;
@@ -2420,10 +2420,10 @@ public class BookieShell implements Tool {
                 }
             };
 
-            dbStorage.initialize(conf, null, ledgerDirsManager, 
ledgerIndexManager,
+            dbStorage.initialize(conf, null, ledgerDirsManager, 
ledgerIndexManager, null,
                         checkpointSource, checkpointer, 
NullStatsLogger.INSTANCE);
             interleavedStorage.initialize(conf, null, ledgerDirsManager, 
ledgerIndexManager,
-                    checkpointSource, checkpointer, NullStatsLogger.INSTANCE);
+                    null, checkpointSource, checkpointer, 
NullStatsLogger.INSTANCE);
             LedgerCache interleavedLedgerCache = 
interleavedStorage.ledgerCache;
 
             EntryLocationIndex dbEntryLocationIndex = 
dbStorage.getEntryLocationIndex();
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieStateManager.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieStateManager.java
new file mode 100644
index 0000000..093dcad
--- /dev/null
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieStateManager.java
@@ -0,0 +1,310 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.SERVER_STATUS;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.discover.RegistrationManager;
+import org.apache.bookkeeper.discover.ZKRegistrationManager;
+import org.apache.bookkeeper.stats.Gauge;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.util.DiskChecker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An implementation of StateManager.
+ */
+public class BookieStateManager implements StateManager {
+    private static final Logger LOG = 
LoggerFactory.getLogger(BookieStateManager.class);
+    private final ServerConfiguration conf;
+    private final LedgerDirsManager ledgerDirsManager;
+
+
+    // use an executor to execute the state changes task
+    final ExecutorService stateService = Executors.newSingleThreadExecutor(
+            new 
ThreadFactoryBuilder().setNameFormat("BookieStateManagerService-%d").build());
+
+    // Running flag
+    private volatile boolean running = false;
+    // Flag identify whether it is in shutting down progress
+    private volatile boolean shuttingdown = false;
+    // Bookie status
+    private final BookieStatus bookieStatus = new BookieStatus();
+    private final AtomicBoolean rmRegistered = new AtomicBoolean(false);
+    private final AtomicBoolean forceReadOnly = new AtomicBoolean(false);
+
+    private final String bookieId;
+    private ShutdownHandler shutdownHandler;
+    private RegistrationManager registrationManager;
+    // Expose Stats
+    private final StatsLogger statsLogger;
+
+
+    public BookieStateManager(ServerConfiguration conf, StatsLogger 
statsLogger,
+           RegistrationManager registrationManager, LedgerDirsManager 
ledgerDirsManager) throws IOException {
+        this.conf = conf;
+        this.statsLogger = statsLogger;
+        this.registrationManager = registrationManager;
+        this.ledgerDirsManager = ledgerDirsManager;
+        // ZK ephemeral node for this Bookie.
+        this.bookieId = getMyId();
+        // 1 : up, 0 : readonly, -1 : unregistered
+        statsLogger.registerGauge(SERVER_STATUS, new Gauge<Number>() {
+            @Override
+            public Number getDefaultValue() {
+                return 0;
+            }
+
+            @Override
+            public Number getSample() {
+                if (!rmRegistered.get()){
+                    return -1;
+                } else if (forceReadOnly.get() || 
bookieStatus.isInReadOnlyMode()) {
+                    return 0;
+                } else {
+                    return 1;
+                }
+            }
+        });
+    }
+
+    @VisibleForTesting
+    BookieStateManager(ServerConfiguration conf, RegistrationManager 
registrationManager) throws IOException {
+        this(conf, NullStatsLogger.INSTANCE, registrationManager, new 
LedgerDirsManager(conf, conf.getLedgerDirs(),
+                new DiskChecker(conf.getDiskUsageThreshold(), 
conf.getDiskUsageWarnThreshold()),
+                NullStatsLogger.INSTANCE));
+    }
+
+    @Override
+    public void initState(){
+        if (forceReadOnly.get()) {
+            this.bookieStatus.setToReadOnlyMode();
+        } else if (conf.isPersistBookieStatusEnabled()) {
+            
this.bookieStatus.readFromDirectories(ledgerDirsManager.getAllLedgerDirs());
+        }
+        running = true;
+    }
+
+    @Override
+    public void forceToShuttingDown(){
+        // mark bookie as in shutting down progress
+        shuttingdown = true;
+    }
+
+    @Override
+    public void forceToReadOnly(){
+        this.forceReadOnly.set(true);
+    }
+
+    @Override
+    public void forceToUnregistered(){
+        this.rmRegistered.set(false);
+    }
+
+    @Override
+    public boolean isReadOnly(){
+        return forceReadOnly.get() || bookieStatus.isInReadOnlyMode();
+    }
+
+    @Override
+    public boolean isRunning(){
+        return running;
+    }
+
+    @Override
+    public boolean isShuttingDown(){
+        return shuttingdown;
+    }
+
+    @Override
+    public void close() {
+        this.running = false;
+        stateService.shutdown();
+    }
+
+    @Override
+    public Future<Void> registerBookie(final boolean throwException) {
+        return stateService.submit(new Callable<Void>() {
+            @Override
+            public Void call() throws IOException {
+                try {
+                    doRegisterBookie();
+                } catch (IOException ioe) {
+                    if (throwException) {
+                        throw ioe;
+                    } else {
+                        LOG.error("Couldn't register bookie with zookeeper, 
shutting down : ", ioe);
+                        shutdownHandler.shutdown(ExitCode.ZK_REG_FAIL);
+                    }
+                }
+                return (Void) null;
+            }
+        });
+    }
+
+    @Override
+    public Future<Void> transitionToWritableMode() {
+        return stateService.submit(new Callable<Void>() {
+            @Override
+            public Void call() throws Exception{
+                doTransitionToWritableMode();
+                return null;
+            }
+        });
+    }
+
+    @Override
+    public Future<Void> transitionToReadOnlyMode() {
+        return stateService.submit(new Callable<Void>() {
+            @Override
+            public Void call() throws Exception{
+                doTransitionToReadOnlyMode();
+                return null;
+            }
+        });
+    }
+
+    void doRegisterBookie() throws IOException {
+        doRegisterBookie(forceReadOnly.get() || 
bookieStatus.isInReadOnlyMode());
+    }
+
+    private void doRegisterBookie(boolean isReadOnly) throws IOException {
+        if (null == registrationManager || ((ZKRegistrationManager) 
this.registrationManager).getZk() == null) {
+            // registration manager is null, means not register itself to zk.
+            // ZooKeeper is null existing only for testing.
+            LOG.info("null zk while do register");
+            return;
+        }
+
+        rmRegistered.set(false);
+        try {
+            registrationManager.registerBookie(bookieId, isReadOnly);
+            rmRegistered.set(true);
+        } catch (BookieException e) {
+            throw new IOException(e);
+        }
+    }
+
+    @VisibleForTesting
+    public void doTransitionToWritableMode() {
+        if (shuttingdown || forceReadOnly.get()) {
+            return;
+        }
+
+        if (!bookieStatus.setToWritableMode()) {
+            // do nothing if already in writable mode
+            return;
+        }
+        LOG.info("Transitioning Bookie to Writable mode and will serve 
read/write requests.");
+        if (conf.isPersistBookieStatusEnabled()) {
+            
bookieStatus.writeToDirectories(ledgerDirsManager.getAllLedgerDirs());
+        }
+        // change zookeeper state only when using zookeeper
+        if (null == registrationManager) {
+            return;
+        }
+        try {
+            doRegisterBookie(false);
+        } catch (IOException e) {
+            LOG.warn("Error in transitioning back to writable mode : ", e);
+            transitionToReadOnlyMode();
+            return;
+        }
+        // clear the readonly state
+        try {
+            registrationManager.unregisterBookie(bookieId, true);
+        } catch (BookieException e) {
+            // if we failed when deleting the readonly flag in zookeeper, it 
is OK since client would
+            // already see the bookie in writable list. so just log the 
exception
+            LOG.warn("Failed to delete bookie readonly state in zookeeper : ", 
e);
+            return;
+        }
+    }
+
+    @VisibleForTesting
+    public void doTransitionToReadOnlyMode() {
+        if (shuttingdown) {
+            return;
+        }
+        if (!bookieStatus.setToReadOnlyMode()) {
+            return;
+        }
+        if (!conf.isReadOnlyModeEnabled()) {
+            LOG.warn("ReadOnly mode is not enabled. "
+                    + "Can be enabled by configuring "
+                    + "'readOnlyModeEnabled=true' in configuration."
+                    + " Shutting down bookie");
+            shutdownHandler.shutdown(ExitCode.BOOKIE_EXCEPTION);
+            return;
+        }
+        LOG.info("Transitioning Bookie to ReadOnly mode,"
+                + " and will serve only read requests from clients!");
+        // persist the bookie status if we enable this
+        if (conf.isPersistBookieStatusEnabled()) {
+            
this.bookieStatus.writeToDirectories(ledgerDirsManager.getAllLedgerDirs());
+        }
+        // change zookeeper state only when using zookeeper
+        if (null == registrationManager) {
+            return;
+        }
+        try {
+            registrationManager.registerBookie(bookieId, true);
+        } catch (BookieException e) {
+            LOG.error("Error in transition to ReadOnly Mode."
+                    + " Shutting down", e);
+            shutdownHandler.shutdown(ExitCode.BOOKIE_EXCEPTION);
+            return;
+        }
+    }
+    public void setShutdownHandler(ShutdownHandler handler){
+        shutdownHandler = handler;
+    }
+
+    private String getMyId() throws UnknownHostException {
+        return Bookie.getBookieAddress(conf).toString();
+    }
+
+    @VisibleForTesting
+    public void setRegistrationManager(RegistrationManager rm) {
+        this.registrationManager = rm;
+    }
+    @VisibleForTesting
+    public ShutdownHandler getShutdownHandler(){
+        return shutdownHandler;
+    }
+    @VisibleForTesting
+    boolean isRegistered(){
+        return rmRegistered.get();
+    }
+}
+
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
index bcb5dc5..5d5d74e 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
@@ -90,6 +90,7 @@ public class InterleavedLedgerStorage implements 
CompactableLedgerStorage, Entry
                            LedgerManager ledgerManager,
                            LedgerDirsManager ledgerDirsManager,
                            LedgerDirsManager indexDirsManager,
+                           StateManager stateManager,
                            CheckpointSource checkpointSource,
                            Checkpointer checkpointer,
                            StatsLogger statsLogger)
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
index ac0df00..83ac2c0 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
@@ -45,6 +45,7 @@ public interface LedgerStorage {
                     LedgerManager ledgerManager,
                     LedgerDirsManager ledgerDirsManager,
                     LedgerDirsManager indexDirsManager,
+                    StateManager stateManager,
                     CheckpointSource checkpointSource,
                     Checkpointer checkpointer,
                     StatsLogger statsLogger)
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyBookie.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyBookie.java
index 3970a1d..464f127 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyBookie.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyBookie.java
@@ -22,7 +22,6 @@
 package org.apache.bookkeeper.bookie;
 
 import java.io.IOException;
-
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.zookeeper.KeeperException;
@@ -42,8 +41,22 @@ public class ReadOnlyBookie extends Bookie {
     public ReadOnlyBookie(ServerConfiguration conf, StatsLogger statsLogger)
             throws IOException, KeeperException, InterruptedException, 
BookieException {
         super(conf, statsLogger);
+        stateManager = new BookieStateManager(conf, statsLogger, 
registrationManager, getLedgerDirsManager()) {
+
+            @Override
+            public void doTransitionToWritableMode() {
+                // no-op
+                LOG.info("Skip transition to writable mode for readonly 
bookie");
+            }
+
+            @Override
+            public void doTransitionToReadOnlyMode() {
+                // no-op
+                LOG.info("Skip transition to readonly mode for readonly 
bookie");
+            }
+        };
         if (conf.isReadOnlyModeEnabled()) {
-            forceReadOnly.set(true);
+            stateManager.forceToReadOnly();
         } else {
             String err = "Try to init ReadOnly Bookie, while ReadOnly mode is 
not enabled";
             LOG.error(err);
@@ -51,16 +64,4 @@ public class ReadOnlyBookie extends Bookie {
         }
         LOG.info("Running bookie in force readonly mode.");
     }
-
-    @Override
-    public void doTransitionToWritableMode() {
-        // no-op
-        LOG.info("Skip transition to writable mode for readonly bookie");
-    }
-
-    @Override
-    public void doTransitionToReadOnlyMode() {
-        // no-op
-        LOG.info("Skip transition to readonly mode for readonly bookie");
-    }
 }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
index b0c6bad..0e3e3b9 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
@@ -47,6 +47,7 @@ public class SortedLedgerStorage extends 
InterleavedLedgerStorage
 
     EntryMemTable memTable;
     private ScheduledExecutorService scheduler;
+    private StateManager stateManager;
 
     public SortedLedgerStorage() {
         super();
@@ -57,6 +58,7 @@ public class SortedLedgerStorage extends 
InterleavedLedgerStorage
                            LedgerManager ledgerManager,
                            LedgerDirsManager ledgerDirsManager,
                            LedgerDirsManager indexDirsManager,
+                           StateManager stateManager,
                            CheckpointSource checkpointSource,
                            Checkpointer checkpointer,
                            StatsLogger statsLogger)
@@ -66,6 +68,7 @@ public class SortedLedgerStorage extends 
InterleavedLedgerStorage
             ledgerManager,
             ledgerDirsManager,
             indexDirsManager,
+            stateManager,
             checkpointSource,
             checkpointer,
             statsLogger);
@@ -74,6 +77,7 @@ public class SortedLedgerStorage extends 
InterleavedLedgerStorage
                 new ThreadFactoryBuilder()
                 .setNameFormat("SortedLedgerStorage-%d")
                 .setPriority((Thread.NORM_PRIORITY + Thread.MAX_PRIORITY) / 
2).build());
+        this.stateManager = stateManager;
     }
 
     @VisibleForTesting
@@ -218,8 +222,7 @@ public class SortedLedgerStorage extends 
InterleavedLedgerStorage
                         checkpointer.startCheckpoint(cp);
                     }
                 } catch (IOException e) {
-                    // TODO: if we failed to flush data, we should switch the 
bookie back to readonly mode
-                    //       or shutdown it. {@link 
https://github.com/apache/bookkeeper/issues/280}
+                    stateManager.transitionToReadOnlyMode();
                     LOG.error("Exception thrown while flushing skip list 
cache.", e);
                 }
             }
@@ -233,4 +236,9 @@ public class SortedLedgerStorage extends 
InterleavedLedgerStorage
         // can happen because compaction. in a sorted ledger storage, 
checkpoint should happen after the data is
         // flushed to the entry log file.
     }
+
+    BookieStateManager getStateManager(){
+        return (BookieStateManager) stateManager;
+    }
+
 }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/StateManager.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/StateManager.java
new file mode 100644
index 0000000..ad4ac0c
--- /dev/null
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/StateManager.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import java.util.concurrent.Future;
+
+/**
+ * State management of Bookie, including register, turn bookie to w/r mode.
+ */
+public interface StateManager extends AutoCloseable {
+
+
+    /**
+     * Init state of Bookie when launch bookie.
+     */
+    void initState();
+
+    /**
+     * Check is ReadOnly.
+     */
+    boolean isReadOnly();
+
+    /**
+     * Check is Running.
+     */
+    boolean isRunning();
+
+    /**
+     * Check is Shutting down.
+     */
+    boolean isShuttingDown();
+
+    /**
+     * Close the manager, release its resources.
+     */
+    @Override
+    void close();
+
+    /**
+     * Register the bookie to RegistrationManager.
+     * @params throwException, whether throwException or not
+     */
+    Future<Void> registerBookie(boolean throwException);
+
+    // forceTos methods below should be called inside Bookie,
+    // which indicates important state of bookie and should be visible fast.
+    /**
+     * Turn state to the shutting down progress,just the flag.
+     */
+    void forceToShuttingDown();
+
+    /**
+     * Turn state to the read only, just flag.
+     */
+    void forceToReadOnly();
+
+    /**
+     * Turn state to not registered, just the flag.
+     */
+    void forceToUnregistered();
+
+    /**
+     * Change the state of bookie to Writable mode.
+     */
+    Future<Void> transitionToWritableMode();
+
+    /**
+     * Change the state of bookie to ReadOnly mode.
+     */
+    Future<Void> transitionToReadOnlyMode();
+
+    /**
+     * ShutdownHandler used to shutdown bookie.
+     */
+    interface ShutdownHandler {
+        void shutdown(int code);
+    }
+
+    void setShutdownHandler(ShutdownHandler handler);
+}
+
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
index b0257ec..4852fbe 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
@@ -53,6 +53,7 @@ import org.apache.bookkeeper.bookie.EntryLogger;
 import org.apache.bookkeeper.bookie.GarbageCollectorThread;
 import org.apache.bookkeeper.bookie.LastAddConfirmedUpdateNotification;
 import org.apache.bookkeeper.bookie.LedgerDirsManager;
+import org.apache.bookkeeper.bookie.StateManager;
 import 
org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorageDataFormats.LedgerData;
 import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorage.Batch;
 import org.apache.bookkeeper.common.util.Watcher;
@@ -136,8 +137,8 @@ public class DbLedgerStorage implements 
CompactableLedgerStorage {
 
     @Override
     public void initialize(ServerConfiguration conf, LedgerManager 
ledgerManager, LedgerDirsManager ledgerDirsManager,
-            LedgerDirsManager indexDirsManager, CheckpointSource 
checkpointSource, Checkpointer checkpointer,
-            StatsLogger statsLogger) throws IOException {
+        LedgerDirsManager indexDirsManager, StateManager stateManager, 
CheckpointSource checkpointSource,
+                           Checkpointer checkpointer, StatsLogger statsLogger) 
throws IOException {
         checkArgument(ledgerDirsManager.getAllLedgerDirs().size() == 1,
                 "Db implementation only allows for one storage dir");
 
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
index 0c240da..2e42b05 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
@@ -27,7 +27,6 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-
 import java.io.BufferedWriter;
 import java.io.File;
 import java.io.FileOutputStream;
@@ -109,8 +108,9 @@ public class BookieInitializationTest extends 
BookKeeperClusterTestCase {
         }
 
         void testRegisterBookie(ServerConfiguration conf) throws IOException {
-            super.doRegisterBookie();
+            super.getStateManager().doRegisterBookie();
         }
+
     }
 
     /**
@@ -133,7 +133,7 @@ public class BookieInitializationTest extends 
BookKeeperClusterTestCase {
                     BookieException {
                 MockBookie bookie = new MockBookie(conf);
                 rm.initialize(conf, () -> {}, NullStatsLogger.INSTANCE);
-                bookie.registrationManager = rm;
+                bookie.setRegistrationManager(rm);
                 ((ZKRegistrationManager) 
bookie.registrationManager).setZk(zkc);
                 ((ZKRegistrationManager) 
bookie.registrationManager).getZk().close();
                 return bookie;
@@ -162,8 +162,7 @@ public class BookieInitializationTest extends 
BookKeeperClusterTestCase {
         MockBookie b = new MockBookie(conf);
         conf.setZkServers(zkUtil.getZooKeeperConnectString());
         rm.initialize(conf, () -> {}, NullStatsLogger.INSTANCE);
-        b.registrationManager = rm;
-
+        b.setRegistrationManager(rm);
         b.testRegisterBookie(conf);
         ZooKeeper zooKeeper = ((ZKRegistrationManager) rm).getZk();
         assertNotNull("Bookie registration node doesn't exists!",
@@ -196,8 +195,7 @@ public class BookieInitializationTest extends 
BookKeeperClusterTestCase {
 
         conf.setZkServers(zkUtil.getZooKeeperConnectString());
         rm.initialize(conf, () -> {}, NullStatsLogger.INSTANCE);
-        b.registrationManager = rm;
-
+        b.setRegistrationManager(rm);
         b.testRegisterBookie(conf);
 
         Stat bkRegNode1 = ((ZKRegistrationManager) 
rm).getZk().exists(bkRegPath, false);
@@ -208,7 +206,7 @@ public class BookieInitializationTest extends 
BookKeeperClusterTestCase {
         // zkclient and doing the registration.
         RegistrationManager newRm = new ZKRegistrationManager();
         newRm.initialize(conf, () -> {}, NullStatsLogger.INSTANCE);
-        b.registrationManager = newRm;
+        b.setRegistrationManager(newRm);
 
         try (ZooKeeperClient newZk = createNewZKClient()) {
             // deleting the znode, so that the bookie registration should
@@ -308,8 +306,7 @@ public class BookieInitializationTest extends 
BookKeeperClusterTestCase {
 
         conf.setZkServers(zkUtil.getZooKeeperConnectString());
         rm.initialize(conf, () -> {}, NullStatsLogger.INSTANCE);
-        b.registrationManager = rm;
-
+        b.setRegistrationManager(rm);
         b.testRegisterBookie(conf);
         Stat bkRegNode1 = zkc.exists(bkRegPath, false);
         assertNotNull("Bookie registration node doesn't exists!",
@@ -320,7 +317,7 @@ public class BookieInitializationTest extends 
BookKeeperClusterTestCase {
         ZooKeeperClient newzk = createNewZKClient();
         RegistrationManager newRm = new ZKRegistrationManager();
         newRm.initialize(conf, () -> {}, NullStatsLogger.INSTANCE);
-        b.registrationManager = newRm;
+        b.setRegistrationManager(newRm);
         try {
             b.testRegisterBookie(conf);
             fail("Should throw NodeExistsException as the znode is not getting 
expired");
@@ -814,7 +811,7 @@ public class BookieInitializationTest extends 
BookKeeperClusterTestCase {
         Bookie bookie = bookieServer.getBookie();
         assertFalse(bookie.isReadOnly());
         // transition to readonly mode, bookie status should be persisted in 
ledger disks
-        bookie.doTransitionToReadOnlyMode();
+        bookie.getStateManager().doTransitionToReadOnlyMode();
         assertTrue(bookie.isReadOnly());
 
         // restart bookie should start in read only mode
@@ -824,7 +821,7 @@ public class BookieInitializationTest extends 
BookKeeperClusterTestCase {
         bookie = bookieServer.getBookie();
         assertTrue(bookie.isReadOnly());
         // transition to writable mode
-        bookie.doTransitionToWritableMode();
+        bookie.getStateManager().doTransitionToWritableMode();
         // restart bookie should start in writable mode
         bookieServer.shutdown();
         bookieServer = new BookieServer(conf);
@@ -850,8 +847,8 @@ public class BookieInitializationTest extends 
BookKeeperClusterTestCase {
         bookieServer.start();
         Bookie bookie = bookieServer.getBookie();
         // persist bookie status
-        bookie.doTransitionToReadOnlyMode();
-        bookie.doTransitionToWritableMode();
+        bookie.getStateManager().doTransitionToReadOnlyMode();
+        bookie.getStateManager().doTransitionToWritableMode();
         assertFalse(bookie.isReadOnly());
         bookieServer.shutdown();
         // start read only bookie
@@ -863,7 +860,7 @@ public class BookieInitializationTest extends 
BookKeeperClusterTestCase {
         bookie = bookieServer.getBookie();
         assertTrue(bookie.isReadOnly());
         // transition to writable should fail
-        bookie.doTransitionToWritableMode();
+        bookie.getStateManager().doTransitionToWritableMode();
         assertTrue(bookie.isReadOnly());
         bookieServer.shutdown();
     }
@@ -892,7 +889,7 @@ public class BookieInitializationTest extends 
BookKeeperClusterTestCase {
         // transition in to read only and persist the status on disk
         Bookie bookie = bookieServer.getBookie();
         assertFalse(bookie.isReadOnly());
-        bookie.doTransitionToReadOnlyMode();
+        bookie.getStateManager().doTransitionToReadOnlyMode();
         assertTrue(bookie.isReadOnly());
         // corrupt status file
         List<File> ledgerDirs = 
bookie.getLedgerDirsManager().getAllLedgerDirs();
@@ -931,7 +928,7 @@ public class BookieInitializationTest extends 
BookKeeperClusterTestCase {
         // transition in to read only and persist the status on disk
         Bookie bookie = bookieServer.getBookie();
         assertFalse(bookie.isReadOnly());
-        bookie.doTransitionToReadOnlyMode();
+        bookie.getStateManager().doTransitionToReadOnlyMode();
         assertTrue(bookie.isReadOnly());
         // Manually update a status file, so it becomes the latest
         Thread.sleep(1);
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
index a678139..8fb7980 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
@@ -25,10 +25,8 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
-
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -40,7 +38,6 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
-
 import 
org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.client.LedgerEntry;
@@ -235,7 +232,6 @@ public abstract class CompactionTest extends 
BookKeeperClusterTestCase {
                 // Do nothing.
             }
         };
-
         for (File journalDir : conf.getJournalDirs()) {
             Bookie.checkDirectoryStructure(journalDir);
         }
@@ -248,6 +244,7 @@ public abstract class CompactionTest extends 
BookKeeperClusterTestCase {
             LedgerManagerFactory.newLedgerManagerFactory(conf, 
zkc).newLedgerManager(),
             dirManager,
             dirManager,
+            null,
             cp,
             Checkpointer.NULL,
             NullStatsLogger.INSTANCE);
@@ -636,6 +633,7 @@ public abstract class CompactionTest extends 
BookKeeperClusterTestCase {
             manager,
             dirs,
             dirs,
+            null,
             checkpointSource,
             Checkpointer.NULL,
             NullStatsLogger.INSTANCE);
@@ -660,7 +658,7 @@ public abstract class CompactionTest extends 
BookKeeperClusterTestCase {
         storage.initialize(
             conf,
             manager,
-            dirs, dirs,
+            dirs, dirs, null,
             checkpointSource,
             Checkpointer.NULL,
             NullStatsLogger.INSTANCE);
@@ -684,6 +682,7 @@ public abstract class CompactionTest extends 
BookKeeperClusterTestCase {
             manager,
             dirs,
             dirs,
+            null,
             checkpointSource,
             Checkpointer.NULL,
             NullStatsLogger.INSTANCE);
@@ -788,6 +787,7 @@ public abstract class CompactionTest extends 
BookKeeperClusterTestCase {
             manager,
             dirs,
             dirs,
+            null,
             checkpointSource,
             Checkpointer.NULL,
             NullStatsLogger.INSTANCE);
@@ -843,6 +843,7 @@ public abstract class CompactionTest extends 
BookKeeperClusterTestCase {
             LedgerManagerFactory.newLedgerManagerFactory(conf, 
zkc).newLedgerManager(),
             dirManager,
             dirManager,
+            null,
             cp,
             Checkpointer.NULL,
             NullStatsLogger.INSTANCE);
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
index b14358b..72e7d27 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
@@ -21,6 +21,7 @@
 
 package org.apache.bookkeeper.bookie;
 
+import static org.apache.bookkeeper.bookie.BookieException.Code.OK;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -36,13 +37,14 @@ import java.util.List;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
-
 import org.apache.bookkeeper.bookie.Bookie.NoLedgerException;
 import org.apache.bookkeeper.bookie.FileInfoBackingCache.CachedFileInfo;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.conf.TestBKConfiguration;
 import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.meta.LedgerManagerFactory;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.util.BookKeeperConstants;
 import org.apache.bookkeeper.util.IOUtils;
@@ -474,6 +476,7 @@ public class LedgerCacheTest {
                                LedgerManager ledgerManager,
                                LedgerDirsManager ledgerDirsManager,
                                LedgerDirsManager indexDirsManager,
+                               StateManager stateManager,
                                CheckpointSource checkpointSource,
                                Checkpointer checkpointer,
                                StatsLogger statsLogger) throws IOException {
@@ -482,6 +485,7 @@ public class LedgerCacheTest {
                 ledgerManager,
                 ledgerDirsManager,
                 indexDirsManager,
+                stateManager,
                 checkpointSource,
                 checkpointer,
                 statsLogger);
@@ -500,6 +504,20 @@ public class LedgerCacheTest {
             }
             super.process(ledgerId, entryId, buffer);
         }
+        // simplified memTable full callback.
+        @Override
+        public void onSizeLimitReached(final CheckpointSource.Checkpoint cp) 
throws IOException {
+            LOG.info("Reached size {}", cp);
+            // use synchronous way
+            try {
+                LOG.info("Started flushing mem table.");
+                memTable.flush(FlushTestSortedLedgerStorage.this);
+            } catch (IOException e) {
+                getStateManager().doTransitionToReadOnlyMode();
+                LOG.error("Exception thrown while flushing skip list cache.", 
e);
+         }
+         }
+
     }
 
     @Test
@@ -523,7 +541,6 @@ public class LedgerCacheTest {
         // without the fileinfo, 'flushTestSortedLedgerStorage.addEntry' calls 
will fail
         // because of BOOKKEEPER-965 change.
         bookie.addEntry(generateEntry(1, 1), new Bookie.NopWriteCallback(), 
null, "passwd".getBytes());
-
         flushTestSortedLedgerStorage.addEntry(generateEntry(1, 2));
         assertFalse("Bookie is expected to be in ReadWrite mode", 
bookie.isReadOnly());
         assertTrue("EntryMemTable SnapShot is expected to be empty", 
memTable.snapshot.isEmpty());
@@ -548,6 +565,51 @@ public class LedgerCacheTest {
                 memTable.snapshot.isEmpty());
     }
 
+    @Test
+    public void testSortedLedgerFlushFailure() throws Exception {
+        // most of the code is same to the testEntryMemTableFlushFailure
+        File tmpDir = createTempDir("bkTest", ".dir");
+        File curDir = Bookie.getCurrentDirectory(tmpDir);
+        Bookie.checkDirectoryStructure(curDir);
+
+        int gcWaitTime = 1000;
+        ServerConfiguration conf = 
TestBKConfiguration.newServerConfiguration();
+        conf.setGcWaitTime(gcWaitTime)
+            .setLedgerDirNames(new String[] { tmpDir.toString() })
+            .setJournalDirName(tmpDir.toString())
+            
.setLedgerStorageClass(FlushTestSortedLedgerStorage.class.getName());
+
+        Bookie bookie = new Bookie(conf);
+        bookie.start();
+        FlushTestSortedLedgerStorage flushTestSortedLedgerStorage = 
(FlushTestSortedLedgerStorage) bookie.ledgerStorage;
+        EntryMemTable memTable = flushTestSortedLedgerStorage.memTable;
+
+        bookie.addEntry(generateEntry(1, 1), new Bookie.NopWriteCallback(), 
null, "passwd".getBytes());
+        flushTestSortedLedgerStorage.addEntry(generateEntry(1, 2));
+        assertFalse("Bookie is expected to be in ReadWrite mode", 
bookie.isReadOnly());
+        assertTrue("EntryMemTable SnapShot is expected to be empty", 
memTable.snapshot.isEmpty());
+
+        // set flags, so that FlushTestSortedLedgerStorage simulates 
FlushFailure scenario
+        flushTestSortedLedgerStorage.setInjectMemTableSizeLimitReached(true);
+        flushTestSortedLedgerStorage.setInjectFlushException(true);
+        flushTestSortedLedgerStorage.addEntry(generateEntry(1, 2));
+
+        // since we simulated sizeLimitReached, snapshot shouldn't be empty
+        assertFalse("EntryMemTable SnapShot is not expected to be empty", 
memTable.snapshot.isEmpty());
+        // after flush failure, the bookie is set to readOnly
+        assertTrue("Bookie is expected to be in Read mode", 
bookie.isReadOnly());
+        // write fail
+        bookie.addEntry(generateEntry(1, 3), new 
BookkeeperInternalCallbacks.WriteCallback(){
+            public void writeComplete(int rc, long ledgerId, long entryId, 
BookieSocketAddress addr, Object ctx){
+                LOG.info("fail write to bk");
+                assertTrue(rc != OK);
+            };
+
+        }, null, "passwd".getBytes());
+        bookie.shutdown();
+
+    }
+
     private ByteBuf generateEntry(long ledger, long entry) {
         byte[] data = ("ledger-" + ledger + "-" + entry).getBytes();
         ByteBuf bb = Unpooled.buffer(8 + 8 + data.length);
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java
index c70b0e0..2dfebdf 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java
@@ -119,11 +119,13 @@ public class SortedLedgerStorageCheckpointTest extends 
LedgerStorageTestBase {
                 log.error("Failed to checkpoint at {}", checkpoint, e);
             }
         });
+        // if the SortedLedgerStorage need not to change bookie's state, pass 
StateManager==null is ok
         this.storage.initialize(
             conf,
             mock(LedgerManager.class),
             ledgerDirsManager,
             ledgerDirsManager,
+            null,
             checkpointSrc,
             checkpointer,
             NullStatsLogger.INSTANCE);
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/StateManagerTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/StateManagerTest.java
new file mode 100644
index 0000000..8c8b636
--- /dev/null
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/StateManagerTest.java
@@ -0,0 +1,258 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie;
+
+import static 
org.apache.bookkeeper.bookie.BookieException.Code.MetadataStoreException;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import java.io.File;
+import java.io.IOException;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.conf.TestBKConfiguration;
+import org.apache.bookkeeper.discover.ZKRegistrationManager;
+import org.apache.bookkeeper.proto.BookieServer;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.apache.zookeeper.KeeperException;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Testing StateManager cases.
+ */
+public class StateManagerTest extends BookKeeperClusterTestCase {
+    private static final Logger LOG = LoggerFactory
+            .getLogger(StateManagerTest.class);
+
+    @Rule
+    public final TestName runtime = new TestName();
+    final ServerConfiguration conf;
+    MockZKRegistrationManager rm;
+
+    public StateManagerTest(){
+        super(0);
+        String ledgersPath = "/" + "ledgers" + runtime.getMethodName();
+        baseClientConf.setZkLedgersRootPath(ledgersPath);
+        baseConf.setZkLedgersRootPath(ledgersPath);
+        conf = TestBKConfiguration.newServerConfiguration();
+
+    }
+
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
+        zkUtil.createBKEnsemble("/" + runtime.getMethodName());
+        rm = new MockZKRegistrationManager();
+        File tmpDir = createTempDir("stateManger", "test");
+        conf.setJournalDirName(tmpDir.getPath())
+                .setLedgerDirNames(new String[] { tmpDir.getPath() })
+                .setJournalDirName(tmpDir.toString())
+                .setZkServers(zkUtil.getZooKeeperConnectString());
+    }
+
+    @Override
+    public void tearDown() throws Exception {
+        super.tearDown();
+        if (rm != null) {
+            rm.close();
+        }
+    }
+
+    private static class MockZKRegistrationManager extends 
ZKRegistrationManager {
+        boolean registerFailed = false;
+        void setRegisterFail(boolean failOrNot){
+            registerFailed = failOrNot;
+        }
+        @Override
+        public void registerBookie(String bookieId, boolean readOnly) throws 
BookieException {
+            if (registerFailed) {
+                throw BookieException.create(MetadataStoreException);
+            }
+            super.registerBookie(bookieId, readOnly);
+        }
+
+    }
+
+    /**
+     * Bookie should shutdown when it register to Registration service fail.
+     * On ZooKeeper exception, should return exit code ZK_REG_FAIL = 4
+     */
+    @Test
+    public void testShutdown() throws Exception {
+        File tmpDir = createTempDir("stateManger", "test");
+
+        final ServerConfiguration conf = 
TestBKConfiguration.newServerConfiguration();
+        conf.setJournalDirName(tmpDir.getPath())
+            .setLedgerDirNames(new String[] { tmpDir.getPath() })
+            .setJournalDirName(tmpDir.toString())
+            .setZkServers(zkUtil.getZooKeeperConnectString());
+
+        BookieServer bkServer = new BookieServer(conf) {
+            protected Bookie newBookie(ServerConfiguration conf)
+                    throws IOException, KeeperException, InterruptedException,
+                    BookieException {
+                Bookie bookie = new Bookie(conf);
+                rm.setRegisterFail(true);
+                rm.initialize(conf, () -> {}, NullStatsLogger.INSTANCE);
+                bookie.setRegistrationManager(rm);
+                return bookie;
+            }
+        };
+        bkServer.start();
+        bkServer.join();
+        assertTrue("Failed to return failCode ZK_REG_FAIL",
+                ExitCode.ZK_REG_FAIL == bkServer.getExitCode());
+    }
+
+    /**
+     * StateManager can transition between writable mode and readOnly mode if 
it was not created with readOnly mode.
+     */
+    @Test
+    public void testNormalBookieTransitions() throws Exception {
+        BookieStateManager stateManager = new BookieStateManager(conf, rm);
+        rm.initialize(conf, () -> {
+            stateManager.forceToUnregistered();
+            // schedule a re-register operation
+            stateManager.registerBookie(false);
+        }, NullStatsLogger.INSTANCE);
+
+        stateManager.initState();
+        stateManager.registerBookie(true).get();
+
+        assertTrue(stateManager.isRunning());
+        assertTrue(stateManager.isRegistered());
+
+        stateManager.transitionToReadOnlyMode().get();
+        assertTrue(stateManager.isReadOnly());
+
+        stateManager.transitionToWritableMode().get();
+        assertTrue(stateManager.isRunning());
+        assertFalse(stateManager.isReadOnly());
+
+        stateManager.close();
+        assertFalse(stateManager.isRunning());
+    }
+
+    @Test
+    public void testReadOnlyDisableBookieTransitions() throws Exception {
+        conf.setReadOnlyModeEnabled(false);
+        // readOnly disabled bk stateManager
+        BookieStateManager stateManager = new BookieStateManager(conf, rm);
+        // simulate sync shutdown logic in bookie
+        stateManager.setShutdownHandler(new StateManager.ShutdownHandler() {
+            @Override
+            public void shutdown(int code) {
+                try {
+                    if (stateManager.isRunning()) {
+                        stateManager.forceToShuttingDown();
+                        stateManager.forceToReadOnly();
+                    }
+
+                } finally {
+                    stateManager.close();
+                }
+            }
+        });
+        rm.initialize(conf, () -> {
+            stateManager.forceToUnregistered();
+            // schedule a re-register operation
+            stateManager.registerBookie(false);
+        }, NullStatsLogger.INSTANCE);
+
+        stateManager.initState();
+        stateManager.registerBookie(true).get();
+        assertTrue(stateManager.isRunning());
+
+        stateManager.transitionToReadOnlyMode().get();
+        // stateManager2 will shutdown
+        assertFalse(stateManager.isRunning());
+        // different dimension of bookie state: running <--> down, read <--> 
write, unregistered <--> registered
+        // bookie2 is set to readOnly when shutdown
+        assertTrue(stateManager.isReadOnly());
+    }
+
+    @Test
+    public void testReadOnlyBookieTransitions() throws Exception{
+        // readOnlybk, which use override stateManager impl
+        File tmpDir = createTempDir("stateManger", "test-readonly");
+        final ServerConfiguration readOnlyConf = 
TestBKConfiguration.newServerConfiguration();
+        readOnlyConf.setJournalDirName(tmpDir.getPath())
+                .setLedgerDirNames(new String[] { tmpDir.getPath() })
+                .setJournalDirName(tmpDir.toString())
+                .setZkServers(zkUtil.getZooKeeperConnectString())
+                .setForceReadOnlyBookie(true);
+        ReadOnlyBookie readOnlyBookie = new ReadOnlyBookie(readOnlyConf, 
NullStatsLogger.INSTANCE);
+        readOnlyBookie.start();
+        assertTrue(readOnlyBookie.isRunning());
+        assertTrue(readOnlyBookie.isReadOnly());
+
+        // transition has no effect if bookie start with readOnly mode
+        readOnlyBookie.getStateManager().transitionToWritableMode().get();
+        assertTrue(readOnlyBookie.isRunning());
+        assertTrue(readOnlyBookie.isReadOnly());
+        readOnlyBookie.shutdown();
+
+    }
+
+    /**
+     * Verify the bookie reg.
+     */
+    @Test
+    public void testRegistration() throws Exception {
+        BookieStateManager stateManager = new BookieStateManager(conf, rm);
+        rm.initialize(conf, () -> {
+            stateManager.forceToUnregistered();
+            // schedule a re-register operation
+            stateManager.registerBookie(false);
+        }, NullStatsLogger.INSTANCE);
+        // simulate sync shutdown logic in bookie
+        stateManager.setShutdownHandler(new StateManager.ShutdownHandler() {
+            @Override
+            public void shutdown(int code) {
+                try {
+                    if (stateManager.isRunning()) {
+                        stateManager.forceToShuttingDown();
+                        stateManager.forceToReadOnly();
+                    }
+
+                } finally {
+                    stateManager.close();
+                }
+            }
+        });
+        stateManager.initState();
+        // up
+        assertTrue(stateManager.isRunning());
+        // unregistered
+        assertFalse(stateManager.isRegistered());
+
+        stateManager.registerBookie(true).get();
+        // registered
+        assertTrue(stateManager.isRegistered());
+        stateManager.getShutdownHandler().shutdown(ExitCode.OK);
+        // readOnly
+        assertTrue(stateManager.isReadOnly());
+    }
+
+}
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
index ea60aa8..e6cb93d 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
@@ -271,6 +271,7 @@ public class TestSyncThread {
             LedgerManager ledgerManager,
             LedgerDirsManager ledgerDirsManager,
             LedgerDirsManager indexDirsManager,
+            StateManager stateManager,
             CheckpointSource checkpointSource,
             Checkpointer checkpointer,
             StatsLogger statsLogger)
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ConversionRollbackTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ConversionRollbackTest.java
index a954f28..75e061a 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ConversionRollbackTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ConversionRollbackTest.java
@@ -88,7 +88,7 @@ public class ConversionRollbackTest {
                 new DiskChecker(conf.getDiskUsageThreshold(), 
conf.getDiskUsageWarnThreshold()));
 
         DbLedgerStorage dbStorage = new DbLedgerStorage();
-        dbStorage.initialize(conf, null, ledgerDirsManager, ledgerDirsManager, 
checkpointSource, checkpointer,
+        dbStorage.initialize(conf, null, ledgerDirsManager, ledgerDirsManager, 
null, checkpointSource, checkpointer,
                 NullStatsLogger.INSTANCE);
 
         // Insert some ledger & entries in the dbStorage
@@ -118,8 +118,8 @@ public class ConversionRollbackTest {
 
         // Verify that interleaved storage index has the same entries
         InterleavedLedgerStorage interleavedStorage = new 
InterleavedLedgerStorage();
-        interleavedStorage.initialize(conf, null, ledgerDirsManager, 
ledgerDirsManager, checkpointSource, checkpointer,
-                NullStatsLogger.INSTANCE);
+        interleavedStorage.initialize(conf, null, ledgerDirsManager, 
ledgerDirsManager,
+                null, checkpointSource, checkpointer, 
NullStatsLogger.INSTANCE);
 
         Set<Long> ledgers = 
Sets.newTreeSet(interleavedStorage.getActiveLedgersInRange(0, Long.MAX_VALUE));
         Assert.assertEquals(Sets.newTreeSet(Lists.newArrayList(0L, 1L, 2L, 3L, 
4L)), ledgers);
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ConversionTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ConversionTest.java
index df5434c..1816945 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ConversionTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ConversionTest.java
@@ -85,8 +85,8 @@ public class ConversionTest {
                 new DiskChecker(conf.getDiskUsageThreshold(), 
conf.getDiskUsageWarnThreshold()));
 
         InterleavedLedgerStorage interleavedStorage = new 
InterleavedLedgerStorage();
-        interleavedStorage.initialize(conf, null, ledgerDirsManager, 
ledgerDirsManager, checkpointSource, checkpointer,
-                NullStatsLogger.INSTANCE);
+        interleavedStorage.initialize(conf, null, ledgerDirsManager, 
ledgerDirsManager,
+                null, checkpointSource, checkpointer, 
NullStatsLogger.INSTANCE);
 
         // Insert some ledger & entries in the interleaved storage
         for (long ledgerId = 0; ledgerId < 5; ledgerId++) {
@@ -115,12 +115,12 @@ public class ConversionTest {
 
         // Verify that db index has the same entries
         DbLedgerStorage dbStorage = new DbLedgerStorage();
-        dbStorage.initialize(conf, null, ledgerDirsManager, ledgerDirsManager, 
checkpointSource, checkpointer,
-                NullStatsLogger.INSTANCE);
+        dbStorage.initialize(conf, null, ledgerDirsManager, ledgerDirsManager,
+                null, checkpointSource, checkpointer, 
NullStatsLogger.INSTANCE);
 
         interleavedStorage = new InterleavedLedgerStorage();
-        interleavedStorage.initialize(conf, null, ledgerDirsManager, 
ledgerDirsManager, checkpointSource, checkpointer,
-                NullStatsLogger.INSTANCE);
+        interleavedStorage.initialize(conf, null, ledgerDirsManager,
+                ledgerDirsManager, null, checkpointSource, checkpointer, 
NullStatsLogger.INSTANCE);
 
         Set<Long> ledgers = 
Sets.newTreeSet(dbStorage.getActiveLedgersInRange(0, Long.MAX_VALUE));
         Assert.assertEquals(Sets.newTreeSet(Lists.newArrayList(0L, 1L, 2L, 3L, 
4L)), ledgers);
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildTest.java
index 4793ec7..c1f7e39 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildTest.java
@@ -86,7 +86,7 @@ public class LocationsIndexRebuildTest {
                 new DiskChecker(conf.getDiskUsageThreshold(), 
conf.getDiskUsageWarnThreshold()));
 
         DbLedgerStorage ledgerStorage = new DbLedgerStorage();
-        ledgerStorage.initialize(conf, null, ledgerDirsManager, 
ledgerDirsManager, checkpointSource, checkpointer,
+        ledgerStorage.initialize(conf, null, ledgerDirsManager, 
ledgerDirsManager, null, checkpointSource, checkpointer,
                 NullStatsLogger.INSTANCE);
 
         // Insert some ledger & entries in the storage
@@ -116,7 +116,7 @@ public class LocationsIndexRebuildTest {
 
         // Verify that db index has the same entries
         ledgerStorage = new DbLedgerStorage();
-        ledgerStorage.initialize(conf, null, ledgerDirsManager, 
ledgerDirsManager, checkpointSource, checkpointer,
+        ledgerStorage.initialize(conf, null, ledgerDirsManager, 
ledgerDirsManager, null, checkpointSource, checkpointer,
                 NullStatsLogger.INSTANCE);
 
         Set<Long> ledgers = 
Sets.newTreeSet(ledgerStorage.getActiveLedgersInRange(0, Long.MAX_VALUE));
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
index 730490b..14bb9cf 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
@@ -56,6 +56,7 @@ import org.apache.bookkeeper.bookie.GarbageCollector;
 import org.apache.bookkeeper.bookie.LastAddConfirmedUpdateNotification;
 import org.apache.bookkeeper.bookie.LedgerDirsManager;
 import org.apache.bookkeeper.bookie.ScanAndCompareGarbageCollector;
+import org.apache.bookkeeper.bookie.StateManager;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.client.LedgerMetadata;
@@ -547,6 +548,7 @@ public class GcLedgersTest extends LedgerManagerTestCase {
             LedgerManager ledgerManager,
             LedgerDirsManager ledgerDirsManager,
             LedgerDirsManager indexDirsManager,
+            StateManager stateManager,
             CheckpointSource checkpointSource,
             Checkpointer checkpointer,
             StatsLogger statsLogger) throws IOException {
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
index 1ecd9f3..32be5fc 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
@@ -36,6 +36,7 @@ import org.apache.bookkeeper.bookie.EntryLocation;
 import org.apache.bookkeeper.bookie.EntryLogger;
 import org.apache.bookkeeper.bookie.LastAddConfirmedUpdateNotification;
 import org.apache.bookkeeper.bookie.LedgerDirsManager;
+import org.apache.bookkeeper.bookie.StateManager;
 import org.apache.bookkeeper.common.util.Watcher;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.stats.StatsLogger;
@@ -126,6 +127,7 @@ public abstract class LedgerManagerTestCase extends 
BookKeeperClusterTestCase {
             LedgerManager ledgerManager,
             LedgerDirsManager ledgerDirsManager,
             LedgerDirsManager indexDirsManager,
+            StateManager stateManager,
             CheckpointSource checkpointSource,
             Checkpointer checkpointer,
             StatsLogger statsLogger) throws IOException {
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java
index b9f46e4..c2e3f3e 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java
@@ -25,7 +25,6 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -69,6 +68,7 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 /**
  * Tests publishing of under replicated ledgers by the Auditor bookie node when
  * corresponding bookies identifes as not running.
@@ -295,7 +295,7 @@ public class AuditorLedgerCheckerTest extends 
BookKeeperClusterTestCase {
         ServerConfiguration bookieConf = bsConfs.get(2);
         BookieServer bk = bs.get(2);
         bookieConf.setReadOnlyModeEnabled(true);
-        bk.getBookie().doTransitionToReadOnlyMode();
+        bk.getBookie().getStateManager().doTransitionToReadOnlyMode();
 
         // grace period for publishing the bk-ledger
         LOG.debug("Waiting for Auditor to finish ledger check.");
@@ -321,7 +321,7 @@ public class AuditorLedgerCheckerTest extends 
BookKeeperClusterTestCase {
         ServerConfiguration bookieConf = bsConfs.get(bkIndex);
         BookieServer bk = bs.get(bkIndex);
         bookieConf.setReadOnlyModeEnabled(true);
-        bk.getBookie().doTransitionToReadOnlyMode();
+        bk.getBookie().getStateManager().doTransitionToReadOnlyMode();
 
         // grace period for publishing the bk-ledger
         LOG.debug("Waiting for Auditor to finish ledger check.");
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
index 300b6f0..5ed514f 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
@@ -326,7 +326,7 @@ public abstract class BookKeeperClusterTestCase {
     public void setBookieToReadOnly(BookieSocketAddress addr) throws 
InterruptedException, UnknownHostException {
         for (BookieServer server : bs) {
             if (server.getLocalAddress().equals(addr)) {
-                server.getBookie().doTransitionToReadOnlyMode();
+                
server.getBookie().getStateManager().doTransitionToReadOnlyMode();
                 break;
             }
         }
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java
index 8ee0343..a34bbcb 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java
@@ -24,11 +24,9 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-
 import java.io.File;
 import java.util.Enumeration;
 import java.util.concurrent.TimeUnit;
-
 import org.apache.bookkeeper.bookie.Bookie;
 import org.apache.bookkeeper.bookie.InterleavedLedgerStorage;
 import org.apache.bookkeeper.bookie.LedgerDirsManager;
@@ -260,7 +258,7 @@ public class ReadOnlyBookieTest extends 
BookKeeperClusterTestCase {
         killBookie(1);
         baseConf.setReadOnlyModeEnabled(true);
         startNewBookie();
-        bs.get(1).getBookie().doTransitionToReadOnlyMode();
+        bs.get(1).getBookie().getStateManager().doTransitionToReadOnlyMode();
         try {
             bkc.waitForReadOnlyBookie(Bookie.getBookieAddress(bsConfs.get(1)))
                 .get(30, TimeUnit.SECONDS);

-- 
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].

Reply via email to