This is an automated email from the ASF dual-hosted git repository.
zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new ee0ddde BOOKKEEPER-772: Reorder Read Sequence
ee0ddde is described below
commit ee0dddee6849d1968500af666571df668d34393a
Author: Sijie Guo <[email protected]>
AuthorDate: Tue Jul 4 12:27:21 2017 +0800
BOOKKEEPER-772: Reorder Read Sequence
Descriptions of the changes in this PR:
- for rackware placement policy, the bookie in the same rack will be
preferred.
- for region-aware placement policy, the bookie in the same region will
be preferred.
- for any readonly or unavailable (high score in bookie failure
history) bookies, they will be at the last position in the sequence.
This change is based on #220 . Please review gitsha 1f7dccd for the reorder
change.
Author: Sijie Guo <[email protected]>
Reviewers: Enrico Olivelli <None>, Matteo Merli <None>
This closes #224 from sijie/reorder_reads
---
.../java/org/apache/bookkeeper/bookie/Bookie.java | 7 +
.../org/apache/bookkeeper/bookie/FileInfo.java | 155 ++++---
.../bookkeeper/bookie/IndexPersistenceMgr.java | 30 +-
.../bookie/InterleavedLedgerStorage.java | 25 +-
.../bookie/LastAddConfirmedUpdateNotification.java | 31 ++
.../org/apache/bookkeeper/bookie/LedgerCache.java | 6 +-
.../apache/bookkeeper/bookie/LedgerCacheImpl.java | 8 +
.../apache/bookkeeper/bookie/LedgerDescriptor.java | 3 +
.../bookkeeper/bookie/LedgerDescriptorImpl.java | 9 +-
.../apache/bookkeeper/bookie/LedgerEntryPage.java | 33 +-
.../apache/bookkeeper/bookie/LedgerStorage.java | 14 +-
.../bookkeeper/bookie/ShortReadException.java | 37 ++
.../apache/bookkeeper/client/AsyncCallback.java | 16 +
.../org/apache/bookkeeper/client/BookKeeper.java | 22 +
.../bookkeeper/client/BookKeeperClientStats.java | 2 +
.../org/apache/bookkeeper/client/LedgerHandle.java | 94 +++++
.../apache/bookkeeper/client/PendingReadOp.java | 8 +-
...eadOp.java => ReadLastConfirmedAndEntryOp.java} | 457 +++++++++++----------
.../bookkeeper/conf/ClientConfiguration.java | 163 +++++++-
.../bookkeeper/conf/ServerConfiguration.java | 65 +++
.../org/apache/bookkeeper/proto/BookieClient.java | 46 ++-
.../bookkeeper/proto/BookieRequestProcessor.java | 53 ++-
.../proto/LongPollReadEntryProcessorV3.java | 226 ++++++++++
.../bookkeeper/proto/PerChannelBookieClient.java | 68 ++-
.../apache/bookkeeper/bookie/TestSyncThread.java | 7 +
.../client/TestReadLastConfirmedAndEntry.java | 267 ++++++++++++
.../client/TestReadLastConfirmedLongPoll.java | 169 ++++++++
.../org/apache/bookkeeper/meta/GcLedgersTest.java | 7 +
.../bookkeeper/meta/LedgerManagerTestCase.java | 17 +-
.../bookkeeper/test/BookKeeperClusterTestCase.java | 13 +
30 files changed, 1744 insertions(+), 314 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
index 789cf33..d607b53 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
@@ -45,6 +45,8 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
+import java.util.Observable;
+import java.util.Observer;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
@@ -1491,6 +1493,11 @@ public class Bookie extends BookieCriticalThread {
LedgerDescriptor handle = handles.getReadOnlyHandle(ledgerId);
return handle.getLastAddConfirmed();
}
+
+ public Observable waitForLastAddConfirmedUpdate(long ledgerId, long
previoisLAC, Observer observer) throws IOException {
+ LedgerDescriptor handle = handles.getReadOnlyHandle(ledgerId);
+ return handle.waitForLastAddConfirmedUpdate(previoisLAC, observer);
+ }
// The rest of the code is test stuff
static class CounterCallback implements WriteCallback {
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
index 5aeb385..597fcd3 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
@@ -23,22 +23,21 @@ package org.apache.bookkeeper.bookie;
import static com.google.common.base.Charsets.UTF_8;
+import com.google.common.annotations.VisibleForTesting;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
+import java.util.Observable;
+import java.util.Observer;
import java.util.concurrent.atomic.AtomicInteger;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.annotations.VisibleForTesting;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-
/**
* This is the file handle for a ledger's index file that maps entry ids to
location.
* It is used by LedgerCache.
@@ -57,7 +56,7 @@ import io.netty.buffer.Unpooled;
* <b>Index page</b> is a fixed-length page, which contains serveral entries
which point to the offsets of data stored in entry loggers.
* </p>
*/
-class FileInfo {
+class FileInfo extends Observable {
private final static Logger LOG = LoggerFactory.getLogger(FileInfo.class);
static final int NO_MASTER_KEY = -1;
@@ -102,14 +101,34 @@ class FileInfo {
return lac;
}
- synchronized long setLastAddConfirmed(long lac) {
- if (null == this.lac || this.lac < lac) {
- this.lac = lac;
+ long setLastAddConfirmed(long lac) {
+ long lacToReturn;
+ synchronized (this) {
+ if (null == this.lac || this.lac < lac) {
+ this.lac = lac;
+ setChanged();
+ }
+ lacToReturn = this.lac;
+ }
+ LOG.trace("Updating LAC {} , {}", lacToReturn, lac);
+
+
+ notifyObservers(new LastAddConfirmedUpdateNotification(lacToReturn));
+ return lacToReturn;
+ }
+
+ synchronized Observable waitForLastAddConfirmedUpdate(long previousLAC,
Observer observe) {
+ if ((null != lac && lac > previousLAC)
+ || isClosed || ((stateBits & STATE_FENCED_BIT) ==
STATE_FENCED_BIT)) {
+ LOG.trace("Wait For LAC {} , {}", this.lac, previousLAC);
+ return null;
}
- return this.lac;
+
+ addObserver(observe);
+ return this;
}
- public File getLf() {
+ public synchronized File getLf() {
return lf;
}
@@ -170,15 +189,15 @@ class FileInfo {
}
bb.flip();
if (bb.getInt() != signature) {
- throw new IOException("Missing ledger signature");
+ throw new IOException("Missing ledger signature while reading
header for " + lf);
}
int version = bb.getInt();
if (version != headerVersion) {
- throw new IOException("Incompatible ledger version " +
version);
+ throw new IOException("Incompatible ledger version " + version
+ " while reading header for " + lf);
}
int length = bb.getInt();
if (length < 0) {
- throw new IOException("Length " + length + " is invalid");
+ throw new IOException("Length " + length + " is invalid while
reading header for " + lf);
} else if (length > bb.remaining()) {
throw new BufferUnderflowException();
}
@@ -187,11 +206,17 @@ class FileInfo {
stateBits = bb.getInt();
needFlushHeader = false;
} else {
- throw new IOException("Ledger index file does not exist");
+ throw new IOException("Ledger index file " + lf +" does not
exist");
}
}
- synchronized void checkOpen(boolean create) throws IOException {
+ @VisibleForTesting
+ void checkOpen(boolean create) throws IOException {
+ checkOpen(create, false);
+ }
+
+ private synchronized void checkOpen(boolean create, boolean
openBeforeClose)
+ throws IOException {
if (fc != null) {
return;
}
@@ -211,6 +236,10 @@ class FileInfo {
}
}
} else {
+ if (openBeforeClose) {
+ // if it is checking for close, skip reading header
+ return;
+ }
try {
readHeader();
} catch (BufferUnderflowException buf) {
@@ -246,19 +275,25 @@ class FileInfo {
* @return true if set fence succeed, otherwise false when
* it already fenced or failed to set fenced.
*/
- synchronized public boolean setFenced() throws IOException {
- checkOpen(false);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Try to set fenced state in file info {} : state bits
{}.", lf, stateBits);
- }
- if ((stateBits & STATE_FENCED_BIT) != STATE_FENCED_BIT) {
- // not fenced yet
- stateBits |= STATE_FENCED_BIT;
- needFlushHeader = true;
- return true;
- } else {
- return false;
+ public boolean setFenced() throws IOException {
+ boolean returnVal = false;
+ synchronized (this) {
+ checkOpen(false);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Try to set fenced state in file info {} : state
bits {}.", lf, stateBits);
+ }
+ if ((stateBits & STATE_FENCED_BIT) != STATE_FENCED_BIT) {
+ // not fenced yet
+ stateBits |= STATE_FENCED_BIT;
+ needFlushHeader = true;
+ synchronized (this) {
+ setChanged();
+ }
+ returnVal = true;
+ }
}
+ notifyObservers(new
LastAddConfirmedUpdateNotification(Long.MAX_VALUE));
+ return returnVal;
}
// flush the header when header is changed
@@ -279,11 +314,28 @@ class FileInfo {
return rc;
}
- public int read(ByteBuffer bb, long position) throws IOException {
- return readAbsolute(bb, position + START_OF_DATA);
+ public int read(ByteBuffer bb, long position, boolean bestEffort)
+ throws IOException {
+ return readAbsolute(bb, position + START_OF_DATA, bestEffort);
}
- private int readAbsolute(ByteBuffer bb, long start) throws IOException {
+ /**
+ * Read data from position <i>start</i> to fill the byte buffer <i>bb</i>.
+ * If <i>bestEffort </i> is provided, it would return when it reaches EOF.
+ * Otherwise, it would throw {@link
org.apache.bookkeeper.bookie.ShortReadException}
+ * if it reaches EOF.
+ *
+ * @param bb
+ * byte buffer of data
+ * @param start
+ * start position to read data
+ * @param bestEffort
+ * flag indicates if it is a best-effort read
+ * @return number of bytes read
+ * @throws IOException
+ */
+ private int readAbsolute(ByteBuffer bb, long start, boolean bestEffort)
+ throws IOException {
checkOpen(false);
synchronized (this) {
if (fc == null) {
@@ -297,7 +349,11 @@ class FileInfo {
rc = fc.read(bb, start);
}
if (rc <= 0) {
- throw new IOException("Short read");
+ if (bestEffort) {
+ return total;
+ } else {
+ throw new ShortReadException("Short read at " +
getLf().getPath() + "@" + start);
+ }
}
total += rc;
// should move read position
@@ -307,23 +363,30 @@ class FileInfo {
}
/**
- * Close a file info
+ * Close a file info. Generally, force should be set to true. If set to
false metadata will not be flushed and
+ * accessing metadata before restart and recovery will be unsafe (since
reloading from the index file will
+ * cause metadata to be lost). Setting force=false helps avoid expensive
file create during shutdown with many
+ * dirty ledgers, and is safe because ledger metadata will be recovered
before being accessed again.
*
* @param force
* if set to true, the index is forced to create before closed,
* if set to false, the index is not forced to create.
*/
- synchronized public void close(boolean force) throws IOException {
- isClosed = true;
- checkOpen(force);
- // Any time when we force close a file, we should try to flush header.
otherwise, we might lose fence bit.
- if (force) {
- flushHeader();
- }
- if (useCount.get() == 0 && fc != null) {
- fc.close();
- fc = null;
+ public void close(boolean force) throws IOException {
+ synchronized (this) {
+ isClosed = true;
+ checkOpen(force, true);
+ // Any time when we force close a file, we should try to flush
header. otherwise, we might lose fence bit.
+ if (force) {
+ flushHeader();
+ }
+ setChanged();
+ if (useCount.get() == 0 && fc != null) {
+ fc.close();
+ fc = null;
+ }
}
+ notifyObservers(new
LastAddConfirmedUpdateNotification(Long.MAX_VALUE));
}
synchronized public long write(ByteBuffer[] buffs, long position) throws
IOException {
@@ -429,7 +492,7 @@ class FileInfo {
}
}
- public boolean delete() {
+ public synchronized boolean delete() {
return lf.delete();
}
@@ -443,7 +506,7 @@ class FileInfo {
}
}
- public boolean isSameFile(File f) {
+ public synchronized boolean isSameFile(File f) {
return this.lf.equals(f);
}
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
index 81c37fd..323ad62 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
@@ -20,6 +20,8 @@
*/
package org.apache.bookkeeper.bookie;
+import com.google.common.annotations.VisibleForTesting;
+import io.netty.buffer.ByteBuf;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -28,9 +30,10 @@ import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map.Entry;
+import java.util.Observable;
+import java.util.Observer;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-
import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
import
org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
import org.apache.bookkeeper.conf.ServerConfiguration;
@@ -41,10 +44,6 @@ import org.apache.bookkeeper.util.SnapshotMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.annotations.VisibleForTesting;
-
-import io.netty.buffer.ByteBuf;
-
import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.LEDGER_CACHE_NUM_EVICTED_LEDGERS;
import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.NUM_OPEN_LEDGERS;
@@ -333,6 +332,18 @@ public class IndexPersistenceMgr {
}
}
}
+
+ Observable waitForLastAddConfirmedUpdate(long ledgerId, long previoisLAC,
Observer observer) throws IOException {
+ FileInfo fi = null;
+ try {
+ fi = getFileInfo(ledgerId, null);
+ return fi.waitForLastAddConfirmedUpdate(previoisLAC, observer);
+ } finally {
+ if (null != fi) {
+ fi.release();
+ }
+ }
+ }
long updateLastAddConfirmed(long ledgerId, long lac) throws IOException {
FileInfo fi = null;
@@ -626,7 +637,14 @@ public class IndexPersistenceMgr {
if (position < 0) {
position = 0;
}
- fi.read(bb, position);
+ // we read the last page from file size minus page size, so it
should not encounter short read
+ // exception. if it does, it is an unexpected situation, then
throw the exception and fail it immediately.
+ try {
+ fi.read(bb, position, false);
+ } catch (ShortReadException sre) {
+ // throw a more meaningful exception with ledger id
+ throw new ShortReadException("Short read on ledger " +
ledgerId + " : ", sre);
+ }
bb.flip();
long startingEntryId = position /
LedgerEntryPage.getIndexEntrySize();
for (int i = entriesPerPage - 1; i >= 0; i--) {
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
index b8a6e53..3d84877 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
@@ -21,24 +21,23 @@
package org.apache.bookkeeper.bookie;
+import com.google.common.collect.Lists;
+
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
-
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
-
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Observable;
+import java.util.Observer;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.bookie.Bookie.NoLedgerException;
import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
import org.apache.bookkeeper.bookie.EntryLogger.EntryLogListener;
import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
-
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.TimeUnit;
-
-import java.util.Map;
-import java.util.NavigableMap;
-
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.proto.BookieProtocol;
@@ -49,8 +48,6 @@ import org.apache.bookkeeper.util.SnapshotMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Lists;
-
import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.STORAGE_GET_ENTRY;
import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.STORAGE_GET_OFFSET;
@@ -264,6 +261,12 @@ public class InterleavedLedgerStorage implements
CompactableLedgerStorage, Entry
}
@Override
+ public Observable waitForLastAddConfirmedUpdate(long ledgerId, long
previoisLAC, Observer observer) throws IOException {
+ return ledgerCache.waitForLastAddConfirmedUpdate(ledgerId,
previoisLAC, observer);
+ }
+
+
+ @Override
synchronized public long addEntry(ByteBuf entry) throws IOException {
long ledgerId = entry.getLong(entry.readerIndex() + 0);
long entryId = entry.getLong(entry.readerIndex() + 8);
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotification.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotification.java
new file mode 100644
index 0000000..81cd842
--- /dev/null
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotification.java
@@ -0,0 +1,31 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie;
+
+public class LastAddConfirmedUpdateNotification {
+ public long lastAddConfirmed;
+ public long timestamp;
+
+ public LastAddConfirmedUpdateNotification(long lastAddConfirmed) {
+ this.lastAddConfirmed = lastAddConfirmed;
+ this.timestamp = System.currentTimeMillis();
+ }
+}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java
index c55592f..26d5245 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java
@@ -21,10 +21,11 @@
package org.apache.bookkeeper.bookie;
+import io.netty.buffer.ByteBuf;
import java.io.Closeable;
import java.io.IOException;
-
-import io.netty.buffer.ByteBuf;
+import java.util.Observable;
+import java.util.Observer;
/**
* This class maps a ledger entry number into a location (entrylogid, offset)
in
@@ -48,6 +49,7 @@ interface LedgerCache extends Closeable {
Long getLastAddConfirmed(long ledgerId) throws IOException;
long updateLastAddConfirmed(long ledgerId, long lac) throws IOException;
+ Observable waitForLastAddConfirmedUpdate(long ledgerId, long previoisLAC,
Observer observer) throws IOException;
void deleteLedger(long ledgerId) throws IOException;
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
index 8f1c56f..5709ce6 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
@@ -23,6 +23,8 @@ package org.apache.bookkeeper.bookie;
import java.io.IOException;
+import java.util.Observable;
+import java.util.Observer;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
@@ -85,6 +87,12 @@ public class LedgerCacheImpl implements LedgerCache {
}
@Override
+ public Observable waitForLastAddConfirmedUpdate(long ledgerId, long
previoisLAC, Observer observer) throws IOException {
+ return indexPersistenceManager.waitForLastAddConfirmedUpdate(ledgerId,
previoisLAC, observer);
+ }
+
+
+ @Override
public void putEntryOffset(long ledger, long entry, long offset) throws
IOException {
indexPageManager.putEntryOffset(ledger, entry, offset);
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
index 9fe1629..032dfe2 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
@@ -24,6 +24,8 @@ package org.apache.bookkeeper.bookie;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
+import java.util.Observable;
+import java.util.Observer;
/**
* Implements a ledger inside a bookie. In particular, it implements operations
@@ -58,6 +60,7 @@ public abstract class LedgerDescriptor {
abstract ByteBuf readEntry(long entryId) throws IOException;
abstract long getLastAddConfirmed() throws IOException;
+ abstract Observable waitForLastAddConfirmedUpdate(long previoisLAC,
Observer observer) throws IOException;
abstract void setExplicitLac(ByteBuf entry) throws IOException;
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
index a1e0fc0..c2246bf 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
@@ -22,10 +22,10 @@
package org.apache.bookkeeper.bookie;
import io.netty.buffer.ByteBuf;
-
import java.io.IOException;
import java.util.Arrays;
-
+import java.util.Observable;
+import java.util.Observer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -101,4 +101,9 @@ public class LedgerDescriptorImpl extends LedgerDescriptor {
long getLastAddConfirmed() throws IOException {
return ledgerStorage.getLastAddConfirmed(ledgerId);
}
+
+ @Override
+ Observable waitForLastAddConfirmedUpdate(long previoisLAC, Observer
observer) throws IOException {
+ return ledgerStorage.waitForLastAddConfirmedUpdate(ledgerId,
previoisLAC, observer);
+ }
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerEntryPage.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerEntryPage.java
index 2d6f80d..5aee2fe 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerEntryPage.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerEntryPage.java
@@ -21,18 +21,22 @@
package org.apache.bookkeeper.bookie;
-import org.apache.bookkeeper.proto.BookieProtocol;
-import org.apache.bookkeeper.util.ZeroBuffer;
-
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.bookkeeper.proto.BookieProtocol;
+import org.apache.bookkeeper.util.ZeroBuffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* This is a page in the LedgerCache. It holds the locations
* (entrylogfile, offset) for entry ids.
*/
public class LedgerEntryPage {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(LedgerEntryPage.class);
+
private final static int indexEntrySize = 8;
private final int pageSize;
private final int entriesPerPage;
@@ -153,11 +157,24 @@ public class LedgerEntryPage {
public void readPage(FileInfo fi) throws IOException {
checkPage();
page.clear();
- while(page.remaining() != 0) {
- if (fi.read(page, getFirstEntryPosition()) <= 0) {
- throw new IOException("Short page read of ledger " +
getLedger()
- + " tried to get " + page.capacity() + " from
position " + getFirstEntryPosition()
- + " still need " + page.remaining());
+ try {
+ fi.read(page, getFirstEntryPosition(), true);
+ } catch (ShortReadException sre) {
+ throw new ShortReadException("Short page read of ledger " +
getLedger()
+ + " tried to get " + page.capacity() + " from position "
+ + getFirstEntryPosition() + " still need " +
page.remaining(), sre);
+ } catch (IllegalArgumentException iae) {
+ LOG.error("IllegalArgumentException when trying to read ledger {}
from position {}"
+ , new Object[]{getLedger(), getFirstEntryPosition(), iae});
+ throw iae;
+ }
+ // make sure we don't include partial index entry
+ if (page.remaining() != 0) {
+ LOG.info("Short page read of ledger {} : tried to read {} bytes
from position {}, but only {} bytes read.",
+ new Object[] { getLedger(), page.capacity(),
getFirstEntryPosition(), page.position() });
+ if (page.position() % indexEntrySize != 0) {
+ int partialIndexEntryStart = page.position() - page.position()
% indexEntrySize;
+ page.putLong(partialIndexEntryStart, 0L);
}
}
last = getLastEntryIndex();
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
index 9d2161e..9fc9390 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
@@ -22,9 +22,9 @@
package org.apache.bookkeeper.bookie;
import io.netty.buffer.ByteBuf;
-
import java.io.IOException;
-
+import java.util.Observable;
+import java.util.Observer;
import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.stats.StatsLogger;
@@ -118,6 +118,16 @@ public interface LedgerStorage {
long getLastAddConfirmed(long ledgerId) throws IOException;
/**
+ * Wait for last add confirmed update.
+ *
+ * @param previoisLAC - The threshold beyond which we would wait for the
update
+ * @param observer - Observer to notify on update
+ * @return
+ * @throws IOException
+ */
+ Observable waitForLastAddConfirmedUpdate(long ledgerId, long previoisLAC,
Observer observer) throws IOException;
+
+ /**
* Flushes all data in the storage. Once this is called,
* add data written to the LedgerStorage up until this point
* has been persisted to perminant storage
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ShortReadException.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ShortReadException.java
new file mode 100644
index 0000000..302cc46
--- /dev/null
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ShortReadException.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.bookie;
+
+import java.io.IOException;
+
+/**
+ * Short Read Exception. Used to distinguish short read exception with other
{@link java.io.IOException}s.
+ */
+public class ShortReadException extends IOException {
+
+ private static final long serialVersionUID = -4201771547564923223L;
+
+ public ShortReadException(String msg) {
+ super(msg);
+ }
+
+ public ShortReadException(String msg, Throwable t) {
+ super(msg, t);
+ }
+
+}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/AsyncCallback.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/AsyncCallback.java
index 05067d0..8f5bdea 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/AsyncCallback.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/AsyncCallback.java
@@ -139,6 +139,22 @@ public interface AsyncCallback {
void readLastConfirmedComplete(int rc, long lastConfirmed, Object ctx);
}
+ public interface ReadLastConfirmedAndEntryCallback {
+ /**
+ * Callback definition for bookie operation that allows reading the
last add confirmed
+ * along with an entry within the last add confirmed range
+ *
+ * @param rc Return code
+ * @param lastConfirmed The entry id of the last confirmed write or
+ * {@link LedgerHandle#INVALID_ENTRY_ID
INVALID_ENTRY_ID}
+ * if no entry has been confirmed
+ * @param entry The entry since the lastAddConfirmed entry that was
specified when the request
+ * was initiated
+ * @param ctx context object
+ */
+ void readLastConfirmedAndEntryComplete(int rc, long lastConfirmed,
LedgerEntry entry, Object ctx);
+ }
+
public interface RecoverCallback {
/**
* Callback definition for bookie recover operations
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
index 383fe3f..7895af2 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
@@ -99,6 +99,8 @@ public class BookKeeper implements AutoCloseable {
private OpStatsLogger deleteOpLogger;
private OpStatsLogger recoverOpLogger;
private OpStatsLogger readOpLogger;
+ private OpStatsLogger readLacAndEntryOpLogger;
+ private OpStatsLogger readLacAndEntryRespLogger;
private OpStatsLogger addOpLogger;
private OpStatsLogger writeLacOpLogger;
private OpStatsLogger readLacOpLogger;
@@ -137,8 +139,10 @@ public class BookKeeper implements AutoCloseable {
final ClientConfiguration conf;
final int explicitLacInterval;
final boolean delayEnsembleChange;
+ final boolean reorderReadSequence;
final Optional<SpeculativeRequestExecutionPolicy>
readSpeculativeRequestPolicy;
+ final Optional<SpeculativeRequestExecutionPolicy>
readLACSpeculativeRequestPolicy;
// Close State
boolean closed = false;
@@ -301,6 +305,7 @@ public class BookKeeper implements AutoCloseable {
throws IOException, InterruptedException, KeeperException {
this.conf = conf;
this.delayEnsembleChange = conf.getDelayEnsembleChange();
+ this.reorderReadSequence = conf.isReorderReadSequenceEnabled();
// initialize zookeeper client
if (zkc == null) {
@@ -374,6 +379,15 @@ public class BookKeeper implements AutoCloseable {
this.readSpeculativeRequestPolicy =
Optional.<SpeculativeRequestExecutionPolicy>absent();
}
+ if (conf.getFirstSpeculativeReadLACTimeout() > 0) {
+ this.readLACSpeculativeRequestPolicy =
+ Optional.of((SpeculativeRequestExecutionPolicy)(new
DefaultSpeculativeRequestExecutionPolicy(
+ conf.getFirstSpeculativeReadLACTimeout(),
+ conf.getMaxSpeculativeReadLACTimeout(),
+
conf.getSpeculativeReadLACTimeoutBackoffMultiplier())));
+ } else {
+ this.readLACSpeculativeRequestPolicy =
Optional.<SpeculativeRequestExecutionPolicy>absent();
+ }
// initialize main worker pool
this.mainWorkerPool = OrderedSafeExecutor.newBuilder()
.name("BookKeeperClientWorker")
@@ -504,6 +518,10 @@ public class BookKeeper implements AutoCloseable {
return readSpeculativeRequestPolicy;
}
+ public Optional<SpeculativeRequestExecutionPolicy>
getReadLACSpeculativeRequestPolicy() {
+ return readLACSpeculativeRequestPolicy;
+ }
+
/**
* Get the BookieClient, currently used for doing bookie recovery.
*
@@ -1275,6 +1293,8 @@ public class BookKeeper implements AutoCloseable {
openOpLogger = stats.getOpStatsLogger(BookKeeperClientStats.OPEN_OP);
recoverOpLogger =
stats.getOpStatsLogger(BookKeeperClientStats.RECOVER_OP);
readOpLogger = stats.getOpStatsLogger(BookKeeperClientStats.READ_OP);
+ readLacAndEntryOpLogger =
stats.getOpStatsLogger(BookKeeperClientStats.READ_LAST_CONFIRMED_AND_ENTRY);
+ readLacAndEntryRespLogger =
stats.getOpStatsLogger(BookKeeperClientStats.READ_LAST_CONFIRMED_AND_ENTRY_RESPONSE);
addOpLogger = stats.getOpStatsLogger(BookKeeperClientStats.ADD_OP);
writeLacOpLogger =
stats.getOpStatsLogger(BookKeeperClientStats.WRITE_LAC_OP);
readLacOpLogger =
stats.getOpStatsLogger(BookKeeperClientStats.READ_LAC_OP);
@@ -1287,6 +1307,8 @@ public class BookKeeper implements AutoCloseable {
OpStatsLogger getDeleteOpLogger() { return deleteOpLogger; }
OpStatsLogger getRecoverOpLogger() { return recoverOpLogger; }
OpStatsLogger getReadOpLogger() { return readOpLogger; }
+ OpStatsLogger getReadLacAndEntryOpLogger() { return
readLacAndEntryOpLogger; }
+ OpStatsLogger getReadLacAndEntryRespLogger() { return
readLacAndEntryRespLogger; }
OpStatsLogger getAddOpLogger() { return addOpLogger; }
OpStatsLogger getWriteLacOpLogger() { return writeLacOpLogger; }
OpStatsLogger getReadLacOpLogger() { return readLacOpLogger; }
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
index e98b2d6..15c6248 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
@@ -39,6 +39,8 @@ public interface BookKeeperClientStats {
public final static String READ_OP = "READ_ENTRY";
public final static String WRITE_LAC_OP = "WRITE_LAC";
public final static String READ_LAC_OP = "READ_LAC";
+ public final static String READ_LAST_CONFIRMED_AND_ENTRY =
"READ_LAST_CONFIRMED_AND_ENTRY";
+ public final static String READ_LAST_CONFIRMED_AND_ENTRY_RESPONSE =
"READ_LAST_CONFIRMED_AND_ENTRY_RESPONSE";
public final static String PENDING_ADDS = "NUM_PENDING_ADD";
public final static String ENSEMBLE_CHANGES = "NUM_ENSEMBLE_CHANGE";
public final static String LAC_UPDATE_HITS = "LAC_UPDATE_HITS";
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index bd6c7d6..a056043 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -22,6 +22,9 @@ package org.apache.bookkeeper.client;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.RateLimiter;
import io.netty.buffer.ByteBuf;
@@ -39,6 +42,7 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
@@ -78,6 +82,7 @@ public class LedgerHandle implements AutoCloseable {
final DigestManager macManager;
final DistributionSchedule distributionSchedule;
final RateLimiter throttler;
+ final LoadingCache<BookieSocketAddress, Long> bookieFailureHistory;
final boolean enableParallelRecoveryRead;
final int recoveryReadBatchSize;
@@ -138,6 +143,13 @@ public class LedgerHandle implements AutoCloseable {
this.ledgerKey = password.length > 0 ?
MacDigestManager.genDigest("ledger", password) : emptyLedgerKey;
distributionSchedule = new RoundRobinDistributionSchedule(
metadata.getWriteQuorumSize(), metadata.getAckQuorumSize(),
metadata.getEnsembleSize());
+ this.bookieFailureHistory = CacheBuilder.newBuilder()
+
.expireAfterWrite(bk.getConf().getBookieFailureHistoryExpirationMSec(),
TimeUnit.MILLISECONDS)
+ .build(new CacheLoader<BookieSocketAddress, Long>() {
+ public Long load(BookieSocketAddress key) {
+ return -1L;
+ }
+ });
ensembleChangeCounter =
bk.getStatsLogger().getCounter(BookKeeperClientStats.ENSEMBLE_CHANGES);
lacUpdateHitsCounter =
bk.getStatsLogger().getCounter(BookKeeperClientStats.LAC_UPDATE_HITS);
@@ -957,6 +969,81 @@ public class LedgerHandle implements AutoCloseable {
new TryReadLastConfirmedOp(this, innercb,
getLastAddConfirmed()).initiate();
}
+
+ /**
+ * Asynchronous read next entry and the latest last add confirmed.
+ * If the next entryId is less than known last add confirmed, the call
will read next entry directly.
+ * If the next entryId is ahead of known last add confirmed, the call will
issue a long poll read
+ * to wait for the next entry <i>entryId</i>.
+ *
+ * The callback will return the latest last add confirmed and next entry
if it is available within timeout period <i>timeOutInMillis</i>.
+ *
+ * @param entryId
+ * next entry id to read
+ * @param timeOutInMillis
+ * timeout period to wait for the entry id to be available (for
long poll only)
+ * @param parallel
+ * whether to issue the long poll reads in parallel
+ * @param cb
+ * callback to return the result
+ * @param ctx
+ * callback context
+ */
+ public void asyncReadLastConfirmedAndEntry(final long entryId,
+ final long timeOutInMillis,
+ final boolean parallel,
+ final
AsyncCallback.ReadLastConfirmedAndEntryCallback cb,
+ final Object ctx) {
+ boolean isClosed;
+ long lac;
+ synchronized (this) {
+ isClosed = metadata.isClosed();
+ lac = metadata.getLastEntryId();
+ }
+ if (isClosed) {
+ if (entryId > lac) {
+ cb.readLastConfirmedAndEntryComplete(BKException.Code.OK, lac,
null, ctx);
+ return;
+ }
+ } else {
+ lac = getLastAddConfirmed();
+ }
+ if (entryId <= lac) {
+ asyncReadEntries(entryId, entryId, new ReadCallback() {
+ @Override
+ public void readComplete(int rc, LedgerHandle lh,
Enumeration<LedgerEntry> seq, Object ctx) {
+ if (BKException.Code.OK == rc) {
+ if (seq.hasMoreElements()) {
+ cb.readLastConfirmedAndEntryComplete(rc,
getLastAddConfirmed(), seq.nextElement(), ctx);
+ } else {
+ cb.readLastConfirmedAndEntryComplete(rc,
getLastAddConfirmed(), null, ctx);
+ }
+ } else {
+ cb.readLastConfirmedAndEntryComplete(rc,
INVALID_ENTRY_ID, null, ctx);
+ }
+ }
+ }, ctx);
+ return;
+ }
+ // wait for entry <i>entryId</i>
+ ReadLastConfirmedAndEntryOp.LastConfirmedAndEntryCallback innercb =
new ReadLastConfirmedAndEntryOp.LastConfirmedAndEntryCallback() {
+ AtomicBoolean completed = new AtomicBoolean(false);
+ @Override
+ public void readLastConfirmedAndEntryComplete(int rc, long
lastAddConfirmed, LedgerEntry entry) {
+ if (rc == BKException.Code.OK) {
+ if (completed.compareAndSet(false, true)) {
+ cb.readLastConfirmedAndEntryComplete(rc,
lastAddConfirmed, entry, ctx);
+ }
+ } else {
+ if (completed.compareAndSet(false, true)) {
+ cb.readLastConfirmedAndEntryComplete(rc,
INVALID_ENTRY_ID, null, ctx);
+ }
+ }
+ }
+ };
+ new ReadLastConfirmedAndEntryOp(this, innercb, entryId - 1,
timeOutInMillis, bk.scheduler).parallelRead(parallel).initiate();
+ }
+
/**
* Context objects for synchronous call to read last confirmed.
*/
@@ -1545,6 +1632,13 @@ public class LedgerHandle implements AutoCloseable {
bk.getLedgerManager().readLedgerMetadata(ledgerId, cb);
}
+ void registerOperationFailureOnBookie(BookieSocketAddress bookie, long
entryId) {
+ if (bk.getConf().getEnableBookieFailureTracking()) {
+ bookieFailureHistory.put(bookie, entryId);
+ }
+ }
+
+
void recover(GenericCallback<Void> finalCb) {
recover(finalCb, null, false);
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
index f2477c1..c820122 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
@@ -92,7 +92,13 @@ class PendingReadOp implements Enumeration<LedgerEntry>,
ReadEntryCallback {
super(lId, eId);
this.ensemble = ensemble;
- this.writeSet = lh.distributionSchedule.getWriteSet(entryId);
+
+ if (lh.bk.reorderReadSequence) {
+ this.writeSet =
lh.bk.placementPolicy.reorderReadSequence(ensemble,
+ lh.distributionSchedule.getWriteSet(entryId),
lh.bookieFailureHistory.asMap());
+ } else {
+ this.writeSet = lh.distributionSchedule.getWriteSet(entryId);
+ }
}
/**
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
similarity index 51%
copy from
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
copy to
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
index f2477c1..2fab694 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java
@@ -1,4 +1,4 @@
-/*
+/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -21,63 +21,47 @@
package org.apache.bookkeeper.client;
import io.netty.buffer.ByteBuf;
-
import java.util.ArrayList;
import java.util.BitSet;
-import java.util.Enumeration;
-import java.util.HashSet;
import java.util.List;
-import java.util.NoSuchElementException;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import com.google.common.base.Optional;
import com.google.common.util.concurrent.ListenableFuture;
-import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
-import org.apache.bookkeeper.client.BKException.BKDigestMatchException;
import org.apache.bookkeeper.net.BookieSocketAddress;
-import
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
-import
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallbackCtx;
-import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.proto.BookieProtocol;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
import org.apache.bookkeeper.util.MathUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-/**
- * Sequence of entries of a ledger that represents a pending read operation.
- * When all the data read has come back, the application callback is called.
- * This class could be improved because we could start pushing data to the
- * application as soon as it arrives rather than waiting for the whole thing.
- *
- */
-class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
- private static final Logger LOG =
LoggerFactory.getLogger(PendingReadOp.class);
+public class ReadLastConfirmedAndEntryOp implements
BookkeeperInternalCallbacks.ReadEntryCallback, SpeculativeRequestExectuor {
+ static final Logger LOG =
LoggerFactory.getLogger(ReadLastConfirmedAndEntryOp.class);
final private ScheduledExecutorService scheduler;
- private ScheduledFuture<?> speculativeTask = null;
- Queue<LedgerEntryRequest> seq;
- Set<BookieSocketAddress> heardFromHosts;
- BitSet heardFromHostsBitSet;
- ReadCallback cb;
- Object ctx;
- LedgerHandle lh;
- long numPendingEntries;
- long startEntryId;
- long endEntryId;
- long requestTimeNanos;
- OpStatsLogger readOpLogger;
-
+ ReadLACAndEntryRequest request;
+ final BitSet heardFromHostsBitSet;
+ final BitSet emptyResponsesFromHostsBitSet;
final int maxMissedReadsAllowed;
boolean parallelRead = false;
- final AtomicBoolean complete = new AtomicBoolean(false);
+ final AtomicBoolean requestComplete = new AtomicBoolean(false);
+
+ final long requestTimeNano;
+ private final LedgerHandle lh;
+ private final LastConfirmedAndEntryCallback cb;
- abstract class LedgerEntryRequest extends LedgerEntry implements
SpeculativeRequestExectuor {
+ private int numResponsesPending;
+ private final int numEmptyResponsesAllowed;
+ private volatile boolean hasValidResponse = false;
+ private final long prevEntryId;
+ private long lastAddConfirmed;
+ private long timeOutInMillis;
+
+ abstract class ReadLACAndEntryRequest extends LedgerEntry {
final AtomicBoolean complete = new AtomicBoolean(false);
@@ -87,12 +71,23 @@ class PendingReadOp implements Enumeration<LedgerEntry>,
ReadEntryCallback {
final ArrayList<BookieSocketAddress> ensemble;
final List<Integer> writeSet;
+ final List<Integer> orderedEnsemble;
- LedgerEntryRequest(ArrayList<BookieSocketAddress> ensemble, long lId,
long eId) {
+ ReadLACAndEntryRequest(ArrayList<BookieSocketAddress> ensemble, long
lId, long eId) {
super(lId, eId);
this.ensemble = ensemble;
this.writeSet = lh.distributionSchedule.getWriteSet(entryId);
+ if (lh.bk.reorderReadSequence) {
+ this.orderedEnsemble =
lh.bk.placementPolicy.reorderReadLACSequence(ensemble,
+ writeSet, lh.bookieFailureHistory.asMap());
+ } else {
+ this.orderedEnsemble = writeSet;
+ }
+ }
+
+ synchronized int getFirstError() {
+ return firstError;
}
/**
@@ -112,11 +107,11 @@ class PendingReadOp implements Enumeration<LedgerEntry>,
ReadEntryCallback {
* @return return true if we managed to complete the entry;
* otherwise return false if the read entry is not complete or
it is already completed before
*/
- boolean complete(int bookieIndex, BookieSocketAddress host, final
ByteBuf buffer) {
+ boolean complete(int bookieIndex, BookieSocketAddress host, final
ByteBuf buffer, long entryId) {
ByteBuf content;
try {
content = lh.macManager.verifyDigestAndReturnData(entryId,
buffer);
- } catch (BKDigestMatchException e) {
+ } catch (BKException.BKDigestMatchException e) {
logErrorAndReattemptRead(bookieIndex, host, "Mac mismatch",
BKException.Code.DigestMatchException);
buffer.release();
return false;
@@ -124,6 +119,7 @@ class PendingReadOp implements Enumeration<LedgerEntry>,
ReadEntryCallback {
if (!complete.getAndSet(true)) {
rc = BKException.Code.OK;
+ this.entryId = entryId;
/*
* The length is a long and it is the last field of the
metadata of an entry.
* Consequently, we have to subtract 8 from METADATA_LENGTH to
get the length.
@@ -132,7 +128,6 @@ class PendingReadOp implements Enumeration<LedgerEntry>,
ReadEntryCallback {
data = content;
return true;
} else {
- buffer.release();
return false;
}
}
@@ -147,13 +142,28 @@ class PendingReadOp implements Enumeration<LedgerEntry>,
ReadEntryCallback {
boolean fail(int rc) {
if (complete.compareAndSet(false, true)) {
this.rc = rc;
- submitCallback(rc);
+ translateAndSetFirstError(rc);
+ completeRequest();
return true;
} else {
return false;
}
}
+ synchronized private void translateAndSetFirstError(int rc) {
+ if (BKException.Code.OK == firstError ||
+ BKException.Code.NoSuchEntryException == firstError ||
+ BKException.Code.NoSuchLedgerExistsException == firstError) {
+ firstError = rc;
+ } else if (BKException.Code.BookieHandleNotAvailableException ==
firstError &&
+ BKException.Code.NoSuchEntryException != rc &&
+ BKException.Code.NoSuchLedgerExistsException != rc) {
+ // if other exception rather than NoSuchEntryException is
returned
+ // we need to update firstError to indicate that it might be a
valid read but just failed.
+ firstError = rc;
+ }
+ }
+
/**
* Log error <i>errMsg</i> and reattempt read from <i>host</i>.
*
@@ -167,30 +177,22 @@ class PendingReadOp implements Enumeration<LedgerEntry>,
ReadEntryCallback {
* read result code
*/
synchronized void logErrorAndReattemptRead(int bookieIndex,
BookieSocketAddress host, String errMsg, int rc) {
- if (BKException.Code.OK == firstError ||
- BKException.Code.NoSuchEntryException == firstError ||
- BKException.Code.NoSuchLedgerExistsException == firstError) {
- firstError = rc;
- } else if (BKException.Code.BookieHandleNotAvailableException ==
firstError &&
- BKException.Code.NoSuchEntryException != rc &&
- BKException.Code.NoSuchLedgerExistsException != rc) {
- // if other exception rather than NoSuchEntryException or
NoSuchLedgerExistsException is
- // returned we need to update firstError to indicate that it
might be a valid read but just
- // failed.
- firstError = rc;
- }
+ translateAndSetFirstError(rc);
+
if (BKException.Code.NoSuchEntryException == rc ||
BKException.Code.NoSuchLedgerExistsException == rc) {
- ++numMissedEntryReads;
- if (LOG.isDebugEnabled()) {
- LOG.debug("No such entry found on bookie. L{} E{} bookie:
{}",
- new Object[] { lh.ledgerId, entryId, host });
- }
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug(errMsg + " while reading L{} E{} from bookie:
{}",
- new Object[]{lh.ledgerId, entryId, host});
+ // Since we send all long poll requests to every available
node, we should only
+ // treat these errors as failures if the node from which we
received this is part of
+ // the writeSet
+ if (this.writeSet.contains(bookieIndex)) {
+ lh.registerOperationFailureOnBookie(host, entryId);
}
+ ++numMissedEntryReads;
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(errMsg + " while reading entry: " + entryId + "
ledgerId: " + lh.ledgerId + " from bookie: "
+ + host);
}
}
@@ -226,43 +228,20 @@ class PendingReadOp implements Enumeration<LedgerEntry>,
ReadEntryCallback {
public String toString() {
return String.format("L%d-E%d", ledgerId, entryId);
}
-
- /**
- * Issues a speculative request and indicates if more speculative
- * requests should be issued
- *
- * @return whether more speculative requests should be issued
- */
- @Override
- public ListenableFuture<Boolean> issueSpeculativeRequest() {
- return lh.bk.mainWorkerPool.submitOrdered(lh.getId(), new
Callable<Boolean>() {
- @Override
- public Boolean call() throws Exception {
- if (!isComplete() && null !=
maybeSendSpeculativeRead(heardFromHostsBitSet)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Send speculative read for {}. Hosts
heard are {}, ensemble is {}.",
- new Object[] { this, heardFromHostsBitSet,
ensemble });
- }
- return true;
- }
- return false;
- }
- });
- }
}
- class ParallelReadRequest extends LedgerEntryRequest {
+ class ParallelReadRequest extends ReadLACAndEntryRequest {
int numPendings;
ParallelReadRequest(ArrayList<BookieSocketAddress> ensemble, long lId,
long eId) {
super(ensemble, lId, eId);
- numPendings = writeSet.size();
+ numPendings = orderedEnsemble.size();
}
@Override
void read() {
- for (int bookieIndex : writeSet) {
+ for (int bookieIndex : orderedEnsemble) {
BookieSocketAddress to = ensemble.get(bookieIndex);
try {
sendReadTo(bookieIndex, to, this);
@@ -297,18 +276,20 @@ class PendingReadOp implements Enumeration<LedgerEntry>,
ReadEntryCallback {
}
}
- class SequenceReadRequest extends LedgerEntryRequest {
+ class SequenceReadRequest extends ReadLACAndEntryRequest {
final static int NOT_FOUND = -1;
int nextReplicaIndexToReadFrom = 0;
final BitSet sentReplicas;
final BitSet erroredReplicas;
+ final BitSet emptyResponseReplicas;
SequenceReadRequest(ArrayList<BookieSocketAddress> ensemble, long lId,
long eId) {
super(ensemble, lId, eId);
- this.sentReplicas = new
BitSet(lh.getLedgerMetadata().getWriteQuorumSize());
- this.erroredReplicas = new
BitSet(lh.getLedgerMetadata().getWriteQuorumSize());
+ this.sentReplicas = new BitSet(orderedEnsemble.size());
+ this.erroredReplicas = new BitSet(orderedEnsemble.size());
+ this.emptyResponseReplicas = new BitSet(orderedEnsemble.size());
}
private synchronized int getNextReplicaIndexToReadFrom() {
@@ -316,7 +297,7 @@ class PendingReadOp implements Enumeration<LedgerEntry>,
ReadEntryCallback {
}
private int getReplicaIndex(int bookieIndex) {
- return writeSet.indexOf(bookieIndex);
+ return orderedEnsemble.indexOf(bookieIndex);
}
private BitSet getSentToBitSet() {
@@ -324,14 +305,14 @@ class PendingReadOp implements Enumeration<LedgerEntry>,
ReadEntryCallback {
for (int i = 0; i < sentReplicas.length(); i++) {
if (sentReplicas.get(i)) {
- b.set(writeSet.get(i));
+ b.set(orderedEnsemble.get(i));
}
}
return b;
}
private boolean readsOutstanding() {
- return (sentReplicas.cardinality() -
erroredReplicas.cardinality()) > 0;
+ return (sentReplicas.cardinality() - erroredReplicas.cardinality()
- emptyResponseReplicas.cardinality()) > 0;
}
/**
@@ -341,7 +322,7 @@ class PendingReadOp implements Enumeration<LedgerEntry>,
ReadEntryCallback {
*/
@Override
synchronized BookieSocketAddress maybeSendSpeculativeRead(BitSet
heardFrom) {
- if (nextReplicaIndexToReadFrom >=
getLedgerMetadata().getWriteQuorumSize()) {
+ if (nextReplicaIndexToReadFrom >=
getLedgerMetadata().getEnsembleSize()) {
return null;
}
@@ -363,7 +344,7 @@ class PendingReadOp implements Enumeration<LedgerEntry>,
ReadEntryCallback {
}
synchronized BookieSocketAddress sendNextRead() {
- if (nextReplicaIndexToReadFrom >=
getLedgerMetadata().getWriteQuorumSize()) {
+ if (nextReplicaIndexToReadFrom >=
getLedgerMetadata().getEnsembleSize()) {
// we are done, the read has failed from all replicas, just
fail the
// read
@@ -379,7 +360,7 @@ class PendingReadOp implements Enumeration<LedgerEntry>,
ReadEntryCallback {
}
int replica = nextReplicaIndexToReadFrom;
- int bookieIndex =
lh.distributionSchedule.getWriteSet(entryId).get(nextReplicaIndexToReadFrom);
+ int bookieIndex = orderedEnsemble.get(nextReplicaIndexToReadFrom);
nextReplicaIndexToReadFrom++;
try {
@@ -404,7 +385,12 @@ class PendingReadOp implements Enumeration<LedgerEntry>,
ReadEntryCallback {
LOG.error("Received error from a host which is not in the
ensemble {} {}.", host, ensemble);
return;
}
- erroredReplicas.set(replica);
+
+ if (BKException.Code.OK == rc) {
+ emptyResponseReplicas.set(replica);
+ } else {
+ erroredReplicas.set(replica);
+ }
if (!readsOutstanding()) {
sendNextRead();
@@ -413,78 +399,101 @@ class PendingReadOp implements Enumeration<LedgerEntry>,
ReadEntryCallback {
}
- PendingReadOp(LedgerHandle lh, ScheduledExecutorService scheduler,
- long startEntryId, long endEntryId, ReadCallback cb, Object
ctx) {
- seq = new ArrayBlockingQueue<LedgerEntryRequest>((int) ((endEntryId +
1) - startEntryId));
- this.cb = cb;
- this.ctx = ctx;
+ ReadLastConfirmedAndEntryOp(LedgerHandle lh,
+ LastConfirmedAndEntryCallback cb,
+ long prevEntryId,
+ long timeOutInMillis,
+ ScheduledExecutorService scheduler) {
this.lh = lh;
- this.startEntryId = startEntryId;
- this.endEntryId = endEntryId;
+ this.cb = cb;
+ this.prevEntryId = prevEntryId;
+ this.lastAddConfirmed = lh.getLastAddConfirmed();
+ this.timeOutInMillis = timeOutInMillis;
+ this.numResponsesPending = 0;
+ this.numEmptyResponsesAllowed =
getLedgerMetadata().getWriteQuorumSize()
+ - getLedgerMetadata().getAckQuorumSize() + 1;
+ this.requestTimeNano = MathUtils.nowInNano();
this.scheduler = scheduler;
- numPendingEntries = endEntryId - startEntryId + 1;
- maxMissedReadsAllowed = getLedgerMetadata().getWriteQuorumSize()
- - getLedgerMetadata().getAckQuorumSize();
- heardFromHosts = new HashSet<>();
+ maxMissedReadsAllowed = getLedgerMetadata().getEnsembleSize()
+ - getLedgerMetadata().getAckQuorumSize();
heardFromHostsBitSet = new
BitSet(getLedgerMetadata().getEnsembleSize());
-
- readOpLogger = lh.bk.getReadOpLogger();
+ emptyResponsesFromHostsBitSet = new
BitSet(getLedgerMetadata().getEnsembleSize());
}
protected LedgerMetadata getLedgerMetadata() {
return lh.metadata;
}
- protected void cancelSpeculativeTask(boolean mayInterruptIfRunning) {
- if (speculativeTask != null) {
- speculativeTask.cancel(mayInterruptIfRunning);
- speculativeTask = null;
- }
- }
-
- PendingReadOp parallelRead(boolean enabled) {
+ ReadLastConfirmedAndEntryOp parallelRead(boolean enabled) {
this.parallelRead = enabled;
return this;
}
- public void initiate() {
- long nextEnsembleChange = startEntryId, i = startEntryId;
- this.requestTimeNanos = MathUtils.nowInNano();
- ArrayList<BookieSocketAddress> ensemble = null;
-
- do {
- if (i == nextEnsembleChange) {
- ensemble = getLedgerMetadata().getEnsemble(i);
- nextEnsembleChange =
getLedgerMetadata().getNextEnsembleChange(i);
- }
- LedgerEntryRequest entry;
- if (parallelRead) {
- entry = new ParallelReadRequest(ensemble, lh.ledgerId, i);
- } else {
- entry = new SequenceReadRequest(ensemble, lh.ledgerId, i);
- }
- seq.add(entry);
- i++;
- } while (i <= endEntryId);
- // read the entries.
- for (LedgerEntryRequest entry : seq) {
- entry.read();
- if (!parallelRead &&
lh.bk.getReadSpeculativeRequestPolicy().isPresent()) {
-
lh.bk.getReadSpeculativeRequestPolicy().get().initiateSpeculativeRequest(scheduler,
entry);
+ /**
+ * Speculative Read Logic
+ */
+ @Override
+ public ListenableFuture<Boolean> issueSpeculativeRequest() {
+ return lh.bk.mainWorkerPool.submitOrdered(lh.getId(), new
Callable<Boolean>() {
+ @Override
+ public Boolean call() throws Exception {
+ if (!requestComplete.get() && !request.isComplete() &&
+ (null !=
request.maybeSendSpeculativeRead(heardFromHostsBitSet))) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Send speculative ReadLAC {} for ledger {}
(previousLAC: {}). Hosts heard are {}.",
+ new Object[] {request, lh.getId(),
lastAddConfirmed, heardFromHostsBitSet });
+ }
+ return true;
+ }
+ return false;
}
+ });
+ }
+
+ public void initiate() {
+ if (parallelRead) {
+ request = new ParallelReadRequest(lh.metadata.currentEnsemble,
lh.ledgerId, prevEntryId + 1);
+ } else {
+ request = new SequenceReadRequest(lh.metadata.currentEnsemble,
lh.ledgerId, prevEntryId + 1);
}
+ request.read();
+
+ if (!parallelRead &&
lh.bk.getReadLACSpeculativeRequestPolicy().isPresent()) {
+
lh.bk.getReadLACSpeculativeRequestPolicy().get().initiateSpeculativeRequest(scheduler,
this);
+ }
+ }
+
+ void sendReadTo(int bookieIndex, BookieSocketAddress to,
ReadLACAndEntryRequest entry) throws InterruptedException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Calling Read LAC and Entry with {} and long polling
interval {} on Bookie {} - Parallel {}",
+ new Object[] { prevEntryId, timeOutInMillis, to,
parallelRead });
+ }
+ lh.bk.bookieClient.readEntryWaitForLACUpdate(to,
+ lh.ledgerId,
+ BookieProtocol.LAST_ADD_CONFIRMED,
+ prevEntryId,
+ timeOutInMillis,
+ true,
+ this, new ReadLastConfirmedAndEntryContext(bookieIndex, to));
+ this.numResponsesPending++;
}
- private static class ReadContext implements ReadEntryCallbackCtx {
+ /**
+ * Wrapper to get all recovered data from the request
+ */
+ interface LastConfirmedAndEntryCallback {
+ public void readLastConfirmedAndEntryComplete(int rc, long
lastAddConfirmed, LedgerEntry entry);
+ }
+
+ public static class ReadLastConfirmedAndEntryContext implements
BookkeeperInternalCallbacks.ReadEntryCallbackCtx {
final int bookieIndex;
- final BookieSocketAddress to;
- final LedgerEntryRequest entry;
+ final BookieSocketAddress bookie;
long lac = LedgerHandle.INVALID_ENTRY_ID;
+ Optional<Long> lacUpdateTimestamp = Optional.absent();
- ReadContext(int bookieIndex, BookieSocketAddress to,
LedgerEntryRequest entry) {
+ ReadLastConfirmedAndEntryContext(int bookieIndex, BookieSocketAddress
bookie) {
this.bookieIndex = bookieIndex;
- this.to = to;
- this.entry = entry;
+ this.bookie = bookie;
}
@Override
@@ -496,84 +505,116 @@ class PendingReadOp implements Enumeration<LedgerEntry>,
ReadEntryCallback {
public long getLastAddConfirmed() {
return lac;
}
- }
- void sendReadTo(int bookieIndex, BookieSocketAddress to,
LedgerEntryRequest entry) throws InterruptedException {
- if (lh.throttler != null) {
- lh.throttler.acquire();
+ public Optional<Long> getLacUpdateTimestamp() {
+ return lacUpdateTimestamp;
}
- lh.bk.bookieClient.readEntry(to, lh.ledgerId, entry.entryId,
- this, new ReadContext(bookieIndex, to,
entry));
- }
-
- @Override
- public void readEntryComplete(int rc, long ledgerId, final long entryId,
final ByteBuf buffer, Object ctx) {
- final ReadContext rctx = (ReadContext)ctx;
- final LedgerEntryRequest entry = rctx.entry;
-
- if (rc != BKException.Code.OK) {
- entry.logErrorAndReattemptRead(rctx.bookieIndex, rctx.to, "Error:
" + BKException.getMessage(rc), rc);
- return;
+ public void setLacUpdateTimestamp(long lacUpdateTimestamp) {
+ this.lacUpdateTimestamp = Optional.of(lacUpdateTimestamp);
}
- heardFromHosts.add(rctx.to);
- heardFromHostsBitSet.set(rctx.bookieIndex, true);
- if (entry.complete(rctx.bookieIndex, rctx.to, buffer)) {
- lh.updateLastConfirmed(rctx.getLastAddConfirmed(), 0L);
- submitCallback(BKException.Code.OK);
- }
+ }
- if(numPendingEntries < 0)
- LOG.error("Read too many values for ledger {} : [{}, {}].", new
Object[] { ledgerId,
- startEntryId, endEntryId });
+ private void submitCallback(int rc, long lastAddConfirmed, LedgerEntry
entry) {
+ long latencyMicros = MathUtils.elapsedMicroSec(requestTimeNano);
+ if (BKException.Code.OK != rc) {
+ lh.bk.getReadLacAndEntryOpLogger()
+ .registerFailedEvent(latencyMicros, TimeUnit.MICROSECONDS);
+ } else {
+ lh.bk.getReadLacAndEntryOpLogger()
+ .registerSuccessfulEvent(latencyMicros, TimeUnit.MICROSECONDS);
+ }
+ cb.readLastConfirmedAndEntryComplete(rc, lastAddConfirmed, entry);
}
- protected void submitCallback(int code) {
- if (BKException.Code.OK == code) {
- numPendingEntries--;
- if (numPendingEntries != 0) {
- return;
+ @Override
+ public void readEntryComplete(int rc, long ledgerId, long entryId, ByteBuf
buffer, Object ctx) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("{} received response for (lid={}, eid={}) : {}",
+ new Object[] { getClass().getName(), ledgerId, entryId, rc });
+ }
+ ReadLastConfirmedAndEntryContext rCtx =
(ReadLastConfirmedAndEntryContext) ctx;
+ BookieSocketAddress bookie = rCtx.bookie;
+ numResponsesPending--;
+ if (BKException.Code.OK == rc) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Received lastAddConfirmed (lac={}) from bookie({})
for (lid={}).",
+ new Object[] { rCtx.getLastAddConfirmed(), bookie,
ledgerId });
}
- }
- // ensure callback once
- if (!complete.compareAndSet(false, true)) {
- return;
- }
+ if (rCtx.getLastAddConfirmed() > lastAddConfirmed) {
+ lastAddConfirmed = rCtx.getLastAddConfirmed();
+ lh.updateLastConfirmed(rCtx.getLastAddConfirmed(), 0L);
+ }
+
+ hasValidResponse = true;
- long latencyNanos = MathUtils.elapsedNanos(requestTimeNanos);
- if (code != BKException.Code.OK) {
- long firstUnread = LedgerHandle.INVALID_ENTRY_ID;
- for (LedgerEntryRequest req : seq) {
- if (!req.isComplete()) {
- firstUnread = req.getEntryId();
- break;
+ if (entryId != BookieProtocol.LAST_ADD_CONFIRMED) {
+ if (request.complete(rCtx.bookieIndex, bookie, buffer,
entryId)) {
+ // callback immediately
+ if (rCtx.getLacUpdateTimestamp().isPresent()) {
+ long elapsedMicros =
TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis() -
rCtx.getLacUpdateTimestamp().get());
+ elapsedMicros = Math.max(elapsedMicros, 0);
+ lh.bk.getReadLacAndEntryRespLogger()
+ .registerSuccessfulEvent(elapsedMicros,
TimeUnit.MICROSECONDS);
+ }
+
+ submitCallback(BKException.Code.OK, lastAddConfirmed,
request);
+ requestComplete.set(true);
+ heardFromHostsBitSet.set(rCtx.bookieIndex, true);
}
+ } else {
+ emptyResponsesFromHostsBitSet.set(rCtx.bookieIndex, true);
+ if (lastAddConfirmed > prevEntryId) {
+ // received advanced lac
+ completeRequest();
+ } else if(emptyResponsesFromHostsBitSet.cardinality() >=
numEmptyResponsesAllowed) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Completed readLACAndEntry(lid = {},
previousEntryId = {}) after received {} empty responses ('{}').",
+ new Object[]{ledgerId, prevEntryId,
emptyResponsesFromHostsBitSet.cardinality(), emptyResponsesFromHostsBitSet});
+ }
+ completeRequest();
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Received empty response for
readLACAndEntry(lid = {}, previousEntryId = {}) from" +
+ " bookie {} @ {}, reattempting reading
next bookie : lac = {}",
+ new Object[]{ledgerId, prevEntryId,
rCtx.bookieIndex,
+ rCtx.bookie, lastAddConfirmed});
+ }
+ request.logErrorAndReattemptRead(rCtx.bookieIndex, bookie,
"Empty Response", rc);
+ }
+ return;
}
- LOG.error("Read of ledger entry failed: L{} E{}-E{}, Heard from {}
: bitset = {}. First unread entry is {}",
- new Object[] { lh.getId(), startEntryId, endEntryId,
heardFromHosts, heardFromHostsBitSet, firstUnread });
- readOpLogger.registerFailedEvent(latencyNanos,
TimeUnit.NANOSECONDS);
+ } else if (BKException.Code.UnauthorizedAccessException == rc &&
!requestComplete.get()) {
+ submitCallback(rc, lastAddConfirmed, null);
+ requestComplete.set(true);
} else {
- readOpLogger.registerSuccessfulEvent(latencyNanos,
TimeUnit.NANOSECONDS);
+ request.logErrorAndReattemptRead(rCtx.bookieIndex, bookie, "Error:
" + BKException.getMessage(rc), rc);
+ return;
+ }
+
+ if (numResponsesPending <= 0) {
+ completeRequest();
}
- cancelSpeculativeTask(true);
- cb.readComplete(code, lh, PendingReadOp.this, PendingReadOp.this.ctx);
- cb = null;
}
- @Override
- public boolean hasMoreElements() {
- return !seq.isEmpty();
+ private void completeRequest() {
+ if (requestComplete.compareAndSet(false, true)) {
+ if (!hasValidResponse) {
+ // no success called
+ submitCallback(request.getFirstError(), lastAddConfirmed,
null);
+ } else {
+ // callback
+ submitCallback(BKException.Code.OK, lastAddConfirmed, null);
+ }
+ }
}
@Override
- public LedgerEntry nextElement() throws NoSuchElementException {
- return seq.remove();
+ public String toString() {
+ return String.format("ReadLastConfirmedAndEntryOp(lid=%d,
prevEntryId=%d])", lh.ledgerId, prevEntryId);
}
- public int size() {
- return seq.size();
- }
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
index 038437a..157c6b5 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
@@ -20,7 +20,9 @@ package org.apache.bookkeeper.conf;
import static com.google.common.base.Charsets.UTF_8;
import static
org.apache.bookkeeper.util.BookKeeperConstants.FEATURE_DISABLE_ENSEMBLE_CHANGE;
+import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
@@ -66,8 +68,12 @@ public class ClientConfiguration extends
AbstractConfiguration {
protected final static String FIRST_SPECULATIVE_READ_TIMEOUT =
"firstSpeculativeReadTimeout";
protected final static String MAX_SPECULATIVE_READ_TIMEOUT =
"maxSpeculativeReadTimeout";
protected final static String SPECULATIVE_READ_TIMEOUT_BACKOFF_MULTIPLIER
= "speculativeReadTimeoutBackoffMultiplier";
+ protected final static String FIRST_SPECULATIVE_READ_LAC_TIMEOUT =
"firstSpeculativeReadLACTimeout";
+ protected final static String MAX_SPECULATIVE_READ_LAC_TIMEOUT =
"maxSpeculativeReadLACTimeout";
+ protected final static String
SPECULATIVE_READ_LAC_TIMEOUT_BACKOFF_MULTIPLIER =
"speculativeReadLACTimeoutBackoffMultiplier";
protected final static String ENABLE_PARALLEL_RECOVERY_READ =
"enableParallelRecoveryRead";
protected final static String RECOVERY_READ_BATCH_SIZE =
"recoveryReadBatchSize";
+ protected final static String REORDER_READ_SEQUENCE_ENABLED =
"reorderReadSequenceEnabled";
// Add Parameters
protected final static String DELAY_ENSEMBLE_CHANGE =
"delayEnsembleChange";
// Timeout Setting
@@ -103,7 +109,11 @@ public class ClientConfiguration extends
AbstractConfiguration {
// Stats
protected final static String ENABLE_TASK_EXECUTION_STATS =
"enableTaskExecutionStats";
protected final static String TASK_EXECUTION_WARN_TIME_MICROS =
"taskExecutionWarnTimeMicros";
-
+
+ // Failure History Settings
+ protected final static String ENABLE_BOOKIE_FAILURE_TRACKING =
"enableBookieFailureTracking";
+ protected final static String BOOKIE_FAILURE_HISTORY_EXPIRATION_MS =
"bookieFailureHistoryExpirationMSec";
+
// Names of dynamic features
protected final static String DISABLE_ENSEMBLE_CHANGE_FEATURE_NAME =
"disableEnsembleChangeFeatureName";
@@ -846,6 +856,27 @@ public class ClientConfiguration extends
AbstractConfiguration {
}
/**
+ * Multipler to use when determining time between successive speculative
read LAC requests
+ *
+ * @return speculative read LAC timeout backoff multiplier.
+ */
+ public float getSpeculativeReadLACTimeoutBackoffMultiplier() {
+ return getFloat(SPECULATIVE_READ_LAC_TIMEOUT_BACKOFF_MULTIPLIER, 2.0f);
+ }
+
+ /**
+ * Set the multipler to use when determining time between successive
speculative read LAC requests
+ *
+ * @param speculativeReadLACTimeoutBackoffMultiplier
+ * multipler to use when determining time between successive
speculative read LAC requests.
+ * @return client configuration.
+ */
+ public ClientConfiguration
setSpeculativeReadLACTimeoutBackoffMultiplier(float
speculativeReadLACTimeoutBackoffMultiplier) {
+ setProperty(SPECULATIVE_READ_LAC_TIMEOUT_BACKOFF_MULTIPLIER,
speculativeReadLACTimeoutBackoffMultiplier);
+ return this;
+ }
+
+ /**
* Get the max speculative read timeout.
*
* @return max speculative read timeout.
@@ -867,6 +898,66 @@ public class ClientConfiguration extends
AbstractConfiguration {
}
/**
+ * Get the period of time after which the first speculative read last add
confirmed and entry
+ * should be triggered.
+ * A speculative entry request is sent to the next replica bookie before
+ * an error or response has been received for the previous entry read
request.
+ *
+ * A speculative entry read is only sent if we have not heard from the
current
+ * replica bookie during the entire read operation which may comprise of
many entries.
+ *
+ * Speculative requests allow the client to avoid having to wait for the
connect timeout
+ * in the case that a bookie has failed. It induces higher load on the
network and on
+ * bookies. This should be taken into account before changing this
configuration value.
+ *
+ * @return the speculative request timeout in milliseconds. Default 1500.
+ */
+ public int getFirstSpeculativeReadLACTimeout() {
+ return getInt(FIRST_SPECULATIVE_READ_LAC_TIMEOUT, 1500);
+ }
+
+
+ /**
+ * Get the maximum interval between successive speculative read last add
confirmed and entry
+ * requests.
+ *
+ * @return the max speculative request timeout in milliseconds. Default
5000.
+ */
+ public int getMaxSpeculativeReadLACTimeout() {
+ return getInt(MAX_SPECULATIVE_READ_LAC_TIMEOUT, 5000);
+ }
+
+ /**
+ * Set the period of time after which the first speculative read last add
confirmed and entry
+ * should be triggered.
+ * A lower timeout will reduce read latency in the case of a failed bookie,
+ * while increasing the load on bookies and the network.
+ *
+ * The default is 1500 milliseconds. A value of 0 will disable speculative
reads
+ * completely.
+ *
+ * @see #getSpeculativeReadTimeout()
+ * @param timeout the timeout value, in milliseconds
+ * @return client configuration
+ */
+ public ClientConfiguration setFirstSpeculativeReadLACTimeout(int timeout) {
+ setProperty(FIRST_SPECULATIVE_READ_LAC_TIMEOUT, timeout);
+ return this;
+ }
+
+ /**
+ * Set the maximum interval between successive speculative read last add
confirmed and entry
+ * requests.
+ *
+ * @param timeout the timeout value, in milliseconds
+ * @return client configuration
+ */
+ public ClientConfiguration setMaxSpeculativeReadLACTimeout(int timeout) {
+ setProperty(MAX_SPECULATIVE_READ_LAC_TIMEOUT, timeout);
+ return this;
+ }
+
+ /**
* Whether to enable parallel reading in recovery read.
*
* @return true if enable parallel reading in recovery read. otherwise,
return false.
@@ -909,6 +1000,34 @@ public class ClientConfiguration extends
AbstractConfiguration {
}
/**
+ * If reorder read sequence enabled or not.
+ *
+ * @return true if reorder read sequence is enabled, otherwise false.
+ */
+ public boolean isReorderReadSequenceEnabled() {
+ return getBoolean(REORDER_READ_SEQUENCE_ENABLED, false);
+ }
+
+ /**
+ * Enable/disable reordering read sequence on reading entries.
+ *
+ * <p>If this flag is enabled, the client will use
+ * {@link EnsemblePlacementPolicy#reorderReadSequence(ArrayList, List,
Map)}
+ * to figure out a better read sequence to attempt reads from replicas and
use
+ * {@link EnsemblePlacementPolicy#reorderReadLACSequence(ArrayList, List,
Map)}
+ * to figure out a better read sequence to attempt long poll reads from
replicas.
+ *
+ * <p>The order of read sequence is determined by the placement policy
implementations.
+ *
+ * @param enabled the flag to enable/disable reorder read sequence.
+ * @return client configuration instance.
+ */
+ public ClientConfiguration setReorderReadSequenceEnabled(boolean enabled) {
+ setProperty(REORDER_READ_SEQUENCE_ENABLED, enabled);
+ return this;
+ }
+
+ /**
* Get Ensemble Placement Policy Class.
*
* @return ensemble placement policy class.
@@ -1240,6 +1359,48 @@ public class ClientConfiguration extends
AbstractConfiguration {
}
/**
+ * Whether to enable bookie failure tracking
+ *
+ * @return flag to enable/disable bookie failure tracking
+ */
+ public boolean getEnableBookieFailureTracking() {
+ return getBoolean(ENABLE_BOOKIE_FAILURE_TRACKING, true);
+ }
+
+ /**
+ * Enable/Disable bookie failure tracking.
+ *
+ * @param enabled
+ * flag to enable/disable bookie failure tracking
+ * @return client configuration.
+ */
+ public ClientConfiguration setEnableBookieFailureTracking(boolean enabled)
{
+ setProperty(ENABLE_BOOKIE_FAILURE_TRACKING, enabled);
+ return this;
+ }
+
+ /**
+ * Get the bookie failure tracking expiration timeout.
+ *
+ * @return bookie failure tracking expiration timeout.
+ */
+ public int getBookieFailureHistoryExpirationMSec() {
+ return getInt(BOOKIE_FAILURE_HISTORY_EXPIRATION_MS, 60000);
+ }
+
+ /**
+ * Set the bookie failure tracking expiration timeout.
+ *
+ * @param timeout
+ * bookie failure tracking expiration timeout.
+ * @return client configuration.
+ */
+ public ClientConfiguration setBookieFailureHistoryExpirationMSec(int
expirationMSec) {
+ setProperty(BOOKIE_FAILURE_HISTORY_EXPIRATION_MS, expirationMSec);
+ return this;
+ }
+
+ /**
* Get the name of the dynamic feature that disables ensemble change
*
* @return name of the dynamic feature that disables ensemble change
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index b4dd066..e5c96e8 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -115,6 +115,11 @@ public class ServerConfiguration extends
AbstractConfiguration {
// Worker Thread parameters.
protected final static String NUM_ADD_WORKER_THREADS =
"numAddWorkerThreads";
protected final static String NUM_READ_WORKER_THREADS =
"numReadWorkerThreads";
+ protected final static String NUM_LONG_POLL_WORKER_THREADS =
"numLongPollWorkerThreads";
+
+ // Long poll parameters
+ protected final static String REQUEST_TIMER_TICK_DURATION_MILLISEC =
"requestTimerTickDurationMs";
+ protected final static String REQUEST_TIMER_NO_OF_TICKS =
"requestTimerNumTicks";
protected final static String READ_BUFFER_SIZE = "readBufferSizeBytes";
protected final static String WRITE_BUFFER_SIZE = "writeBufferSizeBytes";
@@ -1131,6 +1136,26 @@ public class ServerConfiguration extends
AbstractConfiguration {
}
/**
+ * Set the number of threads that should handle long poll requests
+ *
+ * @param numThreads
+ * number of threads to handle long poll requests.
+ * @return server configuration
+ */
+ public ServerConfiguration setNumLongPollWorkerThreads(int numThreads) {
+ setProperty(NUM_LONG_POLL_WORKER_THREADS, numThreads);
+ return this;
+ }
+
+ /**
+ * Get the number of threads that should handle long poll requests.
+ * @return
+ */
+ public int getNumLongPollWorkerThreads() {
+ return getInt(NUM_LONG_POLL_WORKER_THREADS, 10);
+ }
+
+ /**
* Set the number of threads that would handle read requests.
*
* @param numThreads
@@ -1148,6 +1173,46 @@ public class ServerConfiguration extends
AbstractConfiguration {
public int getNumReadWorkerThreads() {
return getInt(NUM_READ_WORKER_THREADS, 8);
}
+
+ /**
+ * Set the tick duration in milliseconds
+ *
+ * @param tickDuration
+ * tick duration in milliseconds.
+ * @return server configuration
+ */
+ public ServerConfiguration setRequestTimerTickDurationMs(int tickDuration)
{
+ setProperty(REQUEST_TIMER_TICK_DURATION_MILLISEC, tickDuration);
+ return this;
+ }
+
+ /**
+ * Get the tick duration in milliseconds.
+ * @return
+ */
+ public int getRequestTimerTickDurationMs() {
+ return getInt(REQUEST_TIMER_TICK_DURATION_MILLISEC, 10);
+ }
+
+ /**
+ * Set the number of ticks per wheel for the request timer.
+ *
+ * @param tickCount
+ * number of ticks per wheel for the request timer.
+ * @return server configuration
+ */
+ public ServerConfiguration setRequestTimerNumTicks(int tickCount) {
+ setProperty(REQUEST_TIMER_NO_OF_TICKS, tickCount);
+ return this;
+ }
+
+ /**
+ * Get the number of ticks per wheel for the request timer.
+ * @return
+ */
+ public int getRequestTimerNumTicks() {
+ return getInt(REQUEST_TIMER_NO_OF_TICKS, 1024);
+ }
/**
* Get the number of bytes used as capacity for the write buffer. Default
is
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
index 1376048..4cbd814 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
@@ -364,17 +364,7 @@ public class BookieClient implements
PerChannelBookieClientFactory {
@Override
public void operationComplete(final int rc,
PerChannelBookieClient pcbc) {
if (rc != BKException.Code.OK) {
- try {
- executor.submitOrdered(ledgerId, new
SafeRunnable() {
- @Override
- public void safeRun() {
- cb.readEntryComplete(rc, ledgerId,
entryId, null, ctx);
- }
- });
- } catch (RejectedExecutionException re) {
-
cb.readEntryComplete(getRc(BKException.Code.InterruptedException),
- ledgerId, entryId, null, ctx);
- }
+ completeRead(rc, ledgerId, entryId, null, cb, ctx);
return;
}
pcbc.readEntry(ledgerId, entryId, cb, ctx);
@@ -384,6 +374,40 @@ public class BookieClient implements
PerChannelBookieClientFactory {
closeLock.readLock().unlock();
}
}
+
+
+ public void readEntryWaitForLACUpdate(final BookieSocketAddress addr,
+ final long ledgerId,
+ final long entryId,
+ final long previousLAC,
+ final long timeOutInMillis,
+ final boolean piggyBackEntry,
+ final ReadEntryCallback cb,
+ final Object ctx) {
+ closeLock.readLock().lock();
+ try {
+ final PerChannelBookieClientPool client = lookupClient(addr,
entryId);
+ if (client == null) {
+
completeRead(BKException.Code.BookieHandleNotAvailableException,
+ ledgerId, entryId, null, cb, ctx);
+ return;
+ }
+
+ client.obtain(new GenericCallback<PerChannelBookieClient>() {
+ @Override
+ public void operationComplete(final int rc,
PerChannelBookieClient pcbc) {
+
+ if (rc != BKException.Code.OK) {
+ completeRead(rc, ledgerId, entryId, null, cb, ctx);
+ return;
+ }
+ pcbc.readEntryWaitForLACUpdate(ledgerId, entryId,
previousLAC, timeOutInMillis, piggyBackEntry, cb, ctx);
+ }
+ }, ledgerId);
+ } finally {
+ closeLock.readLock().unlock();
+ }
+ }
public void getBookieInfo(final BookieSocketAddress addr, final long
requested, final GetBookieInfoCallback cb, final Object ctx) {
closeLock.readLock().lock();
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
index ce5972e..1f20425 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
@@ -20,14 +20,18 @@
*/
package org.apache.bookkeeper.proto;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.protobuf.ByteString;
import io.netty.channel.Channel;
+import io.netty.util.HashedWheelTimer;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.auth.AuthProviderFactoryFactory;
import org.apache.bookkeeper.auth.AuthToken;
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.processor.RequestProcessor;
+import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.util.OrderedSafeExecutor;
@@ -49,6 +53,7 @@ import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_LONG
import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_REQUEST;
import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_SCHEDULING_DELAY;
import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_LAC_REQUEST;
+import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_LAST_ENTRY_NOENTRY_ERROR;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.WRITE_LAC;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_LAC;
import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.GET_BOOKIE_INFO;
@@ -79,6 +84,17 @@ public class BookieRequestProcessor implements
RequestProcessor {
*/
private final OrderedSafeExecutor writeThreadPool;
+ /**
+ * The threadpool used to execute all long poll requests issued to this
server
+ * after they are done waiting
+ */
+ private final OrderedSafeExecutor longPollThreadPool;
+
+ /**
+ * The Timer used to time out requests for long polling
+ */
+ private final HashedWheelTimer requestTimer;
+
// Expose Stats
private final BKStats bkStats = BKStats.getInstance();
private final boolean statsEnabled;
@@ -94,6 +110,7 @@ public class BookieRequestProcessor implements
RequestProcessor {
final OpStatsLogger longPollWaitStats;
final OpStatsLogger longPollReadStats;
final OpStatsLogger longPollReadRequestStats;
+ final Counter readLastEntryNoEntryErrorCounter;
final OpStatsLogger writeLacRequestStats;
final OpStatsLogger writeLacStats;
final OpStatsLogger readLacRequestStats;
@@ -108,6 +125,15 @@ public class BookieRequestProcessor implements
RequestProcessor {
this.bookie = bookie;
this.readThreadPool =
createExecutor(this.serverCfg.getNumReadWorkerThreads(), "BookieReadThread-" +
serverCfg.getBookiePort());
this.writeThreadPool =
createExecutor(this.serverCfg.getNumAddWorkerThreads(), "BookieWriteThread-" +
serverCfg.getBookiePort());
+ this.longPollThreadPool =
+ createExecutor(
+ this.serverCfg.getNumLongPollWorkerThreads(),
+ "BookieLongPollThread-" + serverCfg.getBookiePort());
+ this.requestTimer = new HashedWheelTimer(
+ new
ThreadFactoryBuilder().setNameFormat("BookieRequestTimer-%d").build(),
+ this.serverCfg.getRequestTimerTickDurationMs(),
+ TimeUnit.MILLISECONDS, this.serverCfg.getRequestTimerNumTicks());
+
// Expose Stats
this.statsEnabled = serverCfg.isStatisticsEnabled();
this.addEntryStats = statsLogger.getOpStatsLogger(ADD_ENTRY);
@@ -122,6 +148,7 @@ public class BookieRequestProcessor implements
RequestProcessor {
this.longPollWaitStats =
statsLogger.getOpStatsLogger(READ_ENTRY_LONG_POLL_WAIT);
this.longPollReadStats =
statsLogger.getOpStatsLogger(READ_ENTRY_LONG_POLL_READ);
this.longPollReadRequestStats =
statsLogger.getOpStatsLogger(READ_ENTRY_LONG_POLL_REQUEST);
+ this.readLastEntryNoEntryErrorCounter =
statsLogger.getCounter(READ_LAST_ENTRY_NOENTRY_ERROR);
this.writeLacStats = statsLogger.getOpStatsLogger(WRITE_LAC);
this.writeLacRequestStats =
statsLogger.getOpStatsLogger(WRITE_LAC_REQUEST);
this.readLacStats = statsLogger.getOpStatsLogger(READ_LAC);
@@ -231,11 +258,29 @@ public class BookieRequestProcessor implements
RequestProcessor {
private void processReadRequestV3(final BookkeeperProtocol.Request r,
final Channel c) {
ExecutorService fenceThreadPool =
null == readThreadPool ? null : readThreadPool.chooseThread(c);
- ReadEntryProcessorV3 read = new ReadEntryProcessorV3(r, c, this,
fenceThreadPool);
- if (null == readThreadPool) {
- read.run();
+ ExecutorService lpThreadPool =
+ null == longPollThreadPool ? null :
longPollThreadPool.chooseThread(c);
+ ReadEntryProcessorV3 read;
+ if (RequestUtils.isLongPollReadRequest(r.getReadRequest())) {
+ read = new LongPollReadEntryProcessorV3(
+ r,
+ c,
+ this,
+ fenceThreadPool,
+ lpThreadPool,
+ requestTimer);
+ if (null == longPollThreadPool) {
+ read.run();
+ } else {
+
longPollThreadPool.submitOrdered(r.getReadRequest().getLedgerId(), read);
+ }
} else {
- readThreadPool.submitOrdered(r.getReadRequest().getLedgerId(),
read);
+ read = new ReadEntryProcessorV3(r, c, this, fenceThreadPool);
+ if (null == readThreadPool) {
+ read.run();
+ } else {
+ readThreadPool.submitOrdered(r.getReadRequest().getLedgerId(),
read);
+ }
}
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3.java
new file mode 100644
index 0000000..342e788
--- /dev/null
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3.java
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.proto;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Stopwatch;
+import io.netty.channel.Channel;
+import io.netty.util.HashedWheelTimer;
+import io.netty.util.Timeout;
+import io.netty.util.TimerTask;
+import java.io.IOException;
+import java.util.Observable;
+import java.util.Observer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.LastAddConfirmedUpdateNotification;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.Request;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.ReadResponse;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Processor handling long poll read entry request.
+ */
+class LongPollReadEntryProcessorV3 extends ReadEntryProcessorV3 implements
Observer {
+
+ private final static Logger logger =
LoggerFactory.getLogger(LongPollReadEntryProcessorV3.class);
+
+ private final Long previousLAC;
+ private Optional<Long> lastAddConfirmedUpdateTime = Optional.absent();
+
+ // long poll execution state
+ private final ExecutorService longPollThreadPool;
+ private final HashedWheelTimer requestTimer;
+ private Timeout expirationTimerTask = null;
+ private Future<?> deferredTask = null;
+ private boolean shouldReadEntry = false;
+
+ LongPollReadEntryProcessorV3(Request request,
+ Channel channel,
+ BookieRequestProcessor requestProcessor,
+ ExecutorService fenceThreadPool,
+ ExecutorService longPollThreadPool,
+ HashedWheelTimer requestTimer) {
+ super(request, channel, requestProcessor, fenceThreadPool);
+ this.previousLAC = readRequest.getPreviousLAC();
+ this.longPollThreadPool = longPollThreadPool;
+ this.requestTimer = requestTimer;
+
+ }
+
+ @Override
+ protected Long getPreviousLAC() {
+ return previousLAC;
+ }
+
+ private synchronized boolean shouldReadEntry() {
+ return shouldReadEntry;
+ }
+
+ @Override
+ protected ReadResponse readEntry(ReadResponse.Builder readResponseBuilder,
+ long entryId,
+ Stopwatch startTimeSw)
+ throws IOException {
+ if (RequestUtils.shouldPiggybackEntry(readRequest)) {
+ if(!readRequest.hasPreviousLAC() ||
(BookieProtocol.LAST_ADD_CONFIRMED != entryId)) {
+ // This is not a valid request - client bug?
+ logger.error("Incorrect read request, entry piggyback
requested incorrectly for ledgerId {} entryId {}",
+ ledgerId, entryId);
+ return buildResponse(readResponseBuilder, StatusCode.EBADREQ,
startTimeSw);
+ } else {
+ long knownLAC =
requestProcessor.bookie.readLastAddConfirmed(ledgerId);
+ readResponseBuilder.setMaxLAC(knownLAC);
+ if (knownLAC > previousLAC) {
+ entryId = previousLAC + 1;
+ readResponseBuilder.setMaxLAC(knownLAC);
+ if (lastAddConfirmedUpdateTime.isPresent()) {
+
readResponseBuilder.setLacUpdateTimestamp(lastAddConfirmedUpdateTime.get());
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("ReadLAC Piggy Back reading entry:{} from
ledger: {}", entryId, ledgerId);
+ }
+ try {
+ return super.readEntry(readResponseBuilder, entryId,
true, startTimeSw);
+ } catch (Bookie.NoEntryException e) {
+
requestProcessor.readLastEntryNoEntryErrorCounter.inc();
+ logger.info("No entry found while piggyback reading
entry {} from ledger {} : previous lac = {}",
+ new Object[] { entryId, ledgerId, previousLAC
});
+ // piggy back is best effort and this request can fail
genuinely because of striping
+ // entries across the ensemble
+ return buildResponse(readResponseBuilder,
StatusCode.EOK, startTimeSw);
+ }
+ } else {
+ if (knownLAC < previousLAC) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Found smaller lac when piggy back
reading lac and entry from ledger {} :" +
+ " previous lac = {}, known lac = {}",
+ new Object[]{ ledgerId, previousLAC,
knownLAC });
+ }
+ }
+ return buildResponse(readResponseBuilder, StatusCode.EOK,
startTimeSw);
+ }
+ }
+ } else {
+ return super.readEntry(readResponseBuilder, entryId, false,
startTimeSw);
+ }
+ }
+
+ private ReadResponse buildErrorResponse(StatusCode statusCode, Stopwatch
sw) {
+ ReadResponse.Builder builder = ReadResponse.newBuilder()
+ .setLedgerId(ledgerId)
+ .setEntryId(entryId);
+ return buildResponse(builder, statusCode, sw);
+ }
+
+ private ReadResponse getLongPollReadResponse() {
+ if (!shouldReadEntry() && readRequest.hasTimeOut()) {
+ if (logger.isTraceEnabled()) {
+ logger.trace("Waiting For LAC Update {}", previousLAC);
+ }
+
+ final Stopwatch startTimeSw = Stopwatch.createStarted();
+
+ final Observable observable;
+ try {
+ observable =
requestProcessor.bookie.waitForLastAddConfirmedUpdate(ledgerId, previousLAC,
this);
+ } catch (Bookie.NoLedgerException e) {
+ logger.info("No ledger found while longpoll reading ledger {},
previous lac = {}.",
+ ledgerId, previousLAC);
+ return buildErrorResponse(StatusCode.ENOLEDGER, startTimeSw);
+ } catch (IOException ioe) {
+ logger.error("IOException while longpoll reading ledger {},
previous lac = {} : ",
+ new Object[] { ledgerId, previousLAC, ioe });
+ return buildErrorResponse(StatusCode.EIO, startTimeSw);
+ }
+
+ registerSuccessfulEvent(requestProcessor.longPollPreWaitStats,
startTimeSw);
+ lastPhaseStartTime.reset().start();
+
+ if (null != observable) {
+ // successfully registered observable to lac updates
+ if (logger.isTraceEnabled()) {
+ logger.trace("Waiting For LAC Update {}: Timeout {}",
previousLAC, readRequest.getTimeOut());
+ }
+ synchronized (this) {
+ expirationTimerTask = requestTimer.newTimeout(new
TimerTask() {
+ @Override
+ public void run(Timeout timeout) throws Exception {
+ // When the timeout expires just get whatever is
the current
+ // readLastConfirmed
+
LongPollReadEntryProcessorV3.this.scheduleDeferredRead(observable, true);
+ }
+ }, readRequest.getTimeOut(), TimeUnit.MILLISECONDS);
+ }
+ return null;
+ }
+ }
+ // request doesn't have timeout or fail to wait, proceed to read entry
+ return getReadResponse();
+ }
+
+ @Override
+ protected void executeOp() {
+ ReadResponse readResponse = getLongPollReadResponse();
+ if (null != readResponse) {
+ sendResponse(readResponse);
+ }
+ }
+
+ @Override
+ public void update(Observable observable, Object o) {
+ LastAddConfirmedUpdateNotification newLACNotification =
(LastAddConfirmedUpdateNotification)o;
+ if (newLACNotification.lastAddConfirmed > previousLAC) {
+ if (newLACNotification.lastAddConfirmed != Long.MAX_VALUE &&
+ !lastAddConfirmedUpdateTime.isPresent()) {
+ lastAddConfirmedUpdateTime =
Optional.of(newLACNotification.timestamp);
+ }
+ if (logger.isTraceEnabled()) {
+ logger.trace("Last Add Confirmed Advanced to {} for request
{}",
+ newLACNotification.lastAddConfirmed, request);
+ }
+ scheduleDeferredRead(observable, false);
+ }
+ }
+
+ private synchronized void scheduleDeferredRead(Observable observable,
boolean timeout) {
+ if (null == deferredTask) {
+ if (logger.isTraceEnabled()) {
+ logger.trace("Deferred Task, expired: {}, request: {}",
timeout, request);
+ }
+ observable.deleteObserver(this);
+ try {
+ shouldReadEntry = true;
+ deferredTask = longPollThreadPool.submit(this);
+ } catch (RejectedExecutionException exc) {
+ // If the threadPool has been shutdown, simply drop the task
+ }
+ if (null != expirationTimerTask) {
+ expirationTimerTask.cancel();
+ }
+
+ registerEvent(timeout, requestProcessor.longPollWaitStats,
lastPhaseStartTime);
+ lastPhaseStartTime.reset().start();
+ }
+ }
+}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index 2ec567b..7ecb0b4 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -66,6 +66,7 @@ import com.google.protobuf.ByteString;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeperClientStats;
import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
+import org.apache.bookkeeper.client.ReadLastConfirmedAndEntryOp;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
@@ -728,7 +729,36 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
}
}
- public void readEntry(final long ledgerId, final long entryId,
ReadEntryCallback cb, Object ctx) {
+ /**
+ * Long Poll Reads
+ */
+ public void readEntryWaitForLACUpdate(final long ledgerId,
+ final long entryId,
+ final long previousLAC,
+ final long timeOutInMillis,
+ final boolean piggyBackEntry,
+ ReadEntryCallback cb,
+ Object ctx) {
+ readEntryInternal(ledgerId, entryId, previousLAC, timeOutInMillis,
piggyBackEntry, cb, ctx);
+ }
+
+ /**
+ * Normal Reads.
+ */
+ public void readEntry(final long ledgerId,
+ final long entryId,
+ ReadEntryCallback cb,
+ Object ctx) {
+ readEntryInternal(ledgerId, entryId, null, null, false, cb, ctx);
+ }
+
+ private void readEntryInternal(final long ledgerId,
+ final long entryId,
+ final Long previousLAC,
+ final Long timeOutInMillis,
+ final boolean piggyBackEntry,
+ final ReadEntryCallback cb,
+ final Object ctx) {
Object request = null;
CompletionKey completion = null;
if (useV2WireProtocol) {
@@ -749,6 +779,30 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
.setLedgerId(ledgerId)
.setEntryId(entryId);
+ if (null != previousLAC) {
+ readBuilder = readBuilder.setPreviousLAC(previousLAC);
+ }
+
+ if (null != timeOutInMillis) {
+ // Long poll requires previousLAC
+ if (null == previousLAC) {
+
cb.readEntryComplete(BKException.Code.IncorrectParameterException,
+ ledgerId, entryId, null, ctx);
+ return;
+ }
+ readBuilder = readBuilder.setTimeOut(timeOutInMillis);
+ }
+
+ if (piggyBackEntry) {
+ // Long poll requires previousLAC
+ if (null == previousLAC) {
+
cb.readEntryComplete(BKException.Code.IncorrectParameterException,
+ ledgerId, entryId, null, ctx);
+ return;
+ }
+ readBuilder =
readBuilder.setFlag(ReadRequest.Flag.ENTRY_PIGGYBACK);
+ }
+
request = Request.newBuilder()
.setHeader(headerBuilder)
.setReadRequest(readBuilder)
@@ -1210,7 +1264,7 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
if (readResponse.hasData()) {
data = readResponse.getData();
}
- handleReadResponse(ledgerId, entryId, status,
data, INVALID_ENTRY_ID, completionValue);
+ handleReadResponse(ledgerId, entryId, status,
data, INVALID_ENTRY_ID, -1L, completionValue);
break;
}
default:
@@ -1302,7 +1356,11 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
if (readResponse.hasMaxLAC()) {
maxLAC = readResponse.getMaxLAC();
}
- handleReadResponse(readResponse.getLedgerId(),
readResponse.getEntryId(), status, buffer, maxLAC, completionValue);
+ long lacUpdateTimestamp = -1L;
+ if (readResponse.hasLacUpdateTimestamp()) {
+ lacUpdateTimestamp =
readResponse.getLacUpdateTimestamp();
+ }
+ handleReadResponse(readResponse.getLedgerId(),
readResponse.getEntryId(), status, buffer, maxLAC, lacUpdateTimestamp,
completionValue);
break;
}
case WRITE_LAC: {
@@ -1416,6 +1474,7 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
StatusCode status,
ByteBuf buffer,
long maxLAC, // max known lac piggy-back from
bookies
+ long lacUpdateTimestamp, // the timestamp when the
lac is updated.
CompletionValue completionValue) {
// The completion value should always be an instance of a
ReadCompletion object when we reach here.
ReadCompletion rc = (ReadCompletion)completionValue;
@@ -1440,6 +1499,9 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
if (maxLAC > INVALID_ENTRY_ID && (rc.ctx instanceof
ReadEntryCallbackCtx)) {
((ReadEntryCallbackCtx) rc.ctx).setLastAddConfirmed(maxLAC);
}
+ if (lacUpdateTimestamp > -1L && (rc.ctx instanceof
ReadLastConfirmedAndEntryOp.ReadLastConfirmedAndEntryContext)) {
+ ((ReadLastConfirmedAndEntryOp.ReadLastConfirmedAndEntryContext)
rc.ctx).setLacUpdateTimestamp(lacUpdateTimestamp);
+ }
rc.cb.readEntryComplete(rcToRet, ledgerId, entryId, buffer, rc.ctx);
}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
index 6be898d..0febdc7 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
@@ -24,6 +24,8 @@ import io.netty.buffer.ByteBuf;
import java.io.File;
import java.io.IOException;
+import java.util.Observable;
+import java.util.Observer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Callable;
@@ -339,6 +341,11 @@ public class TestSyncThread {
}
@Override
+ public Observable waitForLastAddConfirmedUpdate(long ledgerId, long
previoisLAC, Observer observer) throws IOException {
+ return null;
+ }
+
+ @Override
public Checkpoint checkpoint(Checkpoint checkpoint)
throws IOException {
return checkpoint;
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadLastConfirmedAndEntry.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadLastConfirmedAndEntry.java
new file mode 100644
index 0000000..96c5c08
--- /dev/null
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadLastConfirmedAndEntry.java
@@ -0,0 +1,267 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.BookieException;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.apache.zookeeper.KeeperException;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Enumeration;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static com.google.common.base.Charsets.UTF_8;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class TestReadLastConfirmedAndEntry extends BookKeeperClusterTestCase {
+
+ private static final Logger logger =
LoggerFactory.getLogger(TestReadLastConfirmedAndEntry.class);
+
+ final BookKeeper.DigestType digestType;
+
+ public TestReadLastConfirmedAndEntry() {
+ super(3);
+ this.digestType = BookKeeper.DigestType.CRC32;
+ }
+
+ static class FakeBookie extends Bookie {
+
+ final long expectedEntryToFail;
+ final boolean stallOrRespondNull;
+
+ public FakeBookie(ServerConfiguration conf, long expectedEntryToFail,
boolean stallOrRespondNull)
+ throws InterruptedException, BookieException, KeeperException,
IOException {
+ super(conf);
+ this.expectedEntryToFail = expectedEntryToFail;
+ this.stallOrRespondNull = stallOrRespondNull;
+ }
+
+ @Override
+ public ByteBuf readEntry(long ledgerId, long entryId)
+ throws IOException, NoLedgerException {
+ if (entryId == expectedEntryToFail) {
+ if (stallOrRespondNull) {
+ try {
+ Thread.sleep(600000);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ } else {
+ throw new NoEntryException(ledgerId, entryId);
+ }
+ }
+ return super.readEntry(ledgerId, entryId);
+ }
+ }
+
+ @Test(timeout = 60000)
+ public void testAdvancedLacWithEmptyResponse() throws Exception {
+ byte[] passwd = "advanced-lac-with-empty-response".getBytes(UTF_8);
+
+ ClientConfiguration newConf = new ClientConfiguration();
+ newConf.addConfiguration(baseClientConf);
+ newConf.setAddEntryTimeout(9999999);
+ newConf.setReadEntryTimeout(9999999);
+
+ // stop existing bookies
+ stopAllBookies();
+ // add fake bookies
+ long expectedEntryIdToFail = 2;
+ for (int i = 0; i < numBookies; i++) {
+ ServerConfiguration conf = newServerConfiguration();
+ Bookie b = new FakeBookie(conf, expectedEntryIdToFail, i != 0);
+ bs.add(startBookie(conf, b));
+ bsConfs.add(conf);
+ }
+
+ // create bookkeeper
+ BookKeeper newBk = new BookKeeper(newConf);
+ // create ledger to write some data
+ LedgerHandle lh = newBk.createLedger(3, 3, 2, digestType, passwd);
+ for (int i = 0; i <= expectedEntryIdToFail; i++) {
+ lh.addEntry("test".getBytes(UTF_8));
+ }
+
+ // open ledger to tail reading
+ LedgerHandle newLh = newBk.openLedgerNoRecovery(lh.getId(),
digestType, passwd);
+ long lac = newLh.readLastConfirmed();
+ assertEquals(expectedEntryIdToFail - 1, lac);
+ Enumeration<LedgerEntry> entries = newLh.readEntries(0, lac);
+
+ int numReads = 0;
+ long expectedEntryId = 0L;
+ while (entries.hasMoreElements()) {
+ LedgerEntry entry = entries.nextElement();
+ assertEquals(expectedEntryId++, entry.getEntryId());
+ ++numReads;
+ }
+ assertEquals(lac + 1, numReads);
+
+ final AtomicInteger rcHolder = new AtomicInteger(-12345);
+ final AtomicLong lacHolder = new AtomicLong(lac);
+ final AtomicReference<LedgerEntry> entryHolder = new
AtomicReference<LedgerEntry>(null);
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ newLh.asyncReadLastConfirmedAndEntry(newLh.getLastAddConfirmed() + 1,
99999, false,
+ new AsyncCallback.ReadLastConfirmedAndEntryCallback() {
+ @Override
+ public void readLastConfirmedAndEntryComplete(int rc, long
lastConfirmed, LedgerEntry entry, Object ctx) {
+ rcHolder.set(rc);
+ lacHolder.set(lastConfirmed);
+ entryHolder.set(entry);
+ latch.countDown();
+ }
+ }, null);
+
+ lh.addEntry("another test".getBytes(UTF_8));
+
+ latch.await();
+ assertEquals(expectedEntryIdToFail, lacHolder.get());
+ assertNull(entryHolder.get());
+ assertEquals(BKException.Code.OK, rcHolder.get());
+ }
+
+ static class SlowReadLacBookie extends Bookie {
+
+ private final long lacToSlowRead;
+ private final CountDownLatch readLatch;
+
+ public SlowReadLacBookie(ServerConfiguration conf,
+ long lacToSlowRead, CountDownLatch readLatch)
+ throws IOException, KeeperException, InterruptedException,
BookieException {
+ super(conf);
+ this.lacToSlowRead = lacToSlowRead;
+ this.readLatch = readLatch;
+ }
+
+ @Override
+ public long readLastAddConfirmed(long ledgerId) throws IOException {
+ long lac = super.readLastAddConfirmed(ledgerId);
+ logger.info("Last Add Confirmed for ledger {} is {}", ledgerId,
lac);
+ if (lacToSlowRead == lac) {
+ logger.info("Suspend returning lac {} for ledger {}", lac,
ledgerId);
+ try {
+ readLatch.await();
+ } catch (InterruptedException e) {
+ // no-op
+ }
+ }
+ return super.readLastAddConfirmed(ledgerId);
+ }
+ }
+
+ static class ReadLastConfirmedAndEntryResult implements
AsyncCallback.ReadLastConfirmedAndEntryCallback {
+
+ int rc = -1234;
+ long lac = -1234L;
+ LedgerEntry entry = null;
+ final CountDownLatch doneLatch = new CountDownLatch(1);
+
+ @Override
+ public void readLastConfirmedAndEntryComplete(int rc, long
lastConfirmed, LedgerEntry entry, Object ctx) {
+ this.rc = rc;
+ this.lac = lastConfirmed;
+ this.entry = entry;
+ doneLatch.countDown();
+ }
+
+ void await() throws InterruptedException {
+ doneLatch.await();
+ }
+ }
+
+ @Test(timeout = 60000)
+ public void testRaceOnLastAddConfirmed() throws Exception {
+ byte[] passwd = "race-on-last-add-confirmed".getBytes(UTF_8);
+
+ ClientConfiguration newConf = new ClientConfiguration();
+ newConf.addConfiguration(baseClientConf);
+ newConf.setAddEntryTimeout(9999999);
+ newConf.setReadEntryTimeout(9999999);
+
+ final long lacToSlowRead = 0L;
+ final CountDownLatch readLatch = new CountDownLatch(1);
+
+ // stop first bookie
+ ServerConfiguration bsConf = killBookie(0);
+ // start it with a slow bookie
+ Bookie b = new SlowReadLacBookie(bsConf, lacToSlowRead, readLatch);
+ bs.add(startBookie(bsConf, b));
+ bsConfs.add(bsConf);
+
+ // create bookkeeper
+ BookKeeper newBk = new BookKeeper(newConf);
+ // create ledger
+ LedgerHandle lh = newBk.createLedger(3, 3, 3, digestType, passwd);
+ // 0) write entry 0
+ lh.addEntry("entry-0".getBytes(UTF_8));
+
+ // open ledger to read
+ LedgerHandle readLh = newBk.openLedgerNoRecovery(lh.getId(),
digestType, passwd);
+
+ // 1) wait entry 0 to be committed
+ ReadLastConfirmedAndEntryResult readResult = new
ReadLastConfirmedAndEntryResult();
+ readLh.asyncReadLastConfirmedAndEntry(0L, 9999999, true, readResult,
null);
+
+ // 2) write entry 1 to commit entry 0 => lac = 0
+ lh.addEntry("entry-1".getBytes(UTF_8));
+ readResult.await();
+ assertEquals(BKException.Code.OK, readResult.rc);
+ assertEquals(0L, readResult.lac);
+ assertEquals(0L, readResult.entry.getEntryId());
+ assertEquals("entry-0", new String(readResult.entry.getEntry(),
UTF_8));
+
+ // 3) write entry 2 to commit entry 1 => lac = 1
+ lh.addEntry("entry-2".getBytes(UTF_8));
+ // 4) count down read latch to trigger previous readLacAndEntry request
+ readLatch.countDown();
+ // 5) due to piggyback, the lac is updated to lac = 1
+ while (readLh.getLastAddConfirmed() < 1L) {
+ Thread.sleep(100);
+ }
+ // 6) write entry 3 to commit entry 2 => lac = 2
+ lh.addEntry("entry-3".getBytes(UTF_8));
+ // 7) readLastConfirmedAndEntry for next entry (we are expecting to
read entry 1)
+ readResult = new ReadLastConfirmedAndEntryResult();
+ readLh.asyncReadLastConfirmedAndEntry(1L, 9999999, true, readResult,
null);
+ readResult.await();
+ assertEquals(BKException.Code.OK, readResult.rc);
+ assertEquals(2L, readResult.lac);
+ assertEquals(1L, readResult.entry.getEntryId());
+ assertEquals("entry-1", new String(readResult.entry.getEntry(),
UTF_8));
+
+ lh.close();
+ readLh.close();
+
+ newBk.close();
+ }
+}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadLastConfirmedLongPoll.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadLastConfirmedLongPoll.java
new file mode 100644
index 0000000..094fe88
--- /dev/null
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadLastConfirmedLongPoll.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.client;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.junit.Test;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class TestReadLastConfirmedLongPoll extends BookKeeperClusterTestCase {
+ final DigestType digestType;
+
+ public TestReadLastConfirmedLongPoll() {
+ super(6);
+ this.digestType = DigestType.CRC32;
+ }
+
+ @Test(timeout = 60000)
+ public void testReadLACLongPollWhenAllBookiesUp() throws Exception {
+ final int numEntries = 3;
+
+ final LedgerHandle lh = bkc.createLedger(3, 3, 1, digestType,
"".getBytes());
+ LedgerHandle readLh = bkc.openLedgerNoRecovery(lh.getId(), digestType,
"".getBytes());
+ assertEquals(LedgerHandle.INVALID_ENTRY_ID,
readLh.getLastAddConfirmed());
+ // add entries
+ for (int i = 0; i < (numEntries - 1); i++) {
+ lh.addEntry(("data" + i).getBytes());
+ }
+ final AtomicBoolean success = new AtomicBoolean(false);
+ final AtomicInteger numCallbacks = new AtomicInteger(0);
+ final CountDownLatch firstReadComplete = new CountDownLatch(1);
+ readLh.asyncTryReadLastConfirmed(new
AsyncCallback.ReadLastConfirmedCallback() {
+ @Override
+ public void readLastConfirmedComplete(int rc, long lastConfirmed,
Object ctx) {
+ numCallbacks.incrementAndGet();
+ if (BKException.Code.OK == rc) {
+ success.set(true);
+ } else {
+ success.set(false);
+ }
+ firstReadComplete.countDown();
+ }
+ }, null);
+ firstReadComplete.await();
+ assertTrue(success.get());
+ assertTrue(numCallbacks.get() == 1);
+ assertEquals(numEntries - 3, readLh.getLastAddConfirmed());
+ // try read last confirmed again
+ success.set(false);
+ numCallbacks.set(0);
+ long entryId = readLh.getLastAddConfirmed()+1;
+ final CountDownLatch secondReadComplete = new CountDownLatch(1);
+ readLh.asyncReadLastConfirmedAndEntry(entryId++, 1000, true, new
AsyncCallback.ReadLastConfirmedAndEntryCallback() {
+ @Override
+ public void readLastConfirmedAndEntryComplete(int rc, long
lastConfirmed, LedgerEntry entry, Object ctx) {
+ numCallbacks.incrementAndGet();
+ if (BKException.Code.OK == rc && lastConfirmed == (numEntries
- 2)) {
+ success.set(true);
+ } else {
+ success.set(false);
+ }
+ secondReadComplete.countDown();
+ }
+ }, null);
+ lh.addEntry(("data" + (numEntries - 1)).getBytes());
+ secondReadComplete.await();
+ assertTrue(success.get());
+ assertTrue(numCallbacks.get() == 1);
+ assertEquals(numEntries - 2, readLh.getLastAddConfirmed());
+
+ success.set(false);
+ numCallbacks.set(0);
+ final CountDownLatch thirdReadComplete = new CountDownLatch(1);
+ readLh.asyncReadLastConfirmedAndEntry(entryId++, 1000, false, new
AsyncCallback.ReadLastConfirmedAndEntryCallback() {
+ @Override
+ public void readLastConfirmedAndEntryComplete(int rc, long
lastConfirmed, LedgerEntry entry, Object ctx) {
+ numCallbacks.incrementAndGet();
+ if (BKException.Code.OK == rc && lastConfirmed == (numEntries
- 1)) {
+ success.set(true);
+ } else {
+ success.set(false);
+ }
+ thirdReadComplete.countDown();
+ }
+ }, null);
+ lh.addEntry(("data" + numEntries).getBytes());
+ thirdReadComplete.await();
+ assertTrue(success.get());
+ assertTrue(numCallbacks.get() == 1);
+ assertEquals(numEntries - 1, readLh.getLastAddConfirmed());
+ lh.close();
+ readLh.close();
+ }
+
+ @Test(timeout = 60000)
+ public void testReadLACLongPollWhenSomeBookiesDown() throws Exception {
+ final int numEntries = 3;
+ final LedgerHandle lh = bkc.createLedger(3, 1, 1, digestType,
"".getBytes());
+ LedgerHandle readLh = bkc.openLedgerNoRecovery(lh.getId(), digestType,
"".getBytes());
+ assertEquals(LedgerHandle.INVALID_ENTRY_ID,
readLh.getLastAddConfirmed());
+ // add entries
+ for (int i = 0; i < numEntries; i++) {
+ lh.addEntry(("data" + i).getBytes());
+ }
+ for (int i = 0; i < numEntries; i++) {
+ ServerConfiguration[] confs = new ServerConfiguration[numEntries -
1];
+ for (int j = 0; j < numEntries - 1; j++) {
+ int idx = (i + 1 + j) % numEntries;
+ confs[j] =
killBookie(lh.getLedgerMetadata().currentEnsemble.get(idx));
+ }
+
+ final AtomicBoolean entryAsExpected = new AtomicBoolean(false);
+ final AtomicBoolean success = new AtomicBoolean(false);
+ final AtomicInteger numCallbacks = new AtomicInteger(0);
+ final CountDownLatch readComplete = new CountDownLatch(1);
+ final int entryId = i;
+ readLh.asyncTryReadLastConfirmed(new
AsyncCallback.ReadLastConfirmedCallback() {
+ @Override
+ public void readLastConfirmedComplete(int rc, long
lastConfirmed, Object ctx) {
+ numCallbacks.incrementAndGet();
+ if (BKException.Code.OK == rc) {
+ success.set(true);
+ entryAsExpected.set(lastConfirmed == (entryId - 1));
+ } else {
+ System.out.println("Return value" + rc);
+ success.set(false);
+ entryAsExpected.set(false);
+ }
+ readComplete.countDown();
+ }
+ }, null);
+ readComplete.await();
+ assertTrue(success.get());
+ assertTrue(entryAsExpected.get());
+ assertTrue(numCallbacks.get() == 1);
+
+ lh.close();
+ readLh.close();
+
+ // start the bookies
+ for (ServerConfiguration conf : confs) {
+ bs.add(startBookie(conf));
+ bsConfs.add(conf);
+ }
+ }
+ }
+}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
index 48b8bab..95a0506 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
@@ -38,6 +38,8 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
+import java.util.Observable;
+import java.util.Observer;
import java.util.Queue;
import java.util.Random;
import java.util.Set;
@@ -418,5 +420,10 @@ public class GcLedgersTest extends LedgerManagerTestCase {
@Override
public void flushEntriesLocationsIndex() throws IOException {
}
+
+ @Override
+ public Observable waitForLastAddConfirmedUpdate(long ledgerId, long
previoisLAC, Observer observer) throws IOException {
+ return null;
+ }
}
}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
index 747398a..c6f2a36 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
@@ -21,12 +21,14 @@
package org.apache.bookkeeper.meta;
+import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.util.NavigableMap;
-
+import java.util.Observable;
+import java.util.Observer;
import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.bookie.CheckpointSource;
import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
@@ -43,17 +45,12 @@ import org.junit.Before;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import io.netty.buffer.ByteBuf;
/**
* Test case to run over serveral ledger managers
*/
@RunWith(Parameterized.class)
public abstract class LedgerManagerTestCase extends BookKeeperClusterTestCase {
- static final Logger LOG =
LoggerFactory.getLogger(LedgerManagerTestCase.class);
protected LedgerManagerFactory ledgerManagerFactory;
protected LedgerManager ledgerManager = null;
@@ -208,14 +205,16 @@ public abstract class LedgerManagerTestCase extends
BookKeeperClusterTestCase {
}
@Override
- public void setExplicitlac(long ledgerId, ByteBuf lac) throws
IOException {
- // TODO Auto-generated method stub
+ public Observable waitForLastAddConfirmedUpdate(long ledgerId, long
previoisLAC, Observer observer) throws IOException {
+ return null;
+ }
+ @Override
+ public void setExplicitlac(long ledgerId, ByteBuf lac) throws
IOException {
}
@Override
public ByteBuf getExplicitLac(long ledgerId) {
- // TODO Auto-generated method stub
return null;
}
}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
index 9106841..a2f66bb 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
@@ -214,6 +214,19 @@ public abstract class BookKeeperClusterTestCase {
return conf;
}
+ protected void stopAllBookies() throws Exception {
+ for (BookieServer server : bs) {
+ server.shutdown();
+ }
+ bs.clear();
+ }
+
+ protected void startAllBookies() throws Exception {
+ for (ServerConfiguration conf : bsConfs) {
+ bs.add(startBookie(conf));
+ }
+ }
+
/**
* Get bookie address for bookie at index
*/
--
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].