ivankelly closed pull request #898: Bookies should prioritize recovery
reads/writes
URL: https://github.com/apache/bookkeeper/pull/898
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/bookkeeper-proto/src/main/proto/BookkeeperProtocol.proto
b/bookkeeper-proto/src/main/proto/BookkeeperProtocol.proto
index ffd0f422c..993b4dbb8 100644
--- a/bookkeeper-proto/src/main/proto/BookkeeperProtocol.proto
+++ b/bookkeeper-proto/src/main/proto/BookkeeperProtocol.proto
@@ -90,8 +90,9 @@ message ReadRequest {
enum Flag {
FENCE_LEDGER = 1;
ENTRY_PIGGYBACK = 2;
+ HIGH_PRIORITY = 3;
}
- optional Flag flag = 100;
+ repeated Flag flag = 100;
required int64 ledgerId = 1;
// entryId will be -1 for reading the LAST_ADD_CONFIRMED entry.
required int64 entryId = 2;
@@ -106,8 +107,9 @@ message ReadRequest {
message AddRequest {
enum Flag {
RECOVERY_ADD = 1;
+ HIGH_PRIORITY = 2;
}
- optional Flag flag = 100;
+ repeated Flag flag = 100;
required int64 ledgerId = 1;
required int64 entryId = 2;
required bytes masterKey = 3;
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
index ea43a9646..027a04dda 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
@@ -95,6 +95,7 @@
import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookieClient;
+import org.apache.bookkeeper.proto.BookieProtocol;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.replication.AuditorElector;
import org.apache.bookkeeper.stats.NullStatsLogger;
@@ -825,7 +826,7 @@ int runCmd(CommandLine cmdLine) throws Exception {
buffer.release();
future.complete(null);
- }, null);
+ }, null, BookieProtocol.FLAG_NONE);
try {
future.get();
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
index dcface641..5eb00d2be 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
@@ -386,7 +386,8 @@ public boolean hasNext() {
try {
CompletableFuture<Enumeration<LedgerEntry>> result = new
CompletableFuture<>();
- handle.asyncReadEntriesInternal(nextEntryId, nextEntryId,
new SyncReadCallback(result), null);
+ handle.asyncReadEntriesInternal(nextEntryId, nextEntryId,
+ new
SyncReadCallback(result), null, false);
currentEntry =
SyncCallbackUtils.waitForResult(result).nextElement();
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java
index 049492bab..6f232cf61 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java
@@ -33,6 +33,7 @@
import org.apache.bookkeeper.client.BKException.Code;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookieClient;
+import org.apache.bookkeeper.proto.BookieProtocol;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
import org.slf4j.Logger;
@@ -198,8 +199,8 @@ private void verifyLedgerFragment(LedgerFragment fragment,
} else if (firstStored == lastStored) {
ReadManyEntriesCallback manycb = new ReadManyEntriesCallback(1,
fragment, cb);
- bookieClient.readEntry(bookie, fragment
- .getLedgerId(), firstStored, manycb, null);
+ bookieClient.readEntry(bookie, fragment.getLedgerId(), firstStored,
+ manycb, null, BookieProtocol.FLAG_NONE);
} else {
if (lastStored <= firstStored) {
cb.operationComplete(Code.IncorrectParameterException, null);
@@ -239,9 +240,8 @@ private void verifyLedgerFragment(LedgerFragment fragment,
}
ReadManyEntriesCallback manycb = new
ReadManyEntriesCallback(entriesToBeVerified.size(),
fragment, cb);
-
for (Long entryID: entriesToBeVerified) {
- bookieClient.readEntry(bookie, fragment.getLedgerId(),
entryID, manycb, null);
+ bookieClient.readEntry(bookie, fragment.getLedgerId(),
entryID, manycb, null, BookieProtocol.FLAG_NONE);
}
}
}
@@ -382,7 +382,8 @@ public void operationComplete(int rc, Boolean result) {
DistributionSchedule.WriteSet writeSet =
lh.getDistributionSchedule().getWriteSet(entryToRead);
for (int i = 0; i < writeSet.size(); i++) {
BookieSocketAddress addr =
curEnsemble.get(writeSet.get(i));
- bookieClient.readEntry(addr, lh.getId(), entryToRead,
eecb, null);
+ bookieClient.readEntry(addr, lh.getId(), entryToRead,
+ eecb, null,
BookieProtocol.FLAG_NONE);
}
writeSet.recycle();
return;
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index 7047b37bb..1e2466ace 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -652,7 +652,7 @@ public void asyncReadEntries(long firstEntry, long
lastEntry, ReadCallback cb, O
return;
}
- asyncReadEntriesInternal(firstEntry, lastEntry, cb, ctx);
+ asyncReadEntriesInternal(firstEntry, lastEntry, cb, ctx, false);
}
/**
@@ -691,7 +691,7 @@ public void asyncReadUnconfirmedEntries(long firstEntry,
long lastEntry, ReadCal
return;
}
- asyncReadEntriesInternal(firstEntry, lastEntry, cb, ctx);
+ asyncReadEntriesInternal(firstEntry, lastEntry, cb, ctx, false);
}
/**
@@ -717,7 +717,7 @@ public void asyncReadUnconfirmedEntries(long firstEntry,
long lastEntry, ReadCal
return FutureUtils.exception(new BKReadException());
}
- return readEntriesInternalAsync(firstEntry, lastEntry);
+ return readEntriesInternalAsync(firstEntry, lastEntry, false);
}
/**
@@ -752,12 +752,13 @@ public void asyncReadUnconfirmedEntries(long firstEntry,
long lastEntry, ReadCal
return FutureUtils.exception(new BKIncorrectParameterException());
}
- return readEntriesInternalAsync(firstEntry, lastEntry);
+ return readEntriesInternalAsync(firstEntry, lastEntry, false);
}
- void asyncReadEntriesInternal(long firstEntry, long lastEntry,
ReadCallback cb, Object ctx) {
+ void asyncReadEntriesInternal(long firstEntry, long lastEntry,
ReadCallback cb,
+ Object ctx, boolean isRecoveryRead) {
if (!bk.isClosed()) {
- readEntriesInternalAsync(firstEntry, lastEntry)
+ readEntriesInternalAsync(firstEntry, lastEntry, isRecoveryRead)
.whenCompleteAsync(new FutureEventListener<LedgerEntries>() {
@Override
public void onSuccess(LedgerEntries entries) {
@@ -802,7 +803,7 @@ public void asyncReadLastEntry(ReadCallback cb, Object ctx)
{
// Ledger was empty, so there is no last entry to read
cb.readComplete(BKException.Code.NoSuchEntryException, this, null,
ctx);
} else {
- asyncReadEntriesInternal(lastEntryId, lastEntryId, cb, ctx);
+ asyncReadEntriesInternal(lastEntryId, lastEntryId, cb, ctx, false);
}
}
@@ -821,8 +822,9 @@ public LedgerEntry readLastEntry()
}
CompletableFuture<LedgerEntries> readEntriesInternalAsync(long firstEntry,
- long lastEntry) {
- PendingReadOp op = new PendingReadOp(this, bk.getScheduler(),
firstEntry, lastEntry);
+ long lastEntry,
+ boolean
isRecoveryRead) {
+ PendingReadOp op = new PendingReadOp(this, bk.getScheduler(),
firstEntry, lastEntry, isRecoveryRead);
if (!bk.isClosed()) {
bk.getMainWorkerPool().submitOrdered(ledgerId, op);
} else {
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
index adf67a8d5..67334aaab 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
@@ -18,6 +18,9 @@
package org.apache.bookkeeper.client;
import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.bookkeeper.proto.BookieProtocol.FLAG_HIGH_PRIORITY;
+import static org.apache.bookkeeper.proto.BookieProtocol.FLAG_NONE;
+import static org.apache.bookkeeper.proto.BookieProtocol.FLAG_RECOVERY_ADD;
import com.google.common.collect.ImmutableMap;
@@ -32,7 +35,6 @@
import org.apache.bookkeeper.client.AsyncCallback.AddCallbackWithLatency;
import org.apache.bookkeeper.net.BookieSocketAddress;
-import org.apache.bookkeeper.proto.BookieProtocol;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.OpStatsLogger;
@@ -124,7 +126,7 @@ long getEntryId() {
}
void sendWriteRequest(int bookieIndex) {
- int flags = isRecoveryAdd ? BookieProtocol.FLAG_RECOVERY_ADD :
BookieProtocol.FLAG_NONE;
+ int flags = isRecoveryAdd ? FLAG_RECOVERY_ADD | FLAG_HIGH_PRIORITY :
FLAG_NONE;
lh.bk.getBookieClient().addEntry(lh.metadata.currentEnsemble.get(bookieIndex),
lh.ledgerId, lh.ledgerKey,
entryId, toSend, this, bookieIndex, flags);
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
index 0276ef4d2..7f2860bc7 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
@@ -40,6 +40,7 @@
import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
import org.apache.bookkeeper.common.util.SafeRunnable;
import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookieProtocol;
import
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
import
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallbackCtx;
import org.apache.bookkeeper.proto.checksum.DigestManager;
@@ -77,6 +78,7 @@
final int requiredBookiesMissingEntryForRecovery;
final boolean isRecoveryRead;
+
boolean parallelRead = false;
final AtomicBoolean complete = new AtomicBoolean(false);
@@ -568,8 +570,10 @@ void sendReadTo(int bookieIndex, BookieSocketAddress to,
LedgerEntryRequest entr
lh.throttler.acquire();
}
+ int flags = isRecoveryRead ? BookieProtocol.FLAG_HIGH_PRIORITY :
BookieProtocol.FLAG_NONE;
lh.bk.getBookieClient().readEntry(to, lh.ledgerId,
entry.entryImpl.getEntryId(),
- this, new ReadContext(bookieIndex, to,
entry));
+ this, new ReadContext(bookieIndex,
to, entry),
+ flags);
}
@Override
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java
index d0e116e39..32d7ffe6f 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedOp.java
@@ -62,17 +62,17 @@ public void initiate() {
lh.bk.getBookieClient().readEntry(lh.metadata.currentEnsemble.get(i),
lh.ledgerId,
BookieProtocol.LAST_ADD_CONFIRMED,
- this, i);
+ this, i, BookieProtocol.FLAG_NONE);
}
}
public void initiateWithFencing() {
for (int i = 0; i < lh.metadata.currentEnsemble.size(); i++) {
-
lh.bk.getBookieClient().readEntryAndFenceLedger(lh.metadata.currentEnsemble.get(i),
- lh.ledgerId,
- lh.ledgerKey,
-
BookieProtocol.LAST_ADD_CONFIRMED,
- this, i);
+
lh.bk.getBookieClient().readEntry(lh.metadata.currentEnsemble.get(i),
+ lh.ledgerId,
+
BookieProtocol.LAST_ADD_CONFIRMED,
+ this, i,
BookieProtocol.FLAG_DO_FENCING,
+ lh.ledgerKey);
}
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
index 55b7b5cc4..9d62f72d7 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java
@@ -191,7 +191,7 @@ public void readLastConfirmedComplete(int rc, long
lastConfirmed, Object ctx) {
// Ledger was empty, so there is no last entry to read
cb.readComplete(BKException.Code.NoSuchEntryException,
ReadOnlyLedgerHandle.this, null, ctx);
} else {
- asyncReadEntriesInternal(lastConfirmed, lastConfirmed,
cb, ctx);
+ asyncReadEntriesInternal(lastConfirmed, lastConfirmed,
cb, ctx, false);
}
} else {
LOG.error("ReadException in asyncReadLastEntry, ledgerId:
{}, lac: {}, rc:{}",
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TryReadLastConfirmedOp.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TryReadLastConfirmedOp.java
index bcaf231be..441504c60 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TryReadLastConfirmedOp.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TryReadLastConfirmedOp.java
@@ -54,7 +54,7 @@ public void initiate() {
lh.bk.getBookieClient().readEntry(lh.metadata.currentEnsemble.get(i),
lh.ledgerId,
BookieProtocol.LAST_ADD_CONFIRMED,
- this, i);
+ this, i, BookieProtocol.FLAG_NONE);
}
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
index 526912b8c..04dd6be1f 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
@@ -20,11 +20,8 @@
import static
org.apache.bookkeeper.conf.ClientConfiguration.CLIENT_AUTH_PROVIDER_FACTORY_CLASS;
import java.net.URL;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
-import java.util.Map;
-
import javax.net.ssl.SSLEngine;
import org.apache.bookkeeper.feature.Feature;
@@ -33,8 +30,6 @@
import org.apache.bookkeeper.meta.LedgerManagerFactory;
import org.apache.bookkeeper.meta.LongHierarchicalLedgerManagerFactory;
import org.apache.bookkeeper.util.EntryFormatter;
-import org.apache.bookkeeper.util.JsonUtil;
-import org.apache.bookkeeper.util.JsonUtil.ParseJsonException;
import org.apache.bookkeeper.util.LedgerIdFormatter;
import org.apache.bookkeeper.util.ReflectionUtils;
import org.apache.bookkeeper.util.StringEntryFormatter;
@@ -679,27 +674,4 @@ public String getTLSEnabledProtocols() {
* Trickery to allow inheritance with fluent style.
*/
protected abstract T getThis();
-
- /**
- * returns the string representation of json format of this config.
- *
- * @return
- * @throws ParseJsonException
- */
- public String asJson() throws ParseJsonException {
- return JsonUtil.toJson(toMap());
- }
-
- private Map<String, Object> toMap() {
- Map<String, Object> configMap = new HashMap<>();
- Iterator<String> iterator = this.getKeys();
- while (iterator.hasNext()) {
- String key = iterator.next().toString();
- Object property = this.getProperty(key);
- if (property != null) {
- configMap.put(key, property.toString());
- }
- }
- return configMap;
- }
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
index 1623324d8..0bd9fe8e1 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
@@ -369,36 +369,6 @@ public void recycle() {
}
}
- public void readEntryAndFenceLedger(final BookieSocketAddress addr,
- final long ledgerId,
- final byte[] masterKey,
- final long entryId,
- final ReadEntryCallback cb,
- final Object ctx) {
- closeLock.readLock().lock();
- try {
- final PerChannelBookieClientPool client = lookupClient(addr);
- if (client == null) {
-
completeRead(getRc(BKException.Code.BookieHandleNotAvailableException),
- ledgerId, entryId, null, cb, ctx);
- return;
- }
-
- client.obtain(new GenericCallback<PerChannelBookieClient>() {
- @Override
- public void operationComplete(final int rc,
PerChannelBookieClient pcbc) {
- if (rc != BKException.Code.OK) {
- completeRead(rc, ledgerId, entryId, null, cb, ctx);
- return;
- }
- pcbc.readEntryAndFenceLedger(ledgerId, masterKey, entryId,
cb, ctx);
- }
- }, ledgerId);
- } finally {
- closeLock.readLock().unlock();
- }
- }
-
public void readLac(final BookieSocketAddress addr, final long ledgerId,
final ReadLacCallback cb,
final Object ctx) {
closeLock.readLock().lock();
@@ -434,8 +404,13 @@ public void safeRun() {
}
}
+ public void readEntry(BookieSocketAddress addr, long ledgerId, long
entryId,
+ ReadEntryCallback cb, Object ctx, int flags) {
+ readEntry(addr, ledgerId, entryId, cb, ctx, flags, null);
+ }
+
public void readEntry(final BookieSocketAddress addr, final long ledgerId,
final long entryId,
- final ReadEntryCallback cb, final Object ctx) {
+ final ReadEntryCallback cb, final Object ctx, int
flags, byte[] masterKey) {
closeLock.readLock().lock();
try {
final PerChannelBookieClientPool client = lookupClient(addr);
@@ -452,7 +427,7 @@ public void operationComplete(final int rc,
PerChannelBookieClient pcbc) {
completeRead(rc, ledgerId, entryId, null, cb, ctx);
return;
}
- pcbc.readEntry(ledgerId, entryId, cb, ctx);
+ pcbc.readEntry(ledgerId, entryId, cb, ctx, flags,
masterKey);
}
}, ledgerId);
} finally {
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
index 375cabaa4..4594ef161 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
@@ -184,7 +184,7 @@ public Object decode(ByteBuf packet)
byte[] masterKey = readMasterKey(packet);
return new BookieProtocol.ReadRequest(version, ledgerId,
entryId, flags, masterKey);
} else {
- return new BookieProtocol.ReadRequest(version, ledgerId,
entryId, flags);
+ return new BookieProtocol.ReadRequest(version, ledgerId,
entryId, flags, null);
}
case BookieProtocol.AUTH:
BookkeeperProtocol.AuthMessage.Builder builder =
BookkeeperProtocol.AuthMessage.newBuilder();
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
index 3136c57ec..9ae63163a 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
@@ -182,6 +182,7 @@ public static short getFlags(int packetHeader) {
short FLAG_NONE = 0x0;
short FLAG_DO_FENCING = 0x0001;
short FLAG_RECOVERY_ADD = 0x0002;
+ short FLAG_HIGH_PRIORITY = 0x0004;
/**
* A Bookie request object.
@@ -233,6 +234,10 @@ boolean hasMasterKey() {
return masterKey;
}
+ boolean isHighPriority() {
+ return (flags & FLAG_HIGH_PRIORITY) == FLAG_HIGH_PRIORITY;
+ }
+
@Override
public String toString() {
return String.format("Op(%d)[Ledger:%d,Entry:%d]", opCode,
ledgerId, entryId);
@@ -352,16 +357,12 @@ public void recycle() {
* A Request that reads data.
*/
class ReadRequest extends Request {
- ReadRequest(byte protocolVersion, long ledgerId, long entryId, short
flags) {
- init(protocolVersion, READENTRY, ledgerId, entryId, flags, null);
- }
-
ReadRequest(byte protocolVersion, long ledgerId, long entryId,
short flags, byte[] masterKey) {
init(protocolVersion, READENTRY, ledgerId, entryId, flags,
masterKey);
}
- boolean isFencingRequest() {
+ boolean isFencing() {
return (flags & FLAG_DO_FENCING) == FLAG_DO_FENCING;
}
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
index 24a3925c6..300bbae8f 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
@@ -40,6 +40,7 @@
import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_LAST_ENTRY_NOENTRY_ERROR;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.WRITE_LAC;
import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.WRITE_LAC_REQUEST;
+import static org.apache.bookkeeper.proto.RequestUtils.hasFlag;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.protobuf.ByteString;
@@ -51,6 +52,7 @@
import io.netty.util.concurrent.GenericFutureListener;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
@@ -109,6 +111,12 @@
*/
private final OrderedSafeExecutor longPollThreadPool;
+ /**
+ * The threadpool used to execute high priority requests. It needs to be
separate to
+ * the read thread, as the read thread is bounded.
+ */
+ private final ExecutorService highPriorityThreadPool;
+
/**
* The Timer used to time out requests for long polling.
*/
@@ -154,9 +162,15 @@ public BookieRequestProcessor(ServerConfiguration
serverCfg, Bookie bookie,
statsLogger);
this.longPollThreadPool = createExecutor(
this.serverCfg.getNumLongPollWorkerThreads(),
- "BookieLongPollThread",
- OrderedScheduler.NO_TASK_LIMIT,
- statsLogger);
+ "BookieLongPollThread-" + serverCfg.getBookiePort(),
+ OrderedScheduler.NO_TASK_LIMIT, statsLogger);
+ this.highPriorityThreadPool = Executors.newCachedThreadPool(
+ new ThreadFactoryBuilder()
+ .setNameFormat("BookieHighPriorityThread-" +
serverCfg.getBookiePort() + "-")
+ .setUncaughtExceptionHandler((t, ex) -> {
+ LOG.error("Exception in fencing thread {}", t, ex);
+ })
+ .build());
this.requestTimer = new HashedWheelTimer(
new
ThreadFactoryBuilder().setNameFormat("BookieRequestTimer-%d").build(),
this.serverCfg.getRequestTimerTickDurationMs(),
@@ -195,6 +209,7 @@ public void close() {
shutdownExecutor(writeThreadPool);
shutdownExecutor(readThreadPool);
shutdownExecutor(longPollThreadPool);
+ highPriorityThreadPool.shutdown();
}
private OrderedSafeExecutor createExecutor(
@@ -276,10 +291,12 @@ public void processRequest(Object msg, Channel c) {
// process packet
switch (r.getOpCode()) {
case BookieProtocol.ADDENTRY:
- processAddRequest(r, c);
+ assert(r instanceof BookieProtocol.ParsedAddRequest);
+ processAddRequest((BookieProtocol.ParsedAddRequest) r, c);
break;
case BookieProtocol.READENTRY:
- processReadRequest(r, c);
+ assert(r instanceof BookieProtocol.ReadRequest);
+ processReadRequest((BookieProtocol.ReadRequest) r, c);
break;
default:
LOG.error("Unknown op type {}, sending error",
r.getOpCode());
@@ -312,8 +329,14 @@ private void processReadLacRequestV3(final
BookkeeperProtocol.Request r, final C
private void processAddRequestV3(final BookkeeperProtocol.Request r, final
Channel c) {
WriteEntryProcessorV3 write = new WriteEntryProcessorV3(r, c, this);
+
+ // If it's a high priority add (usually as part of recovery process),
we want to make sure it gets
+ // executed as fast as possible, so bypass the normal writeThreadPool
and execute in highPriorityThreadPool
+ boolean isHighPriority = hasFlag(r.getAddRequest(),
BookkeeperProtocol.AddRequest.Flag.HIGH_PRIORITY);
if (null == writeThreadPool) {
write.run();
+ } else if (isHighPriority) {
+ highPriorityThreadPool.submit(write);
} else {
try {
writeThreadPool.submitOrdered(r.getAddRequest().getLedgerId(),
write);
@@ -337,8 +360,6 @@ private void processAddRequestV3(final
BookkeeperProtocol.Request r, final Chann
}
private void processReadRequestV3(final BookkeeperProtocol.Request r,
final Channel c) {
- ExecutorService fenceThreadPool =
- null == readThreadPool ? null : readThreadPool.chooseThread(c);
ExecutorService lpThreadPool =
null == longPollThreadPool ? null :
longPollThreadPool.chooseThread(c);
ReadEntryProcessorV3 read;
@@ -347,7 +368,7 @@ private void processReadRequestV3(final
BookkeeperProtocol.Request r, final Chan
r,
c,
this,
- fenceThreadPool,
+ highPriorityThreadPool,
lpThreadPool,
requestTimer);
if (null == longPollThreadPool) {
@@ -356,9 +377,18 @@ private void processReadRequestV3(final
BookkeeperProtocol.Request r, final Chan
longPollThreadPool.submitOrdered(r.getReadRequest().getLedgerId(), read);
}
} else {
- read = new ReadEntryProcessorV3(r, c, this, fenceThreadPool);
+ read = new ReadEntryProcessorV3(r, c, this,
highPriorityThreadPool);
+
+ // If it's a high priority read (fencing or as part of recovery
process), we want to make sure it
+ // gets executed as fast as possible, so bypass the normal
readThreadPool
+ // and execute in highPriorityThreadPool
+ boolean isHighPriority = hasFlag(r.getReadRequest(),
BookkeeperProtocol.ReadRequest.Flag.HIGH_PRIORITY)
+ || hasFlag(r.getReadRequest(),
BookkeeperProtocol.ReadRequest.Flag.FENCE_LEDGER);
+
if (null == readThreadPool) {
read.run();
+ } else if (isHighPriority) {
+ highPriorityThreadPool.submit(read);
} else {
try {
readThreadPool.submitOrdered(r.getReadRequest().getLedgerId(), read);
@@ -435,10 +465,15 @@ private void processGetBookieInfoRequestV3(final
BookkeeperProtocol.Request r, f
}
}
- private void processAddRequest(final BookieProtocol.Request r, final
Channel c) {
+ private void processAddRequest(final BookieProtocol.ParsedAddRequest r,
final Channel c) {
WriteEntryProcessor write = WriteEntryProcessor.create(r, c, this);
+
+ // If it's a high priority add (usually as part of recovery process),
we want to make sure it gets
+ // executed as fast as possible, so bypass the normal writeThreadPool
and execute in highPriorityThreadPool
if (null == writeThreadPool) {
write.run();
+ } else if (r.isHighPriority()) {
+ highPriorityThreadPool.submit(write);
} else {
try {
writeThreadPool.submitOrdered(r.getLedgerId(), write);
@@ -454,10 +489,16 @@ private void processAddRequest(final
BookieProtocol.Request r, final Channel c)
}
}
- private void processReadRequest(final BookieProtocol.Request r, final
Channel c) {
+ private void processReadRequest(final BookieProtocol.ReadRequest r, final
Channel c) {
ReadEntryProcessor read = ReadEntryProcessor.create(r, c, this);
+
+ // If it's a high priority read (fencing or as part of recovery
process), we want to make sure it
+ // gets executed as fast as possible, so bypass the normal
readThreadPool
+ // and execute in highPriorityThreadPool
if (null == readThreadPool) {
read.run();
+ } else if (r.isHighPriority() || r.isFencing()) {
+ highPriorityThreadPool.submit(read);
} else {
try {
readThreadPool.submitOrdered(r.getLedgerId(), read);
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
index c0214d338..a12020263 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
@@ -46,7 +46,6 @@
import org.apache.bookkeeper.tls.SecurityException;
import org.apache.bookkeeper.tls.SecurityHandlerFactory;
import org.apache.bookkeeper.tls.SecurityProviderFactoryFactory;
-import org.apache.bookkeeper.util.JsonUtil.ParseJsonException;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -82,14 +81,6 @@ public BookieServer(ServerConfiguration conf, StatsLogger
statsLogger)
BookieException, UnavailableException, CompatibilityException,
SecurityException {
this.conf = conf;
validateUser(conf);
- String configAsString;
- try {
- configAsString = conf.asJson();
- LOG.info(configAsString);
- } catch (ParseJsonException pe) {
- LOG.error("Got ParseJsonException while converting Config to
JSONString", pe);
- }
-
this.statsLogger = statsLogger;
this.nettyServer = new BookieNettyServer(this.conf, null);
try {
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java
index eda5b92a0..b7dee2d4a 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java
@@ -31,14 +31,14 @@
/**
* A base class for bookeeper packet processors.
*/
-abstract class PacketProcessorBase extends SafeRunnable {
+abstract class PacketProcessorBase<T extends Request> extends SafeRunnable {
private static final Logger logger =
LoggerFactory.getLogger(PacketProcessorBase.class);
- Request request;
+ T request;
Channel channel;
BookieRequestProcessor requestProcessor;
long enqueueNanos;
- protected void init(Request request, Channel channel,
BookieRequestProcessor requestProcessor) {
+ protected void init(T request, Channel channel, BookieRequestProcessor
requestProcessor) {
this.request = request;
this.channel = channel;
this.requestProcessor = requestProcessor;
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index 1173d77df..df8e08fb4 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -580,8 +580,12 @@ void addEntry(final long ledgerId, byte[] masterKey, final
long entryId, ByteBuf
.setBody(ByteString.copyFrom(toSendArray));
if (((short) options & BookieProtocol.FLAG_RECOVERY_ADD) ==
BookieProtocol.FLAG_RECOVERY_ADD) {
- addBuilder.setFlag(AddRequest.Flag.RECOVERY_ADD);
+ addBuilder.addFlag(AddRequest.Flag.RECOVERY_ADD);
}
+ if (((short) options & BookieProtocol.FLAG_HIGH_PRIORITY) ==
BookieProtocol.FLAG_HIGH_PRIORITY) {
+ addBuilder.addFlag(AddRequest.Flag.HIGH_PRIORITY);
+ }
+
request = Request.newBuilder()
.setHeader(headerBuilder)
@@ -604,55 +608,12 @@ void addEntry(final long ledgerId, byte[] masterKey,
final long entryId, ByteBuf
}
}
- public void readEntryAndFenceLedger(final long ledgerId, byte[] masterKey,
- final long entryId,
- ReadEntryCallback cb, Object ctx) {
- Object request = null;
- CompletionKey completionKey = null;
- if (useV2WireProtocol) {
- completionKey = acquireV2Key(ledgerId, entryId,
OperationType.READ_ENTRY);
- request = new
BookieProtocol.ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId,
entryId,
- BookieProtocol.FLAG_DO_FENCING, masterKey);
- } else {
- final long txnId = getTxnId();
- completionKey = new V3CompletionKey(txnId,
OperationType.READ_ENTRY);
-
- // Build the request and calculate the total size to be included
in the packet.
- BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
- .setVersion(ProtocolVersion.VERSION_THREE)
- .setOperation(OperationType.READ_ENTRY)
- .setTxnId(txnId);
-
- ReadRequest.Builder readBuilder = ReadRequest.newBuilder()
- .setLedgerId(ledgerId)
- .setEntryId(entryId)
- .setMasterKey(ByteString.copyFrom(masterKey))
- .setFlag(ReadRequest.Flag.FENCE_LEDGER);
-
- request = Request.newBuilder()
- .setHeader(headerBuilder)
- .setReadRequest(readBuilder)
- .build();
- }
-
- ReadCompletion readCompletion = new ReadCompletion(completionKey, cb,
ctx, ledgerId, entryId);
- CompletionValue existingValue =
completionObjects.putIfAbsent(completionKey, readCompletion);
- if (existingValue != null) {
- // There's a pending read request on same ledger/entry. Use the
multimap to track all of them
- synchronized (completionObjectsV2Conflicts) {
- completionObjectsV2Conflicts.put(completionKey,
readCompletion);
- }
- }
-
- writeAndFlush(channel, completionKey, request);
- }
-
public void readLac(final long ledgerId, ReadLacCallback cb, Object ctx) {
Object request = null;
CompletionKey completionKey = null;
if (useV2WireProtocol) {
request = new
BookieProtocol.ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION,
- ledgerId, 0, (short) 0);
+ ledgerId, 0, (short) 0,
null);
completionKey = acquireV2Key(ledgerId, 0, OperationType.READ_LAC);
} else {
final long txnId = getTxnId();
@@ -686,7 +647,8 @@ public void readEntryWaitForLACUpdate(final long ledgerId,
final boolean piggyBackEntry,
ReadEntryCallback cb,
Object ctx) {
- readEntryInternal(ledgerId, entryId, previousLAC, timeOutInMillis,
piggyBackEntry, cb, ctx);
+ readEntryInternal(ledgerId, entryId, previousLAC, timeOutInMillis,
+ piggyBackEntry, cb, ctx, (short) 0, null);
}
/**
@@ -695,8 +657,11 @@ public void readEntryWaitForLACUpdate(final long ledgerId,
public void readEntry(final long ledgerId,
final long entryId,
ReadEntryCallback cb,
- Object ctx) {
- readEntryInternal(ledgerId, entryId, null, null, false, cb, ctx);
+ Object ctx,
+ int flags,
+ byte[] masterKey) {
+ readEntryInternal(ledgerId, entryId, null, null, false,
+ cb, ctx, (short) flags, masterKey);
}
private void readEntryInternal(final long ledgerId,
@@ -705,12 +670,14 @@ private void readEntryInternal(final long ledgerId,
final Long timeOutInMillis,
final boolean piggyBackEntry,
final ReadEntryCallback cb,
- final Object ctx) {
+ final Object ctx,
+ int flags,
+ byte[] masterKey) {
Object request = null;
CompletionKey completionKey = null;
if (useV2WireProtocol) {
request = new
BookieProtocol.ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION,
- ledgerId, entryId, (short) 0);
+ ledgerId, entryId, (short) flags, masterKey);
completionKey = acquireV2Key(ledgerId, entryId,
OperationType.READ_ENTRY);
} else {
final long txnId = getTxnId();
@@ -747,7 +714,20 @@ private void readEntryInternal(final long ledgerId,
ledgerId, entryId, null, ctx);
return;
}
- readBuilder =
readBuilder.setFlag(ReadRequest.Flag.ENTRY_PIGGYBACK);
+ readBuilder =
readBuilder.addFlag(ReadRequest.Flag.ENTRY_PIGGYBACK);
+ }
+
+ // Only one flag can be set on the read requests
+ if (((short) flags & BookieProtocol.FLAG_DO_FENCING) ==
BookieProtocol.FLAG_DO_FENCING) {
+ readBuilder.addFlag(ReadRequest.Flag.FENCE_LEDGER);
+ if (masterKey == null) {
+
cb.readEntryComplete(BKException.Code.IncorrectParameterException,
+ ledgerId, entryId, null, ctx);
+ return;
+ }
+ readBuilder.setMasterKey(ByteString.copyFrom(masterKey));
+ } else if (((short) flags & BookieProtocol.FLAG_HIGH_PRIORITY) ==
BookieProtocol.FLAG_HIGH_PRIORITY) {
+ readBuilder.addFlag(ReadRequest.Flag.HIGH_PRIORITY);
}
request = Request.newBuilder()
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
index ee1e930e6..2d4c54040 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
@@ -31,16 +31,16 @@
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.bookie.BookieException;
-import org.apache.bookkeeper.proto.BookieProtocol.Request;
+import org.apache.bookkeeper.proto.BookieProtocol.ReadRequest;
import org.apache.bookkeeper.util.MathUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-class ReadEntryProcessor extends PacketProcessorBase {
+class ReadEntryProcessor extends PacketProcessorBase<ReadRequest> {
private static final Logger LOG =
LoggerFactory.getLogger(ReadEntryProcessor.class);
- public static ReadEntryProcessor create(Request request, Channel channel,
- BookieRequestProcessor requestProcessor) {
+ public static ReadEntryProcessor create(ReadRequest request, Channel
channel,
+ BookieRequestProcessor
requestProcessor) {
ReadEntryProcessor rep = RECYCLER.get();
rep.init(request, channel, requestProcessor);
return rep;
@@ -48,9 +48,6 @@ public static ReadEntryProcessor create(Request request,
Channel channel,
@Override
protected void processPacket() {
- assert (request instanceof BookieProtocol.ReadRequest);
- BookieProtocol.ReadRequest read = (BookieProtocol.ReadRequest) request;
-
if (LOG.isDebugEnabled()) {
LOG.debug("Received new read request: {}", request);
}
@@ -59,13 +56,13 @@ protected void processPacket() {
ByteBuf data = null;
try {
Future<Boolean> fenceResult = null;
- if (read.isFencingRequest()) {
+ if (request.isFencing()) {
LOG.warn("Ledger: {} fenced by: {}", request.getLedgerId(),
channel.remoteAddress());
- if (read.hasMasterKey()) {
- fenceResult =
requestProcessor.bookie.fenceLedger(read.getLedgerId(), read.getMasterKey());
+ if (request.hasMasterKey()) {
+ fenceResult =
requestProcessor.bookie.fenceLedger(request.getLedgerId(),
request.getMasterKey());
} else {
- LOG.error("Password not provided, Not safe to fence {}",
read.getLedgerId());
+ LOG.error("Password not provided, Not safe to fence {}",
request.getLedgerId());
throw
BookieException.create(BookieException.Code.UnauthorizedAccessException);
}
}
@@ -94,17 +91,17 @@ protected void processPacket() {
}
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
- LOG.error("Interrupting fence read entry {}", read, ie);
+ LOG.error("Interrupting fence read entry {}", request, ie);
errorCode = BookieProtocol.EIO;
data.release();
data = null;
} catch (ExecutionException ee) {
- LOG.error("Failed to fence read entry {}", read, ee);
+ LOG.error("Failed to fence read entry {}", request, ee);
errorCode = BookieProtocol.EIO;
data.release();
data = null;
} catch (TimeoutException te) {
- LOG.error("Timeout to fence read entry {}", read, te);
+ LOG.error("Timeout to fence read entry {}", request, te);
errorCode = BookieProtocol.EIO;
data.release();
data = null;
@@ -114,33 +111,35 @@ protected void processPacket() {
}
} catch (Bookie.NoLedgerException e) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Error reading {}", read, e);
+ LOG.debug("Error reading {}", request, e);
}
errorCode = BookieProtocol.ENOLEDGER;
} catch (Bookie.NoEntryException e) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Error reading {}", read, e);
+ LOG.debug("Error reading {}", request, e);
}
errorCode = BookieProtocol.ENOENTRY;
} catch (IOException e) {
- LOG.error("Error reading {}", read, e);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Error reading {}", request, e);
+ }
errorCode = BookieProtocol.EIO;
} catch (BookieException e) {
- LOG.error("Unauthorized access to ledger {}", read.getLedgerId(),
e);
+ LOG.error("Unauthorized access to ledger {}",
request.getLedgerId(), e);
errorCode = BookieProtocol.EUA;
} catch (Throwable t) {
- LOG.error("Unexpected exception reading at {}:{} : {}",
read.getLedgerId(), read.getEntryId(),
- t.getMessage(), t);
+ LOG.error("Unexpected exception reading at {}:{} : {}",
request.getLedgerId(), request.getEntryId(),
+ t.getMessage(), t);
errorCode = BookieProtocol.EBADREQ;
}
if (LOG.isTraceEnabled()) {
- LOG.trace("Read entry rc = {} for {}", errorCode, read);
+ LOG.trace("Read entry rc = {} for {}", errorCode, request);
}
if (errorCode == BookieProtocol.EOK) {
requestProcessor.readEntryStats.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos),
TimeUnit.NANOSECONDS);
- sendResponse(errorCode, ResponseBuilder.buildReadResponse(data,
read),
+ sendResponse(errorCode, ResponseBuilder.buildReadResponse(data,
request),
requestProcessor.readRequestStats);
} else {
@@ -148,7 +147,7 @@ protected void processPacket() {
requestProcessor.readEntryStats.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos),
TimeUnit.NANOSECONDS);
- sendResponse(errorCode,
ResponseBuilder.buildErrorResponse(errorCode, read),
+ sendResponse(errorCode,
ResponseBuilder.buildErrorResponse(errorCode, request),
requestProcessor.readRequestStats);
}
recycle();
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/RequestUtils.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/RequestUtils.java
index c735bef26..0e303cb3f 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/RequestUtils.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/RequestUtils.java
@@ -25,7 +25,7 @@
class RequestUtils {
public static boolean isFenceRequest(ReadRequest readRequest) {
- return readRequest.hasFlag() &&
readRequest.getFlag().equals(ReadRequest.Flag.FENCE_LEDGER);
+ return hasFlag(readRequest, ReadRequest.Flag.FENCE_LEDGER);
}
public static boolean isLongPollReadRequest(ReadRequest readRequest) {
@@ -33,6 +33,25 @@ public static boolean isLongPollReadRequest(ReadRequest
readRequest) {
}
public static boolean shouldPiggybackEntry(ReadRequest readRequest) {
- return readRequest.hasFlag() &&
readRequest.getFlag().equals(ReadRequest.Flag.ENTRY_PIGGYBACK);
+ return hasFlag(readRequest, ReadRequest.Flag.ENTRY_PIGGYBACK);
}
+
+ static boolean hasFlag(BookkeeperProtocol.ReadRequest request,
BookkeeperProtocol.ReadRequest.Flag flag) {
+ for (BookkeeperProtocol.ReadRequest.Flag f : request.getFlagList()) {
+ if (f == flag) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ static boolean hasFlag(BookkeeperProtocol.AddRequest request,
BookkeeperProtocol.AddRequest.Flag flag) {
+ for (BookkeeperProtocol.AddRequest.Flag f : request.getFlagList()) {
+ if (f == flag) {
+ return true;
+ }
+ }
+ return false;
+ }
+
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
index 164a11b42..993700092 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
@@ -27,7 +27,7 @@
import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.net.BookieSocketAddress;
-import org.apache.bookkeeper.proto.BookieProtocol.Request;
+import org.apache.bookkeeper.proto.BookieProtocol.ParsedAddRequest;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
import org.apache.bookkeeper.util.MathUtils;
import org.slf4j.Logger;
@@ -36,7 +36,7 @@
/**
* Processes add entry requests.
*/
-class WriteEntryProcessor extends PacketProcessorBase implements WriteCallback
{
+class WriteEntryProcessor extends PacketProcessorBase<ParsedAddRequest>
implements WriteCallback {
private static final Logger LOG =
LoggerFactory.getLogger(WriteEntryProcessor.class);
@@ -47,8 +47,8 @@ protected void reset() {
startTimeNanos = -1L;
}
- public static WriteEntryProcessor create(Request request, Channel channel,
- BookieRequestProcessor requestProcessor) {
+ public static WriteEntryProcessor create(ParsedAddRequest request, Channel
channel,
+ BookieRequestProcessor
requestProcessor) {
WriteEntryProcessor wep = RECYCLER.get();
wep.init(request, channel, requestProcessor);
return wep;
@@ -56,40 +56,38 @@ public static WriteEntryProcessor create(Request request,
Channel channel,
@Override
protected void processPacket() {
- assert (request instanceof BookieProtocol.ParsedAddRequest);
- BookieProtocol.ParsedAddRequest add =
(BookieProtocol.ParsedAddRequest) request;
-
if (requestProcessor.bookie.isReadOnly()) {
LOG.warn("BookieServer is running in readonly mode,"
+ " so rejecting the request from the client!");
sendResponse(BookieProtocol.EREADONLY,
-
ResponseBuilder.buildErrorResponse(BookieProtocol.EREADONLY, add),
+
ResponseBuilder.buildErrorResponse(BookieProtocol.EREADONLY, request),
requestProcessor.addRequestStats);
- add.release();
- add.recycle();
+ request.release();
+ request.recycle();
return;
}
startTimeNanos = MathUtils.nowInNano();
int rc = BookieProtocol.EOK;
- ByteBuf addData = add.getData();
+ ByteBuf addData = request.getData();
try {
- if (add.isRecoveryAdd()) {
- requestProcessor.bookie.recoveryAddEntry(addData, this,
channel, add.getMasterKey());
+ if (request.isRecoveryAdd()) {
+ requestProcessor.bookie.recoveryAddEntry(addData, this,
channel, request.getMasterKey());
} else {
- requestProcessor.bookie.addEntry(addData, false, this,
channel, add.getMasterKey());
+ requestProcessor.bookie.addEntry(addData, false, this,
channel, request.getMasterKey());
}
} catch (IOException e) {
- LOG.error("Error writing " + add, e);
+ LOG.error("Error writing {}", request, e);
rc = BookieProtocol.EIO;
} catch (BookieException.LedgerFencedException lfe) {
LOG.error("Attempt to write to fenced ledger", lfe);
rc = BookieProtocol.EFENCED;
} catch (BookieException e) {
- LOG.error("Unauthorized access to ledger " + add.getLedgerId(), e);
+ LOG.error("Unauthorized access to ledger {}",
request.getLedgerId(), e);
rc = BookieProtocol.EUA;
} catch (Throwable t) {
- LOG.error("Unexpected exception while writing {}@{} : {}",
add.ledgerId, add.entryId, t.getMessage(), t);
+ LOG.error("Unexpected exception while writing {}@{} : {}",
+ request.ledgerId, request.entryId, t.getMessage(), t);
// some bad request which cause unexpected exception
rc = BookieProtocol.EBADREQ;
} finally {
@@ -100,9 +98,9 @@ protected void processPacket() {
requestProcessor.addEntryStats.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos),
TimeUnit.NANOSECONDS);
sendResponse(rc,
- ResponseBuilder.buildErrorResponse(rc, add),
+ ResponseBuilder.buildErrorResponse(rc, request),
requestProcessor.addRequestStats);
- add.recycle();
+ request.recycle();
}
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
index f227d8828..7bd78d3f4 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
@@ -114,7 +114,7 @@ public void writeComplete(int rc, long ledgerId, long
entryId,
byte[] masterKey = addRequest.getMasterKey().toByteArray();
ByteBuf entryToAdd =
Unpooled.wrappedBuffer(addRequest.getBody().asReadOnlyByteBuffer());
try {
- if (addRequest.hasFlag() &&
addRequest.getFlag().equals(AddRequest.Flag.RECOVERY_ADD)) {
+ if (RequestUtils.hasFlag(addRequest,
AddRequest.Flag.RECOVERY_ADD)) {
requestProcessor.bookie.recoveryAddEntry(entryToAdd, wcb,
channel, masterKey);
} else {
requestProcessor.bookie.addEntry(entryToAdd, ackBeforeSync,
wcb, channel, masterKey);
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/http/service/ConfigurationService.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/http/service/ConfigurationService.java
index 2ac9e8e6a..983da4aca 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/http/service/ConfigurationService.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/http/service/ConfigurationService.java
@@ -21,6 +21,7 @@
import static com.google.common.base.Preconditions.checkNotNull;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.Map;
import org.apache.bookkeeper.conf.ServerConfiguration;
@@ -47,7 +48,8 @@ public HttpServiceResponse handle(HttpServiceRequest request)
throws Exception {
HttpServiceResponse response = new HttpServiceResponse();
// GET
if (HttpServer.Method.GET == request.getMethod()) {
- String jsonResponse = conf.asJson();
+ Map<String, Object> configMap = toMap(conf);
+ String jsonResponse = JsonUtil.toJson(configMap);
response.setBody(jsonResponse);
return response;
} else if (HttpServer.Method.PUT == request.getMethod()) {
@@ -73,4 +75,17 @@ public HttpServiceResponse handle(HttpServiceRequest
request) throws Exception {
}
}
+
+ private Map<String, Object> toMap(ServerConfiguration conf) {
+ Map<String, Object> configMap = new HashMap<>();
+ Iterator iterator = conf.getKeys();
+ while (iterator.hasNext()) {
+ String key = iterator.next().toString();
+ Object property = conf.getProperty(key);
+ if (property != null) {
+ configMap.put(key, property.toString());
+ }
+ }
+ return configMap;
+ }
}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java
index 0536ff804..0828341c8 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java
@@ -45,6 +45,7 @@
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookieProtocol;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
@@ -526,7 +527,8 @@ private boolean verifyFullyReplicated(LedgerHandle lh, long
untilEntry) throws E
ReplicationVerificationCallback cb = new
ReplicationVerificationCallback(numRequests);
for (long i = startEntryId; i < endEntryId; i++) {
for (BookieSocketAddress addr : e.getValue()) {
- bkc.getBookieClient().readEntry(addr, lh.getId(), i, cb,
addr);
+ bkc.getBookieClient().readEntry(addr, lh.getId(), i,
+ cb, addr,
BookieProtocol.FLAG_NONE);
}
}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
index ffb85f58e..b7cc373d9 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
@@ -390,56 +390,14 @@ private void setupWriteLedgerMetadata() {
@SuppressWarnings("unchecked")
protected void setupBookieClientReadEntry() {
- doAnswer(invokation -> {
- Object[] args = invokation.getArguments();
- BookkeeperInternalCallbacks.ReadEntryCallback callback =
- (BookkeeperInternalCallbacks.ReadEntryCallback) args[4];
- BookieSocketAddress bookieSocketAddress = (BookieSocketAddress)
args[0];
- long ledgerId = (Long) args[1];
- long entryId = (Long) args[3];
-
- executor.submitOrdered(ledgerId, () -> {
- DigestManager macManager = null;
- try {
- macManager = getDigestType(ledgerId);
- } catch (GeneralSecurityException gse){
- LOG.error("Initialize macManager fail", gse);
- }
- fencedLedgers.add(ledgerId);
- MockEntry mockEntry = null;
- try {
- mockEntry = getMockLedgerEntry(ledgerId,
bookieSocketAddress, entryId);
- } catch (BKException bke) {
- LOG.info("readEntryAndFenceLedger - occur BKException
{}@{} at {}", entryId, ledgerId,
- bookieSocketAddress);
- callback.readEntryComplete(bke.getCode(), ledgerId,
entryId, null, args[5]);
- }
- if (mockEntry != null) {
- LOG.info("readEntryAndFenceLedger - found mock entry {}@{}
at {}", entryId, ledgerId,
- bookieSocketAddress);
- ByteBufList entry =
macManager.computeDigestAndPackageForSending(entryId,
- mockEntry.lastAddConfirmed,
mockEntry.payload.length,
- Unpooled.wrappedBuffer(mockEntry.payload));
- callback.readEntryComplete(BKException.Code.OK, ledgerId,
entryId, ByteBufList.coalesce(entry),
- args[5]);
- entry.release();
- } else {
- LOG.info("readEntryAndFenceLedger - no such mock entry
{}@{} at {}", entryId, ledgerId,
- bookieSocketAddress);
-
callback.readEntryComplete(BKException.Code.NoSuchEntryException, ledgerId,
entryId, null, args[5]);
- }
- });
- return null;
- }).when(bookieClient).readEntryAndFenceLedger(any(), anyLong(), any(),
anyLong(),
- any(BookkeeperInternalCallbacks.ReadEntryCallback.class), any());
-
- doAnswer(invokation -> {
+ Answer<Void> answer = invokation -> {
Object[] args = invokation.getArguments();
BookieSocketAddress bookieSocketAddress = (BookieSocketAddress)
args[0];
long ledgerId = (Long) args[1];
long entryId = (Long) args[2];
BookkeeperInternalCallbacks.ReadEntryCallback callback =
(BookkeeperInternalCallbacks.ReadEntryCallback) args[3];
+ boolean fenced = (((Integer) args[5]) &
BookieProtocol.FLAG_DO_FENCING) == BookieProtocol.FLAG_DO_FENCING;
executor.submitOrdered(ledgerId, () -> {
DigestManager macManager = null;
@@ -456,6 +414,11 @@ protected void setupBookieClientReadEntry() {
bookieSocketAddress);
callback.readEntryComplete(bke.getCode(), ledgerId,
entryId, null, args[5]);
}
+
+ if (fenced) {
+ fencedLedgers.add(ledgerId);
+ }
+
if (mockEntry != null) {
LOG.info("readEntry - found mock entry {}@{} at {}",
entryId, ledgerId, bookieSocketAddress);
ByteBufList entry =
macManager.computeDigestAndPackageForSending(entryId,
@@ -470,8 +433,13 @@ protected void setupBookieClientReadEntry() {
}
});
return null;
- }).when(bookieClient).readEntry(any(), anyLong(), anyLong(),
- any(BookkeeperInternalCallbacks.ReadEntryCallback.class), any());
+ };
+ doAnswer(answer).when(bookieClient).readEntry(any(), anyLong(),
anyLong(),
+ any(BookkeeperInternalCallbacks.ReadEntryCallback.class),
+ any(), anyInt());
+ doAnswer(answer).when(bookieClient).readEntry(any(), anyLong(),
anyLong(),
+ any(BookkeeperInternalCallbacks.ReadEntryCallback.class),
+ any(), anyInt(), any());
}
private byte[] extractEntryPayload(long ledgerId, long entryId,
ByteBufList toSend)
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestDelayEnsembleChange.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestDelayEnsembleChange.java
index 954df02c1..cf693ed0f 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestDelayEnsembleChange.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestDelayEnsembleChange.java
@@ -106,7 +106,8 @@ private void verifyEntries(LedgerHandle lh, long
startEntry, long untilEntry,
ArrayList<BookieSocketAddress> addresses = md.getEnsemble(eid);
VerificationCallback callback = new
VerificationCallback(addresses.size());
for (BookieSocketAddress addr : addresses) {
- bkc.getBookieClient().readEntry(addr, lh.getId(), eid,
callback, addr);
+ bkc.getBookieClient().readEntry(addr, lh.getId(), eid,
+ callback, addr, 0, null);
}
callback.latch.await();
assertEquals(expectedSuccess, callback.numSuccess.get());
@@ -123,7 +124,8 @@ private void verifyEntriesRange(LedgerHandle lh, long
startEntry, long untilEntr
ArrayList<BookieSocketAddress> addresses = md.getEnsemble(eid);
VerificationCallback callback = new
VerificationCallback(addresses.size());
for (BookieSocketAddress addr : addresses) {
- bkc.getBookieClient().readEntry(addr, lh.getId(), eid,
callback, addr);
+ bkc.getBookieClient().readEntry(addr, lh.getId(), eid,
+ callback, addr, 0, null);
}
callback.latch.await();
assertTrue(expectedSuccess >= callback.numSuccess.get());
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestMaxSizeWorkersQueue.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestMaxSizeWorkersQueue.java
index 612e75477..cc70e024a 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestMaxSizeWorkersQueue.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestMaxSizeWorkersQueue.java
@@ -19,9 +19,18 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import com.google.common.collect.Lists;
import java.util.Enumeration;
+import java.util.List;
+import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -48,7 +57,7 @@ public TestMaxSizeWorkersQueue() {
baseConf.setMaxPendingAddRequestPerThread(1);
}
- @Test(timeout = 60000)
+ @Test
public void testReadRejected() throws Exception {
LedgerHandle lh = bkc.createLedger(1, 1, digestType, new byte[0]);
byte[] content = new byte[100];
@@ -91,7 +100,7 @@ public void readComplete(int rc, LedgerHandle lh,
Enumeration<LedgerEntry> seq,
assertEquals(BKException.Code.TooManyRequestsException,
rcSecondReadOperation.get());
}
- @Test(timeout = 60000)
+ @Test
public void testAddRejected() throws Exception {
LedgerHandle lh = bkc.createLedger(1, 1, digestType, new byte[0]);
byte[] content = new byte[100];
@@ -122,4 +131,60 @@ public void addComplete(int rc, LedgerHandle lh, long
entryId, Object ctx) {
assertTrue(receivedTooManyRequestsException.get());
}
+
+ @Test
+ public void testRecoveryNotRejected() throws Exception {
+ LedgerHandle lh = bkc.createLedger(1, 1, digestType, new byte[0]);
+ byte[] content = new byte[100];
+
+ final int numEntriesToRead = 1000;
+ // Write few entries
+ for (int i = 0; i < numEntriesToRead; i++) {
+ lh.addEntry(content);
+ }
+
+ final int numLedgersToRecover = 10;
+ List<Long> ledgersToRecover = Lists.newArrayList();
+ for (int i = 0; i < numLedgersToRecover; i++) {
+ LedgerHandle lhr = bkc.createLedger(1, 1, digestType, new byte[0]);
+ lhr.addEntry(content);
+ // Leave the ledger in open state
+ ledgersToRecover.add(lhr.getId());
+ }
+
+ ExecutorService executor = Executors.newCachedThreadPool();
+ final CyclicBarrier barrier = new CyclicBarrier(1 +
numLedgersToRecover);
+
+ List<Future<?>> futures = Lists.newArrayList();
+ futures.add(executor.submit(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ barrier.await();
+ try {
+ lh.readEntries(0, numEntriesToRead - 1);
+ fail("Should have thrown exception");
+ } catch (Exception e) {
+ // Expected
+ }
+ return null;
+ }
+ }));
+
+ for (long ledgerId : ledgersToRecover) {
+ futures.add(executor.submit(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ barrier.await();
+
+ // Recovery should always succeed
+ bkc.openLedger(ledgerId, digestType, new byte[0]);
+ return null;
+ }
+ }));
+ }
+
+ for (Future<?> future : futures) {
+ future.get();
+ }
+ }
}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBackwardCompatCMS42.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBackwardCompatCMS42.java
index 00dbb4bd5..9e230b6af 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBackwardCompatCMS42.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBackwardCompatCMS42.java
@@ -166,7 +166,7 @@ public void testAuthFail() throws Exception {
}
client.sendRequest(new
ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION,
- 1L, 1L, (short) 0));
+ 1L, 1L, (short) 0, null));
Response response = client.takeResponse();
assertEquals("Should have failed",
response.getErrorCode(), BookieProtocol.EUA);
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBookieRequestProcessor.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBookieRequestProcessor.java
new file mode 100644
index 000000000..3d7edb02a
--- /dev/null
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBookieRequestProcessor.java
@@ -0,0 +1,84 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.proto;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import com.google.protobuf.ByteString;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.AddRequest;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.ReadRequest;
+import org.junit.Test;
+
+/**
+ * Test utility methods from bookie request processor.
+ */
+public class TestBookieRequestProcessor {
+ @Test
+ public void testFlagsV3() {
+ ReadRequest read = ReadRequest.newBuilder()
+ .setLedgerId(10).setEntryId(1)
+ .addFlag(ReadRequest.Flag.FENCE_LEDGER).build();
+ assertTrue(RequestUtils.hasFlag(read, ReadRequest.Flag.FENCE_LEDGER));
+ assertFalse(RequestUtils.hasFlag(read,
ReadRequest.Flag.ENTRY_PIGGYBACK));
+ assertFalse(RequestUtils.hasFlag(read,
ReadRequest.Flag.HIGH_PRIORITY));
+
+ read = ReadRequest.newBuilder()
+ .setLedgerId(10).setEntryId(1)
+ .addFlag(ReadRequest.Flag.ENTRY_PIGGYBACK).build();
+ assertFalse(RequestUtils.hasFlag(read, ReadRequest.Flag.FENCE_LEDGER));
+ assertTrue(RequestUtils.hasFlag(read,
ReadRequest.Flag.ENTRY_PIGGYBACK));
+ assertFalse(RequestUtils.hasFlag(read,
ReadRequest.Flag.HIGH_PRIORITY));
+
+ read = ReadRequest.newBuilder()
+ .setLedgerId(10).setEntryId(1)
+ .build();
+ assertFalse(RequestUtils.hasFlag(read, ReadRequest.Flag.FENCE_LEDGER));
+ assertFalse(RequestUtils.hasFlag(read,
ReadRequest.Flag.ENTRY_PIGGYBACK));
+ assertFalse(RequestUtils.hasFlag(read,
ReadRequest.Flag.HIGH_PRIORITY));
+
+ AddRequest add = AddRequest.newBuilder()
+ .setLedgerId(10).setEntryId(1)
+ .addFlag(AddRequest.Flag.RECOVERY_ADD)
+ .setMasterKey(ByteString.EMPTY)
+ .setBody(ByteString.EMPTY)
+ .build();
+ assertTrue(RequestUtils.hasFlag(add, AddRequest.Flag.RECOVERY_ADD));
+ assertFalse(RequestUtils.hasFlag(add, AddRequest.Flag.HIGH_PRIORITY));
+
+ add = AddRequest.newBuilder()
+ .setLedgerId(10).setEntryId(1)
+ .setMasterKey(ByteString.EMPTY)
+ .setBody(ByteString.EMPTY)
+ .build();
+ assertFalse(RequestUtils.hasFlag(add, AddRequest.Flag.RECOVERY_ADD));
+
+ add = AddRequest.newBuilder()
+ .setLedgerId(10).setEntryId(1)
+ .addFlag(AddRequest.Flag.RECOVERY_ADD)
+ .addFlag(AddRequest.Flag.HIGH_PRIORITY)
+ .setMasterKey(ByteString.EMPTY)
+ .setBody(ByteString.EMPTY)
+ .build();
+ assertTrue(RequestUtils.hasFlag(add, AddRequest.Flag.RECOVERY_ADD));
+ assertTrue(RequestUtils.hasFlag(add, AddRequest.Flag.HIGH_PRIORITY));
+ }
+}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java
index d08dd8ad8..90788b1e7 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java
@@ -277,7 +277,7 @@ public void safeRun() {
return;
}
- client.readEntryAndFenceLedger(1,
"00000111112222233333".getBytes(), 1, cb, null);
+ client.readEntry(1, 1, cb, null,
BookieProtocol.FLAG_DO_FENCING, "00000111112222233333".getBytes());
}
});
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
index 4cf25912d..7ecce0ba9 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
@@ -162,7 +162,7 @@ public void testWriteGaps() throws Exception {
synchronized (arc) {
arc.wait(1000);
assertEquals(0, arc.rc);
- bc.readEntry(addr, 1, 1, recb, arc);
+ bc.readEntry(addr, 1, 1, recb, arc, BookieProtocol.FLAG_NONE);
arc.wait(1000);
assertEquals(0, arc.rc);
assertEquals(1, arc.entry.getInt());
@@ -181,63 +181,63 @@ public void testWriteGaps() throws Exception {
notifyObject.wait();
}
synchronized (arc) {
- bc.readEntry(addr, 1, 6, recb, arc);
+ bc.readEntry(addr, 1, 6, recb, arc, BookieProtocol.FLAG_NONE);
arc.wait(1000);
assertEquals(BKException.Code.NoSuchEntryException, arc.rc);
}
synchronized (arc) {
- bc.readEntry(addr, 1, 7, recb, arc);
+ bc.readEntry(addr, 1, 7, recb, arc, BookieProtocol.FLAG_NONE);
arc.wait(1000);
assertEquals(0, arc.rc);
- assertEquals(7, arc.entry.getInt());
+ assertEquals(7, arc.entry.getInt(), BookieProtocol.FLAG_NONE);
}
synchronized (arc) {
- bc.readEntry(addr, 1, 1, recb, arc);
+ bc.readEntry(addr, 1, 1, recb, arc, BookieProtocol.FLAG_NONE);
arc.wait(1000);
assertEquals(0, arc.rc);
assertEquals(1, arc.entry.getInt());
}
synchronized (arc) {
- bc.readEntry(addr, 1, 2, recb, arc);
+ bc.readEntry(addr, 1, 2, recb, arc, BookieProtocol.FLAG_NONE);
arc.wait(1000);
assertEquals(0, arc.rc);
assertEquals(2, arc.entry.getInt());
}
synchronized (arc) {
- bc.readEntry(addr, 1, 3, recb, arc);
+ bc.readEntry(addr, 1, 3, recb, arc, BookieProtocol.FLAG_NONE);
arc.wait(1000);
assertEquals(0, arc.rc);
assertEquals(3, arc.entry.getInt());
}
synchronized (arc) {
- bc.readEntry(addr, 1, 4, recb, arc);
+ bc.readEntry(addr, 1, 4, recb, arc, BookieProtocol.FLAG_NONE);
arc.wait(1000);
assertEquals(BKException.Code.NoSuchEntryException, arc.rc);
}
synchronized (arc) {
- bc.readEntry(addr, 1, 11, recb, arc);
+ bc.readEntry(addr, 1, 11, recb, arc, BookieProtocol.FLAG_NONE);
arc.wait(1000);
assertEquals(0, arc.rc);
assertEquals(11, arc.entry.getInt());
}
synchronized (arc) {
- bc.readEntry(addr, 1, 5, recb, arc);
+ bc.readEntry(addr, 1, 5, recb, arc, BookieProtocol.FLAG_NONE);
arc.wait(1000);
assertEquals(0, arc.rc);
assertEquals(5, arc.entry.getInt());
}
synchronized (arc) {
- bc.readEntry(addr, 1, 10, recb, arc);
+ bc.readEntry(addr, 1, 10, recb, arc, BookieProtocol.FLAG_NONE);
arc.wait(1000);
assertEquals(BKException.Code.NoSuchEntryException, arc.rc);
}
synchronized (arc) {
- bc.readEntry(addr, 1, 12, recb, arc);
+ bc.readEntry(addr, 1, 12, recb, arc, BookieProtocol.FLAG_NONE);
arc.wait(1000);
assertEquals(BKException.Code.NoSuchEntryException, arc.rc);
}
synchronized (arc) {
- bc.readEntry(addr, 1, 13, recb, arc);
+ bc.readEntry(addr, 1, 13, recb, arc, BookieProtocol.FLAG_NONE);
arc.wait(1000);
assertEquals(BKException.Code.NoSuchEntryException, arc.rc);
}
@@ -259,7 +259,7 @@ public void testNoLedger() throws Exception {
BookieClient bc = new BookieClient(new ClientConfiguration(),
eventLoopGroup, executor,
scheduler,
NullStatsLogger.INSTANCE);
synchronized (arc) {
- bc.readEntry(addr, 2, 13, recb, arc);
+ bc.readEntry(addr, 2, 13, recb, arc, BookieProtocol.FLAG_NONE);
arc.wait(1000);
assertEquals(BKException.Code.NoSuchLedgerExistsException, arc.rc);
}
diff --git
a/stream/distributedlog/core/src/main/java/org/apache/bookkeeper/client/LedgerReader.java
b/stream/distributedlog/core/src/main/java/org/apache/bookkeeper/client/LedgerReader.java
index 60d909600..9d39d3890 100644
---
a/stream/distributedlog/core/src/main/java/org/apache/bookkeeper/client/LedgerReader.java
+++
b/stream/distributedlog/core/src/main/java/org/apache/bookkeeper/client/LedgerReader.java
@@ -34,6 +34,7 @@
import org.apache.bookkeeper.common.concurrent.FutureEventListener;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookieClient;
+import org.apache.bookkeeper.proto.BookieProtocol;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
import org.apache.bookkeeper.proto.checksum.DigestManager;
@@ -122,7 +123,8 @@ public void readEntryComplete(int rc, long lid, long eid,
ByteBuf buffer, Object
ArrayList<BookieSocketAddress> ensemble =
lh.getLedgerMetadata().getEnsemble(eid);
for (int i = 0; i < writeSet.size(); i++) {
int idx = writeSet.get(i);
- bookieClient.readEntry(ensemble.get(idx), lh.getId(), eid,
readEntryCallback, ensemble.get(idx));
+ bookieClient.readEntry(ensemble.get(idx), lh.getId(), eid,
readEntryCallback,
+ ensemble.get(idx),
BookieProtocol.FLAG_NONE);
}
}
@@ -226,7 +228,8 @@ public void readLacs(final LedgerHandle lh, long eid,
ArrayList<BookieSocketAddress> ensemble =
lh.getLedgerMetadata().getEnsemble(eid);
for (int i = 0; i < writeSet.size(); i++) {
int idx = writeSet.get(i);
- bookieClient.readEntry(ensemble.get(idx), lh.getId(), eid,
readEntryCallback, ensemble.get(idx));
+ bookieClient.readEntry(ensemble.get(idx), lh.getId(), eid,
readEntryCallback,
+ ensemble.get(idx),
BookieProtocol.FLAG_NONE);
}
}
}
diff --git
a/tests/backward-compat/recovery-no-password/src/test/groovy/org/apache/bookkeeper/tests/backwardcompat/TestCompatRecoveryNoPassword.groovy
b/tests/backward-compat/recovery-no-password/src/test/groovy/org/apache/bookkeeper/tests/backwardcompat/TestCompatRecoveryNoPassword.groovy
index d24a890b4..7540f7073 100644
---
a/tests/backward-compat/recovery-no-password/src/test/groovy/org/apache/bookkeeper/tests/backwardcompat/TestCompatRecoveryNoPassword.groovy
+++
b/tests/backward-compat/recovery-no-password/src/test/groovy/org/apache/bookkeeper/tests/backwardcompat/TestCompatRecoveryNoPassword.groovy
@@ -34,6 +34,7 @@ import org.apache.bookkeeper.client.LedgerHandle
import org.apache.bookkeeper.client.LedgerMetadata
import org.apache.bookkeeper.conf.ClientConfiguration
import org.apache.bookkeeper.net.BookieSocketAddress
+import org.apache.bookkeeper.proto.BookieProtocol
import
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback
import org.apache.bookkeeper.tests.BookKeeperClusterUtils
import org.apache.bookkeeper.tests.DockerUtils
@@ -125,7 +126,7 @@ class TestCompatRecoveryNoPassword {
for (long i = startEntryId; i < endEntryId; i++) {
for (BookieSocketAddress addr : e.getValue()) {
bookkeeper.getBookieClient()
- .readEntry(addr, lh.getId(), i, cb, addr)
+ .readEntry(addr, lh.getId(), i, cb, addr,
BookieProtocol.FLAG_NONE)
}
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services