This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 08d3cf8  ISSUE #1314: Provide a mechanism to allow high priority 
writes to readonly bookies
08d3cf8 is described below

commit 08d3cf8f014451dbc1485c8fc4985b2ada30a55c
Author: Sijie Guo <[email protected]>
AuthorDate: Thu Apr 5 00:02:59 2018 -0700

    ISSUE #1314: Provide a mechanism to allow high priority writes to readonly 
bookies
    
    Descriptions of the changes in this PR:
    
    *Problem*
    
    Currently we allow fence requests going through readonly bookies, since 
fence requests are special read requests. However a ledger can only be sealed 
after it is successfully recovered. If bookies are in readonly, those recovery 
writes won't go through.
    
    If there is a bookie outage happened (e.g. all bookies are readonly), all 
ledgers are not able to be sealed. It might be good to have a similar setting 
like minUsableSizeForIndexFileCreation for recovery writes.
    
    This can improve operability during outage.
    
    *Solution*
    
    - add a setting `minUsableSizeForHighPriorityWrites` to allow accepting 
high priority writes on readonly bookies.
    
    Master Issue: #1314
    
    Author: Sijie Guo <[email protected]>
    
    Reviewers: Enrico Olivelli <[email protected]>, Matteo Merli 
<[email protected]>
    
    This closes #1315 from sijie/allow_high_priority_writes, closes #1314
---
 .../java/org/apache/bookkeeper/bookie/Bookie.java  |  24 +-
 .../bookkeeper/bookie/BookieStateManager.java      |  12 +-
 .../org/apache/bookkeeper/bookie/BookieStatus.java |   3 +-
 .../org/apache/bookkeeper/bookie/EntryLogger.java  |  25 ---
 .../bookkeeper/bookie/IndexPersistenceMgr.java     |  41 ----
 .../bookie/InterleavedLedgerStorage.java           |  11 +-
 .../bookkeeper/bookie/LedgerDirsManager.java       |  20 +-
 .../bookkeeper/bookie/LedgerDirsMonitor.java       |  10 +-
 .../org/apache/bookkeeper/bookie/StateManager.java |  15 +-
 .../org/apache/bookkeeper/bookie/SyncThread.java   |   8 +-
 .../bookkeeper/conf/ServerConfiguration.java       |  26 ++-
 .../bookkeeper/proto/BookieRequestProcessor.java   |   7 +-
 .../bookkeeper/proto/PacketProcessorBaseV3.java    |   6 +-
 .../bookkeeper/proto/WriteEntryProcessor.java      |  23 +-
 .../bookkeeper/proto/WriteEntryProcessorV3.java    |  16 +-
 .../bookie/BookieStorageThresholdTest.java         |  18 --
 .../bookkeeper/bookie/TestLedgerDirsManager.java   |  49 +++--
 .../apache/bookkeeper/bookie/TestSyncThread.java   |  41 +---
 .../bookkeeper/proto/WriteEntryProcessorTest.java  | 224 +++++++++++++++++++
 .../proto/WriteEntryProcessorV3Test.java           | 245 +++++++++++++++++++++
 20 files changed, 624 insertions(+), 200 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
index 7322173..b391c53 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
@@ -886,24 +886,15 @@ public class Bookie extends BookieCriticalThread {
         return new LedgerDirsListener() {
 
             @Override
-            public void diskFull(File disk) {
-                // Nothing needs to be handled here.
-            }
-
-            @Override
-            public void diskAlmostFull(File disk) {
-                // Nothing needs to be handled here.
-            }
-
-            @Override
             public void diskFailed(File disk) {
                 // Shutdown the bookie on disk failure.
                 triggerBookieShutdown(ExitCode.BOOKIE_EXCEPTION);
             }
 
             @Override
-            public void allDisksFull() {
+            public void allDisksFull(boolean highPriorityWritesAllowed) {
                 // Transition to readOnly mode on all disks full
+                
stateManager.setHighPriorityWritesAvailability(highPriorityWritesAllowed);
                 stateManager.transitionToReadOnlyMode();
             }
 
@@ -916,12 +907,14 @@ public class Bookie extends BookieCriticalThread {
             @Override
             public void diskWritable(File disk) {
                 // Transition to writable mode when a disk becomes writable 
again.
+                stateManager.setHighPriorityWritesAvailability(true);
                 stateManager.transitionToWritableMode();
             }
 
             @Override
             public void diskJustWritable(File disk) {
                 // Transition to writable mode when a disk becomes writable 
again.
+                stateManager.setHighPriorityWritesAvailability(true);
                 stateManager.transitionToWritableMode();
             }
         };
@@ -962,6 +955,15 @@ public class Bookie extends BookieCriticalThread {
         return stateManager.isReadOnly();
     }
 
+    /**
+     * Check whether Bookie is available for high priority writes.
+     *
+     * @return true if the bookie is able to take high priority writes.
+     */
+    public boolean isAvailableForHighPriorityWrites() {
+        return stateManager.isAvailableForHighPriorityWrites();
+    }
+
     public boolean isRunning() {
         return stateManager.isRunning();
     }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieStateManager.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieStateManager.java
index 798db41..1689b11 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieStateManager.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieStateManager.java
@@ -61,6 +61,7 @@ public class BookieStateManager implements StateManager {
     private final BookieStatus bookieStatus = new BookieStatus();
     private final AtomicBoolean rmRegistered = new AtomicBoolean(false);
     private final AtomicBoolean forceReadOnly = new AtomicBoolean(false);
+    private volatile boolean availableForHighPriorityWrites = true;
 
     private final String bookieId;
     private ShutdownHandler shutdownHandler;
@@ -68,7 +69,6 @@ public class BookieStateManager implements StateManager {
     // Expose Stats
     private final StatsLogger statsLogger;
 
-
     public BookieStateManager(ServerConfiguration conf, StatsLogger 
statsLogger,
            MetadataBookieDriver metadataDriver, LedgerDirsManager 
ledgerDirsManager) throws IOException {
         this.conf = conf;
@@ -136,6 +136,16 @@ public class BookieStateManager implements StateManager {
     }
 
     @Override
+    public boolean isAvailableForHighPriorityWrites() {
+        return availableForHighPriorityWrites;
+    }
+
+    @Override
+    public void setHighPriorityWritesAvailability(boolean available) {
+        this.availableForHighPriorityWrites = available;
+    }
+
+    @Override
     public boolean isRunning(){
         return running;
     }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieStatus.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieStatus.java
index 7c702da..49aa66e 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieStatus.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieStatus.java
@@ -45,7 +45,7 @@ public class BookieStatus {
 
     enum BookieMode {
         READ_ONLY,
-        READ_WRITE;
+        READ_WRITE
     }
 
     private static final long INVALID_UPDATE_TIME = -1;
@@ -54,7 +54,6 @@ public class BookieStatus {
     private long lastUpdateTime;
     private volatile BookieMode bookieMode;
 
-
     BookieStatus() {
         this.bookieMode = BookieMode.READ_WRITE;
         this.layoutVersion = CURRENT_STATUS_LAYOUT_VERSION;
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
index 2055758..66efbaa 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
@@ -439,31 +439,6 @@ public class EntryLogger {
                     shouldCreateNewEntryLog.set(true);
                 }
             }
-
-            @Override
-            public void diskFailed(File disk) {
-                // Nothing to handle here. Will be handled in Bookie
-            }
-
-            @Override
-            public void allDisksFull() {
-                // Nothing to handle here. Will be handled in Bookie
-            }
-
-            @Override
-            public void fatalError() {
-                // Nothing to handle here. Will be handled in Bookie
-            }
-
-            @Override
-            public void diskWritable(File disk) {
-                // Nothing to handle here. Will be handled in Bookie
-            }
-
-            @Override
-            public void diskJustWritable(File disk) {
-                // Nothing to handle here. Will be handled in Bookie
-            }
         };
     }
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
index 05b5eb5..eb3b935 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
@@ -42,7 +42,6 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.bookie.FileInfoBackingCache.CachedFileInfo;
-import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
 import 
org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
 import org.apache.bookkeeper.common.util.Watcher;
 import org.apache.bookkeeper.conf.ServerConfiguration;
@@ -108,7 +107,6 @@ public class IndexPersistenceMgr {
         LOG.info("openFileLimit = {}", openFileLimit);
         // Retrieve all of the active ledgers.
         getActiveLedgers();
-        ledgerDirsManager.addLedgerDirsListener(getLedgerDirsListener());
 
         // build the file info cache
         int concurrencyLevel = Math.max(1, 
Math.max(conf.getNumAddWorkerThreads(), conf.getNumReadWorkerThreads()));
@@ -493,45 +491,6 @@ public class IndexPersistenceMgr {
         return openFileLimit;
     }
 
-    private LedgerDirsListener getLedgerDirsListener() {
-        return new LedgerDirsListener() {
-            @Override
-            public void diskFull(File disk) {
-                // Nothing to handle here. Will be handled in Bookie
-            }
-
-            @Override
-            public void diskAlmostFull(File disk) {
-                // Nothing to handle here. Will be handled in Bookie
-            }
-
-            @Override
-            public void diskFailed(File disk) {
-                // Nothing to handle here. Will be handled in Bookie
-            }
-
-            @Override
-            public void allDisksFull() {
-                // Nothing to handle here. Will be handled in Bookie
-            }
-
-            @Override
-            public void fatalError() {
-                // Nothing to handle here. Will be handled in Bookie
-            }
-
-            @Override
-            public void diskWritable(File disk) {
-                // Nothing to handle here. Will be handled in Bookie
-            }
-
-            @Override
-            public void diskJustWritable(File disk) {
-                // Nothing to handle here. Will be handled in Bookie
-            }
-        };
-    }
-
     private void relocateIndexFileAndFlushHeader(long ledger, FileInfo fi) 
throws IOException {
         File currentDir = getLedgerDirForLedger(fi);
         if (ledgerDirsManager.isDirFull(currentDir)) {
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
index ce1a499..2c576d7 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
@@ -113,10 +113,6 @@ public class InterleavedLedgerStorage implements 
CompactableLedgerStorage, Entry
 
     private LedgerDirsListener getLedgerDirsListener() {
         return new LedgerDirsListener() {
-            @Override
-            public void diskFailed(File disk) {
-                // do nothing.
-            }
 
             @Override
             public void diskAlmostFull(File disk) {
@@ -138,7 +134,7 @@ public class InterleavedLedgerStorage implements 
CompactableLedgerStorage, Entry
             }
 
             @Override
-            public void allDisksFull() {
+            public void allDisksFull(boolean highPriorityWritesAllowed) {
                 if (gcThread.isForceGCAllowWhenNoSpace) {
                     gcThread.enableForceGC();
                 } else {
@@ -148,11 +144,6 @@ public class InterleavedLedgerStorage implements 
CompactableLedgerStorage, Entry
             }
 
             @Override
-            public void fatalError() {
-                // do nothing.
-            }
-
-            @Override
             public void diskWritable(File disk) {
                 // we have enough space now
                 if (gcThread.isForceGCAllowWhenNoSpace) {
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsManager.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsManager.java
index 4bd28b2..5d2c11f 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsManager.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsManager.java
@@ -373,43 +373,49 @@ public class LedgerDirsManager {
          *
          * @param disk Failed disk
          */
-        void diskFailed(File disk);
+        default void diskFailed(File disk) {}
 
         /**
          * Notified when the disk usage warn threshold is exceeded on the 
drive.
          * @param disk
          */
-        void diskAlmostFull(File disk);
+        default void diskAlmostFull(File disk) {}
 
         /**
          * This will be notified on disk detected as full.
          *
          * @param disk Filled disk
          */
-        void diskFull(File disk);
+        default void diskFull(File disk) {}
 
         /**
          * This will be notified on disk detected as writable and under warn 
threshold.
          *
          * @param disk Writable disk
          */
-        void diskWritable(File disk);
+        default void diskWritable(File disk) {}
 
         /**
          * This will be notified on disk detected as writable but still in 
warn threshold.
          *
          * @param disk Writable disk
          */
-        void diskJustWritable(File disk);
+        default void diskJustWritable(File disk) {}
 
         /**
          * This will be notified whenever all disks are detected as full.
+         *
+         * <p>Normal writes will be rejected when disks are detected as 
"full". High priority writes
+         * such as ledger recovery writes can go through if disks are still 
available.
+         *
+         * @param highPriorityWritesAllowed the parameter indicates we are 
still have disk spaces for high priority
+         *                                  writes even disks are detected as 
"full"
          */
-        void allDisksFull();
+        default void allDisksFull(boolean highPriorityWritesAllowed) {}
 
         /**
          * This will notify the fatal errors.
          */
-        void fatalError();
+        default void fatalError() {}
     }
 }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsMonitor.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsMonitor.java
index afd8ad8..2904643 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsMonitor.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsMonitor.java
@@ -53,6 +53,7 @@ class LedgerDirsMonitor {
     private final ConcurrentMap<File, Float> diskUsages;
     private final DiskChecker diskChecker;
     private final LedgerDirsManager ldm;
+    private long minUsableSizeForHighPriorityWrites;
     private ScheduledExecutorService executor;
     private ScheduledFuture<?> checkTask;
 
@@ -60,6 +61,7 @@ class LedgerDirsMonitor {
                              final DiskChecker diskChecker,
                              final LedgerDirsManager ldm) {
         this.interval = conf.getDiskCheckInterval();
+        this.minUsableSizeForHighPriorityWrites = 
conf.getMinUsableSizeForHighPriorityWrites();
         this.conf = conf;
         this.diskChecker = diskChecker;
         this.diskUsages = ldm.getDiskUsages();
@@ -98,8 +100,14 @@ class LedgerDirsMonitor {
             // bookie cannot get writable dir but considered to be writable
             ldm.getWritableLedgerDirs();
         } catch (NoWritableLedgerDirException e) {
+            boolean highPriorityWritesAllowed = true;
+            try {
+                
ldm.getDirsAboveUsableThresholdSize(minUsableSizeForHighPriorityWrites);
+            } catch (NoWritableLedgerDirException e1) {
+                highPriorityWritesAllowed = false;
+            }
             for (LedgerDirsListener listener : ldm.getListeners()) {
-                listener.allDisksFull();
+                listener.allDisksFull(highPriorityWritesAllowed);
             }
         }
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/StateManager.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/StateManager.java
index ad4ac0c..538f3ac 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/StateManager.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/StateManager.java
@@ -25,13 +25,26 @@ import java.util.concurrent.Future;
  */
 public interface StateManager extends AutoCloseable {
 
-
     /**
      * Init state of Bookie when launch bookie.
      */
     void initState();
 
     /**
+     * Check if the bookie is available for high priority writes or not.
+     *
+     * @return true if the bookie is available for high priority writes; 
otherwise false.
+     */
+    boolean isAvailableForHighPriorityWrites();
+
+    /**
+     * Enable/Disable the availability for high priority writes.
+     *
+     * @param available the flag to enable/disable the availability for high 
priority writes.
+     */
+    void setHighPriorityWritesAvailability(boolean available);
+
+    /**
      * Check is ReadOnly.
      */
     boolean isReadOnly();
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java
index 466d46a..a7c3a7a 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java
@@ -122,7 +122,7 @@ class SyncThread implements Checkpointer {
             ledgerStorage.flush();
         } catch (NoWritableLedgerDirException e) {
             log.error("No writeable ledger directories", e);
-            dirsListener.allDisksFull();
+            dirsListener.allDisksFull(true);
             return;
         } catch (IOException e) {
             log.error("Exception flushing ledgers", e);
@@ -138,7 +138,7 @@ class SyncThread implements Checkpointer {
             checkpointSource.checkpointComplete(checkpoint, false);
         } catch (IOException e) {
             log.error("Exception marking checkpoint as complete", e);
-            dirsListener.allDisksFull();
+            dirsListener.allDisksFull(true);
         }
     }
 
@@ -153,7 +153,7 @@ class SyncThread implements Checkpointer {
             ledgerStorage.checkpoint(checkpoint);
         } catch (NoWritableLedgerDirException e) {
             log.error("No writeable ledger directories", e);
-            dirsListener.allDisksFull();
+            dirsListener.allDisksFull(true);
             return;
         } catch (IOException e) {
             log.error("Exception flushing ledgers", e);
@@ -164,7 +164,7 @@ class SyncThread implements Checkpointer {
             checkpointSource.checkpointComplete(checkpoint, true);
         } catch (IOException e) {
             log.error("Exception marking checkpoint as complete", e);
-            dirsListener.allDisksFull();
+            dirsListener.allDisksFull(true);
         }
     }
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index e9950fd..cb39821 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -162,6 +162,7 @@ public class ServerConfiguration extends 
AbstractConfiguration<ServerConfigurati
     protected static final String BOOKIE_AUTH_PROVIDER_FACTORY_CLASS = 
"bookieAuthProviderFactoryClass";
 
     protected static final String MIN_USABLESIZE_FOR_INDEXFILE_CREATION = 
"minUsableSizeForIndexFileCreation";
+    protected static final String MIN_USABLESIZE_FOR_HIGH_PRIORITY_WRITES = 
"minUsableSizeForHighPriorityWrites";
 
     protected static final String ALLOW_MULTIPLEDIRS_UNDER_SAME_DISKPARTITION =
         "allowMultipleDirsUnderSameDiskPartition";
@@ -2523,7 +2524,30 @@ public class ServerConfiguration extends 
AbstractConfiguration<ServerConfigurati
      * @return
      */
     public ServerConfiguration setMinUsableSizeForIndexFileCreation(long 
minUsableSizeForIndexFileCreation) {
-        this.setProperty(MIN_USABLESIZE_FOR_INDEXFILE_CREATION, 
Long.toString(minUsableSizeForIndexFileCreation));
+        this.setProperty(MIN_USABLESIZE_FOR_INDEXFILE_CREATION, 
minUsableSizeForIndexFileCreation);
+        return this;
+    }
+
+    /**
+     * Gets the minimum safe usable size to be available in ledger directory 
for Bookie to accept high priority writes.
+     *
+     * <p>If not set, it is two times of {@link #getEntryLogSizeLimit()}.
+     *
+     * @return the minimum safe usable size per ledger directory for bookie to 
accept high priority writes.
+     */
+    public long getMinUsableSizeForHighPriorityWrites() {
+        return this.getLong(MIN_USABLESIZE_FOR_HIGH_PRIORITY_WRITES, 2 * 
getEntryLogSizeLimit());
+    }
+
+    /**
+     * Sets the minimum safe usable size to be available in ledger directory 
for Bookie to accept high priority writes.
+     *
+     * @param minUsableSizeForHighPriorityWrites minimum safe usable size per 
ledger directory for Bookie to accept
+     *                                           high priority writes
+     * @return server configuration.
+     */
+    public ServerConfiguration setMinUsableSizeForHighPriorityWrites(long 
minUsableSizeForHighPriorityWrites) {
+        this.setProperty(MIN_USABLESIZE_FOR_HIGH_PRIORITY_WRITES, 
minUsableSizeForHighPriorityWrites);
         return this;
     }
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
index 4102e75..edb8924 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
@@ -76,6 +76,7 @@ import org.slf4j.LoggerFactory;
 /**
  * An implementation of the RequestProcessor interface.
  */
+@Getter(AccessLevel.PACKAGE)
 public class BookieRequestProcessor implements RequestProcessor {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(BookieRequestProcessor.class);
@@ -94,7 +95,6 @@ public class BookieRequestProcessor implements 
RequestProcessor {
     /**
      * The threadpool used to execute all read entry requests issued to this 
server.
      */
-    @Getter(AccessLevel.PACKAGE)
     private final OrderedExecutor readThreadPool;
 
     /**
@@ -111,7 +111,6 @@ public class BookieRequestProcessor implements 
RequestProcessor {
      * The threadpool used to execute all long poll requests issued to this 
server
      * after they are done waiting.
      */
-    @Getter(AccessLevel.PACKAGE)
     private final OrderedExecutor longPollThreadPool;
 
     /**
@@ -127,8 +126,8 @@ public class BookieRequestProcessor implements 
RequestProcessor {
     // Expose Stats
     private final BKStats bkStats = BKStats.getInstance();
     private final boolean statsEnabled;
-    final OpStatsLogger addRequestStats;
-    final OpStatsLogger addEntryStats;
+    private final OpStatsLogger addRequestStats;
+    private final OpStatsLogger addEntryStats;
     final OpStatsLogger readRequestStats;
     final OpStatsLogger readEntryStats;
     final OpStatsLogger fenceReadRequestStats;
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java
index e84bbee..9cd9fd5 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java
@@ -58,9 +58,11 @@ public abstract class PacketProcessorBaseV3 extends 
SafeRunnable {
             public void operationComplete(ChannelFuture future) throws 
Exception {
                 long writeElapsedNanos = MathUtils.elapsedNanos(writeNanos);
                 if (!future.isSuccess()) {
-                    
requestProcessor.channelWriteStats.registerFailedEvent(writeElapsedNanos, 
TimeUnit.NANOSECONDS);
+                    requestProcessor.getChannelWriteStats()
+                        .registerFailedEvent(writeElapsedNanos, 
TimeUnit.NANOSECONDS);
                 } else {
-                    
requestProcessor.channelWriteStats.registerSuccessfulEvent(writeElapsedNanos, 
TimeUnit.NANOSECONDS);
+                    requestProcessor.getChannelWriteStats()
+                        .registerSuccessfulEvent(writeElapsedNanos, 
TimeUnit.NANOSECONDS);
                 }
                 if (StatusCode.EOK == code) {
                     
statsLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(enqueueNanos), 
TimeUnit.NANOSECONDS);
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
index eaf2473..f5af75a 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
@@ -17,6 +17,7 @@
  */
 package org.apache.bookkeeper.proto;
 
+import com.google.common.annotations.VisibleForTesting;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.Channel;
 import io.netty.util.Recycler;
@@ -56,12 +57,13 @@ class WriteEntryProcessor extends 
PacketProcessorBase<ParsedAddRequest> implemen
 
     @Override
     protected void processPacket() {
-        if (requestProcessor.bookie.isReadOnly()) {
+        if (requestProcessor.getBookie().isReadOnly()
+            && !(request.isHighPriority() && 
requestProcessor.getBookie().isAvailableForHighPriorityWrites())) {
             LOG.warn("BookieServer is running in readonly mode,"
                     + " so rejecting the request from the client!");
             sendResponse(BookieProtocol.EREADONLY,
                          
ResponseBuilder.buildErrorResponse(BookieProtocol.EREADONLY, request),
-                         requestProcessor.addRequestStats);
+                         requestProcessor.getAddRequestStats());
             request.release();
             request.recycle();
             return;
@@ -72,9 +74,9 @@ class WriteEntryProcessor extends 
PacketProcessorBase<ParsedAddRequest> implemen
         ByteBuf addData = request.getData();
         try {
             if (request.isRecoveryAdd()) {
-                requestProcessor.bookie.recoveryAddEntry(addData, this, 
channel, request.getMasterKey());
+                requestProcessor.getBookie().recoveryAddEntry(addData, this, 
channel, request.getMasterKey());
             } else {
-                requestProcessor.bookie.addEntry(addData, false, this, 
channel, request.getMasterKey());
+                requestProcessor.getBookie().addEntry(addData, false, this, 
channel, request.getMasterKey());
             }
         } catch (OperationRejectedException e) {
             // Avoid to log each occurence of this exception as this can 
happen when the ledger storage is
@@ -102,11 +104,11 @@ class WriteEntryProcessor extends 
PacketProcessorBase<ParsedAddRequest> implemen
         }
 
         if (rc != BookieProtocol.EOK) {
-            
requestProcessor.addEntryStats.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos),
+            
requestProcessor.getAddEntryStats().registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos),
                     TimeUnit.NANOSECONDS);
             sendResponse(rc,
                          ResponseBuilder.buildErrorResponse(rc, request),
-                         requestProcessor.addRequestStats);
+                         requestProcessor.getAddRequestStats());
             request.recycle();
         }
     }
@@ -115,15 +117,15 @@ class WriteEntryProcessor extends 
PacketProcessorBase<ParsedAddRequest> implemen
     public void writeComplete(int rc, long ledgerId, long entryId,
                               BookieSocketAddress addr, Object ctx) {
         if (BookieProtocol.EOK == rc) {
-            
requestProcessor.addEntryStats.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos),
+            
requestProcessor.getAddEntryStats().registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos),
                     TimeUnit.NANOSECONDS);
         } else {
-            
requestProcessor.addEntryStats.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos),
+            
requestProcessor.getAddEntryStats().registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos),
                     TimeUnit.NANOSECONDS);
         }
         sendResponse(rc,
                      ResponseBuilder.buildAddResponse(request),
-                     requestProcessor.addRequestStats);
+                     requestProcessor.getAddRequestStats());
         request.recycle();
         recycle();
     }
@@ -134,7 +136,8 @@ class WriteEntryProcessor extends 
PacketProcessorBase<ParsedAddRequest> implemen
                              request.getLedgerId(), request.getEntryId());
     }
 
-    private void recycle() {
+    @VisibleForTesting
+    void recycle() {
         reset();
         recyclerHandle.recycle(this);
     }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
index c0e097a..578d666 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
@@ -65,7 +65,9 @@ class WriteEntryProcessorV3 extends PacketProcessorBaseV3 {
             return addResponse.build();
         }
 
-        if (requestProcessor.bookie.isReadOnly()) {
+        if (requestProcessor.getBookie().isReadOnly()
+            && !(RequestUtils.isHighPriority(request)
+                    && 
requestProcessor.getBookie().isAvailableForHighPriorityWrites())) {
             logger.warn("BookieServer is running as readonly mode, so 
rejecting the request from the client!");
             addResponse.setStatus(StatusCode.EREADONLY);
             return addResponse.build();
@@ -76,10 +78,10 @@ class WriteEntryProcessorV3 extends PacketProcessorBaseV3 {
             public void writeComplete(int rc, long ledgerId, long entryId,
                                       BookieSocketAddress addr, Object ctx) {
                 if (BookieProtocol.EOK == rc) {
-                    
requestProcessor.addEntryStats.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos),
+                    
requestProcessor.getAddEntryStats().registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos),
                             TimeUnit.NANOSECONDS);
                 } else {
-                    
requestProcessor.addEntryStats.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos),
+                    
requestProcessor.getAddEntryStats().registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos),
                             TimeUnit.NANOSECONDS);
                 }
 
@@ -101,7 +103,7 @@ class WriteEntryProcessorV3 extends PacketProcessorBaseV3 {
                         .setStatus(addResponse.getStatus())
                         .setAddResponse(addResponse);
                 Response resp = response.build();
-                sendResponse(status, resp, requestProcessor.addRequestStats);
+                sendResponse(status, resp, 
requestProcessor.getAddRequestStats());
             }
         };
         final EnumSet<WriteFlag> writeFlags;
@@ -116,9 +118,9 @@ class WriteEntryProcessorV3 extends PacketProcessorBaseV3 {
         ByteBuf entryToAdd = 
Unpooled.wrappedBuffer(addRequest.getBody().asReadOnlyByteBuffer());
         try {
             if (RequestUtils.hasFlag(addRequest, 
AddRequest.Flag.RECOVERY_ADD)) {
-                requestProcessor.bookie.recoveryAddEntry(entryToAdd, wcb, 
channel, masterKey);
+                requestProcessor.getBookie().recoveryAddEntry(entryToAdd, wcb, 
channel, masterKey);
             } else {
-                requestProcessor.bookie.addEntry(entryToAdd, ackBeforeSync, 
wcb, channel, masterKey);
+                requestProcessor.getBookie().addEntry(entryToAdd, 
ackBeforeSync, wcb, channel, masterKey);
             }
             status = StatusCode.EOK;
         } catch (OperationRejectedException e) {
@@ -167,7 +169,7 @@ class WriteEntryProcessorV3 extends PacketProcessorBaseV3 {
                     .setAddResponse(addResponse);
             Response resp = response.build();
             sendResponse(addResponse.getStatus(), resp,
-                         requestProcessor.addRequestStats);
+                         requestProcessor.getAddRequestStats());
         }
     }
 
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieStorageThresholdTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieStorageThresholdTest.java
index ad7ba1e..ce084b3 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieStorageThresholdTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieStorageThresholdTest.java
@@ -167,9 +167,6 @@ public class BookieStorageThresholdTest extends 
BookKeeperClusterTestCase {
         final CountDownLatch diskWritable = new CountDownLatch(1);
         final CountDownLatch diskFull = new CountDownLatch(1);
         ledgerDirsManager.addLedgerDirsListener(new LedgerDirsListener() {
-            @Override
-            public void fatalError() {
-            }
 
             @Override
             public void diskWritable(File disk) {
@@ -177,25 +174,10 @@ public class BookieStorageThresholdTest extends 
BookKeeperClusterTestCase {
             }
 
             @Override
-            public void diskJustWritable(File disk) {
-            }
-
-            @Override
             public void diskFull(File disk) {
                 diskFull.countDown();
             }
 
-            @Override
-            public void diskFailed(File disk) {
-            }
-
-            @Override
-            public void diskAlmostFull(File disk) {
-            }
-
-            @Override
-            public void allDisksFull() {
-            }
         });
 
         // Dependency Injected class
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestLedgerDirsManager.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestLedgerDirsManager.java
index 4efc69f..1731b9b 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestLedgerDirsManager.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestLedgerDirsManager.java
@@ -181,20 +181,42 @@ public class TestLedgerDirsManager {
 
     @Test
     public void testLedgerDirsMonitorDuringTransition() throws Exception {
+        testLedgerDirsMonitorDuringTransition(true);
+    }
+
+    @Test
+    public void testHighPriorityWritesDisallowedDuringTransition() throws 
Exception {
+        testLedgerDirsMonitorDuringTransition(false);
+    }
+
+    private void testLedgerDirsMonitorDuringTransition(boolean 
highPriorityWritesAllowed) throws Exception {
+        if (!highPriorityWritesAllowed) {
+            ledgerMonitor.shutdown();
+            conf.setMinUsableSizeForHighPriorityWrites(curDir.getUsableSpace() 
+ 1024);
+            dirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
+                new DiskChecker(conf.getDiskUsageThreshold(), 
conf.getDiskUsageWarnThreshold()), statsLogger);
+            ledgerMonitor = new LedgerDirsMonitor(conf, mockDiskChecker, 
dirsManager);
+            ledgerMonitor.init();
+        }
+
         MockLedgerDirsListener mockLedgerDirsListener = new 
MockLedgerDirsListener();
         dirsManager.addLedgerDirsListener(mockLedgerDirsListener);
         ledgerMonitor.start();
 
         assertFalse(mockLedgerDirsListener.readOnly);
-        mockDiskChecker.setUsage(threshold + 0.05f);
+        assertTrue(mockLedgerDirsListener.highPriorityWritesAllowed);
 
+        mockDiskChecker.setUsage(threshold + 0.05f);
         executorController.advance(Duration.ofMillis(diskCheckInterval));
+
         assertTrue(mockLedgerDirsListener.readOnly);
+        assertEquals(highPriorityWritesAllowed, 
mockLedgerDirsListener.highPriorityWritesAllowed);
 
         mockDiskChecker.setUsage(threshold - 0.05f);
         executorController.advance(Duration.ofMillis(diskCheckInterval));
 
         assertFalse(mockLedgerDirsListener.readOnly);
+        assertTrue(mockLedgerDirsListener.highPriorityWritesAllowed);
     }
 
     @Test
@@ -427,6 +449,7 @@ public class TestLedgerDirsManager {
 
     private class MockLedgerDirsListener implements LedgerDirsListener {
 
+        public volatile boolean highPriorityWritesAllowed;
         public volatile boolean readOnly;
 
         public MockLedgerDirsListener() {
@@ -434,38 +457,26 @@ public class TestLedgerDirsManager {
         }
 
         @Override
-        public void diskFailed(File disk) {
-        }
-
-        @Override
-        public void diskAlmostFull(File disk) {
-        }
-
-        @Override
-        public void diskFull(File disk) {
-        }
-
-        @Override
         public void diskWritable(File disk) {
             readOnly = false;
+            highPriorityWritesAllowed = true;
         }
 
         @Override
         public void diskJustWritable(File disk) {
             readOnly = false;
+            highPriorityWritesAllowed = true;
         }
 
         @Override
-        public void allDisksFull() {
-            readOnly = true;
-        }
-
-        @Override
-        public void fatalError() {
+        public void allDisksFull(boolean highPriorityWritesAllowed) {
+            this.readOnly = true;
+            this.highPriorityWritesAllowed = highPriorityWritesAllowed;
         }
 
         public void reset() {
             readOnly = false;
+            highPriorityWritesAllowed = true;
         }
 
     }
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
index e6cb93d..d9fa8cc 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
@@ -26,7 +26,6 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import io.netty.buffer.ByteBuf;
-import java.io.File;
 import java.io.IOException;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
@@ -81,7 +80,7 @@ public class TestSyncThread {
         ServerConfiguration conf = 
TestBKConfiguration.newServerConfiguration();
         conf.setFlushInterval(flushInterval);
         CheckpointSource checkpointSource = new DummyCheckpointSource();
-        LedgerDirsListener listener = new DummyLedgerDirsListener();
+        LedgerDirsListener listener = new LedgerDirsListener() {};
 
         final CountDownLatch checkpointCalledLatch = new CountDownLatch(1);
         final CountDownLatch checkpointLatch = new CountDownLatch(1);
@@ -154,7 +153,7 @@ public class TestSyncThread {
         ServerConfiguration conf = 
TestBKConfiguration.newServerConfiguration();
         conf.setFlushInterval(flushInterval);
         CheckpointSource checkpointSource = new DummyCheckpointSource();
-        LedgerDirsListener listener = new DummyLedgerDirsListener();
+        LedgerDirsListener listener = new LedgerDirsListener() {};
 
         final AtomicInteger checkpointCount = new AtomicInteger(0);
         LedgerStorage storage = new DummyLedgerStorage() {
@@ -200,7 +199,7 @@ public class TestSyncThread {
         conf.setFlushInterval(flushInterval);
         CheckpointSource checkpointSource = new DummyCheckpointSource();
         final CountDownLatch fatalLatch = new CountDownLatch(1);
-        LedgerDirsListener listener = new DummyLedgerDirsListener() {
+        LedgerDirsListener listener = new LedgerDirsListener() {
                 @Override
                 public void fatalError() {
                     fatalLatch.countDown();
@@ -232,9 +231,9 @@ public class TestSyncThread {
         conf.setFlushInterval(flushInterval);
         CheckpointSource checkpointSource = new DummyCheckpointSource();
         final CountDownLatch diskFullLatch = new CountDownLatch(1);
-        LedgerDirsListener listener = new DummyLedgerDirsListener() {
+        LedgerDirsListener listener = new LedgerDirsListener() {
                 @Override
-                public void allDisksFull() {
+                public void allDisksFull(boolean highPriorityWritesAllowed) {
                     diskFullLatch.countDown();
                 }
             };
@@ -363,34 +362,4 @@ public class TestSyncThread {
         }
     }
 
-    private static class DummyLedgerDirsListener
-        implements LedgerDirsManager.LedgerDirsListener {
-        @Override
-        public void diskFailed(File disk) {
-        }
-
-        @Override
-        public void diskAlmostFull(File disk) {
-        }
-
-        @Override
-        public void diskFull(File disk) {
-        }
-
-        @Override
-        public void allDisksFull() {
-        }
-
-        @Override
-        public void fatalError() {
-        }
-
-        @Override
-        public void diskWritable(File disk) {
-        }
-
-        @Override
-        public void diskJustWritable(File disk) {
-        }
-    }
 }
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java
new file mode 100644
index 0000000..5901c2f
--- /dev/null
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.proto;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.same;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelPromise;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.proto.BookieProtocol.ParsedAddRequest;
+import org.apache.bookkeeper.proto.BookieProtocol.Response;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Unit test {@link WriteEntryProcessor}.
+ */
+public class WriteEntryProcessorTest {
+
+    private ParsedAddRequest request;
+    private WriteEntryProcessor processor;
+    private Channel channel;
+    private BookieRequestProcessor requestProcessor;
+    private Bookie bookie;
+
+    @Before
+    public void setup() {
+        request = ParsedAddRequest.create(
+            BookieProtocol.CURRENT_PROTOCOL_VERSION,
+            System.currentTimeMillis(),
+            System.currentTimeMillis() + 1,
+            (short) 0,
+            new byte[0],
+            Unpooled.wrappedBuffer("test-entry-data".getBytes(UTF_8)));
+        channel = mock(Channel.class);
+        bookie = mock(Bookie.class);
+        requestProcessor = mock(BookieRequestProcessor.class);
+        when(requestProcessor.getBookie()).thenReturn(bookie);
+        when(requestProcessor.getAddEntryStats())
+            
.thenReturn(NullStatsLogger.INSTANCE.getOpStatsLogger("add_entry"));
+        when(requestProcessor.getAddRequestStats())
+            
.thenReturn(NullStatsLogger.INSTANCE.getOpStatsLogger("add_requests"));
+        processor = WriteEntryProcessor.create(
+            request,
+            channel,
+            requestProcessor);
+    }
+
+    private void reinitRequest(short flags) {
+        request.release();
+        request.recycle();
+        processor.recycle();
+
+        request = ParsedAddRequest.create(
+            BookieProtocol.CURRENT_PROTOCOL_VERSION,
+            System.currentTimeMillis(),
+            System.currentTimeMillis() + 1,
+            flags,
+            new byte[0],
+            Unpooled.wrappedBuffer("test-entry-data".getBytes(UTF_8)));
+        processor = WriteEntryProcessor.create(
+            request,
+            channel,
+            requestProcessor);
+    }
+
+    @Test
+    public void testNoneHighPriorityWritesOnReadOnlyBookie() throws Exception {
+        when(bookie.isReadOnly()).thenReturn(true);
+        when(channel.voidPromise()).thenReturn(mock(ChannelPromise.class));
+
+        AtomicReference<Object> writtenObject = new AtomicReference<>();
+        CountDownLatch latch = new CountDownLatch(1);
+        doAnswer(invocationOnMock -> {
+            writtenObject.set(invocationOnMock.getArgument(0));
+            latch.countDown();
+            return null;
+        }).when(channel).writeAndFlush(any(), any(ChannelPromise.class));
+
+        processor.run();
+
+        verify(channel, times(1)).writeAndFlush(any(), 
any(ChannelPromise.class));
+
+        latch.await();
+
+        assertTrue(writtenObject.get() instanceof Response);
+        Response response = (Response) writtenObject.get();
+        assertEquals(BookieProtocol.EREADONLY, response.getErrorCode());
+
+        response.release();
+        response.recycle();
+    }
+
+    @Test
+    public void 
testHighPriorityWritesOnReadOnlyBookieWhenHighPriorityWritesDisallowed() throws 
Exception {
+        reinitRequest(BookieProtocol.FLAG_HIGH_PRIORITY);
+
+        when(bookie.isReadOnly()).thenReturn(true);
+        when(bookie.isAvailableForHighPriorityWrites()).thenReturn(false);
+        when(channel.voidPromise()).thenReturn(mock(ChannelPromise.class));
+
+        AtomicReference<Object> writtenObject = new AtomicReference<>();
+        CountDownLatch latch = new CountDownLatch(1);
+        doAnswer(invocationOnMock -> {
+            writtenObject.set(invocationOnMock.getArgument(0));
+            latch.countDown();
+            return null;
+        }).when(channel).writeAndFlush(any(), any(ChannelPromise.class));
+
+        processor.run();
+
+        verify(channel, times(1)).writeAndFlush(any(), 
any(ChannelPromise.class));
+
+        latch.await();
+
+        assertTrue(writtenObject.get() instanceof Response);
+        Response response = (Response) writtenObject.get();
+        assertEquals(BookieProtocol.EREADONLY, response.getErrorCode());
+
+        response.release();
+        response.recycle();
+    }
+
+    @Test
+    public void 
testHighPriorityWritesOnReadOnlyBookieWhenHighPriorityWritesAllowed() throws 
Exception {
+        reinitRequest(BookieProtocol.FLAG_HIGH_PRIORITY);
+
+        when(bookie.isReadOnly()).thenReturn(true);
+        when(bookie.isAvailableForHighPriorityWrites()).thenReturn(true);
+        when(channel.voidPromise()).thenReturn(mock(ChannelPromise.class));
+        doAnswer(invocationOnMock -> {
+            processor.writeComplete(0, request.ledgerId, request.entryId, 
null, null);
+            return null;
+        }).when(bookie).addEntry(any(ByteBuf.class), eq(false), 
same(processor), same(channel), eq(new byte[0]));
+
+        AtomicReference<Object> writtenObject = new AtomicReference<>();
+        CountDownLatch latch = new CountDownLatch(1);
+        doAnswer(invocationOnMock -> {
+            writtenObject.set(invocationOnMock.getArgument(0));
+            latch.countDown();
+            return null;
+        }).when(channel).writeAndFlush(any(), any(ChannelPromise.class));
+
+        processor.run();
+
+        verify(bookie, times(1))
+            .addEntry(any(ByteBuf.class), eq(false), same(processor), 
same(channel), eq(new byte[0]));
+        verify(channel, times(1)).writeAndFlush(any(), 
any(ChannelPromise.class));
+
+        latch.await();
+
+        assertTrue(writtenObject.get() instanceof Response);
+        Response response = (Response) writtenObject.get();
+        assertEquals(BookieProtocol.EOK, response.getErrorCode());
+
+        response.release();
+        response.recycle();
+    }
+
+    @Test
+    public void testNormalWritesOnWritableBookie() throws Exception {
+        when(bookie.isReadOnly()).thenReturn(false);
+        when(channel.voidPromise()).thenReturn(mock(ChannelPromise.class));
+        doAnswer(invocationOnMock -> {
+            processor.writeComplete(0, request.ledgerId, request.entryId, 
null, null);
+            return null;
+        }).when(bookie).addEntry(any(ByteBuf.class), eq(false), 
same(processor), same(channel), eq(new byte[0]));
+
+        AtomicReference<Object> writtenObject = new AtomicReference<>();
+        CountDownLatch latch = new CountDownLatch(1);
+        doAnswer(invocationOnMock -> {
+            writtenObject.set(invocationOnMock.getArgument(0));
+            latch.countDown();
+            return null;
+        }).when(channel).writeAndFlush(any(), any(ChannelPromise.class));
+
+        processor.run();
+
+        verify(bookie, times(1))
+            .addEntry(any(ByteBuf.class), eq(false), same(processor), 
same(channel), eq(new byte[0]));
+        verify(channel, times(1)).writeAndFlush(any(), 
any(ChannelPromise.class));
+
+        latch.await();
+
+        assertTrue(writtenObject.get() instanceof Response);
+        Response response = (Response) writtenObject.get();
+        assertEquals(BookieProtocol.EOK, response.getErrorCode());
+
+        response.release();
+        response.recycle();
+    }
+
+}
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java
new file mode 100644
index 0000000..8f54ddb
--- /dev/null
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.proto;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.same;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.protobuf.ByteString;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.DefaultChannelPromise;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.AddRequest;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeader;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.OperationType;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.ProtocolVersion;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.Request;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.Response;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Unit test {@link WriteEntryProcessor}.
+ */
+public class WriteEntryProcessorV3Test {
+
+    private Request request;
+    private WriteEntryProcessorV3 processor;
+    private Channel channel;
+    private BookieRequestProcessor requestProcessor;
+    private Bookie bookie;
+
+    @Before
+    public void setup() {
+        request = Request.newBuilder()
+            .setHeader(BKPacketHeader.newBuilder()
+                .setTxnId(System.currentTimeMillis())
+                .setVersion(ProtocolVersion.VERSION_THREE)
+                .setOperation(OperationType.ADD_ENTRY)
+                .build())
+            .setAddRequest(AddRequest.newBuilder()
+                .setLedgerId(System.currentTimeMillis())
+                .setEntryId(System.currentTimeMillis() + 1)
+                .setBody(ByteString.copyFromUtf8("test-entry-data"))
+                .setMasterKey(ByteString.copyFrom(new byte[0]))
+                .build())
+            .build();
+        channel = mock(Channel.class);
+        bookie = mock(Bookie.class);
+        requestProcessor = mock(BookieRequestProcessor.class);
+        when(requestProcessor.getBookie()).thenReturn(bookie);
+        when(requestProcessor.getAddEntryStats())
+            
.thenReturn(NullStatsLogger.INSTANCE.getOpStatsLogger("add_entry"));
+        when(requestProcessor.getAddRequestStats())
+            
.thenReturn(NullStatsLogger.INSTANCE.getOpStatsLogger("add_requests"));
+        processor = new WriteEntryProcessorV3(
+            request,
+            channel,
+            requestProcessor);
+    }
+
+    private void reinitRequest(int priority) {
+        request = Request.newBuilder(request)
+            .setHeader(BKPacketHeader.newBuilder(request.getHeader())
+                .setPriority(priority)
+                .build())
+            .build();
+
+        processor = new WriteEntryProcessorV3(
+            request,
+            channel,
+            requestProcessor);
+    }
+
+    @Test
+    public void testNoneHighPriorityWritesOnReadOnlyBookie() throws Exception {
+        when(bookie.isReadOnly()).thenReturn(true);
+        when(channel.voidPromise()).thenReturn(mock(ChannelPromise.class));
+
+        ChannelPromise promise = new DefaultChannelPromise(channel);
+        AtomicReference<Object> writtenObject = new AtomicReference<>();
+        CountDownLatch latch = new CountDownLatch(1);
+        doAnswer(invocationOnMock -> {
+            writtenObject.set(invocationOnMock.getArgument(0));
+            latch.countDown();
+            return promise;
+        }).when(channel).writeAndFlush(any());
+
+        processor.run();
+
+        verify(channel, times(1)).writeAndFlush(any(Response.class));
+
+        latch.await();
+
+        assertTrue(writtenObject.get() instanceof Response);
+        Response response = (Response) writtenObject.get();
+        assertEquals(StatusCode.EREADONLY, response.getStatus());
+    }
+
+    @Test
+    public void 
testHighPriorityWritesOnReadOnlyBookieWhenHighPriorityWritesDisallowed() throws 
Exception {
+        reinitRequest(100);
+
+        when(bookie.isReadOnly()).thenReturn(true);
+        when(bookie.isAvailableForHighPriorityWrites()).thenReturn(false);
+        when(channel.voidPromise()).thenReturn(mock(ChannelPromise.class));
+
+        ChannelPromise promise = new DefaultChannelPromise(channel);
+        AtomicReference<Object> writtenObject = new AtomicReference<>();
+        CountDownLatch latch = new CountDownLatch(1);
+        doAnswer(invocationOnMock -> {
+            writtenObject.set(invocationOnMock.getArgument(0));
+            latch.countDown();
+            return promise;
+        }).when(channel).writeAndFlush(any());
+
+        processor.run();
+
+        verify(channel, times(1)).writeAndFlush(any(Response.class));
+
+        latch.await();
+
+        assertTrue(writtenObject.get() instanceof Response);
+        Response response = (Response) writtenObject.get();
+        assertEquals(StatusCode.EREADONLY, response.getStatus());
+    }
+
+    @Test
+    public void 
testHighPriorityWritesOnReadOnlyBookieWhenHighPriorityWritesAllowed() throws 
Exception {
+        reinitRequest(BookieProtocol.FLAG_HIGH_PRIORITY);
+
+        when(bookie.isReadOnly()).thenReturn(true);
+        when(bookie.isAvailableForHighPriorityWrites()).thenReturn(true);
+        when(channel.voidPromise()).thenReturn(mock(ChannelPromise.class));
+
+        doAnswer(invocationOnMock -> {
+            WriteCallback wc = invocationOnMock.getArgument(2);
+
+            wc.writeComplete(
+                0,
+                request.getAddRequest().getLedgerId(),
+                request.getAddRequest().getEntryId(),
+                null,
+                null);
+            return null;
+        }).when(bookie).addEntry(
+            any(ByteBuf.class),
+            eq(false),
+            any(WriteCallback.class),
+            same(channel),
+            eq(new byte[0]));
+
+        ChannelPromise promise = new DefaultChannelPromise(channel);
+        AtomicReference<Object> writtenObject = new AtomicReference<>();
+        CountDownLatch latch = new CountDownLatch(1);
+        doAnswer(invocationOnMock -> {
+            writtenObject.set(invocationOnMock.getArgument(0));
+            latch.countDown();
+            return promise;
+        }).when(channel).writeAndFlush(any());
+
+        processor.run();
+
+        verify(bookie, times(1))
+            .addEntry(any(ByteBuf.class), eq(false), any(WriteCallback.class), 
same(channel), eq(new byte[0]));
+        verify(channel, times(1)).writeAndFlush(any(Response.class));
+
+        latch.await();
+
+        assertTrue(writtenObject.get() instanceof Response);
+        Response response = (Response) writtenObject.get();
+        assertEquals(StatusCode.EOK, response.getStatus());
+    }
+
+    @Test
+    public void testNormalWritesOnWritableBookie() throws Exception {
+        when(bookie.isReadOnly()).thenReturn(false);
+        when(channel.voidPromise()).thenReturn(mock(ChannelPromise.class));
+        
when(channel.writeAndFlush(any())).thenReturn(mock(ChannelPromise.class));
+        doAnswer(invocationOnMock -> {
+            WriteCallback wc = invocationOnMock.getArgument(2);
+
+            wc.writeComplete(
+                0,
+                request.getAddRequest().getLedgerId(),
+                request.getAddRequest().getEntryId(),
+                null,
+                null);
+            return null;
+        }).when(bookie).addEntry(
+            any(ByteBuf.class), eq(false), any(WriteCallback.class), 
same(channel), eq(new byte[0]));
+
+        ChannelPromise promise = new DefaultChannelPromise(channel);
+        AtomicReference<Object> writtenObject = new AtomicReference<>();
+        CountDownLatch latch = new CountDownLatch(1);
+        doAnswer(invocationOnMock -> {
+            writtenObject.set(invocationOnMock.getArgument(0));
+            latch.countDown();
+            return promise;
+        }).when(channel).writeAndFlush(any());
+
+        processor.run();
+
+        verify(bookie, times(1))
+            .addEntry(any(ByteBuf.class), eq(false), any(WriteCallback.class), 
same(channel), eq(new byte[0]));
+        verify(channel, times(1)).writeAndFlush(any(Response.class));
+
+        latch.await();
+
+        assertTrue(writtenObject.get() instanceof Response);
+        Response response = (Response) writtenObject.get();
+        assertEquals(StatusCode.EOK, response.getStatus());
+    }
+
+}

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to