This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 08d3cf8 ISSUE #1314: Provide a mechanism to allow high priority
writes to readonly bookies
08d3cf8 is described below
commit 08d3cf8f014451dbc1485c8fc4985b2ada30a55c
Author: Sijie Guo <[email protected]>
AuthorDate: Thu Apr 5 00:02:59 2018 -0700
ISSUE #1314: Provide a mechanism to allow high priority writes to readonly
bookies
Descriptions of the changes in this PR:
*Problem*
Currently we allow fence requests going through readonly bookies, since
fence requests are special read requests. However a ledger can only be sealed
after it is successfully recovered. If bookies are in readonly, those recovery
writes won't go through.
If there is a bookie outage happened (e.g. all bookies are readonly), all
ledgers are not able to be sealed. It might be good to have a similar setting
like minUsableSizeForIndexFileCreation for recovery writes.
This can improve operability during outage.
*Solution*
- add a setting `minUsableSizeForHighPriorityWrites` to allow accepting
high priority writes on readonly bookies.
Master Issue: #1314
Author: Sijie Guo <[email protected]>
Reviewers: Enrico Olivelli <[email protected]>, Matteo Merli
<[email protected]>
This closes #1315 from sijie/allow_high_priority_writes, closes #1314
---
.../java/org/apache/bookkeeper/bookie/Bookie.java | 24 +-
.../bookkeeper/bookie/BookieStateManager.java | 12 +-
.../org/apache/bookkeeper/bookie/BookieStatus.java | 3 +-
.../org/apache/bookkeeper/bookie/EntryLogger.java | 25 ---
.../bookkeeper/bookie/IndexPersistenceMgr.java | 41 ----
.../bookie/InterleavedLedgerStorage.java | 11 +-
.../bookkeeper/bookie/LedgerDirsManager.java | 20 +-
.../bookkeeper/bookie/LedgerDirsMonitor.java | 10 +-
.../org/apache/bookkeeper/bookie/StateManager.java | 15 +-
.../org/apache/bookkeeper/bookie/SyncThread.java | 8 +-
.../bookkeeper/conf/ServerConfiguration.java | 26 ++-
.../bookkeeper/proto/BookieRequestProcessor.java | 7 +-
.../bookkeeper/proto/PacketProcessorBaseV3.java | 6 +-
.../bookkeeper/proto/WriteEntryProcessor.java | 23 +-
.../bookkeeper/proto/WriteEntryProcessorV3.java | 16 +-
.../bookie/BookieStorageThresholdTest.java | 18 --
.../bookkeeper/bookie/TestLedgerDirsManager.java | 49 +++--
.../apache/bookkeeper/bookie/TestSyncThread.java | 41 +---
.../bookkeeper/proto/WriteEntryProcessorTest.java | 224 +++++++++++++++++++
.../proto/WriteEntryProcessorV3Test.java | 245 +++++++++++++++++++++
20 files changed, 624 insertions(+), 200 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
index 7322173..b391c53 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
@@ -886,24 +886,15 @@ public class Bookie extends BookieCriticalThread {
return new LedgerDirsListener() {
@Override
- public void diskFull(File disk) {
- // Nothing needs to be handled here.
- }
-
- @Override
- public void diskAlmostFull(File disk) {
- // Nothing needs to be handled here.
- }
-
- @Override
public void diskFailed(File disk) {
// Shutdown the bookie on disk failure.
triggerBookieShutdown(ExitCode.BOOKIE_EXCEPTION);
}
@Override
- public void allDisksFull() {
+ public void allDisksFull(boolean highPriorityWritesAllowed) {
// Transition to readOnly mode on all disks full
+
stateManager.setHighPriorityWritesAvailability(highPriorityWritesAllowed);
stateManager.transitionToReadOnlyMode();
}
@@ -916,12 +907,14 @@ public class Bookie extends BookieCriticalThread {
@Override
public void diskWritable(File disk) {
// Transition to writable mode when a disk becomes writable
again.
+ stateManager.setHighPriorityWritesAvailability(true);
stateManager.transitionToWritableMode();
}
@Override
public void diskJustWritable(File disk) {
// Transition to writable mode when a disk becomes writable
again.
+ stateManager.setHighPriorityWritesAvailability(true);
stateManager.transitionToWritableMode();
}
};
@@ -962,6 +955,15 @@ public class Bookie extends BookieCriticalThread {
return stateManager.isReadOnly();
}
+ /**
+ * Check whether Bookie is available for high priority writes.
+ *
+ * @return true if the bookie is able to take high priority writes.
+ */
+ public boolean isAvailableForHighPriorityWrites() {
+ return stateManager.isAvailableForHighPriorityWrites();
+ }
+
public boolean isRunning() {
return stateManager.isRunning();
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieStateManager.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieStateManager.java
index 798db41..1689b11 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieStateManager.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieStateManager.java
@@ -61,6 +61,7 @@ public class BookieStateManager implements StateManager {
private final BookieStatus bookieStatus = new BookieStatus();
private final AtomicBoolean rmRegistered = new AtomicBoolean(false);
private final AtomicBoolean forceReadOnly = new AtomicBoolean(false);
+ private volatile boolean availableForHighPriorityWrites = true;
private final String bookieId;
private ShutdownHandler shutdownHandler;
@@ -68,7 +69,6 @@ public class BookieStateManager implements StateManager {
// Expose Stats
private final StatsLogger statsLogger;
-
public BookieStateManager(ServerConfiguration conf, StatsLogger
statsLogger,
MetadataBookieDriver metadataDriver, LedgerDirsManager
ledgerDirsManager) throws IOException {
this.conf = conf;
@@ -136,6 +136,16 @@ public class BookieStateManager implements StateManager {
}
@Override
+ public boolean isAvailableForHighPriorityWrites() {
+ return availableForHighPriorityWrites;
+ }
+
+ @Override
+ public void setHighPriorityWritesAvailability(boolean available) {
+ this.availableForHighPriorityWrites = available;
+ }
+
+ @Override
public boolean isRunning(){
return running;
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieStatus.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieStatus.java
index 7c702da..49aa66e 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieStatus.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieStatus.java
@@ -45,7 +45,7 @@ public class BookieStatus {
enum BookieMode {
READ_ONLY,
- READ_WRITE;
+ READ_WRITE
}
private static final long INVALID_UPDATE_TIME = -1;
@@ -54,7 +54,6 @@ public class BookieStatus {
private long lastUpdateTime;
private volatile BookieMode bookieMode;
-
BookieStatus() {
this.bookieMode = BookieMode.READ_WRITE;
this.layoutVersion = CURRENT_STATUS_LAYOUT_VERSION;
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
index 2055758..66efbaa 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
@@ -439,31 +439,6 @@ public class EntryLogger {
shouldCreateNewEntryLog.set(true);
}
}
-
- @Override
- public void diskFailed(File disk) {
- // Nothing to handle here. Will be handled in Bookie
- }
-
- @Override
- public void allDisksFull() {
- // Nothing to handle here. Will be handled in Bookie
- }
-
- @Override
- public void fatalError() {
- // Nothing to handle here. Will be handled in Bookie
- }
-
- @Override
- public void diskWritable(File disk) {
- // Nothing to handle here. Will be handled in Bookie
- }
-
- @Override
- public void diskJustWritable(File disk) {
- // Nothing to handle here. Will be handled in Bookie
- }
};
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
index 05b5eb5..eb3b935 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
@@ -42,7 +42,6 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.bookie.FileInfoBackingCache.CachedFileInfo;
-import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
import
org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
import org.apache.bookkeeper.common.util.Watcher;
import org.apache.bookkeeper.conf.ServerConfiguration;
@@ -108,7 +107,6 @@ public class IndexPersistenceMgr {
LOG.info("openFileLimit = {}", openFileLimit);
// Retrieve all of the active ledgers.
getActiveLedgers();
- ledgerDirsManager.addLedgerDirsListener(getLedgerDirsListener());
// build the file info cache
int concurrencyLevel = Math.max(1,
Math.max(conf.getNumAddWorkerThreads(), conf.getNumReadWorkerThreads()));
@@ -493,45 +491,6 @@ public class IndexPersistenceMgr {
return openFileLimit;
}
- private LedgerDirsListener getLedgerDirsListener() {
- return new LedgerDirsListener() {
- @Override
- public void diskFull(File disk) {
- // Nothing to handle here. Will be handled in Bookie
- }
-
- @Override
- public void diskAlmostFull(File disk) {
- // Nothing to handle here. Will be handled in Bookie
- }
-
- @Override
- public void diskFailed(File disk) {
- // Nothing to handle here. Will be handled in Bookie
- }
-
- @Override
- public void allDisksFull() {
- // Nothing to handle here. Will be handled in Bookie
- }
-
- @Override
- public void fatalError() {
- // Nothing to handle here. Will be handled in Bookie
- }
-
- @Override
- public void diskWritable(File disk) {
- // Nothing to handle here. Will be handled in Bookie
- }
-
- @Override
- public void diskJustWritable(File disk) {
- // Nothing to handle here. Will be handled in Bookie
- }
- };
- }
-
private void relocateIndexFileAndFlushHeader(long ledger, FileInfo fi)
throws IOException {
File currentDir = getLedgerDirForLedger(fi);
if (ledgerDirsManager.isDirFull(currentDir)) {
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
index ce1a499..2c576d7 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
@@ -113,10 +113,6 @@ public class InterleavedLedgerStorage implements
CompactableLedgerStorage, Entry
private LedgerDirsListener getLedgerDirsListener() {
return new LedgerDirsListener() {
- @Override
- public void diskFailed(File disk) {
- // do nothing.
- }
@Override
public void diskAlmostFull(File disk) {
@@ -138,7 +134,7 @@ public class InterleavedLedgerStorage implements
CompactableLedgerStorage, Entry
}
@Override
- public void allDisksFull() {
+ public void allDisksFull(boolean highPriorityWritesAllowed) {
if (gcThread.isForceGCAllowWhenNoSpace) {
gcThread.enableForceGC();
} else {
@@ -148,11 +144,6 @@ public class InterleavedLedgerStorage implements
CompactableLedgerStorage, Entry
}
@Override
- public void fatalError() {
- // do nothing.
- }
-
- @Override
public void diskWritable(File disk) {
// we have enough space now
if (gcThread.isForceGCAllowWhenNoSpace) {
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsManager.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsManager.java
index 4bd28b2..5d2c11f 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsManager.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsManager.java
@@ -373,43 +373,49 @@ public class LedgerDirsManager {
*
* @param disk Failed disk
*/
- void diskFailed(File disk);
+ default void diskFailed(File disk) {}
/**
* Notified when the disk usage warn threshold is exceeded on the
drive.
* @param disk
*/
- void diskAlmostFull(File disk);
+ default void diskAlmostFull(File disk) {}
/**
* This will be notified on disk detected as full.
*
* @param disk Filled disk
*/
- void diskFull(File disk);
+ default void diskFull(File disk) {}
/**
* This will be notified on disk detected as writable and under warn
threshold.
*
* @param disk Writable disk
*/
- void diskWritable(File disk);
+ default void diskWritable(File disk) {}
/**
* This will be notified on disk detected as writable but still in
warn threshold.
*
* @param disk Writable disk
*/
- void diskJustWritable(File disk);
+ default void diskJustWritable(File disk) {}
/**
* This will be notified whenever all disks are detected as full.
+ *
+ * <p>Normal writes will be rejected when disks are detected as
"full". High priority writes
+ * such as ledger recovery writes can go through if disks are still
available.
+ *
+ * @param highPriorityWritesAllowed the parameter indicates we are
still have disk spaces for high priority
+ * writes even disks are detected as
"full"
*/
- void allDisksFull();
+ default void allDisksFull(boolean highPriorityWritesAllowed) {}
/**
* This will notify the fatal errors.
*/
- void fatalError();
+ default void fatalError() {}
}
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsMonitor.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsMonitor.java
index afd8ad8..2904643 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsMonitor.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsMonitor.java
@@ -53,6 +53,7 @@ class LedgerDirsMonitor {
private final ConcurrentMap<File, Float> diskUsages;
private final DiskChecker diskChecker;
private final LedgerDirsManager ldm;
+ private long minUsableSizeForHighPriorityWrites;
private ScheduledExecutorService executor;
private ScheduledFuture<?> checkTask;
@@ -60,6 +61,7 @@ class LedgerDirsMonitor {
final DiskChecker diskChecker,
final LedgerDirsManager ldm) {
this.interval = conf.getDiskCheckInterval();
+ this.minUsableSizeForHighPriorityWrites =
conf.getMinUsableSizeForHighPriorityWrites();
this.conf = conf;
this.diskChecker = diskChecker;
this.diskUsages = ldm.getDiskUsages();
@@ -98,8 +100,14 @@ class LedgerDirsMonitor {
// bookie cannot get writable dir but considered to be writable
ldm.getWritableLedgerDirs();
} catch (NoWritableLedgerDirException e) {
+ boolean highPriorityWritesAllowed = true;
+ try {
+
ldm.getDirsAboveUsableThresholdSize(minUsableSizeForHighPriorityWrites);
+ } catch (NoWritableLedgerDirException e1) {
+ highPriorityWritesAllowed = false;
+ }
for (LedgerDirsListener listener : ldm.getListeners()) {
- listener.allDisksFull();
+ listener.allDisksFull(highPriorityWritesAllowed);
}
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/StateManager.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/StateManager.java
index ad4ac0c..538f3ac 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/StateManager.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/StateManager.java
@@ -25,13 +25,26 @@ import java.util.concurrent.Future;
*/
public interface StateManager extends AutoCloseable {
-
/**
* Init state of Bookie when launch bookie.
*/
void initState();
/**
+ * Check if the bookie is available for high priority writes or not.
+ *
+ * @return true if the bookie is available for high priority writes;
otherwise false.
+ */
+ boolean isAvailableForHighPriorityWrites();
+
+ /**
+ * Enable/Disable the availability for high priority writes.
+ *
+ * @param available the flag to enable/disable the availability for high
priority writes.
+ */
+ void setHighPriorityWritesAvailability(boolean available);
+
+ /**
* Check is ReadOnly.
*/
boolean isReadOnly();
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java
index 466d46a..a7c3a7a 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java
@@ -122,7 +122,7 @@ class SyncThread implements Checkpointer {
ledgerStorage.flush();
} catch (NoWritableLedgerDirException e) {
log.error("No writeable ledger directories", e);
- dirsListener.allDisksFull();
+ dirsListener.allDisksFull(true);
return;
} catch (IOException e) {
log.error("Exception flushing ledgers", e);
@@ -138,7 +138,7 @@ class SyncThread implements Checkpointer {
checkpointSource.checkpointComplete(checkpoint, false);
} catch (IOException e) {
log.error("Exception marking checkpoint as complete", e);
- dirsListener.allDisksFull();
+ dirsListener.allDisksFull(true);
}
}
@@ -153,7 +153,7 @@ class SyncThread implements Checkpointer {
ledgerStorage.checkpoint(checkpoint);
} catch (NoWritableLedgerDirException e) {
log.error("No writeable ledger directories", e);
- dirsListener.allDisksFull();
+ dirsListener.allDisksFull(true);
return;
} catch (IOException e) {
log.error("Exception flushing ledgers", e);
@@ -164,7 +164,7 @@ class SyncThread implements Checkpointer {
checkpointSource.checkpointComplete(checkpoint, true);
} catch (IOException e) {
log.error("Exception marking checkpoint as complete", e);
- dirsListener.allDisksFull();
+ dirsListener.allDisksFull(true);
}
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index e9950fd..cb39821 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -162,6 +162,7 @@ public class ServerConfiguration extends
AbstractConfiguration<ServerConfigurati
protected static final String BOOKIE_AUTH_PROVIDER_FACTORY_CLASS =
"bookieAuthProviderFactoryClass";
protected static final String MIN_USABLESIZE_FOR_INDEXFILE_CREATION =
"minUsableSizeForIndexFileCreation";
+ protected static final String MIN_USABLESIZE_FOR_HIGH_PRIORITY_WRITES =
"minUsableSizeForHighPriorityWrites";
protected static final String ALLOW_MULTIPLEDIRS_UNDER_SAME_DISKPARTITION =
"allowMultipleDirsUnderSameDiskPartition";
@@ -2523,7 +2524,30 @@ public class ServerConfiguration extends
AbstractConfiguration<ServerConfigurati
* @return
*/
public ServerConfiguration setMinUsableSizeForIndexFileCreation(long
minUsableSizeForIndexFileCreation) {
- this.setProperty(MIN_USABLESIZE_FOR_INDEXFILE_CREATION,
Long.toString(minUsableSizeForIndexFileCreation));
+ this.setProperty(MIN_USABLESIZE_FOR_INDEXFILE_CREATION,
minUsableSizeForIndexFileCreation);
+ return this;
+ }
+
+ /**
+ * Gets the minimum safe usable size to be available in ledger directory
for Bookie to accept high priority writes.
+ *
+ * <p>If not set, it is two times of {@link #getEntryLogSizeLimit()}.
+ *
+ * @return the minimum safe usable size per ledger directory for bookie to
accept high priority writes.
+ */
+ public long getMinUsableSizeForHighPriorityWrites() {
+ return this.getLong(MIN_USABLESIZE_FOR_HIGH_PRIORITY_WRITES, 2 *
getEntryLogSizeLimit());
+ }
+
+ /**
+ * Sets the minimum safe usable size to be available in ledger directory
for Bookie to accept high priority writes.
+ *
+ * @param minUsableSizeForHighPriorityWrites minimum safe usable size per
ledger directory for Bookie to accept
+ * high priority writes
+ * @return server configuration.
+ */
+ public ServerConfiguration setMinUsableSizeForHighPriorityWrites(long
minUsableSizeForHighPriorityWrites) {
+ this.setProperty(MIN_USABLESIZE_FOR_HIGH_PRIORITY_WRITES,
minUsableSizeForHighPriorityWrites);
return this;
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
index 4102e75..edb8924 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
@@ -76,6 +76,7 @@ import org.slf4j.LoggerFactory;
/**
* An implementation of the RequestProcessor interface.
*/
+@Getter(AccessLevel.PACKAGE)
public class BookieRequestProcessor implements RequestProcessor {
private static final Logger LOG =
LoggerFactory.getLogger(BookieRequestProcessor.class);
@@ -94,7 +95,6 @@ public class BookieRequestProcessor implements
RequestProcessor {
/**
* The threadpool used to execute all read entry requests issued to this
server.
*/
- @Getter(AccessLevel.PACKAGE)
private final OrderedExecutor readThreadPool;
/**
@@ -111,7 +111,6 @@ public class BookieRequestProcessor implements
RequestProcessor {
* The threadpool used to execute all long poll requests issued to this
server
* after they are done waiting.
*/
- @Getter(AccessLevel.PACKAGE)
private final OrderedExecutor longPollThreadPool;
/**
@@ -127,8 +126,8 @@ public class BookieRequestProcessor implements
RequestProcessor {
// Expose Stats
private final BKStats bkStats = BKStats.getInstance();
private final boolean statsEnabled;
- final OpStatsLogger addRequestStats;
- final OpStatsLogger addEntryStats;
+ private final OpStatsLogger addRequestStats;
+ private final OpStatsLogger addEntryStats;
final OpStatsLogger readRequestStats;
final OpStatsLogger readEntryStats;
final OpStatsLogger fenceReadRequestStats;
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java
index e84bbee..9cd9fd5 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java
@@ -58,9 +58,11 @@ public abstract class PacketProcessorBaseV3 extends
SafeRunnable {
public void operationComplete(ChannelFuture future) throws
Exception {
long writeElapsedNanos = MathUtils.elapsedNanos(writeNanos);
if (!future.isSuccess()) {
-
requestProcessor.channelWriteStats.registerFailedEvent(writeElapsedNanos,
TimeUnit.NANOSECONDS);
+ requestProcessor.getChannelWriteStats()
+ .registerFailedEvent(writeElapsedNanos,
TimeUnit.NANOSECONDS);
} else {
-
requestProcessor.channelWriteStats.registerSuccessfulEvent(writeElapsedNanos,
TimeUnit.NANOSECONDS);
+ requestProcessor.getChannelWriteStats()
+ .registerSuccessfulEvent(writeElapsedNanos,
TimeUnit.NANOSECONDS);
}
if (StatusCode.EOK == code) {
statsLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(enqueueNanos),
TimeUnit.NANOSECONDS);
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
index eaf2473..f5af75a 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
@@ -17,6 +17,7 @@
*/
package org.apache.bookkeeper.proto;
+import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.util.Recycler;
@@ -56,12 +57,13 @@ class WriteEntryProcessor extends
PacketProcessorBase<ParsedAddRequest> implemen
@Override
protected void processPacket() {
- if (requestProcessor.bookie.isReadOnly()) {
+ if (requestProcessor.getBookie().isReadOnly()
+ && !(request.isHighPriority() &&
requestProcessor.getBookie().isAvailableForHighPriorityWrites())) {
LOG.warn("BookieServer is running in readonly mode,"
+ " so rejecting the request from the client!");
sendResponse(BookieProtocol.EREADONLY,
ResponseBuilder.buildErrorResponse(BookieProtocol.EREADONLY, request),
- requestProcessor.addRequestStats);
+ requestProcessor.getAddRequestStats());
request.release();
request.recycle();
return;
@@ -72,9 +74,9 @@ class WriteEntryProcessor extends
PacketProcessorBase<ParsedAddRequest> implemen
ByteBuf addData = request.getData();
try {
if (request.isRecoveryAdd()) {
- requestProcessor.bookie.recoveryAddEntry(addData, this,
channel, request.getMasterKey());
+ requestProcessor.getBookie().recoveryAddEntry(addData, this,
channel, request.getMasterKey());
} else {
- requestProcessor.bookie.addEntry(addData, false, this,
channel, request.getMasterKey());
+ requestProcessor.getBookie().addEntry(addData, false, this,
channel, request.getMasterKey());
}
} catch (OperationRejectedException e) {
// Avoid to log each occurence of this exception as this can
happen when the ledger storage is
@@ -102,11 +104,11 @@ class WriteEntryProcessor extends
PacketProcessorBase<ParsedAddRequest> implemen
}
if (rc != BookieProtocol.EOK) {
-
requestProcessor.addEntryStats.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos),
+
requestProcessor.getAddEntryStats().registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos),
TimeUnit.NANOSECONDS);
sendResponse(rc,
ResponseBuilder.buildErrorResponse(rc, request),
- requestProcessor.addRequestStats);
+ requestProcessor.getAddRequestStats());
request.recycle();
}
}
@@ -115,15 +117,15 @@ class WriteEntryProcessor extends
PacketProcessorBase<ParsedAddRequest> implemen
public void writeComplete(int rc, long ledgerId, long entryId,
BookieSocketAddress addr, Object ctx) {
if (BookieProtocol.EOK == rc) {
-
requestProcessor.addEntryStats.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos),
+
requestProcessor.getAddEntryStats().registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos),
TimeUnit.NANOSECONDS);
} else {
-
requestProcessor.addEntryStats.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos),
+
requestProcessor.getAddEntryStats().registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos),
TimeUnit.NANOSECONDS);
}
sendResponse(rc,
ResponseBuilder.buildAddResponse(request),
- requestProcessor.addRequestStats);
+ requestProcessor.getAddRequestStats());
request.recycle();
recycle();
}
@@ -134,7 +136,8 @@ class WriteEntryProcessor extends
PacketProcessorBase<ParsedAddRequest> implemen
request.getLedgerId(), request.getEntryId());
}
- private void recycle() {
+ @VisibleForTesting
+ void recycle() {
reset();
recyclerHandle.recycle(this);
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
index c0e097a..578d666 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
@@ -65,7 +65,9 @@ class WriteEntryProcessorV3 extends PacketProcessorBaseV3 {
return addResponse.build();
}
- if (requestProcessor.bookie.isReadOnly()) {
+ if (requestProcessor.getBookie().isReadOnly()
+ && !(RequestUtils.isHighPriority(request)
+ &&
requestProcessor.getBookie().isAvailableForHighPriorityWrites())) {
logger.warn("BookieServer is running as readonly mode, so
rejecting the request from the client!");
addResponse.setStatus(StatusCode.EREADONLY);
return addResponse.build();
@@ -76,10 +78,10 @@ class WriteEntryProcessorV3 extends PacketProcessorBaseV3 {
public void writeComplete(int rc, long ledgerId, long entryId,
BookieSocketAddress addr, Object ctx) {
if (BookieProtocol.EOK == rc) {
-
requestProcessor.addEntryStats.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos),
+
requestProcessor.getAddEntryStats().registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos),
TimeUnit.NANOSECONDS);
} else {
-
requestProcessor.addEntryStats.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos),
+
requestProcessor.getAddEntryStats().registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos),
TimeUnit.NANOSECONDS);
}
@@ -101,7 +103,7 @@ class WriteEntryProcessorV3 extends PacketProcessorBaseV3 {
.setStatus(addResponse.getStatus())
.setAddResponse(addResponse);
Response resp = response.build();
- sendResponse(status, resp, requestProcessor.addRequestStats);
+ sendResponse(status, resp,
requestProcessor.getAddRequestStats());
}
};
final EnumSet<WriteFlag> writeFlags;
@@ -116,9 +118,9 @@ class WriteEntryProcessorV3 extends PacketProcessorBaseV3 {
ByteBuf entryToAdd =
Unpooled.wrappedBuffer(addRequest.getBody().asReadOnlyByteBuffer());
try {
if (RequestUtils.hasFlag(addRequest,
AddRequest.Flag.RECOVERY_ADD)) {
- requestProcessor.bookie.recoveryAddEntry(entryToAdd, wcb,
channel, masterKey);
+ requestProcessor.getBookie().recoveryAddEntry(entryToAdd, wcb,
channel, masterKey);
} else {
- requestProcessor.bookie.addEntry(entryToAdd, ackBeforeSync,
wcb, channel, masterKey);
+ requestProcessor.getBookie().addEntry(entryToAdd,
ackBeforeSync, wcb, channel, masterKey);
}
status = StatusCode.EOK;
} catch (OperationRejectedException e) {
@@ -167,7 +169,7 @@ class WriteEntryProcessorV3 extends PacketProcessorBaseV3 {
.setAddResponse(addResponse);
Response resp = response.build();
sendResponse(addResponse.getStatus(), resp,
- requestProcessor.addRequestStats);
+ requestProcessor.getAddRequestStats());
}
}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieStorageThresholdTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieStorageThresholdTest.java
index ad7ba1e..ce084b3 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieStorageThresholdTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieStorageThresholdTest.java
@@ -167,9 +167,6 @@ public class BookieStorageThresholdTest extends
BookKeeperClusterTestCase {
final CountDownLatch diskWritable = new CountDownLatch(1);
final CountDownLatch diskFull = new CountDownLatch(1);
ledgerDirsManager.addLedgerDirsListener(new LedgerDirsListener() {
- @Override
- public void fatalError() {
- }
@Override
public void diskWritable(File disk) {
@@ -177,25 +174,10 @@ public class BookieStorageThresholdTest extends
BookKeeperClusterTestCase {
}
@Override
- public void diskJustWritable(File disk) {
- }
-
- @Override
public void diskFull(File disk) {
diskFull.countDown();
}
- @Override
- public void diskFailed(File disk) {
- }
-
- @Override
- public void diskAlmostFull(File disk) {
- }
-
- @Override
- public void allDisksFull() {
- }
});
// Dependency Injected class
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestLedgerDirsManager.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestLedgerDirsManager.java
index 4efc69f..1731b9b 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestLedgerDirsManager.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestLedgerDirsManager.java
@@ -181,20 +181,42 @@ public class TestLedgerDirsManager {
@Test
public void testLedgerDirsMonitorDuringTransition() throws Exception {
+ testLedgerDirsMonitorDuringTransition(true);
+ }
+
+ @Test
+ public void testHighPriorityWritesDisallowedDuringTransition() throws
Exception {
+ testLedgerDirsMonitorDuringTransition(false);
+ }
+
+ private void testLedgerDirsMonitorDuringTransition(boolean
highPriorityWritesAllowed) throws Exception {
+ if (!highPriorityWritesAllowed) {
+ ledgerMonitor.shutdown();
+ conf.setMinUsableSizeForHighPriorityWrites(curDir.getUsableSpace()
+ 1024);
+ dirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
+ new DiskChecker(conf.getDiskUsageThreshold(),
conf.getDiskUsageWarnThreshold()), statsLogger);
+ ledgerMonitor = new LedgerDirsMonitor(conf, mockDiskChecker,
dirsManager);
+ ledgerMonitor.init();
+ }
+
MockLedgerDirsListener mockLedgerDirsListener = new
MockLedgerDirsListener();
dirsManager.addLedgerDirsListener(mockLedgerDirsListener);
ledgerMonitor.start();
assertFalse(mockLedgerDirsListener.readOnly);
- mockDiskChecker.setUsage(threshold + 0.05f);
+ assertTrue(mockLedgerDirsListener.highPriorityWritesAllowed);
+ mockDiskChecker.setUsage(threshold + 0.05f);
executorController.advance(Duration.ofMillis(diskCheckInterval));
+
assertTrue(mockLedgerDirsListener.readOnly);
+ assertEquals(highPriorityWritesAllowed,
mockLedgerDirsListener.highPriorityWritesAllowed);
mockDiskChecker.setUsage(threshold - 0.05f);
executorController.advance(Duration.ofMillis(diskCheckInterval));
assertFalse(mockLedgerDirsListener.readOnly);
+ assertTrue(mockLedgerDirsListener.highPriorityWritesAllowed);
}
@Test
@@ -427,6 +449,7 @@ public class TestLedgerDirsManager {
private class MockLedgerDirsListener implements LedgerDirsListener {
+ public volatile boolean highPriorityWritesAllowed;
public volatile boolean readOnly;
public MockLedgerDirsListener() {
@@ -434,38 +457,26 @@ public class TestLedgerDirsManager {
}
@Override
- public void diskFailed(File disk) {
- }
-
- @Override
- public void diskAlmostFull(File disk) {
- }
-
- @Override
- public void diskFull(File disk) {
- }
-
- @Override
public void diskWritable(File disk) {
readOnly = false;
+ highPriorityWritesAllowed = true;
}
@Override
public void diskJustWritable(File disk) {
readOnly = false;
+ highPriorityWritesAllowed = true;
}
@Override
- public void allDisksFull() {
- readOnly = true;
- }
-
- @Override
- public void fatalError() {
+ public void allDisksFull(boolean highPriorityWritesAllowed) {
+ this.readOnly = true;
+ this.highPriorityWritesAllowed = highPriorityWritesAllowed;
}
public void reset() {
readOnly = false;
+ highPriorityWritesAllowed = true;
}
}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
index e6cb93d..d9fa8cc 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
@@ -26,7 +26,6 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import io.netty.buffer.ByteBuf;
-import java.io.File;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
@@ -81,7 +80,7 @@ public class TestSyncThread {
ServerConfiguration conf =
TestBKConfiguration.newServerConfiguration();
conf.setFlushInterval(flushInterval);
CheckpointSource checkpointSource = new DummyCheckpointSource();
- LedgerDirsListener listener = new DummyLedgerDirsListener();
+ LedgerDirsListener listener = new LedgerDirsListener() {};
final CountDownLatch checkpointCalledLatch = new CountDownLatch(1);
final CountDownLatch checkpointLatch = new CountDownLatch(1);
@@ -154,7 +153,7 @@ public class TestSyncThread {
ServerConfiguration conf =
TestBKConfiguration.newServerConfiguration();
conf.setFlushInterval(flushInterval);
CheckpointSource checkpointSource = new DummyCheckpointSource();
- LedgerDirsListener listener = new DummyLedgerDirsListener();
+ LedgerDirsListener listener = new LedgerDirsListener() {};
final AtomicInteger checkpointCount = new AtomicInteger(0);
LedgerStorage storage = new DummyLedgerStorage() {
@@ -200,7 +199,7 @@ public class TestSyncThread {
conf.setFlushInterval(flushInterval);
CheckpointSource checkpointSource = new DummyCheckpointSource();
final CountDownLatch fatalLatch = new CountDownLatch(1);
- LedgerDirsListener listener = new DummyLedgerDirsListener() {
+ LedgerDirsListener listener = new LedgerDirsListener() {
@Override
public void fatalError() {
fatalLatch.countDown();
@@ -232,9 +231,9 @@ public class TestSyncThread {
conf.setFlushInterval(flushInterval);
CheckpointSource checkpointSource = new DummyCheckpointSource();
final CountDownLatch diskFullLatch = new CountDownLatch(1);
- LedgerDirsListener listener = new DummyLedgerDirsListener() {
+ LedgerDirsListener listener = new LedgerDirsListener() {
@Override
- public void allDisksFull() {
+ public void allDisksFull(boolean highPriorityWritesAllowed) {
diskFullLatch.countDown();
}
};
@@ -363,34 +362,4 @@ public class TestSyncThread {
}
}
- private static class DummyLedgerDirsListener
- implements LedgerDirsManager.LedgerDirsListener {
- @Override
- public void diskFailed(File disk) {
- }
-
- @Override
- public void diskAlmostFull(File disk) {
- }
-
- @Override
- public void diskFull(File disk) {
- }
-
- @Override
- public void allDisksFull() {
- }
-
- @Override
- public void fatalError() {
- }
-
- @Override
- public void diskWritable(File disk) {
- }
-
- @Override
- public void diskJustWritable(File disk) {
- }
- }
}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java
new file mode 100644
index 0000000..5901c2f
--- /dev/null
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.proto;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.same;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelPromise;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.proto.BookieProtocol.ParsedAddRequest;
+import org.apache.bookkeeper.proto.BookieProtocol.Response;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Unit test {@link WriteEntryProcessor}.
+ */
+public class WriteEntryProcessorTest {
+
+ private ParsedAddRequest request;
+ private WriteEntryProcessor processor;
+ private Channel channel;
+ private BookieRequestProcessor requestProcessor;
+ private Bookie bookie;
+
+ @Before
+ public void setup() {
+ request = ParsedAddRequest.create(
+ BookieProtocol.CURRENT_PROTOCOL_VERSION,
+ System.currentTimeMillis(),
+ System.currentTimeMillis() + 1,
+ (short) 0,
+ new byte[0],
+ Unpooled.wrappedBuffer("test-entry-data".getBytes(UTF_8)));
+ channel = mock(Channel.class);
+ bookie = mock(Bookie.class);
+ requestProcessor = mock(BookieRequestProcessor.class);
+ when(requestProcessor.getBookie()).thenReturn(bookie);
+ when(requestProcessor.getAddEntryStats())
+
.thenReturn(NullStatsLogger.INSTANCE.getOpStatsLogger("add_entry"));
+ when(requestProcessor.getAddRequestStats())
+
.thenReturn(NullStatsLogger.INSTANCE.getOpStatsLogger("add_requests"));
+ processor = WriteEntryProcessor.create(
+ request,
+ channel,
+ requestProcessor);
+ }
+
+ private void reinitRequest(short flags) {
+ request.release();
+ request.recycle();
+ processor.recycle();
+
+ request = ParsedAddRequest.create(
+ BookieProtocol.CURRENT_PROTOCOL_VERSION,
+ System.currentTimeMillis(),
+ System.currentTimeMillis() + 1,
+ flags,
+ new byte[0],
+ Unpooled.wrappedBuffer("test-entry-data".getBytes(UTF_8)));
+ processor = WriteEntryProcessor.create(
+ request,
+ channel,
+ requestProcessor);
+ }
+
+ @Test
+ public void testNoneHighPriorityWritesOnReadOnlyBookie() throws Exception {
+ when(bookie.isReadOnly()).thenReturn(true);
+ when(channel.voidPromise()).thenReturn(mock(ChannelPromise.class));
+
+ AtomicReference<Object> writtenObject = new AtomicReference<>();
+ CountDownLatch latch = new CountDownLatch(1);
+ doAnswer(invocationOnMock -> {
+ writtenObject.set(invocationOnMock.getArgument(0));
+ latch.countDown();
+ return null;
+ }).when(channel).writeAndFlush(any(), any(ChannelPromise.class));
+
+ processor.run();
+
+ verify(channel, times(1)).writeAndFlush(any(),
any(ChannelPromise.class));
+
+ latch.await();
+
+ assertTrue(writtenObject.get() instanceof Response);
+ Response response = (Response) writtenObject.get();
+ assertEquals(BookieProtocol.EREADONLY, response.getErrorCode());
+
+ response.release();
+ response.recycle();
+ }
+
+ @Test
+ public void
testHighPriorityWritesOnReadOnlyBookieWhenHighPriorityWritesDisallowed() throws
Exception {
+ reinitRequest(BookieProtocol.FLAG_HIGH_PRIORITY);
+
+ when(bookie.isReadOnly()).thenReturn(true);
+ when(bookie.isAvailableForHighPriorityWrites()).thenReturn(false);
+ when(channel.voidPromise()).thenReturn(mock(ChannelPromise.class));
+
+ AtomicReference<Object> writtenObject = new AtomicReference<>();
+ CountDownLatch latch = new CountDownLatch(1);
+ doAnswer(invocationOnMock -> {
+ writtenObject.set(invocationOnMock.getArgument(0));
+ latch.countDown();
+ return null;
+ }).when(channel).writeAndFlush(any(), any(ChannelPromise.class));
+
+ processor.run();
+
+ verify(channel, times(1)).writeAndFlush(any(),
any(ChannelPromise.class));
+
+ latch.await();
+
+ assertTrue(writtenObject.get() instanceof Response);
+ Response response = (Response) writtenObject.get();
+ assertEquals(BookieProtocol.EREADONLY, response.getErrorCode());
+
+ response.release();
+ response.recycle();
+ }
+
+ @Test
+ public void
testHighPriorityWritesOnReadOnlyBookieWhenHighPriorityWritesAllowed() throws
Exception {
+ reinitRequest(BookieProtocol.FLAG_HIGH_PRIORITY);
+
+ when(bookie.isReadOnly()).thenReturn(true);
+ when(bookie.isAvailableForHighPriorityWrites()).thenReturn(true);
+ when(channel.voidPromise()).thenReturn(mock(ChannelPromise.class));
+ doAnswer(invocationOnMock -> {
+ processor.writeComplete(0, request.ledgerId, request.entryId,
null, null);
+ return null;
+ }).when(bookie).addEntry(any(ByteBuf.class), eq(false),
same(processor), same(channel), eq(new byte[0]));
+
+ AtomicReference<Object> writtenObject = new AtomicReference<>();
+ CountDownLatch latch = new CountDownLatch(1);
+ doAnswer(invocationOnMock -> {
+ writtenObject.set(invocationOnMock.getArgument(0));
+ latch.countDown();
+ return null;
+ }).when(channel).writeAndFlush(any(), any(ChannelPromise.class));
+
+ processor.run();
+
+ verify(bookie, times(1))
+ .addEntry(any(ByteBuf.class), eq(false), same(processor),
same(channel), eq(new byte[0]));
+ verify(channel, times(1)).writeAndFlush(any(),
any(ChannelPromise.class));
+
+ latch.await();
+
+ assertTrue(writtenObject.get() instanceof Response);
+ Response response = (Response) writtenObject.get();
+ assertEquals(BookieProtocol.EOK, response.getErrorCode());
+
+ response.release();
+ response.recycle();
+ }
+
+ @Test
+ public void testNormalWritesOnWritableBookie() throws Exception {
+ when(bookie.isReadOnly()).thenReturn(false);
+ when(channel.voidPromise()).thenReturn(mock(ChannelPromise.class));
+ doAnswer(invocationOnMock -> {
+ processor.writeComplete(0, request.ledgerId, request.entryId,
null, null);
+ return null;
+ }).when(bookie).addEntry(any(ByteBuf.class), eq(false),
same(processor), same(channel), eq(new byte[0]));
+
+ AtomicReference<Object> writtenObject = new AtomicReference<>();
+ CountDownLatch latch = new CountDownLatch(1);
+ doAnswer(invocationOnMock -> {
+ writtenObject.set(invocationOnMock.getArgument(0));
+ latch.countDown();
+ return null;
+ }).when(channel).writeAndFlush(any(), any(ChannelPromise.class));
+
+ processor.run();
+
+ verify(bookie, times(1))
+ .addEntry(any(ByteBuf.class), eq(false), same(processor),
same(channel), eq(new byte[0]));
+ verify(channel, times(1)).writeAndFlush(any(),
any(ChannelPromise.class));
+
+ latch.await();
+
+ assertTrue(writtenObject.get() instanceof Response);
+ Response response = (Response) writtenObject.get();
+ assertEquals(BookieProtocol.EOK, response.getErrorCode());
+
+ response.release();
+ response.recycle();
+ }
+
+}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java
new file mode 100644
index 0000000..8f54ddb
--- /dev/null
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.proto;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.same;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.protobuf.ByteString;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.DefaultChannelPromise;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.AddRequest;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeader;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.OperationType;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.ProtocolVersion;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.Request;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.Response;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Unit test {@link WriteEntryProcessor}.
+ */
+public class WriteEntryProcessorV3Test {
+
+ private Request request;
+ private WriteEntryProcessorV3 processor;
+ private Channel channel;
+ private BookieRequestProcessor requestProcessor;
+ private Bookie bookie;
+
+ @Before
+ public void setup() {
+ request = Request.newBuilder()
+ .setHeader(BKPacketHeader.newBuilder()
+ .setTxnId(System.currentTimeMillis())
+ .setVersion(ProtocolVersion.VERSION_THREE)
+ .setOperation(OperationType.ADD_ENTRY)
+ .build())
+ .setAddRequest(AddRequest.newBuilder()
+ .setLedgerId(System.currentTimeMillis())
+ .setEntryId(System.currentTimeMillis() + 1)
+ .setBody(ByteString.copyFromUtf8("test-entry-data"))
+ .setMasterKey(ByteString.copyFrom(new byte[0]))
+ .build())
+ .build();
+ channel = mock(Channel.class);
+ bookie = mock(Bookie.class);
+ requestProcessor = mock(BookieRequestProcessor.class);
+ when(requestProcessor.getBookie()).thenReturn(bookie);
+ when(requestProcessor.getAddEntryStats())
+
.thenReturn(NullStatsLogger.INSTANCE.getOpStatsLogger("add_entry"));
+ when(requestProcessor.getAddRequestStats())
+
.thenReturn(NullStatsLogger.INSTANCE.getOpStatsLogger("add_requests"));
+ processor = new WriteEntryProcessorV3(
+ request,
+ channel,
+ requestProcessor);
+ }
+
+ private void reinitRequest(int priority) {
+ request = Request.newBuilder(request)
+ .setHeader(BKPacketHeader.newBuilder(request.getHeader())
+ .setPriority(priority)
+ .build())
+ .build();
+
+ processor = new WriteEntryProcessorV3(
+ request,
+ channel,
+ requestProcessor);
+ }
+
+ @Test
+ public void testNoneHighPriorityWritesOnReadOnlyBookie() throws Exception {
+ when(bookie.isReadOnly()).thenReturn(true);
+ when(channel.voidPromise()).thenReturn(mock(ChannelPromise.class));
+
+ ChannelPromise promise = new DefaultChannelPromise(channel);
+ AtomicReference<Object> writtenObject = new AtomicReference<>();
+ CountDownLatch latch = new CountDownLatch(1);
+ doAnswer(invocationOnMock -> {
+ writtenObject.set(invocationOnMock.getArgument(0));
+ latch.countDown();
+ return promise;
+ }).when(channel).writeAndFlush(any());
+
+ processor.run();
+
+ verify(channel, times(1)).writeAndFlush(any(Response.class));
+
+ latch.await();
+
+ assertTrue(writtenObject.get() instanceof Response);
+ Response response = (Response) writtenObject.get();
+ assertEquals(StatusCode.EREADONLY, response.getStatus());
+ }
+
+ @Test
+ public void
testHighPriorityWritesOnReadOnlyBookieWhenHighPriorityWritesDisallowed() throws
Exception {
+ reinitRequest(100);
+
+ when(bookie.isReadOnly()).thenReturn(true);
+ when(bookie.isAvailableForHighPriorityWrites()).thenReturn(false);
+ when(channel.voidPromise()).thenReturn(mock(ChannelPromise.class));
+
+ ChannelPromise promise = new DefaultChannelPromise(channel);
+ AtomicReference<Object> writtenObject = new AtomicReference<>();
+ CountDownLatch latch = new CountDownLatch(1);
+ doAnswer(invocationOnMock -> {
+ writtenObject.set(invocationOnMock.getArgument(0));
+ latch.countDown();
+ return promise;
+ }).when(channel).writeAndFlush(any());
+
+ processor.run();
+
+ verify(channel, times(1)).writeAndFlush(any(Response.class));
+
+ latch.await();
+
+ assertTrue(writtenObject.get() instanceof Response);
+ Response response = (Response) writtenObject.get();
+ assertEquals(StatusCode.EREADONLY, response.getStatus());
+ }
+
+ @Test
+ public void
testHighPriorityWritesOnReadOnlyBookieWhenHighPriorityWritesAllowed() throws
Exception {
+ reinitRequest(BookieProtocol.FLAG_HIGH_PRIORITY);
+
+ when(bookie.isReadOnly()).thenReturn(true);
+ when(bookie.isAvailableForHighPriorityWrites()).thenReturn(true);
+ when(channel.voidPromise()).thenReturn(mock(ChannelPromise.class));
+
+ doAnswer(invocationOnMock -> {
+ WriteCallback wc = invocationOnMock.getArgument(2);
+
+ wc.writeComplete(
+ 0,
+ request.getAddRequest().getLedgerId(),
+ request.getAddRequest().getEntryId(),
+ null,
+ null);
+ return null;
+ }).when(bookie).addEntry(
+ any(ByteBuf.class),
+ eq(false),
+ any(WriteCallback.class),
+ same(channel),
+ eq(new byte[0]));
+
+ ChannelPromise promise = new DefaultChannelPromise(channel);
+ AtomicReference<Object> writtenObject = new AtomicReference<>();
+ CountDownLatch latch = new CountDownLatch(1);
+ doAnswer(invocationOnMock -> {
+ writtenObject.set(invocationOnMock.getArgument(0));
+ latch.countDown();
+ return promise;
+ }).when(channel).writeAndFlush(any());
+
+ processor.run();
+
+ verify(bookie, times(1))
+ .addEntry(any(ByteBuf.class), eq(false), any(WriteCallback.class),
same(channel), eq(new byte[0]));
+ verify(channel, times(1)).writeAndFlush(any(Response.class));
+
+ latch.await();
+
+ assertTrue(writtenObject.get() instanceof Response);
+ Response response = (Response) writtenObject.get();
+ assertEquals(StatusCode.EOK, response.getStatus());
+ }
+
+ @Test
+ public void testNormalWritesOnWritableBookie() throws Exception {
+ when(bookie.isReadOnly()).thenReturn(false);
+ when(channel.voidPromise()).thenReturn(mock(ChannelPromise.class));
+
when(channel.writeAndFlush(any())).thenReturn(mock(ChannelPromise.class));
+ doAnswer(invocationOnMock -> {
+ WriteCallback wc = invocationOnMock.getArgument(2);
+
+ wc.writeComplete(
+ 0,
+ request.getAddRequest().getLedgerId(),
+ request.getAddRequest().getEntryId(),
+ null,
+ null);
+ return null;
+ }).when(bookie).addEntry(
+ any(ByteBuf.class), eq(false), any(WriteCallback.class),
same(channel), eq(new byte[0]));
+
+ ChannelPromise promise = new DefaultChannelPromise(channel);
+ AtomicReference<Object> writtenObject = new AtomicReference<>();
+ CountDownLatch latch = new CountDownLatch(1);
+ doAnswer(invocationOnMock -> {
+ writtenObject.set(invocationOnMock.getArgument(0));
+ latch.countDown();
+ return promise;
+ }).when(channel).writeAndFlush(any());
+
+ processor.run();
+
+ verify(bookie, times(1))
+ .addEntry(any(ByteBuf.class), eq(false), any(WriteCallback.class),
same(channel), eq(new byte[0]));
+ verify(channel, times(1)).writeAndFlush(any(Response.class));
+
+ latch.await();
+
+ assertTrue(writtenObject.get() instanceof Response);
+ Response response = (Response) writtenObject.get();
+ assertEquals(StatusCode.EOK, response.getStatus());
+ }
+
+}
--
To stop receiving notification emails like this one, please contact
[email protected].