This is an automated email from the ASF dual-hosted git repository. zhaijia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push: new ee0ddde BOOKKEEPER-772: Reorder Read Sequence ee0ddde is described below commit ee0dddee6849d1968500af666571df668d34393a Author: Sijie Guo <si...@apache.org> AuthorDate: Tue Jul 4 12:27:21 2017 +0800 BOOKKEEPER-772: Reorder Read Sequence Descriptions of the changes in this PR: - for rackware placement policy, the bookie in the same rack will be preferred. - for region-aware placement policy, the bookie in the same region will be preferred. - for any readonly or unavailable (high score in bookie failure history) bookies, they will be at the last position in the sequence. This change is based on #220 . Please review gitsha 1f7dccd for the reorder change. Author: Sijie Guo <si...@apache.org> Reviewers: Enrico Olivelli <None>, Matteo Merli <None> This closes #224 from sijie/reorder_reads --- .../java/org/apache/bookkeeper/bookie/Bookie.java | 7 + .../org/apache/bookkeeper/bookie/FileInfo.java | 155 ++++--- .../bookkeeper/bookie/IndexPersistenceMgr.java | 30 +- .../bookie/InterleavedLedgerStorage.java | 25 +- .../bookie/LastAddConfirmedUpdateNotification.java | 31 ++ .../org/apache/bookkeeper/bookie/LedgerCache.java | 6 +- .../apache/bookkeeper/bookie/LedgerCacheImpl.java | 8 + .../apache/bookkeeper/bookie/LedgerDescriptor.java | 3 + .../bookkeeper/bookie/LedgerDescriptorImpl.java | 9 +- .../apache/bookkeeper/bookie/LedgerEntryPage.java | 33 +- .../apache/bookkeeper/bookie/LedgerStorage.java | 14 +- .../bookkeeper/bookie/ShortReadException.java | 37 ++ .../apache/bookkeeper/client/AsyncCallback.java | 16 + .../org/apache/bookkeeper/client/BookKeeper.java | 22 + .../bookkeeper/client/BookKeeperClientStats.java | 2 + .../org/apache/bookkeeper/client/LedgerHandle.java | 94 +++++ .../apache/bookkeeper/client/PendingReadOp.java | 8 +- ...eadOp.java => ReadLastConfirmedAndEntryOp.java} | 457 +++++++++++---------- .../bookkeeper/conf/ClientConfiguration.java | 163 +++++++- .../bookkeeper/conf/ServerConfiguration.java | 65 +++ .../org/apache/bookkeeper/proto/BookieClient.java | 46 ++- .../bookkeeper/proto/BookieRequestProcessor.java | 53 ++- .../proto/LongPollReadEntryProcessorV3.java | 226 ++++++++++ .../bookkeeper/proto/PerChannelBookieClient.java | 68 ++- .../apache/bookkeeper/bookie/TestSyncThread.java | 7 + .../client/TestReadLastConfirmedAndEntry.java | 267 ++++++++++++ .../client/TestReadLastConfirmedLongPoll.java | 169 ++++++++ .../org/apache/bookkeeper/meta/GcLedgersTest.java | 7 + .../bookkeeper/meta/LedgerManagerTestCase.java | 17 +- .../bookkeeper/test/BookKeeperClusterTestCase.java | 13 + 30 files changed, 1744 insertions(+), 314 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java index 789cf33..d607b53 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java @@ -45,6 +45,8 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Observable; +import java.util.Observer; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; @@ -1491,6 +1493,11 @@ public class Bookie extends BookieCriticalThread { LedgerDescriptor handle = handles.getReadOnlyHandle(ledgerId); return handle.getLastAddConfirmed(); } + + public Observable waitForLastAddConfirmedUpdate(long ledgerId, long previoisLAC, Observer observer) throws IOException { + LedgerDescriptor handle = handles.getReadOnlyHandle(ledgerId); + return handle.waitForLastAddConfirmedUpdate(previoisLAC, observer); + } // The rest of the code is test stuff static class CounterCallback implements WriteCallback { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java index 5aeb385..597fcd3 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java @@ -23,22 +23,21 @@ package org.apache.bookkeeper.bookie; import static com.google.common.base.Charsets.UTF_8; +import com.google.common.annotations.VisibleForTesting; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; import java.nio.BufferUnderflowException; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; +import java.util.Observable; +import java.util.Observer; import java.util.concurrent.atomic.AtomicInteger; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.annotations.VisibleForTesting; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; - /** * This is the file handle for a ledger's index file that maps entry ids to location. * It is used by LedgerCache. @@ -57,7 +56,7 @@ import io.netty.buffer.Unpooled; * <b>Index page</b> is a fixed-length page, which contains serveral entries which point to the offsets of data stored in entry loggers. * </p> */ -class FileInfo { +class FileInfo extends Observable { private final static Logger LOG = LoggerFactory.getLogger(FileInfo.class); static final int NO_MASTER_KEY = -1; @@ -102,14 +101,34 @@ class FileInfo { return lac; } - synchronized long setLastAddConfirmed(long lac) { - if (null == this.lac || this.lac < lac) { - this.lac = lac; + long setLastAddConfirmed(long lac) { + long lacToReturn; + synchronized (this) { + if (null == this.lac || this.lac < lac) { + this.lac = lac; + setChanged(); + } + lacToReturn = this.lac; + } + LOG.trace("Updating LAC {} , {}", lacToReturn, lac); + + + notifyObservers(new LastAddConfirmedUpdateNotification(lacToReturn)); + return lacToReturn; + } + + synchronized Observable waitForLastAddConfirmedUpdate(long previousLAC, Observer observe) { + if ((null != lac && lac > previousLAC) + || isClosed || ((stateBits & STATE_FENCED_BIT) == STATE_FENCED_BIT)) { + LOG.trace("Wait For LAC {} , {}", this.lac, previousLAC); + return null; } - return this.lac; + + addObserver(observe); + return this; } - public File getLf() { + public synchronized File getLf() { return lf; } @@ -170,15 +189,15 @@ class FileInfo { } bb.flip(); if (bb.getInt() != signature) { - throw new IOException("Missing ledger signature"); + throw new IOException("Missing ledger signature while reading header for " + lf); } int version = bb.getInt(); if (version != headerVersion) { - throw new IOException("Incompatible ledger version " + version); + throw new IOException("Incompatible ledger version " + version + " while reading header for " + lf); } int length = bb.getInt(); if (length < 0) { - throw new IOException("Length " + length + " is invalid"); + throw new IOException("Length " + length + " is invalid while reading header for " + lf); } else if (length > bb.remaining()) { throw new BufferUnderflowException(); } @@ -187,11 +206,17 @@ class FileInfo { stateBits = bb.getInt(); needFlushHeader = false; } else { - throw new IOException("Ledger index file does not exist"); + throw new IOException("Ledger index file " + lf +" does not exist"); } } - synchronized void checkOpen(boolean create) throws IOException { + @VisibleForTesting + void checkOpen(boolean create) throws IOException { + checkOpen(create, false); + } + + private synchronized void checkOpen(boolean create, boolean openBeforeClose) + throws IOException { if (fc != null) { return; } @@ -211,6 +236,10 @@ class FileInfo { } } } else { + if (openBeforeClose) { + // if it is checking for close, skip reading header + return; + } try { readHeader(); } catch (BufferUnderflowException buf) { @@ -246,19 +275,25 @@ class FileInfo { * @return true if set fence succeed, otherwise false when * it already fenced or failed to set fenced. */ - synchronized public boolean setFenced() throws IOException { - checkOpen(false); - if (LOG.isDebugEnabled()) { - LOG.debug("Try to set fenced state in file info {} : state bits {}.", lf, stateBits); - } - if ((stateBits & STATE_FENCED_BIT) != STATE_FENCED_BIT) { - // not fenced yet - stateBits |= STATE_FENCED_BIT; - needFlushHeader = true; - return true; - } else { - return false; + public boolean setFenced() throws IOException { + boolean returnVal = false; + synchronized (this) { + checkOpen(false); + if (LOG.isDebugEnabled()) { + LOG.debug("Try to set fenced state in file info {} : state bits {}.", lf, stateBits); + } + if ((stateBits & STATE_FENCED_BIT) != STATE_FENCED_BIT) { + // not fenced yet + stateBits |= STATE_FENCED_BIT; + needFlushHeader = true; + synchronized (this) { + setChanged(); + } + returnVal = true; + } } + notifyObservers(new LastAddConfirmedUpdateNotification(Long.MAX_VALUE)); + return returnVal; } // flush the header when header is changed @@ -279,11 +314,28 @@ class FileInfo { return rc; } - public int read(ByteBuffer bb, long position) throws IOException { - return readAbsolute(bb, position + START_OF_DATA); + public int read(ByteBuffer bb, long position, boolean bestEffort) + throws IOException { + return readAbsolute(bb, position + START_OF_DATA, bestEffort); } - private int readAbsolute(ByteBuffer bb, long start) throws IOException { + /** + * Read data from position <i>start</i> to fill the byte buffer <i>bb</i>. + * If <i>bestEffort </i> is provided, it would return when it reaches EOF. + * Otherwise, it would throw {@link org.apache.bookkeeper.bookie.ShortReadException} + * if it reaches EOF. + * + * @param bb + * byte buffer of data + * @param start + * start position to read data + * @param bestEffort + * flag indicates if it is a best-effort read + * @return number of bytes read + * @throws IOException + */ + private int readAbsolute(ByteBuffer bb, long start, boolean bestEffort) + throws IOException { checkOpen(false); synchronized (this) { if (fc == null) { @@ -297,7 +349,11 @@ class FileInfo { rc = fc.read(bb, start); } if (rc <= 0) { - throw new IOException("Short read"); + if (bestEffort) { + return total; + } else { + throw new ShortReadException("Short read at " + getLf().getPath() + "@" + start); + } } total += rc; // should move read position @@ -307,23 +363,30 @@ class FileInfo { } /** - * Close a file info + * Close a file info. Generally, force should be set to true. If set to false metadata will not be flushed and + * accessing metadata before restart and recovery will be unsafe (since reloading from the index file will + * cause metadata to be lost). Setting force=false helps avoid expensive file create during shutdown with many + * dirty ledgers, and is safe because ledger metadata will be recovered before being accessed again. * * @param force * if set to true, the index is forced to create before closed, * if set to false, the index is not forced to create. */ - synchronized public void close(boolean force) throws IOException { - isClosed = true; - checkOpen(force); - // Any time when we force close a file, we should try to flush header. otherwise, we might lose fence bit. - if (force) { - flushHeader(); - } - if (useCount.get() == 0 && fc != null) { - fc.close(); - fc = null; + public void close(boolean force) throws IOException { + synchronized (this) { + isClosed = true; + checkOpen(force, true); + // Any time when we force close a file, we should try to flush header. otherwise, we might lose fence bit. + if (force) { + flushHeader(); + } + setChanged(); + if (useCount.get() == 0 && fc != null) { + fc.close(); + fc = null; + } } + notifyObservers(new LastAddConfirmedUpdateNotification(Long.MAX_VALUE)); } synchronized public long write(ByteBuffer[] buffs, long position) throws IOException { @@ -429,7 +492,7 @@ class FileInfo { } } - public boolean delete() { + public synchronized boolean delete() { return lf.delete(); } @@ -443,7 +506,7 @@ class FileInfo { } } - public boolean isSameFile(File f) { + public synchronized boolean isSameFile(File f) { return this.lf.equals(f); } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java index 81c37fd..323ad62 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java @@ -20,6 +20,8 @@ */ package org.apache.bookkeeper.bookie; +import com.google.common.annotations.VisibleForTesting; +import io.netty.buffer.ByteBuf; import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; @@ -28,9 +30,10 @@ import java.util.Comparator; import java.util.LinkedList; import java.util.List; import java.util.Map.Entry; +import java.util.Observable; +import java.util.Observer; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; - import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener; import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException; import org.apache.bookkeeper.conf.ServerConfiguration; @@ -41,10 +44,6 @@ import org.apache.bookkeeper.util.SnapshotMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.annotations.VisibleForTesting; - -import io.netty.buffer.ByteBuf; - import static org.apache.bookkeeper.bookie.BookKeeperServerStats.LEDGER_CACHE_NUM_EVICTED_LEDGERS; import static org.apache.bookkeeper.bookie.BookKeeperServerStats.NUM_OPEN_LEDGERS; @@ -333,6 +332,18 @@ public class IndexPersistenceMgr { } } } + + Observable waitForLastAddConfirmedUpdate(long ledgerId, long previoisLAC, Observer observer) throws IOException { + FileInfo fi = null; + try { + fi = getFileInfo(ledgerId, null); + return fi.waitForLastAddConfirmedUpdate(previoisLAC, observer); + } finally { + if (null != fi) { + fi.release(); + } + } + } long updateLastAddConfirmed(long ledgerId, long lac) throws IOException { FileInfo fi = null; @@ -626,7 +637,14 @@ public class IndexPersistenceMgr { if (position < 0) { position = 0; } - fi.read(bb, position); + // we read the last page from file size minus page size, so it should not encounter short read + // exception. if it does, it is an unexpected situation, then throw the exception and fail it immediately. + try { + fi.read(bb, position, false); + } catch (ShortReadException sre) { + // throw a more meaningful exception with ledger id + throw new ShortReadException("Short read on ledger " + ledgerId + " : ", sre); + } bb.flip(); long startingEntryId = position / LedgerEntryPage.getIndexEntrySize(); for (int i = entriesPerPage - 1; i >= 0; i--) { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java index b8a6e53..3d84877 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java @@ -21,24 +21,23 @@ package org.apache.bookkeeper.bookie; +import com.google.common.collect.Lists; + import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; - import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; - +import java.util.Map; +import java.util.NavigableMap; +import java.util.Observable; +import java.util.Observer; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.bookie.Bookie.NoLedgerException; import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint; import org.apache.bookkeeper.bookie.EntryLogger.EntryLogListener; import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener; - -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.TimeUnit; - -import java.util.Map; -import java.util.NavigableMap; - import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.meta.LedgerManager; import org.apache.bookkeeper.proto.BookieProtocol; @@ -49,8 +48,6 @@ import org.apache.bookkeeper.util.SnapshotMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.Lists; - import static org.apache.bookkeeper.bookie.BookKeeperServerStats.STORAGE_GET_ENTRY; import static org.apache.bookkeeper.bookie.BookKeeperServerStats.STORAGE_GET_OFFSET; @@ -264,6 +261,12 @@ public class InterleavedLedgerStorage implements CompactableLedgerStorage, Entry } @Override + public Observable waitForLastAddConfirmedUpdate(long ledgerId, long previoisLAC, Observer observer) throws IOException { + return ledgerCache.waitForLastAddConfirmedUpdate(ledgerId, previoisLAC, observer); + } + + + @Override synchronized public long addEntry(ByteBuf entry) throws IOException { long ledgerId = entry.getLong(entry.readerIndex() + 0); long entryId = entry.getLong(entry.readerIndex() + 8); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotification.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotification.java new file mode 100644 index 0000000..81cd842 --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotification.java @@ -0,0 +1,31 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.bookie; + +public class LastAddConfirmedUpdateNotification { + public long lastAddConfirmed; + public long timestamp; + + public LastAddConfirmedUpdateNotification(long lastAddConfirmed) { + this.lastAddConfirmed = lastAddConfirmed; + this.timestamp = System.currentTimeMillis(); + } +} diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java index c55592f..26d5245 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java @@ -21,10 +21,11 @@ package org.apache.bookkeeper.bookie; +import io.netty.buffer.ByteBuf; import java.io.Closeable; import java.io.IOException; - -import io.netty.buffer.ByteBuf; +import java.util.Observable; +import java.util.Observer; /** * This class maps a ledger entry number into a location (entrylogid, offset) in @@ -48,6 +49,7 @@ interface LedgerCache extends Closeable { Long getLastAddConfirmed(long ledgerId) throws IOException; long updateLastAddConfirmed(long ledgerId, long lac) throws IOException; + Observable waitForLastAddConfirmedUpdate(long ledgerId, long previoisLAC, Observer observer) throws IOException; void deleteLedger(long ledgerId) throws IOException; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java index 8f1c56f..5709ce6 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java @@ -23,6 +23,8 @@ package org.apache.bookkeeper.bookie; import java.io.IOException; +import java.util.Observable; +import java.util.Observer; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.stats.StatsLogger; @@ -85,6 +87,12 @@ public class LedgerCacheImpl implements LedgerCache { } @Override + public Observable waitForLastAddConfirmedUpdate(long ledgerId, long previoisLAC, Observer observer) throws IOException { + return indexPersistenceManager.waitForLastAddConfirmedUpdate(ledgerId, previoisLAC, observer); + } + + + @Override public void putEntryOffset(long ledger, long entry, long offset) throws IOException { indexPageManager.putEntryOffset(ledger, entry, offset); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java index 9fe1629..032dfe2 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java @@ -24,6 +24,8 @@ package org.apache.bookkeeper.bookie; import io.netty.buffer.ByteBuf; import java.io.IOException; +import java.util.Observable; +import java.util.Observer; /** * Implements a ledger inside a bookie. In particular, it implements operations @@ -58,6 +60,7 @@ public abstract class LedgerDescriptor { abstract ByteBuf readEntry(long entryId) throws IOException; abstract long getLastAddConfirmed() throws IOException; + abstract Observable waitForLastAddConfirmedUpdate(long previoisLAC, Observer observer) throws IOException; abstract void setExplicitLac(ByteBuf entry) throws IOException; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java index a1e0fc0..c2246bf 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java @@ -22,10 +22,10 @@ package org.apache.bookkeeper.bookie; import io.netty.buffer.ByteBuf; - import java.io.IOException; import java.util.Arrays; - +import java.util.Observable; +import java.util.Observer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -101,4 +101,9 @@ public class LedgerDescriptorImpl extends LedgerDescriptor { long getLastAddConfirmed() throws IOException { return ledgerStorage.getLastAddConfirmed(ledgerId); } + + @Override + Observable waitForLastAddConfirmedUpdate(long previoisLAC, Observer observer) throws IOException { + return ledgerStorage.waitForLastAddConfirmedUpdate(ledgerId, previoisLAC, observer); + } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerEntryPage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerEntryPage.java index 2d6f80d..5aee2fe 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerEntryPage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerEntryPage.java @@ -21,18 +21,22 @@ package org.apache.bookkeeper.bookie; -import org.apache.bookkeeper.proto.BookieProtocol; -import org.apache.bookkeeper.util.ZeroBuffer; - import java.io.IOException; import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.bookkeeper.proto.BookieProtocol; +import org.apache.bookkeeper.util.ZeroBuffer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This is a page in the LedgerCache. It holds the locations * (entrylogfile, offset) for entry ids. */ public class LedgerEntryPage { + + private static final Logger LOG = LoggerFactory.getLogger(LedgerEntryPage.class); + private final static int indexEntrySize = 8; private final int pageSize; private final int entriesPerPage; @@ -153,11 +157,24 @@ public class LedgerEntryPage { public void readPage(FileInfo fi) throws IOException { checkPage(); page.clear(); - while(page.remaining() != 0) { - if (fi.read(page, getFirstEntryPosition()) <= 0) { - throw new IOException("Short page read of ledger " + getLedger() - + " tried to get " + page.capacity() + " from position " + getFirstEntryPosition() - + " still need " + page.remaining()); + try { + fi.read(page, getFirstEntryPosition(), true); + } catch (ShortReadException sre) { + throw new ShortReadException("Short page read of ledger " + getLedger() + + " tried to get " + page.capacity() + " from position " + + getFirstEntryPosition() + " still need " + page.remaining(), sre); + } catch (IllegalArgumentException iae) { + LOG.error("IllegalArgumentException when trying to read ledger {} from position {}" + , new Object[]{getLedger(), getFirstEntryPosition(), iae}); + throw iae; + } + // make sure we don't include partial index entry + if (page.remaining() != 0) { + LOG.info("Short page read of ledger {} : tried to read {} bytes from position {}, but only {} bytes read.", + new Object[] { getLedger(), page.capacity(), getFirstEntryPosition(), page.position() }); + if (page.position() % indexEntrySize != 0) { + int partialIndexEntryStart = page.position() - page.position() % indexEntrySize; + page.putLong(partialIndexEntryStart, 0L); } } last = getLastEntryIndex(); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java index 9d2161e..9fc9390 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java @@ -22,9 +22,9 @@ package org.apache.bookkeeper.bookie; import io.netty.buffer.ByteBuf; - import java.io.IOException; - +import java.util.Observable; +import java.util.Observer; import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.stats.StatsLogger; @@ -118,6 +118,16 @@ public interface LedgerStorage { long getLastAddConfirmed(long ledgerId) throws IOException; /** + * Wait for last add confirmed update. + * + * @param previoisLAC - The threshold beyond which we would wait for the update + * @param observer - Observer to notify on update + * @return + * @throws IOException + */ + Observable waitForLastAddConfirmedUpdate(long ledgerId, long previoisLAC, Observer observer) throws IOException; + + /** * Flushes all data in the storage. Once this is called, * add data written to the LedgerStorage up until this point * has been persisted to perminant storage diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ShortReadException.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ShortReadException.java new file mode 100644 index 0000000..302cc46 --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ShortReadException.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.bookkeeper.bookie; + +import java.io.IOException; + +/** + * Short Read Exception. Used to distinguish short read exception with other {@link java.io.IOException}s. + */ +public class ShortReadException extends IOException { + + private static final long serialVersionUID = -4201771547564923223L; + + public ShortReadException(String msg) { + super(msg); + } + + public ShortReadException(String msg, Throwable t) { + super(msg, t); + } + +} diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/AsyncCallback.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/AsyncCallback.java index 05067d0..8f5bdea 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/AsyncCallback.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/AsyncCallback.java @@ -139,6 +139,22 @@ public interface AsyncCallback { void readLastConfirmedComplete(int rc, long lastConfirmed, Object ctx); } + public interface ReadLastConfirmedAndEntryCallback { + /** + * Callback definition for bookie operation that allows reading the last add confirmed + * along with an entry within the last add confirmed range + * + * @param rc Return code + * @param lastConfirmed The entry id of the last confirmed write or + * {@link LedgerHandle#INVALID_ENTRY_ID INVALID_ENTRY_ID} + * if no entry has been confirmed + * @param entry The entry since the lastAddConfirmed entry that was specified when the request + * was initiated + * @param ctx context object + */ + void readLastConfirmedAndEntryComplete(int rc, long lastConfirmed, LedgerEntry entry, Object ctx); + } + public interface RecoverCallback { /** * Callback definition for bookie recover operations diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java index 383fe3f..7895af2 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java @@ -99,6 +99,8 @@ public class BookKeeper implements AutoCloseable { private OpStatsLogger deleteOpLogger; private OpStatsLogger recoverOpLogger; private OpStatsLogger readOpLogger; + private OpStatsLogger readLacAndEntryOpLogger; + private OpStatsLogger readLacAndEntryRespLogger; private OpStatsLogger addOpLogger; private OpStatsLogger writeLacOpLogger; private OpStatsLogger readLacOpLogger; @@ -137,8 +139,10 @@ public class BookKeeper implements AutoCloseable { final ClientConfiguration conf; final int explicitLacInterval; final boolean delayEnsembleChange; + final boolean reorderReadSequence; final Optional<SpeculativeRequestExecutionPolicy> readSpeculativeRequestPolicy; + final Optional<SpeculativeRequestExecutionPolicy> readLACSpeculativeRequestPolicy; // Close State boolean closed = false; @@ -301,6 +305,7 @@ public class BookKeeper implements AutoCloseable { throws IOException, InterruptedException, KeeperException { this.conf = conf; this.delayEnsembleChange = conf.getDelayEnsembleChange(); + this.reorderReadSequence = conf.isReorderReadSequenceEnabled(); // initialize zookeeper client if (zkc == null) { @@ -374,6 +379,15 @@ public class BookKeeper implements AutoCloseable { this.readSpeculativeRequestPolicy = Optional.<SpeculativeRequestExecutionPolicy>absent(); } + if (conf.getFirstSpeculativeReadLACTimeout() > 0) { + this.readLACSpeculativeRequestPolicy = + Optional.of((SpeculativeRequestExecutionPolicy)(new DefaultSpeculativeRequestExecutionPolicy( + conf.getFirstSpeculativeReadLACTimeout(), + conf.getMaxSpeculativeReadLACTimeout(), + conf.getSpeculativeReadLACTimeoutBackoffMultiplier()))); + } else { + this.readLACSpeculativeRequestPolicy = Optional.<SpeculativeRequestExecutionPolicy>absent(); + } // initialize main worker pool this.mainWorkerPool = OrderedSafeExecutor.newBuilder() .name("BookKeeperClientWorker") @@ -504,6 +518,10 @@ public class BookKeeper implements AutoCloseable { return readSpeculativeRequestPolicy; } + public Optional<SpeculativeRequestExecutionPolicy> getReadLACSpeculativeRequestPolicy() { + return readLACSpeculativeRequestPolicy; + } + /** * Get the BookieClient, currently used for doing bookie recovery. * @@ -1275,6 +1293,8 @@ public class BookKeeper implements AutoCloseable { openOpLogger = stats.getOpStatsLogger(BookKeeperClientStats.OPEN_OP); recoverOpLogger = stats.getOpStatsLogger(BookKeeperClientStats.RECOVER_OP); readOpLogger = stats.getOpStatsLogger(BookKeeperClientStats.READ_OP); + readLacAndEntryOpLogger = stats.getOpStatsLogger(BookKeeperClientStats.READ_LAST_CONFIRMED_AND_ENTRY); + readLacAndEntryRespLogger = stats.getOpStatsLogger(BookKeeperClientStats.READ_LAST_CONFIRMED_AND_ENTRY_RESPONSE); addOpLogger = stats.getOpStatsLogger(BookKeeperClientStats.ADD_OP); writeLacOpLogger = stats.getOpStatsLogger(BookKeeperClientStats.WRITE_LAC_OP); readLacOpLogger = stats.getOpStatsLogger(BookKeeperClientStats.READ_LAC_OP); @@ -1287,6 +1307,8 @@ public class BookKeeper implements AutoCloseable { OpStatsLogger getDeleteOpLogger() { return deleteOpLogger; } OpStatsLogger getRecoverOpLogger() { return recoverOpLogger; } OpStatsLogger getReadOpLogger() { return readOpLogger; } + OpStatsLogger getReadLacAndEntryOpLogger() { return readLacAndEntryOpLogger; } + OpStatsLogger getReadLacAndEntryRespLogger() { return readLacAndEntryRespLogger; } OpStatsLogger getAddOpLogger() { return addOpLogger; } OpStatsLogger getWriteLacOpLogger() { return writeLacOpLogger; } OpStatsLogger getReadLacOpLogger() { return readLacOpLogger; } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java index e98b2d6..15c6248 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java @@ -39,6 +39,8 @@ public interface BookKeeperClientStats { public final static String READ_OP = "READ_ENTRY"; public final static String WRITE_LAC_OP = "WRITE_LAC"; public final static String READ_LAC_OP = "READ_LAC"; + public final static String READ_LAST_CONFIRMED_AND_ENTRY = "READ_LAST_CONFIRMED_AND_ENTRY"; + public final static String READ_LAST_CONFIRMED_AND_ENTRY_RESPONSE = "READ_LAST_CONFIRMED_AND_ENTRY_RESPONSE"; public final static String PENDING_ADDS = "NUM_PENDING_ADD"; public final static String ENSEMBLE_CHANGES = "NUM_ENSEMBLE_CHANGE"; public final static String LAC_UPDATE_HITS = "LAC_UPDATE_HITS"; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java index bd6c7d6..a056043 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java @@ -22,6 +22,9 @@ package org.apache.bookkeeper.client; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Objects; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; import com.google.common.collect.Sets; import com.google.common.util.concurrent.RateLimiter; import io.netty.buffer.ByteBuf; @@ -39,6 +42,7 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.apache.bookkeeper.client.AsyncCallback.AddCallback; @@ -78,6 +82,7 @@ public class LedgerHandle implements AutoCloseable { final DigestManager macManager; final DistributionSchedule distributionSchedule; final RateLimiter throttler; + final LoadingCache<BookieSocketAddress, Long> bookieFailureHistory; final boolean enableParallelRecoveryRead; final int recoveryReadBatchSize; @@ -138,6 +143,13 @@ public class LedgerHandle implements AutoCloseable { this.ledgerKey = password.length > 0 ? MacDigestManager.genDigest("ledger", password) : emptyLedgerKey; distributionSchedule = new RoundRobinDistributionSchedule( metadata.getWriteQuorumSize(), metadata.getAckQuorumSize(), metadata.getEnsembleSize()); + this.bookieFailureHistory = CacheBuilder.newBuilder() + .expireAfterWrite(bk.getConf().getBookieFailureHistoryExpirationMSec(), TimeUnit.MILLISECONDS) + .build(new CacheLoader<BookieSocketAddress, Long>() { + public Long load(BookieSocketAddress key) { + return -1L; + } + }); ensembleChangeCounter = bk.getStatsLogger().getCounter(BookKeeperClientStats.ENSEMBLE_CHANGES); lacUpdateHitsCounter = bk.getStatsLogger().getCounter(BookKeeperClientStats.LAC_UPDATE_HITS); @@ -957,6 +969,81 @@ public class LedgerHandle implements AutoCloseable { new TryReadLastConfirmedOp(this, innercb, getLastAddConfirmed()).initiate(); } + + /** + * Asynchronous read next entry and the latest last add confirmed. + * If the next entryId is less than known last add confirmed, the call will read next entry directly. + * If the next entryId is ahead of known last add confirmed, the call will issue a long poll read + * to wait for the next entry <i>entryId</i>. + * + * The callback will return the latest last add confirmed and next entry if it is available within timeout period <i>timeOutInMillis</i>. + * + * @param entryId + * next entry id to read + * @param timeOutInMillis + * timeout period to wait for the entry id to be available (for long poll only) + * @param parallel + * whether to issue the long poll reads in parallel + * @param cb + * callback to return the result + * @param ctx + * callback context + */ + public void asyncReadLastConfirmedAndEntry(final long entryId, + final long timeOutInMillis, + final boolean parallel, + final AsyncCallback.ReadLastConfirmedAndEntryCallback cb, + final Object ctx) { + boolean isClosed; + long lac; + synchronized (this) { + isClosed = metadata.isClosed(); + lac = metadata.getLastEntryId(); + } + if (isClosed) { + if (entryId > lac) { + cb.readLastConfirmedAndEntryComplete(BKException.Code.OK, lac, null, ctx); + return; + } + } else { + lac = getLastAddConfirmed(); + } + if (entryId <= lac) { + asyncReadEntries(entryId, entryId, new ReadCallback() { + @Override + public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx) { + if (BKException.Code.OK == rc) { + if (seq.hasMoreElements()) { + cb.readLastConfirmedAndEntryComplete(rc, getLastAddConfirmed(), seq.nextElement(), ctx); + } else { + cb.readLastConfirmedAndEntryComplete(rc, getLastAddConfirmed(), null, ctx); + } + } else { + cb.readLastConfirmedAndEntryComplete(rc, INVALID_ENTRY_ID, null, ctx); + } + } + }, ctx); + return; + } + // wait for entry <i>entryId</i> + ReadLastConfirmedAndEntryOp.LastConfirmedAndEntryCallback innercb = new ReadLastConfirmedAndEntryOp.LastConfirmedAndEntryCallback() { + AtomicBoolean completed = new AtomicBoolean(false); + @Override + public void readLastConfirmedAndEntryComplete(int rc, long lastAddConfirmed, LedgerEntry entry) { + if (rc == BKException.Code.OK) { + if (completed.compareAndSet(false, true)) { + cb.readLastConfirmedAndEntryComplete(rc, lastAddConfirmed, entry, ctx); + } + } else { + if (completed.compareAndSet(false, true)) { + cb.readLastConfirmedAndEntryComplete(rc, INVALID_ENTRY_ID, null, ctx); + } + } + } + }; + new ReadLastConfirmedAndEntryOp(this, innercb, entryId - 1, timeOutInMillis, bk.scheduler).parallelRead(parallel).initiate(); + } + /** * Context objects for synchronous call to read last confirmed. */ @@ -1545,6 +1632,13 @@ public class LedgerHandle implements AutoCloseable { bk.getLedgerManager().readLedgerMetadata(ledgerId, cb); } + void registerOperationFailureOnBookie(BookieSocketAddress bookie, long entryId) { + if (bk.getConf().getEnableBookieFailureTracking()) { + bookieFailureHistory.put(bookie, entryId); + } + } + + void recover(GenericCallback<Void> finalCb) { recover(finalCb, null, false); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java index f2477c1..c820122 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java @@ -92,7 +92,13 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback { super(lId, eId); this.ensemble = ensemble; - this.writeSet = lh.distributionSchedule.getWriteSet(entryId); + + if (lh.bk.reorderReadSequence) { + this.writeSet = lh.bk.placementPolicy.reorderReadSequence(ensemble, + lh.distributionSchedule.getWriteSet(entryId), lh.bookieFailureHistory.asMap()); + } else { + this.writeSet = lh.distributionSchedule.getWriteSet(entryId); + } } /** diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java similarity index 51% copy from bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java copy to bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java index f2477c1..2fab694 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java @@ -1,4 +1,4 @@ -/* +/** * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -21,63 +21,47 @@ package org.apache.bookkeeper.client; import io.netty.buffer.ByteBuf; - import java.util.ArrayList; import java.util.BitSet; -import java.util.Enumeration; -import java.util.HashSet; import java.util.List; -import java.util.NoSuchElementException; -import java.util.Queue; -import java.util.Set; -import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import com.google.common.base.Optional; import com.google.common.util.concurrent.ListenableFuture; -import org.apache.bookkeeper.client.AsyncCallback.ReadCallback; -import org.apache.bookkeeper.client.BKException.BKDigestMatchException; import org.apache.bookkeeper.net.BookieSocketAddress; -import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback; -import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallbackCtx; -import org.apache.bookkeeper.stats.OpStatsLogger; +import org.apache.bookkeeper.proto.BookieProtocol; +import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks; import org.apache.bookkeeper.util.MathUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** - * Sequence of entries of a ledger that represents a pending read operation. - * When all the data read has come back, the application callback is called. - * This class could be improved because we could start pushing data to the - * application as soon as it arrives rather than waiting for the whole thing. - * - */ -class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback { - private static final Logger LOG = LoggerFactory.getLogger(PendingReadOp.class); +public class ReadLastConfirmedAndEntryOp implements BookkeeperInternalCallbacks.ReadEntryCallback, SpeculativeRequestExectuor { + static final Logger LOG = LoggerFactory.getLogger(ReadLastConfirmedAndEntryOp.class); final private ScheduledExecutorService scheduler; - private ScheduledFuture<?> speculativeTask = null; - Queue<LedgerEntryRequest> seq; - Set<BookieSocketAddress> heardFromHosts; - BitSet heardFromHostsBitSet; - ReadCallback cb; - Object ctx; - LedgerHandle lh; - long numPendingEntries; - long startEntryId; - long endEntryId; - long requestTimeNanos; - OpStatsLogger readOpLogger; - + ReadLACAndEntryRequest request; + final BitSet heardFromHostsBitSet; + final BitSet emptyResponsesFromHostsBitSet; final int maxMissedReadsAllowed; boolean parallelRead = false; - final AtomicBoolean complete = new AtomicBoolean(false); + final AtomicBoolean requestComplete = new AtomicBoolean(false); + + final long requestTimeNano; + private final LedgerHandle lh; + private final LastConfirmedAndEntryCallback cb; - abstract class LedgerEntryRequest extends LedgerEntry implements SpeculativeRequestExectuor { + private int numResponsesPending; + private final int numEmptyResponsesAllowed; + private volatile boolean hasValidResponse = false; + private final long prevEntryId; + private long lastAddConfirmed; + private long timeOutInMillis; + + abstract class ReadLACAndEntryRequest extends LedgerEntry { final AtomicBoolean complete = new AtomicBoolean(false); @@ -87,12 +71,23 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback { final ArrayList<BookieSocketAddress> ensemble; final List<Integer> writeSet; + final List<Integer> orderedEnsemble; - LedgerEntryRequest(ArrayList<BookieSocketAddress> ensemble, long lId, long eId) { + ReadLACAndEntryRequest(ArrayList<BookieSocketAddress> ensemble, long lId, long eId) { super(lId, eId); this.ensemble = ensemble; this.writeSet = lh.distributionSchedule.getWriteSet(entryId); + if (lh.bk.reorderReadSequence) { + this.orderedEnsemble = lh.bk.placementPolicy.reorderReadLACSequence(ensemble, + writeSet, lh.bookieFailureHistory.asMap()); + } else { + this.orderedEnsemble = writeSet; + } + } + + synchronized int getFirstError() { + return firstError; } /** @@ -112,11 +107,11 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback { * @return return true if we managed to complete the entry; * otherwise return false if the read entry is not complete or it is already completed before */ - boolean complete(int bookieIndex, BookieSocketAddress host, final ByteBuf buffer) { + boolean complete(int bookieIndex, BookieSocketAddress host, final ByteBuf buffer, long entryId) { ByteBuf content; try { content = lh.macManager.verifyDigestAndReturnData(entryId, buffer); - } catch (BKDigestMatchException e) { + } catch (BKException.BKDigestMatchException e) { logErrorAndReattemptRead(bookieIndex, host, "Mac mismatch", BKException.Code.DigestMatchException); buffer.release(); return false; @@ -124,6 +119,7 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback { if (!complete.getAndSet(true)) { rc = BKException.Code.OK; + this.entryId = entryId; /* * The length is a long and it is the last field of the metadata of an entry. * Consequently, we have to subtract 8 from METADATA_LENGTH to get the length. @@ -132,7 +128,6 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback { data = content; return true; } else { - buffer.release(); return false; } } @@ -147,13 +142,28 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback { boolean fail(int rc) { if (complete.compareAndSet(false, true)) { this.rc = rc; - submitCallback(rc); + translateAndSetFirstError(rc); + completeRequest(); return true; } else { return false; } } + synchronized private void translateAndSetFirstError(int rc) { + if (BKException.Code.OK == firstError || + BKException.Code.NoSuchEntryException == firstError || + BKException.Code.NoSuchLedgerExistsException == firstError) { + firstError = rc; + } else if (BKException.Code.BookieHandleNotAvailableException == firstError && + BKException.Code.NoSuchEntryException != rc && + BKException.Code.NoSuchLedgerExistsException != rc) { + // if other exception rather than NoSuchEntryException is returned + // we need to update firstError to indicate that it might be a valid read but just failed. + firstError = rc; + } + } + /** * Log error <i>errMsg</i> and reattempt read from <i>host</i>. * @@ -167,30 +177,22 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback { * read result code */ synchronized void logErrorAndReattemptRead(int bookieIndex, BookieSocketAddress host, String errMsg, int rc) { - if (BKException.Code.OK == firstError || - BKException.Code.NoSuchEntryException == firstError || - BKException.Code.NoSuchLedgerExistsException == firstError) { - firstError = rc; - } else if (BKException.Code.BookieHandleNotAvailableException == firstError && - BKException.Code.NoSuchEntryException != rc && - BKException.Code.NoSuchLedgerExistsException != rc) { - // if other exception rather than NoSuchEntryException or NoSuchLedgerExistsException is - // returned we need to update firstError to indicate that it might be a valid read but just - // failed. - firstError = rc; - } + translateAndSetFirstError(rc); + if (BKException.Code.NoSuchEntryException == rc || BKException.Code.NoSuchLedgerExistsException == rc) { - ++numMissedEntryReads; - if (LOG.isDebugEnabled()) { - LOG.debug("No such entry found on bookie. L{} E{} bookie: {}", - new Object[] { lh.ledgerId, entryId, host }); - } - } else { - if (LOG.isDebugEnabled()) { - LOG.debug(errMsg + " while reading L{} E{} from bookie: {}", - new Object[]{lh.ledgerId, entryId, host}); + // Since we send all long poll requests to every available node, we should only + // treat these errors as failures if the node from which we received this is part of + // the writeSet + if (this.writeSet.contains(bookieIndex)) { + lh.registerOperationFailureOnBookie(host, entryId); } + ++numMissedEntryReads; + } + + if (LOG.isDebugEnabled()) { + LOG.debug(errMsg + " while reading entry: " + entryId + " ledgerId: " + lh.ledgerId + " from bookie: " + + host); } } @@ -226,43 +228,20 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback { public String toString() { return String.format("L%d-E%d", ledgerId, entryId); } - - /** - * Issues a speculative request and indicates if more speculative - * requests should be issued - * - * @return whether more speculative requests should be issued - */ - @Override - public ListenableFuture<Boolean> issueSpeculativeRequest() { - return lh.bk.mainWorkerPool.submitOrdered(lh.getId(), new Callable<Boolean>() { - @Override - public Boolean call() throws Exception { - if (!isComplete() && null != maybeSendSpeculativeRead(heardFromHostsBitSet)) { - if (LOG.isDebugEnabled()) { - LOG.debug("Send speculative read for {}. Hosts heard are {}, ensemble is {}.", - new Object[] { this, heardFromHostsBitSet, ensemble }); - } - return true; - } - return false; - } - }); - } } - class ParallelReadRequest extends LedgerEntryRequest { + class ParallelReadRequest extends ReadLACAndEntryRequest { int numPendings; ParallelReadRequest(ArrayList<BookieSocketAddress> ensemble, long lId, long eId) { super(ensemble, lId, eId); - numPendings = writeSet.size(); + numPendings = orderedEnsemble.size(); } @Override void read() { - for (int bookieIndex : writeSet) { + for (int bookieIndex : orderedEnsemble) { BookieSocketAddress to = ensemble.get(bookieIndex); try { sendReadTo(bookieIndex, to, this); @@ -297,18 +276,20 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback { } } - class SequenceReadRequest extends LedgerEntryRequest { + class SequenceReadRequest extends ReadLACAndEntryRequest { final static int NOT_FOUND = -1; int nextReplicaIndexToReadFrom = 0; final BitSet sentReplicas; final BitSet erroredReplicas; + final BitSet emptyResponseReplicas; SequenceReadRequest(ArrayList<BookieSocketAddress> ensemble, long lId, long eId) { super(ensemble, lId, eId); - this.sentReplicas = new BitSet(lh.getLedgerMetadata().getWriteQuorumSize()); - this.erroredReplicas = new BitSet(lh.getLedgerMetadata().getWriteQuorumSize()); + this.sentReplicas = new BitSet(orderedEnsemble.size()); + this.erroredReplicas = new BitSet(orderedEnsemble.size()); + this.emptyResponseReplicas = new BitSet(orderedEnsemble.size()); } private synchronized int getNextReplicaIndexToReadFrom() { @@ -316,7 +297,7 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback { } private int getReplicaIndex(int bookieIndex) { - return writeSet.indexOf(bookieIndex); + return orderedEnsemble.indexOf(bookieIndex); } private BitSet getSentToBitSet() { @@ -324,14 +305,14 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback { for (int i = 0; i < sentReplicas.length(); i++) { if (sentReplicas.get(i)) { - b.set(writeSet.get(i)); + b.set(orderedEnsemble.get(i)); } } return b; } private boolean readsOutstanding() { - return (sentReplicas.cardinality() - erroredReplicas.cardinality()) > 0; + return (sentReplicas.cardinality() - erroredReplicas.cardinality() - emptyResponseReplicas.cardinality()) > 0; } /** @@ -341,7 +322,7 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback { */ @Override synchronized BookieSocketAddress maybeSendSpeculativeRead(BitSet heardFrom) { - if (nextReplicaIndexToReadFrom >= getLedgerMetadata().getWriteQuorumSize()) { + if (nextReplicaIndexToReadFrom >= getLedgerMetadata().getEnsembleSize()) { return null; } @@ -363,7 +344,7 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback { } synchronized BookieSocketAddress sendNextRead() { - if (nextReplicaIndexToReadFrom >= getLedgerMetadata().getWriteQuorumSize()) { + if (nextReplicaIndexToReadFrom >= getLedgerMetadata().getEnsembleSize()) { // we are done, the read has failed from all replicas, just fail the // read @@ -379,7 +360,7 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback { } int replica = nextReplicaIndexToReadFrom; - int bookieIndex = lh.distributionSchedule.getWriteSet(entryId).get(nextReplicaIndexToReadFrom); + int bookieIndex = orderedEnsemble.get(nextReplicaIndexToReadFrom); nextReplicaIndexToReadFrom++; try { @@ -404,7 +385,12 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback { LOG.error("Received error from a host which is not in the ensemble {} {}.", host, ensemble); return; } - erroredReplicas.set(replica); + + if (BKException.Code.OK == rc) { + emptyResponseReplicas.set(replica); + } else { + erroredReplicas.set(replica); + } if (!readsOutstanding()) { sendNextRead(); @@ -413,78 +399,101 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback { } - PendingReadOp(LedgerHandle lh, ScheduledExecutorService scheduler, - long startEntryId, long endEntryId, ReadCallback cb, Object ctx) { - seq = new ArrayBlockingQueue<LedgerEntryRequest>((int) ((endEntryId + 1) - startEntryId)); - this.cb = cb; - this.ctx = ctx; + ReadLastConfirmedAndEntryOp(LedgerHandle lh, + LastConfirmedAndEntryCallback cb, + long prevEntryId, + long timeOutInMillis, + ScheduledExecutorService scheduler) { this.lh = lh; - this.startEntryId = startEntryId; - this.endEntryId = endEntryId; + this.cb = cb; + this.prevEntryId = prevEntryId; + this.lastAddConfirmed = lh.getLastAddConfirmed(); + this.timeOutInMillis = timeOutInMillis; + this.numResponsesPending = 0; + this.numEmptyResponsesAllowed = getLedgerMetadata().getWriteQuorumSize() + - getLedgerMetadata().getAckQuorumSize() + 1; + this.requestTimeNano = MathUtils.nowInNano(); this.scheduler = scheduler; - numPendingEntries = endEntryId - startEntryId + 1; - maxMissedReadsAllowed = getLedgerMetadata().getWriteQuorumSize() - - getLedgerMetadata().getAckQuorumSize(); - heardFromHosts = new HashSet<>(); + maxMissedReadsAllowed = getLedgerMetadata().getEnsembleSize() + - getLedgerMetadata().getAckQuorumSize(); heardFromHostsBitSet = new BitSet(getLedgerMetadata().getEnsembleSize()); - - readOpLogger = lh.bk.getReadOpLogger(); + emptyResponsesFromHostsBitSet = new BitSet(getLedgerMetadata().getEnsembleSize()); } protected LedgerMetadata getLedgerMetadata() { return lh.metadata; } - protected void cancelSpeculativeTask(boolean mayInterruptIfRunning) { - if (speculativeTask != null) { - speculativeTask.cancel(mayInterruptIfRunning); - speculativeTask = null; - } - } - - PendingReadOp parallelRead(boolean enabled) { + ReadLastConfirmedAndEntryOp parallelRead(boolean enabled) { this.parallelRead = enabled; return this; } - public void initiate() { - long nextEnsembleChange = startEntryId, i = startEntryId; - this.requestTimeNanos = MathUtils.nowInNano(); - ArrayList<BookieSocketAddress> ensemble = null; - - do { - if (i == nextEnsembleChange) { - ensemble = getLedgerMetadata().getEnsemble(i); - nextEnsembleChange = getLedgerMetadata().getNextEnsembleChange(i); - } - LedgerEntryRequest entry; - if (parallelRead) { - entry = new ParallelReadRequest(ensemble, lh.ledgerId, i); - } else { - entry = new SequenceReadRequest(ensemble, lh.ledgerId, i); - } - seq.add(entry); - i++; - } while (i <= endEntryId); - // read the entries. - for (LedgerEntryRequest entry : seq) { - entry.read(); - if (!parallelRead && lh.bk.getReadSpeculativeRequestPolicy().isPresent()) { - lh.bk.getReadSpeculativeRequestPolicy().get().initiateSpeculativeRequest(scheduler, entry); + /** + * Speculative Read Logic + */ + @Override + public ListenableFuture<Boolean> issueSpeculativeRequest() { + return lh.bk.mainWorkerPool.submitOrdered(lh.getId(), new Callable<Boolean>() { + @Override + public Boolean call() throws Exception { + if (!requestComplete.get() && !request.isComplete() && + (null != request.maybeSendSpeculativeRead(heardFromHostsBitSet))) { + if (LOG.isDebugEnabled()) { + LOG.debug("Send speculative ReadLAC {} for ledger {} (previousLAC: {}). Hosts heard are {}.", + new Object[] {request, lh.getId(), lastAddConfirmed, heardFromHostsBitSet }); + } + return true; + } + return false; } + }); + } + + public void initiate() { + if (parallelRead) { + request = new ParallelReadRequest(lh.metadata.currentEnsemble, lh.ledgerId, prevEntryId + 1); + } else { + request = new SequenceReadRequest(lh.metadata.currentEnsemble, lh.ledgerId, prevEntryId + 1); } + request.read(); + + if (!parallelRead && lh.bk.getReadLACSpeculativeRequestPolicy().isPresent()) { + lh.bk.getReadLACSpeculativeRequestPolicy().get().initiateSpeculativeRequest(scheduler, this); + } + } + + void sendReadTo(int bookieIndex, BookieSocketAddress to, ReadLACAndEntryRequest entry) throws InterruptedException { + if (LOG.isDebugEnabled()) { + LOG.debug("Calling Read LAC and Entry with {} and long polling interval {} on Bookie {} - Parallel {}", + new Object[] { prevEntryId, timeOutInMillis, to, parallelRead }); + } + lh.bk.bookieClient.readEntryWaitForLACUpdate(to, + lh.ledgerId, + BookieProtocol.LAST_ADD_CONFIRMED, + prevEntryId, + timeOutInMillis, + true, + this, new ReadLastConfirmedAndEntryContext(bookieIndex, to)); + this.numResponsesPending++; } - private static class ReadContext implements ReadEntryCallbackCtx { + /** + * Wrapper to get all recovered data from the request + */ + interface LastConfirmedAndEntryCallback { + public void readLastConfirmedAndEntryComplete(int rc, long lastAddConfirmed, LedgerEntry entry); + } + + public static class ReadLastConfirmedAndEntryContext implements BookkeeperInternalCallbacks.ReadEntryCallbackCtx { final int bookieIndex; - final BookieSocketAddress to; - final LedgerEntryRequest entry; + final BookieSocketAddress bookie; long lac = LedgerHandle.INVALID_ENTRY_ID; + Optional<Long> lacUpdateTimestamp = Optional.absent(); - ReadContext(int bookieIndex, BookieSocketAddress to, LedgerEntryRequest entry) { + ReadLastConfirmedAndEntryContext(int bookieIndex, BookieSocketAddress bookie) { this.bookieIndex = bookieIndex; - this.to = to; - this.entry = entry; + this.bookie = bookie; } @Override @@ -496,84 +505,116 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback { public long getLastAddConfirmed() { return lac; } - } - void sendReadTo(int bookieIndex, BookieSocketAddress to, LedgerEntryRequest entry) throws InterruptedException { - if (lh.throttler != null) { - lh.throttler.acquire(); + public Optional<Long> getLacUpdateTimestamp() { + return lacUpdateTimestamp; } - lh.bk.bookieClient.readEntry(to, lh.ledgerId, entry.entryId, - this, new ReadContext(bookieIndex, to, entry)); - } - - @Override - public void readEntryComplete(int rc, long ledgerId, final long entryId, final ByteBuf buffer, Object ctx) { - final ReadContext rctx = (ReadContext)ctx; - final LedgerEntryRequest entry = rctx.entry; - - if (rc != BKException.Code.OK) { - entry.logErrorAndReattemptRead(rctx.bookieIndex, rctx.to, "Error: " + BKException.getMessage(rc), rc); - return; + public void setLacUpdateTimestamp(long lacUpdateTimestamp) { + this.lacUpdateTimestamp = Optional.of(lacUpdateTimestamp); } - heardFromHosts.add(rctx.to); - heardFromHostsBitSet.set(rctx.bookieIndex, true); - if (entry.complete(rctx.bookieIndex, rctx.to, buffer)) { - lh.updateLastConfirmed(rctx.getLastAddConfirmed(), 0L); - submitCallback(BKException.Code.OK); - } + } - if(numPendingEntries < 0) - LOG.error("Read too many values for ledger {} : [{}, {}].", new Object[] { ledgerId, - startEntryId, endEntryId }); + private void submitCallback(int rc, long lastAddConfirmed, LedgerEntry entry) { + long latencyMicros = MathUtils.elapsedMicroSec(requestTimeNano); + if (BKException.Code.OK != rc) { + lh.bk.getReadLacAndEntryOpLogger() + .registerFailedEvent(latencyMicros, TimeUnit.MICROSECONDS); + } else { + lh.bk.getReadLacAndEntryOpLogger() + .registerSuccessfulEvent(latencyMicros, TimeUnit.MICROSECONDS); + } + cb.readLastConfirmedAndEntryComplete(rc, lastAddConfirmed, entry); } - protected void submitCallback(int code) { - if (BKException.Code.OK == code) { - numPendingEntries--; - if (numPendingEntries != 0) { - return; + @Override + public void readEntryComplete(int rc, long ledgerId, long entryId, ByteBuf buffer, Object ctx) { + if (LOG.isTraceEnabled()) { + LOG.trace("{} received response for (lid={}, eid={}) : {}", + new Object[] { getClass().getName(), ledgerId, entryId, rc }); + } + ReadLastConfirmedAndEntryContext rCtx = (ReadLastConfirmedAndEntryContext) ctx; + BookieSocketAddress bookie = rCtx.bookie; + numResponsesPending--; + if (BKException.Code.OK == rc) { + if (LOG.isTraceEnabled()) { + LOG.trace("Received lastAddConfirmed (lac={}) from bookie({}) for (lid={}).", + new Object[] { rCtx.getLastAddConfirmed(), bookie, ledgerId }); } - } - // ensure callback once - if (!complete.compareAndSet(false, true)) { - return; - } + if (rCtx.getLastAddConfirmed() > lastAddConfirmed) { + lastAddConfirmed = rCtx.getLastAddConfirmed(); + lh.updateLastConfirmed(rCtx.getLastAddConfirmed(), 0L); + } + + hasValidResponse = true; - long latencyNanos = MathUtils.elapsedNanos(requestTimeNanos); - if (code != BKException.Code.OK) { - long firstUnread = LedgerHandle.INVALID_ENTRY_ID; - for (LedgerEntryRequest req : seq) { - if (!req.isComplete()) { - firstUnread = req.getEntryId(); - break; + if (entryId != BookieProtocol.LAST_ADD_CONFIRMED) { + if (request.complete(rCtx.bookieIndex, bookie, buffer, entryId)) { + // callback immediately + if (rCtx.getLacUpdateTimestamp().isPresent()) { + long elapsedMicros = TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis() - rCtx.getLacUpdateTimestamp().get()); + elapsedMicros = Math.max(elapsedMicros, 0); + lh.bk.getReadLacAndEntryRespLogger() + .registerSuccessfulEvent(elapsedMicros, TimeUnit.MICROSECONDS); + } + + submitCallback(BKException.Code.OK, lastAddConfirmed, request); + requestComplete.set(true); + heardFromHostsBitSet.set(rCtx.bookieIndex, true); } + } else { + emptyResponsesFromHostsBitSet.set(rCtx.bookieIndex, true); + if (lastAddConfirmed > prevEntryId) { + // received advanced lac + completeRequest(); + } else if(emptyResponsesFromHostsBitSet.cardinality() >= numEmptyResponsesAllowed) { + if (LOG.isDebugEnabled()) { + LOG.debug("Completed readLACAndEntry(lid = {}, previousEntryId = {}) after received {} empty responses ('{}').", + new Object[]{ledgerId, prevEntryId, emptyResponsesFromHostsBitSet.cardinality(), emptyResponsesFromHostsBitSet}); + } + completeRequest(); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Received empty response for readLACAndEntry(lid = {}, previousEntryId = {}) from" + + " bookie {} @ {}, reattempting reading next bookie : lac = {}", + new Object[]{ledgerId, prevEntryId, rCtx.bookieIndex, + rCtx.bookie, lastAddConfirmed}); + } + request.logErrorAndReattemptRead(rCtx.bookieIndex, bookie, "Empty Response", rc); + } + return; } - LOG.error("Read of ledger entry failed: L{} E{}-E{}, Heard from {} : bitset = {}. First unread entry is {}", - new Object[] { lh.getId(), startEntryId, endEntryId, heardFromHosts, heardFromHostsBitSet, firstUnread }); - readOpLogger.registerFailedEvent(latencyNanos, TimeUnit.NANOSECONDS); + } else if (BKException.Code.UnauthorizedAccessException == rc && !requestComplete.get()) { + submitCallback(rc, lastAddConfirmed, null); + requestComplete.set(true); } else { - readOpLogger.registerSuccessfulEvent(latencyNanos, TimeUnit.NANOSECONDS); + request.logErrorAndReattemptRead(rCtx.bookieIndex, bookie, "Error: " + BKException.getMessage(rc), rc); + return; + } + + if (numResponsesPending <= 0) { + completeRequest(); } - cancelSpeculativeTask(true); - cb.readComplete(code, lh, PendingReadOp.this, PendingReadOp.this.ctx); - cb = null; } - @Override - public boolean hasMoreElements() { - return !seq.isEmpty(); + private void completeRequest() { + if (requestComplete.compareAndSet(false, true)) { + if (!hasValidResponse) { + // no success called + submitCallback(request.getFirstError(), lastAddConfirmed, null); + } else { + // callback + submitCallback(BKException.Code.OK, lastAddConfirmed, null); + } + } } @Override - public LedgerEntry nextElement() throws NoSuchElementException { - return seq.remove(); + public String toString() { + return String.format("ReadLastConfirmedAndEntryOp(lid=%d, prevEntryId=%d])", lh.ledgerId, prevEntryId); } - public int size() { - return seq.size(); - } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java index 038437a..157c6b5 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java @@ -20,7 +20,9 @@ package org.apache.bookkeeper.conf; import static com.google.common.base.Charsets.UTF_8; import static org.apache.bookkeeper.util.BookKeeperConstants.FEATURE_DISABLE_ENSEMBLE_CHANGE; +import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.client.BookKeeper.DigestType; @@ -66,8 +68,12 @@ public class ClientConfiguration extends AbstractConfiguration { protected final static String FIRST_SPECULATIVE_READ_TIMEOUT = "firstSpeculativeReadTimeout"; protected final static String MAX_SPECULATIVE_READ_TIMEOUT = "maxSpeculativeReadTimeout"; protected final static String SPECULATIVE_READ_TIMEOUT_BACKOFF_MULTIPLIER = "speculativeReadTimeoutBackoffMultiplier"; + protected final static String FIRST_SPECULATIVE_READ_LAC_TIMEOUT = "firstSpeculativeReadLACTimeout"; + protected final static String MAX_SPECULATIVE_READ_LAC_TIMEOUT = "maxSpeculativeReadLACTimeout"; + protected final static String SPECULATIVE_READ_LAC_TIMEOUT_BACKOFF_MULTIPLIER = "speculativeReadLACTimeoutBackoffMultiplier"; protected final static String ENABLE_PARALLEL_RECOVERY_READ = "enableParallelRecoveryRead"; protected final static String RECOVERY_READ_BATCH_SIZE = "recoveryReadBatchSize"; + protected final static String REORDER_READ_SEQUENCE_ENABLED = "reorderReadSequenceEnabled"; // Add Parameters protected final static String DELAY_ENSEMBLE_CHANGE = "delayEnsembleChange"; // Timeout Setting @@ -103,7 +109,11 @@ public class ClientConfiguration extends AbstractConfiguration { // Stats protected final static String ENABLE_TASK_EXECUTION_STATS = "enableTaskExecutionStats"; protected final static String TASK_EXECUTION_WARN_TIME_MICROS = "taskExecutionWarnTimeMicros"; - + + // Failure History Settings + protected final static String ENABLE_BOOKIE_FAILURE_TRACKING = "enableBookieFailureTracking"; + protected final static String BOOKIE_FAILURE_HISTORY_EXPIRATION_MS = "bookieFailureHistoryExpirationMSec"; + // Names of dynamic features protected final static String DISABLE_ENSEMBLE_CHANGE_FEATURE_NAME = "disableEnsembleChangeFeatureName"; @@ -846,6 +856,27 @@ public class ClientConfiguration extends AbstractConfiguration { } /** + * Multipler to use when determining time between successive speculative read LAC requests + * + * @return speculative read LAC timeout backoff multiplier. + */ + public float getSpeculativeReadLACTimeoutBackoffMultiplier() { + return getFloat(SPECULATIVE_READ_LAC_TIMEOUT_BACKOFF_MULTIPLIER, 2.0f); + } + + /** + * Set the multipler to use when determining time between successive speculative read LAC requests + * + * @param speculativeReadLACTimeoutBackoffMultiplier + * multipler to use when determining time between successive speculative read LAC requests. + * @return client configuration. + */ + public ClientConfiguration setSpeculativeReadLACTimeoutBackoffMultiplier(float speculativeReadLACTimeoutBackoffMultiplier) { + setProperty(SPECULATIVE_READ_LAC_TIMEOUT_BACKOFF_MULTIPLIER, speculativeReadLACTimeoutBackoffMultiplier); + return this; + } + + /** * Get the max speculative read timeout. * * @return max speculative read timeout. @@ -867,6 +898,66 @@ public class ClientConfiguration extends AbstractConfiguration { } /** + * Get the period of time after which the first speculative read last add confirmed and entry + * should be triggered. + * A speculative entry request is sent to the next replica bookie before + * an error or response has been received for the previous entry read request. + * + * A speculative entry read is only sent if we have not heard from the current + * replica bookie during the entire read operation which may comprise of many entries. + * + * Speculative requests allow the client to avoid having to wait for the connect timeout + * in the case that a bookie has failed. It induces higher load on the network and on + * bookies. This should be taken into account before changing this configuration value. + * + * @return the speculative request timeout in milliseconds. Default 1500. + */ + public int getFirstSpeculativeReadLACTimeout() { + return getInt(FIRST_SPECULATIVE_READ_LAC_TIMEOUT, 1500); + } + + + /** + * Get the maximum interval between successive speculative read last add confirmed and entry + * requests. + * + * @return the max speculative request timeout in milliseconds. Default 5000. + */ + public int getMaxSpeculativeReadLACTimeout() { + return getInt(MAX_SPECULATIVE_READ_LAC_TIMEOUT, 5000); + } + + /** + * Set the period of time after which the first speculative read last add confirmed and entry + * should be triggered. + * A lower timeout will reduce read latency in the case of a failed bookie, + * while increasing the load on bookies and the network. + * + * The default is 1500 milliseconds. A value of 0 will disable speculative reads + * completely. + * + * @see #getSpeculativeReadTimeout() + * @param timeout the timeout value, in milliseconds + * @return client configuration + */ + public ClientConfiguration setFirstSpeculativeReadLACTimeout(int timeout) { + setProperty(FIRST_SPECULATIVE_READ_LAC_TIMEOUT, timeout); + return this; + } + + /** + * Set the maximum interval between successive speculative read last add confirmed and entry + * requests. + * + * @param timeout the timeout value, in milliseconds + * @return client configuration + */ + public ClientConfiguration setMaxSpeculativeReadLACTimeout(int timeout) { + setProperty(MAX_SPECULATIVE_READ_LAC_TIMEOUT, timeout); + return this; + } + + /** * Whether to enable parallel reading in recovery read. * * @return true if enable parallel reading in recovery read. otherwise, return false. @@ -909,6 +1000,34 @@ public class ClientConfiguration extends AbstractConfiguration { } /** + * If reorder read sequence enabled or not. + * + * @return true if reorder read sequence is enabled, otherwise false. + */ + public boolean isReorderReadSequenceEnabled() { + return getBoolean(REORDER_READ_SEQUENCE_ENABLED, false); + } + + /** + * Enable/disable reordering read sequence on reading entries. + * + * <p>If this flag is enabled, the client will use + * {@link EnsemblePlacementPolicy#reorderReadSequence(ArrayList, List, Map)} + * to figure out a better read sequence to attempt reads from replicas and use + * {@link EnsemblePlacementPolicy#reorderReadLACSequence(ArrayList, List, Map)} + * to figure out a better read sequence to attempt long poll reads from replicas. + * + * <p>The order of read sequence is determined by the placement policy implementations. + * + * @param enabled the flag to enable/disable reorder read sequence. + * @return client configuration instance. + */ + public ClientConfiguration setReorderReadSequenceEnabled(boolean enabled) { + setProperty(REORDER_READ_SEQUENCE_ENABLED, enabled); + return this; + } + + /** * Get Ensemble Placement Policy Class. * * @return ensemble placement policy class. @@ -1240,6 +1359,48 @@ public class ClientConfiguration extends AbstractConfiguration { } /** + * Whether to enable bookie failure tracking + * + * @return flag to enable/disable bookie failure tracking + */ + public boolean getEnableBookieFailureTracking() { + return getBoolean(ENABLE_BOOKIE_FAILURE_TRACKING, true); + } + + /** + * Enable/Disable bookie failure tracking. + * + * @param enabled + * flag to enable/disable bookie failure tracking + * @return client configuration. + */ + public ClientConfiguration setEnableBookieFailureTracking(boolean enabled) { + setProperty(ENABLE_BOOKIE_FAILURE_TRACKING, enabled); + return this; + } + + /** + * Get the bookie failure tracking expiration timeout. + * + * @return bookie failure tracking expiration timeout. + */ + public int getBookieFailureHistoryExpirationMSec() { + return getInt(BOOKIE_FAILURE_HISTORY_EXPIRATION_MS, 60000); + } + + /** + * Set the bookie failure tracking expiration timeout. + * + * @param timeout + * bookie failure tracking expiration timeout. + * @return client configuration. + */ + public ClientConfiguration setBookieFailureHistoryExpirationMSec(int expirationMSec) { + setProperty(BOOKIE_FAILURE_HISTORY_EXPIRATION_MS, expirationMSec); + return this; + } + + /** * Get the name of the dynamic feature that disables ensemble change * * @return name of the dynamic feature that disables ensemble change diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java index b4dd066..e5c96e8 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java @@ -115,6 +115,11 @@ public class ServerConfiguration extends AbstractConfiguration { // Worker Thread parameters. protected final static String NUM_ADD_WORKER_THREADS = "numAddWorkerThreads"; protected final static String NUM_READ_WORKER_THREADS = "numReadWorkerThreads"; + protected final static String NUM_LONG_POLL_WORKER_THREADS = "numLongPollWorkerThreads"; + + // Long poll parameters + protected final static String REQUEST_TIMER_TICK_DURATION_MILLISEC = "requestTimerTickDurationMs"; + protected final static String REQUEST_TIMER_NO_OF_TICKS = "requestTimerNumTicks"; protected final static String READ_BUFFER_SIZE = "readBufferSizeBytes"; protected final static String WRITE_BUFFER_SIZE = "writeBufferSizeBytes"; @@ -1131,6 +1136,26 @@ public class ServerConfiguration extends AbstractConfiguration { } /** + * Set the number of threads that should handle long poll requests + * + * @param numThreads + * number of threads to handle long poll requests. + * @return server configuration + */ + public ServerConfiguration setNumLongPollWorkerThreads(int numThreads) { + setProperty(NUM_LONG_POLL_WORKER_THREADS, numThreads); + return this; + } + + /** + * Get the number of threads that should handle long poll requests. + * @return + */ + public int getNumLongPollWorkerThreads() { + return getInt(NUM_LONG_POLL_WORKER_THREADS, 10); + } + + /** * Set the number of threads that would handle read requests. * * @param numThreads @@ -1148,6 +1173,46 @@ public class ServerConfiguration extends AbstractConfiguration { public int getNumReadWorkerThreads() { return getInt(NUM_READ_WORKER_THREADS, 8); } + + /** + * Set the tick duration in milliseconds + * + * @param tickDuration + * tick duration in milliseconds. + * @return server configuration + */ + public ServerConfiguration setRequestTimerTickDurationMs(int tickDuration) { + setProperty(REQUEST_TIMER_TICK_DURATION_MILLISEC, tickDuration); + return this; + } + + /** + * Get the tick duration in milliseconds. + * @return + */ + public int getRequestTimerTickDurationMs() { + return getInt(REQUEST_TIMER_TICK_DURATION_MILLISEC, 10); + } + + /** + * Set the number of ticks per wheel for the request timer. + * + * @param tickCount + * number of ticks per wheel for the request timer. + * @return server configuration + */ + public ServerConfiguration setRequestTimerNumTicks(int tickCount) { + setProperty(REQUEST_TIMER_NO_OF_TICKS, tickCount); + return this; + } + + /** + * Get the number of ticks per wheel for the request timer. + * @return + */ + public int getRequestTimerNumTicks() { + return getInt(REQUEST_TIMER_NO_OF_TICKS, 1024); + } /** * Get the number of bytes used as capacity for the write buffer. Default is diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java index 1376048..4cbd814 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java @@ -364,17 +364,7 @@ public class BookieClient implements PerChannelBookieClientFactory { @Override public void operationComplete(final int rc, PerChannelBookieClient pcbc) { if (rc != BKException.Code.OK) { - try { - executor.submitOrdered(ledgerId, new SafeRunnable() { - @Override - public void safeRun() { - cb.readEntryComplete(rc, ledgerId, entryId, null, ctx); - } - }); - } catch (RejectedExecutionException re) { - cb.readEntryComplete(getRc(BKException.Code.InterruptedException), - ledgerId, entryId, null, ctx); - } + completeRead(rc, ledgerId, entryId, null, cb, ctx); return; } pcbc.readEntry(ledgerId, entryId, cb, ctx); @@ -384,6 +374,40 @@ public class BookieClient implements PerChannelBookieClientFactory { closeLock.readLock().unlock(); } } + + + public void readEntryWaitForLACUpdate(final BookieSocketAddress addr, + final long ledgerId, + final long entryId, + final long previousLAC, + final long timeOutInMillis, + final boolean piggyBackEntry, + final ReadEntryCallback cb, + final Object ctx) { + closeLock.readLock().lock(); + try { + final PerChannelBookieClientPool client = lookupClient(addr, entryId); + if (client == null) { + completeRead(BKException.Code.BookieHandleNotAvailableException, + ledgerId, entryId, null, cb, ctx); + return; + } + + client.obtain(new GenericCallback<PerChannelBookieClient>() { + @Override + public void operationComplete(final int rc, PerChannelBookieClient pcbc) { + + if (rc != BKException.Code.OK) { + completeRead(rc, ledgerId, entryId, null, cb, ctx); + return; + } + pcbc.readEntryWaitForLACUpdate(ledgerId, entryId, previousLAC, timeOutInMillis, piggyBackEntry, cb, ctx); + } + }, ledgerId); + } finally { + closeLock.readLock().unlock(); + } + } public void getBookieInfo(final BookieSocketAddress addr, final long requested, final GetBookieInfoCallback cb, final Object ctx) { closeLock.readLock().lock(); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java index ce5972e..1f20425 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java @@ -20,14 +20,18 @@ */ package org.apache.bookkeeper.proto; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.protobuf.ByteString; import io.netty.channel.Channel; +import io.netty.util.HashedWheelTimer; import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.auth.AuthProviderFactoryFactory; import org.apache.bookkeeper.auth.AuthToken; import org.apache.bookkeeper.bookie.Bookie; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.processor.RequestProcessor; +import org.apache.bookkeeper.stats.Counter; import org.apache.bookkeeper.stats.OpStatsLogger; import org.apache.bookkeeper.stats.StatsLogger; import org.apache.bookkeeper.util.OrderedSafeExecutor; @@ -49,6 +53,7 @@ import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_LONG import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_REQUEST; import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_SCHEDULING_DELAY; import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_LAC_REQUEST; +import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_LAST_ENTRY_NOENTRY_ERROR; import static org.apache.bookkeeper.bookie.BookKeeperServerStats.WRITE_LAC; import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_LAC; import static org.apache.bookkeeper.bookie.BookKeeperServerStats.GET_BOOKIE_INFO; @@ -79,6 +84,17 @@ public class BookieRequestProcessor implements RequestProcessor { */ private final OrderedSafeExecutor writeThreadPool; + /** + * The threadpool used to execute all long poll requests issued to this server + * after they are done waiting + */ + private final OrderedSafeExecutor longPollThreadPool; + + /** + * The Timer used to time out requests for long polling + */ + private final HashedWheelTimer requestTimer; + // Expose Stats private final BKStats bkStats = BKStats.getInstance(); private final boolean statsEnabled; @@ -94,6 +110,7 @@ public class BookieRequestProcessor implements RequestProcessor { final OpStatsLogger longPollWaitStats; final OpStatsLogger longPollReadStats; final OpStatsLogger longPollReadRequestStats; + final Counter readLastEntryNoEntryErrorCounter; final OpStatsLogger writeLacRequestStats; final OpStatsLogger writeLacStats; final OpStatsLogger readLacRequestStats; @@ -108,6 +125,15 @@ public class BookieRequestProcessor implements RequestProcessor { this.bookie = bookie; this.readThreadPool = createExecutor(this.serverCfg.getNumReadWorkerThreads(), "BookieReadThread-" + serverCfg.getBookiePort()); this.writeThreadPool = createExecutor(this.serverCfg.getNumAddWorkerThreads(), "BookieWriteThread-" + serverCfg.getBookiePort()); + this.longPollThreadPool = + createExecutor( + this.serverCfg.getNumLongPollWorkerThreads(), + "BookieLongPollThread-" + serverCfg.getBookiePort()); + this.requestTimer = new HashedWheelTimer( + new ThreadFactoryBuilder().setNameFormat("BookieRequestTimer-%d").build(), + this.serverCfg.getRequestTimerTickDurationMs(), + TimeUnit.MILLISECONDS, this.serverCfg.getRequestTimerNumTicks()); + // Expose Stats this.statsEnabled = serverCfg.isStatisticsEnabled(); this.addEntryStats = statsLogger.getOpStatsLogger(ADD_ENTRY); @@ -122,6 +148,7 @@ public class BookieRequestProcessor implements RequestProcessor { this.longPollWaitStats = statsLogger.getOpStatsLogger(READ_ENTRY_LONG_POLL_WAIT); this.longPollReadStats = statsLogger.getOpStatsLogger(READ_ENTRY_LONG_POLL_READ); this.longPollReadRequestStats = statsLogger.getOpStatsLogger(READ_ENTRY_LONG_POLL_REQUEST); + this.readLastEntryNoEntryErrorCounter = statsLogger.getCounter(READ_LAST_ENTRY_NOENTRY_ERROR); this.writeLacStats = statsLogger.getOpStatsLogger(WRITE_LAC); this.writeLacRequestStats = statsLogger.getOpStatsLogger(WRITE_LAC_REQUEST); this.readLacStats = statsLogger.getOpStatsLogger(READ_LAC); @@ -231,11 +258,29 @@ public class BookieRequestProcessor implements RequestProcessor { private void processReadRequestV3(final BookkeeperProtocol.Request r, final Channel c) { ExecutorService fenceThreadPool = null == readThreadPool ? null : readThreadPool.chooseThread(c); - ReadEntryProcessorV3 read = new ReadEntryProcessorV3(r, c, this, fenceThreadPool); - if (null == readThreadPool) { - read.run(); + ExecutorService lpThreadPool = + null == longPollThreadPool ? null : longPollThreadPool.chooseThread(c); + ReadEntryProcessorV3 read; + if (RequestUtils.isLongPollReadRequest(r.getReadRequest())) { + read = new LongPollReadEntryProcessorV3( + r, + c, + this, + fenceThreadPool, + lpThreadPool, + requestTimer); + if (null == longPollThreadPool) { + read.run(); + } else { + longPollThreadPool.submitOrdered(r.getReadRequest().getLedgerId(), read); + } } else { - readThreadPool.submitOrdered(r.getReadRequest().getLedgerId(), read); + read = new ReadEntryProcessorV3(r, c, this, fenceThreadPool); + if (null == readThreadPool) { + read.run(); + } else { + readThreadPool.submitOrdered(r.getReadRequest().getLedgerId(), read); + } } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3.java new file mode 100644 index 0000000..342e788 --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3.java @@ -0,0 +1,226 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.bookkeeper.proto; + +import com.google.common.base.Optional; +import com.google.common.base.Stopwatch; +import io.netty.channel.Channel; +import io.netty.util.HashedWheelTimer; +import io.netty.util.Timeout; +import io.netty.util.TimerTask; +import java.io.IOException; +import java.util.Observable; +import java.util.Observer; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import org.apache.bookkeeper.bookie.Bookie; +import org.apache.bookkeeper.bookie.LastAddConfirmedUpdateNotification; +import org.apache.bookkeeper.proto.BookkeeperProtocol.Request; +import org.apache.bookkeeper.proto.BookkeeperProtocol.ReadResponse; +import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Processor handling long poll read entry request. + */ +class LongPollReadEntryProcessorV3 extends ReadEntryProcessorV3 implements Observer { + + private final static Logger logger = LoggerFactory.getLogger(LongPollReadEntryProcessorV3.class); + + private final Long previousLAC; + private Optional<Long> lastAddConfirmedUpdateTime = Optional.absent(); + + // long poll execution state + private final ExecutorService longPollThreadPool; + private final HashedWheelTimer requestTimer; + private Timeout expirationTimerTask = null; + private Future<?> deferredTask = null; + private boolean shouldReadEntry = false; + + LongPollReadEntryProcessorV3(Request request, + Channel channel, + BookieRequestProcessor requestProcessor, + ExecutorService fenceThreadPool, + ExecutorService longPollThreadPool, + HashedWheelTimer requestTimer) { + super(request, channel, requestProcessor, fenceThreadPool); + this.previousLAC = readRequest.getPreviousLAC(); + this.longPollThreadPool = longPollThreadPool; + this.requestTimer = requestTimer; + + } + + @Override + protected Long getPreviousLAC() { + return previousLAC; + } + + private synchronized boolean shouldReadEntry() { + return shouldReadEntry; + } + + @Override + protected ReadResponse readEntry(ReadResponse.Builder readResponseBuilder, + long entryId, + Stopwatch startTimeSw) + throws IOException { + if (RequestUtils.shouldPiggybackEntry(readRequest)) { + if(!readRequest.hasPreviousLAC() || (BookieProtocol.LAST_ADD_CONFIRMED != entryId)) { + // This is not a valid request - client bug? + logger.error("Incorrect read request, entry piggyback requested incorrectly for ledgerId {} entryId {}", + ledgerId, entryId); + return buildResponse(readResponseBuilder, StatusCode.EBADREQ, startTimeSw); + } else { + long knownLAC = requestProcessor.bookie.readLastAddConfirmed(ledgerId); + readResponseBuilder.setMaxLAC(knownLAC); + if (knownLAC > previousLAC) { + entryId = previousLAC + 1; + readResponseBuilder.setMaxLAC(knownLAC); + if (lastAddConfirmedUpdateTime.isPresent()) { + readResponseBuilder.setLacUpdateTimestamp(lastAddConfirmedUpdateTime.get()); + } + if (logger.isDebugEnabled()) { + logger.debug("ReadLAC Piggy Back reading entry:{} from ledger: {}", entryId, ledgerId); + } + try { + return super.readEntry(readResponseBuilder, entryId, true, startTimeSw); + } catch (Bookie.NoEntryException e) { + requestProcessor.readLastEntryNoEntryErrorCounter.inc(); + logger.info("No entry found while piggyback reading entry {} from ledger {} : previous lac = {}", + new Object[] { entryId, ledgerId, previousLAC }); + // piggy back is best effort and this request can fail genuinely because of striping + // entries across the ensemble + return buildResponse(readResponseBuilder, StatusCode.EOK, startTimeSw); + } + } else { + if (knownLAC < previousLAC) { + if (logger.isDebugEnabled()) { + logger.debug("Found smaller lac when piggy back reading lac and entry from ledger {} :" + + " previous lac = {}, known lac = {}", + new Object[]{ ledgerId, previousLAC, knownLAC }); + } + } + return buildResponse(readResponseBuilder, StatusCode.EOK, startTimeSw); + } + } + } else { + return super.readEntry(readResponseBuilder, entryId, false, startTimeSw); + } + } + + private ReadResponse buildErrorResponse(StatusCode statusCode, Stopwatch sw) { + ReadResponse.Builder builder = ReadResponse.newBuilder() + .setLedgerId(ledgerId) + .setEntryId(entryId); + return buildResponse(builder, statusCode, sw); + } + + private ReadResponse getLongPollReadResponse() { + if (!shouldReadEntry() && readRequest.hasTimeOut()) { + if (logger.isTraceEnabled()) { + logger.trace("Waiting For LAC Update {}", previousLAC); + } + + final Stopwatch startTimeSw = Stopwatch.createStarted(); + + final Observable observable; + try { + observable = requestProcessor.bookie.waitForLastAddConfirmedUpdate(ledgerId, previousLAC, this); + } catch (Bookie.NoLedgerException e) { + logger.info("No ledger found while longpoll reading ledger {}, previous lac = {}.", + ledgerId, previousLAC); + return buildErrorResponse(StatusCode.ENOLEDGER, startTimeSw); + } catch (IOException ioe) { + logger.error("IOException while longpoll reading ledger {}, previous lac = {} : ", + new Object[] { ledgerId, previousLAC, ioe }); + return buildErrorResponse(StatusCode.EIO, startTimeSw); + } + + registerSuccessfulEvent(requestProcessor.longPollPreWaitStats, startTimeSw); + lastPhaseStartTime.reset().start(); + + if (null != observable) { + // successfully registered observable to lac updates + if (logger.isTraceEnabled()) { + logger.trace("Waiting For LAC Update {}: Timeout {}", previousLAC, readRequest.getTimeOut()); + } + synchronized (this) { + expirationTimerTask = requestTimer.newTimeout(new TimerTask() { + @Override + public void run(Timeout timeout) throws Exception { + // When the timeout expires just get whatever is the current + // readLastConfirmed + LongPollReadEntryProcessorV3.this.scheduleDeferredRead(observable, true); + } + }, readRequest.getTimeOut(), TimeUnit.MILLISECONDS); + } + return null; + } + } + // request doesn't have timeout or fail to wait, proceed to read entry + return getReadResponse(); + } + + @Override + protected void executeOp() { + ReadResponse readResponse = getLongPollReadResponse(); + if (null != readResponse) { + sendResponse(readResponse); + } + } + + @Override + public void update(Observable observable, Object o) { + LastAddConfirmedUpdateNotification newLACNotification = (LastAddConfirmedUpdateNotification)o; + if (newLACNotification.lastAddConfirmed > previousLAC) { + if (newLACNotification.lastAddConfirmed != Long.MAX_VALUE && + !lastAddConfirmedUpdateTime.isPresent()) { + lastAddConfirmedUpdateTime = Optional.of(newLACNotification.timestamp); + } + if (logger.isTraceEnabled()) { + logger.trace("Last Add Confirmed Advanced to {} for request {}", + newLACNotification.lastAddConfirmed, request); + } + scheduleDeferredRead(observable, false); + } + } + + private synchronized void scheduleDeferredRead(Observable observable, boolean timeout) { + if (null == deferredTask) { + if (logger.isTraceEnabled()) { + logger.trace("Deferred Task, expired: {}, request: {}", timeout, request); + } + observable.deleteObserver(this); + try { + shouldReadEntry = true; + deferredTask = longPollThreadPool.submit(this); + } catch (RejectedExecutionException exc) { + // If the threadPool has been shutdown, simply drop the task + } + if (null != expirationTimerTask) { + expirationTimerTask.cancel(); + } + + registerEvent(timeout, requestProcessor.longPollWaitStats, lastPhaseStartTime); + lastPhaseStartTime.reset().start(); + } + } +} diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java index 2ec567b..7ecb0b4 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java @@ -66,6 +66,7 @@ import com.google.protobuf.ByteString; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeperClientStats; import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo; +import org.apache.bookkeeper.client.ReadLastConfirmedAndEntryOp; import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback; @@ -728,7 +729,36 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter { } } - public void readEntry(final long ledgerId, final long entryId, ReadEntryCallback cb, Object ctx) { + /** + * Long Poll Reads + */ + public void readEntryWaitForLACUpdate(final long ledgerId, + final long entryId, + final long previousLAC, + final long timeOutInMillis, + final boolean piggyBackEntry, + ReadEntryCallback cb, + Object ctx) { + readEntryInternal(ledgerId, entryId, previousLAC, timeOutInMillis, piggyBackEntry, cb, ctx); + } + + /** + * Normal Reads. + */ + public void readEntry(final long ledgerId, + final long entryId, + ReadEntryCallback cb, + Object ctx) { + readEntryInternal(ledgerId, entryId, null, null, false, cb, ctx); + } + + private void readEntryInternal(final long ledgerId, + final long entryId, + final Long previousLAC, + final Long timeOutInMillis, + final boolean piggyBackEntry, + final ReadEntryCallback cb, + final Object ctx) { Object request = null; CompletionKey completion = null; if (useV2WireProtocol) { @@ -749,6 +779,30 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter { .setLedgerId(ledgerId) .setEntryId(entryId); + if (null != previousLAC) { + readBuilder = readBuilder.setPreviousLAC(previousLAC); + } + + if (null != timeOutInMillis) { + // Long poll requires previousLAC + if (null == previousLAC) { + cb.readEntryComplete(BKException.Code.IncorrectParameterException, + ledgerId, entryId, null, ctx); + return; + } + readBuilder = readBuilder.setTimeOut(timeOutInMillis); + } + + if (piggyBackEntry) { + // Long poll requires previousLAC + if (null == previousLAC) { + cb.readEntryComplete(BKException.Code.IncorrectParameterException, + ledgerId, entryId, null, ctx); + return; + } + readBuilder = readBuilder.setFlag(ReadRequest.Flag.ENTRY_PIGGYBACK); + } + request = Request.newBuilder() .setHeader(headerBuilder) .setReadRequest(readBuilder) @@ -1210,7 +1264,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter { if (readResponse.hasData()) { data = readResponse.getData(); } - handleReadResponse(ledgerId, entryId, status, data, INVALID_ENTRY_ID, completionValue); + handleReadResponse(ledgerId, entryId, status, data, INVALID_ENTRY_ID, -1L, completionValue); break; } default: @@ -1302,7 +1356,11 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter { if (readResponse.hasMaxLAC()) { maxLAC = readResponse.getMaxLAC(); } - handleReadResponse(readResponse.getLedgerId(), readResponse.getEntryId(), status, buffer, maxLAC, completionValue); + long lacUpdateTimestamp = -1L; + if (readResponse.hasLacUpdateTimestamp()) { + lacUpdateTimestamp = readResponse.getLacUpdateTimestamp(); + } + handleReadResponse(readResponse.getLedgerId(), readResponse.getEntryId(), status, buffer, maxLAC, lacUpdateTimestamp, completionValue); break; } case WRITE_LAC: { @@ -1416,6 +1474,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter { StatusCode status, ByteBuf buffer, long maxLAC, // max known lac piggy-back from bookies + long lacUpdateTimestamp, // the timestamp when the lac is updated. CompletionValue completionValue) { // The completion value should always be an instance of a ReadCompletion object when we reach here. ReadCompletion rc = (ReadCompletion)completionValue; @@ -1440,6 +1499,9 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter { if (maxLAC > INVALID_ENTRY_ID && (rc.ctx instanceof ReadEntryCallbackCtx)) { ((ReadEntryCallbackCtx) rc.ctx).setLastAddConfirmed(maxLAC); } + if (lacUpdateTimestamp > -1L && (rc.ctx instanceof ReadLastConfirmedAndEntryOp.ReadLastConfirmedAndEntryContext)) { + ((ReadLastConfirmedAndEntryOp.ReadLastConfirmedAndEntryContext) rc.ctx).setLacUpdateTimestamp(lacUpdateTimestamp); + } rc.cb.readEntryComplete(rcToRet, ledgerId, entryId, buffer, rc.ctx); } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java index 6be898d..0febdc7 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java @@ -24,6 +24,8 @@ import io.netty.buffer.ByteBuf; import java.io.File; import java.io.IOException; +import java.util.Observable; +import java.util.Observer; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Callable; @@ -339,6 +341,11 @@ public class TestSyncThread { } @Override + public Observable waitForLastAddConfirmedUpdate(long ledgerId, long previoisLAC, Observer observer) throws IOException { + return null; + } + + @Override public Checkpoint checkpoint(Checkpoint checkpoint) throws IOException { return checkpoint; diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadLastConfirmedAndEntry.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadLastConfirmedAndEntry.java new file mode 100644 index 0000000..96c5c08 --- /dev/null +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadLastConfirmedAndEntry.java @@ -0,0 +1,267 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.client; + +import io.netty.buffer.ByteBuf; +import org.apache.bookkeeper.bookie.Bookie; +import org.apache.bookkeeper.bookie.BookieException; +import org.apache.bookkeeper.conf.ClientConfiguration; +import org.apache.bookkeeper.conf.ServerConfiguration; +import org.apache.bookkeeper.test.BookKeeperClusterTestCase; +import org.apache.zookeeper.KeeperException; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Enumeration; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +import static com.google.common.base.Charsets.UTF_8; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +public class TestReadLastConfirmedAndEntry extends BookKeeperClusterTestCase { + + private static final Logger logger = LoggerFactory.getLogger(TestReadLastConfirmedAndEntry.class); + + final BookKeeper.DigestType digestType; + + public TestReadLastConfirmedAndEntry() { + super(3); + this.digestType = BookKeeper.DigestType.CRC32; + } + + static class FakeBookie extends Bookie { + + final long expectedEntryToFail; + final boolean stallOrRespondNull; + + public FakeBookie(ServerConfiguration conf, long expectedEntryToFail, boolean stallOrRespondNull) + throws InterruptedException, BookieException, KeeperException, IOException { + super(conf); + this.expectedEntryToFail = expectedEntryToFail; + this.stallOrRespondNull = stallOrRespondNull; + } + + @Override + public ByteBuf readEntry(long ledgerId, long entryId) + throws IOException, NoLedgerException { + if (entryId == expectedEntryToFail) { + if (stallOrRespondNull) { + try { + Thread.sleep(600000); + } catch (InterruptedException e) { + // ignore + } + } else { + throw new NoEntryException(ledgerId, entryId); + } + } + return super.readEntry(ledgerId, entryId); + } + } + + @Test(timeout = 60000) + public void testAdvancedLacWithEmptyResponse() throws Exception { + byte[] passwd = "advanced-lac-with-empty-response".getBytes(UTF_8); + + ClientConfiguration newConf = new ClientConfiguration(); + newConf.addConfiguration(baseClientConf); + newConf.setAddEntryTimeout(9999999); + newConf.setReadEntryTimeout(9999999); + + // stop existing bookies + stopAllBookies(); + // add fake bookies + long expectedEntryIdToFail = 2; + for (int i = 0; i < numBookies; i++) { + ServerConfiguration conf = newServerConfiguration(); + Bookie b = new FakeBookie(conf, expectedEntryIdToFail, i != 0); + bs.add(startBookie(conf, b)); + bsConfs.add(conf); + } + + // create bookkeeper + BookKeeper newBk = new BookKeeper(newConf); + // create ledger to write some data + LedgerHandle lh = newBk.createLedger(3, 3, 2, digestType, passwd); + for (int i = 0; i <= expectedEntryIdToFail; i++) { + lh.addEntry("test".getBytes(UTF_8)); + } + + // open ledger to tail reading + LedgerHandle newLh = newBk.openLedgerNoRecovery(lh.getId(), digestType, passwd); + long lac = newLh.readLastConfirmed(); + assertEquals(expectedEntryIdToFail - 1, lac); + Enumeration<LedgerEntry> entries = newLh.readEntries(0, lac); + + int numReads = 0; + long expectedEntryId = 0L; + while (entries.hasMoreElements()) { + LedgerEntry entry = entries.nextElement(); + assertEquals(expectedEntryId++, entry.getEntryId()); + ++numReads; + } + assertEquals(lac + 1, numReads); + + final AtomicInteger rcHolder = new AtomicInteger(-12345); + final AtomicLong lacHolder = new AtomicLong(lac); + final AtomicReference<LedgerEntry> entryHolder = new AtomicReference<LedgerEntry>(null); + final CountDownLatch latch = new CountDownLatch(1); + + newLh.asyncReadLastConfirmedAndEntry(newLh.getLastAddConfirmed() + 1, 99999, false, + new AsyncCallback.ReadLastConfirmedAndEntryCallback() { + @Override + public void readLastConfirmedAndEntryComplete(int rc, long lastConfirmed, LedgerEntry entry, Object ctx) { + rcHolder.set(rc); + lacHolder.set(lastConfirmed); + entryHolder.set(entry); + latch.countDown(); + } + }, null); + + lh.addEntry("another test".getBytes(UTF_8)); + + latch.await(); + assertEquals(expectedEntryIdToFail, lacHolder.get()); + assertNull(entryHolder.get()); + assertEquals(BKException.Code.OK, rcHolder.get()); + } + + static class SlowReadLacBookie extends Bookie { + + private final long lacToSlowRead; + private final CountDownLatch readLatch; + + public SlowReadLacBookie(ServerConfiguration conf, + long lacToSlowRead, CountDownLatch readLatch) + throws IOException, KeeperException, InterruptedException, BookieException { + super(conf); + this.lacToSlowRead = lacToSlowRead; + this.readLatch = readLatch; + } + + @Override + public long readLastAddConfirmed(long ledgerId) throws IOException { + long lac = super.readLastAddConfirmed(ledgerId); + logger.info("Last Add Confirmed for ledger {} is {}", ledgerId, lac); + if (lacToSlowRead == lac) { + logger.info("Suspend returning lac {} for ledger {}", lac, ledgerId); + try { + readLatch.await(); + } catch (InterruptedException e) { + // no-op + } + } + return super.readLastAddConfirmed(ledgerId); + } + } + + static class ReadLastConfirmedAndEntryResult implements AsyncCallback.ReadLastConfirmedAndEntryCallback { + + int rc = -1234; + long lac = -1234L; + LedgerEntry entry = null; + final CountDownLatch doneLatch = new CountDownLatch(1); + + @Override + public void readLastConfirmedAndEntryComplete(int rc, long lastConfirmed, LedgerEntry entry, Object ctx) { + this.rc = rc; + this.lac = lastConfirmed; + this.entry = entry; + doneLatch.countDown(); + } + + void await() throws InterruptedException { + doneLatch.await(); + } + } + + @Test(timeout = 60000) + public void testRaceOnLastAddConfirmed() throws Exception { + byte[] passwd = "race-on-last-add-confirmed".getBytes(UTF_8); + + ClientConfiguration newConf = new ClientConfiguration(); + newConf.addConfiguration(baseClientConf); + newConf.setAddEntryTimeout(9999999); + newConf.setReadEntryTimeout(9999999); + + final long lacToSlowRead = 0L; + final CountDownLatch readLatch = new CountDownLatch(1); + + // stop first bookie + ServerConfiguration bsConf = killBookie(0); + // start it with a slow bookie + Bookie b = new SlowReadLacBookie(bsConf, lacToSlowRead, readLatch); + bs.add(startBookie(bsConf, b)); + bsConfs.add(bsConf); + + // create bookkeeper + BookKeeper newBk = new BookKeeper(newConf); + // create ledger + LedgerHandle lh = newBk.createLedger(3, 3, 3, digestType, passwd); + // 0) write entry 0 + lh.addEntry("entry-0".getBytes(UTF_8)); + + // open ledger to read + LedgerHandle readLh = newBk.openLedgerNoRecovery(lh.getId(), digestType, passwd); + + // 1) wait entry 0 to be committed + ReadLastConfirmedAndEntryResult readResult = new ReadLastConfirmedAndEntryResult(); + readLh.asyncReadLastConfirmedAndEntry(0L, 9999999, true, readResult, null); + + // 2) write entry 1 to commit entry 0 => lac = 0 + lh.addEntry("entry-1".getBytes(UTF_8)); + readResult.await(); + assertEquals(BKException.Code.OK, readResult.rc); + assertEquals(0L, readResult.lac); + assertEquals(0L, readResult.entry.getEntryId()); + assertEquals("entry-0", new String(readResult.entry.getEntry(), UTF_8)); + + // 3) write entry 2 to commit entry 1 => lac = 1 + lh.addEntry("entry-2".getBytes(UTF_8)); + // 4) count down read latch to trigger previous readLacAndEntry request + readLatch.countDown(); + // 5) due to piggyback, the lac is updated to lac = 1 + while (readLh.getLastAddConfirmed() < 1L) { + Thread.sleep(100); + } + // 6) write entry 3 to commit entry 2 => lac = 2 + lh.addEntry("entry-3".getBytes(UTF_8)); + // 7) readLastConfirmedAndEntry for next entry (we are expecting to read entry 1) + readResult = new ReadLastConfirmedAndEntryResult(); + readLh.asyncReadLastConfirmedAndEntry(1L, 9999999, true, readResult, null); + readResult.await(); + assertEquals(BKException.Code.OK, readResult.rc); + assertEquals(2L, readResult.lac); + assertEquals(1L, readResult.entry.getEntryId()); + assertEquals("entry-1", new String(readResult.entry.getEntry(), UTF_8)); + + lh.close(); + readLh.close(); + + newBk.close(); + } +} diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadLastConfirmedLongPoll.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadLastConfirmedLongPoll.java new file mode 100644 index 0000000..094fe88 --- /dev/null +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadLastConfirmedLongPoll.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.bookkeeper.client; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import org.apache.bookkeeper.client.BookKeeper.DigestType; +import org.apache.bookkeeper.conf.ServerConfiguration; +import org.apache.bookkeeper.test.BookKeeperClusterTestCase; +import org.junit.Test; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +public class TestReadLastConfirmedLongPoll extends BookKeeperClusterTestCase { + final DigestType digestType; + + public TestReadLastConfirmedLongPoll() { + super(6); + this.digestType = DigestType.CRC32; + } + + @Test(timeout = 60000) + public void testReadLACLongPollWhenAllBookiesUp() throws Exception { + final int numEntries = 3; + + final LedgerHandle lh = bkc.createLedger(3, 3, 1, digestType, "".getBytes()); + LedgerHandle readLh = bkc.openLedgerNoRecovery(lh.getId(), digestType, "".getBytes()); + assertEquals(LedgerHandle.INVALID_ENTRY_ID, readLh.getLastAddConfirmed()); + // add entries + for (int i = 0; i < (numEntries - 1); i++) { + lh.addEntry(("data" + i).getBytes()); + } + final AtomicBoolean success = new AtomicBoolean(false); + final AtomicInteger numCallbacks = new AtomicInteger(0); + final CountDownLatch firstReadComplete = new CountDownLatch(1); + readLh.asyncTryReadLastConfirmed(new AsyncCallback.ReadLastConfirmedCallback() { + @Override + public void readLastConfirmedComplete(int rc, long lastConfirmed, Object ctx) { + numCallbacks.incrementAndGet(); + if (BKException.Code.OK == rc) { + success.set(true); + } else { + success.set(false); + } + firstReadComplete.countDown(); + } + }, null); + firstReadComplete.await(); + assertTrue(success.get()); + assertTrue(numCallbacks.get() == 1); + assertEquals(numEntries - 3, readLh.getLastAddConfirmed()); + // try read last confirmed again + success.set(false); + numCallbacks.set(0); + long entryId = readLh.getLastAddConfirmed()+1; + final CountDownLatch secondReadComplete = new CountDownLatch(1); + readLh.asyncReadLastConfirmedAndEntry(entryId++, 1000, true, new AsyncCallback.ReadLastConfirmedAndEntryCallback() { + @Override + public void readLastConfirmedAndEntryComplete(int rc, long lastConfirmed, LedgerEntry entry, Object ctx) { + numCallbacks.incrementAndGet(); + if (BKException.Code.OK == rc && lastConfirmed == (numEntries - 2)) { + success.set(true); + } else { + success.set(false); + } + secondReadComplete.countDown(); + } + }, null); + lh.addEntry(("data" + (numEntries - 1)).getBytes()); + secondReadComplete.await(); + assertTrue(success.get()); + assertTrue(numCallbacks.get() == 1); + assertEquals(numEntries - 2, readLh.getLastAddConfirmed()); + + success.set(false); + numCallbacks.set(0); + final CountDownLatch thirdReadComplete = new CountDownLatch(1); + readLh.asyncReadLastConfirmedAndEntry(entryId++, 1000, false, new AsyncCallback.ReadLastConfirmedAndEntryCallback() { + @Override + public void readLastConfirmedAndEntryComplete(int rc, long lastConfirmed, LedgerEntry entry, Object ctx) { + numCallbacks.incrementAndGet(); + if (BKException.Code.OK == rc && lastConfirmed == (numEntries - 1)) { + success.set(true); + } else { + success.set(false); + } + thirdReadComplete.countDown(); + } + }, null); + lh.addEntry(("data" + numEntries).getBytes()); + thirdReadComplete.await(); + assertTrue(success.get()); + assertTrue(numCallbacks.get() == 1); + assertEquals(numEntries - 1, readLh.getLastAddConfirmed()); + lh.close(); + readLh.close(); + } + + @Test(timeout = 60000) + public void testReadLACLongPollWhenSomeBookiesDown() throws Exception { + final int numEntries = 3; + final LedgerHandle lh = bkc.createLedger(3, 1, 1, digestType, "".getBytes()); + LedgerHandle readLh = bkc.openLedgerNoRecovery(lh.getId(), digestType, "".getBytes()); + assertEquals(LedgerHandle.INVALID_ENTRY_ID, readLh.getLastAddConfirmed()); + // add entries + for (int i = 0; i < numEntries; i++) { + lh.addEntry(("data" + i).getBytes()); + } + for (int i = 0; i < numEntries; i++) { + ServerConfiguration[] confs = new ServerConfiguration[numEntries - 1]; + for (int j = 0; j < numEntries - 1; j++) { + int idx = (i + 1 + j) % numEntries; + confs[j] = killBookie(lh.getLedgerMetadata().currentEnsemble.get(idx)); + } + + final AtomicBoolean entryAsExpected = new AtomicBoolean(false); + final AtomicBoolean success = new AtomicBoolean(false); + final AtomicInteger numCallbacks = new AtomicInteger(0); + final CountDownLatch readComplete = new CountDownLatch(1); + final int entryId = i; + readLh.asyncTryReadLastConfirmed(new AsyncCallback.ReadLastConfirmedCallback() { + @Override + public void readLastConfirmedComplete(int rc, long lastConfirmed, Object ctx) { + numCallbacks.incrementAndGet(); + if (BKException.Code.OK == rc) { + success.set(true); + entryAsExpected.set(lastConfirmed == (entryId - 1)); + } else { + System.out.println("Return value" + rc); + success.set(false); + entryAsExpected.set(false); + } + readComplete.countDown(); + } + }, null); + readComplete.await(); + assertTrue(success.get()); + assertTrue(entryAsExpected.get()); + assertTrue(numCallbacks.get() == 1); + + lh.close(); + readLh.close(); + + // start the bookies + for (ServerConfiguration conf : confs) { + bs.add(startBookie(conf)); + bsConfs.add(conf); + } + } + } +} diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java index 48b8bab..95a0506 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java @@ -38,6 +38,8 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.NavigableMap; +import java.util.Observable; +import java.util.Observer; import java.util.Queue; import java.util.Random; import java.util.Set; @@ -418,5 +420,10 @@ public class GcLedgersTest extends LedgerManagerTestCase { @Override public void flushEntriesLocationsIndex() throws IOException { } + + @Override + public Observable waitForLastAddConfirmedUpdate(long ledgerId, long previoisLAC, Observer observer) throws IOException { + return null; + } } } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java index 747398a..c6f2a36 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java @@ -21,12 +21,14 @@ package org.apache.bookkeeper.meta; +import io.netty.buffer.ByteBuf; import java.io.IOException; import java.util.Arrays; import java.util.Collection; import java.util.Map; import java.util.NavigableMap; - +import java.util.Observable; +import java.util.Observer; import org.apache.bookkeeper.bookie.BookieException; import org.apache.bookkeeper.bookie.CheckpointSource; import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint; @@ -43,17 +45,12 @@ import org.junit.Before; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import io.netty.buffer.ByteBuf; /** * Test case to run over serveral ledger managers */ @RunWith(Parameterized.class) public abstract class LedgerManagerTestCase extends BookKeeperClusterTestCase { - static final Logger LOG = LoggerFactory.getLogger(LedgerManagerTestCase.class); protected LedgerManagerFactory ledgerManagerFactory; protected LedgerManager ledgerManager = null; @@ -208,14 +205,16 @@ public abstract class LedgerManagerTestCase extends BookKeeperClusterTestCase { } @Override - public void setExplicitlac(long ledgerId, ByteBuf lac) throws IOException { - // TODO Auto-generated method stub + public Observable waitForLastAddConfirmedUpdate(long ledgerId, long previoisLAC, Observer observer) throws IOException { + return null; + } + @Override + public void setExplicitlac(long ledgerId, ByteBuf lac) throws IOException { } @Override public ByteBuf getExplicitLac(long ledgerId) { - // TODO Auto-generated method stub return null; } } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java index 9106841..a2f66bb 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java @@ -214,6 +214,19 @@ public abstract class BookKeeperClusterTestCase { return conf; } + protected void stopAllBookies() throws Exception { + for (BookieServer server : bs) { + server.shutdown(); + } + bs.clear(); + } + + protected void startAllBookies() throws Exception { + for (ServerConfiguration conf : bsConfs) { + bs.add(startBookie(conf)); + } + } + /** * Get bookie address for bookie at index */ -- To stop receiving notification emails like this one, please contact ['"commits@bookkeeper.apache.org" <commits@bookkeeper.apache.org>'].