Repository: bookkeeper Updated Branches: refs/heads/master 5d43260e8 -> 0f81461d2
BOOKKEEPER-1048: Use ByteBuf in LedgerStorage interface To pass ref-counted buffer from Netty directly to the storage and the Journal, we need to have LedgerStorage to accept ByteBuf instead of ByteBuffer #### Note This commit is on top of BOOKKEEPER-1048 / #138. Once that gets merged, I will rebase. Posting now to get Jenkins run. Please review last commit f53f772f79d0a334edc0f05e66edb7cc645b1ffa in this PR for now. Author: Matteo Merli <[email protected]> Reviewers: Jia Zhai <None>, Sijie Guo <None> Closes #139 from merlimat/bytebuf-in-ledger-storage Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/0f81461d Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/0f81461d Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/0f81461d Branch: refs/heads/master Commit: 0f81461d2d1dc5cf9db4de9a46599d7d64e3dac6 Parents: 5d43260 Author: Matteo Merli <[email protected]> Authored: Mon May 15 15:31:56 2017 -0700 Committer: Matteo Merli <[email protected]> Committed: Mon May 15 15:31:56 2017 -0700 ---------------------------------------------------------------------- .../org/apache/bookkeeper/bookie/Bookie.java | 61 +++++++++++--------- .../apache/bookkeeper/bookie/EntryKeyValue.java | 7 ++- .../apache/bookkeeper/bookie/EntryMemTable.java | 2 +- .../org/apache/bookkeeper/bookie/FileInfo.java | 21 ++++--- .../bookkeeper/bookie/IndexPersistenceMgr.java | 6 +- .../bookie/InterleavedLedgerStorage.java | 33 ++++++----- .../org/apache/bookkeeper/bookie/Journal.java | 33 +++++++---- .../apache/bookkeeper/bookie/LedgerCache.java | 6 +- .../bookkeeper/bookie/LedgerCacheImpl.java | 6 +- .../bookkeeper/bookie/LedgerDescriptor.java | 15 ++--- .../bookkeeper/bookie/LedgerDescriptorImpl.java | 17 +++--- .../bookie/LedgerDescriptorReadOnlyImpl.java | 5 +- .../apache/bookkeeper/bookie/LedgerStorage.java | 12 ++-- .../bookkeeper/bookie/SortedLedgerStorage.java | 21 ++++--- .../bookkeeper/proto/BookieProtoEncoding.java | 35 +++++------ .../apache/bookkeeper/proto/BookieProtocol.java | 9 +-- .../bookkeeper/proto/ReadEntryProcessor.java | 9 ++- .../bookkeeper/proto/ReadEntryProcessorV3.java | 11 ++-- .../bookkeeper/proto/ReadLacProcessorV3.java | 15 +++-- .../bookkeeper/proto/ResponseBuilder.java | 8 +-- .../bookkeeper/proto/WriteEntryProcessor.java | 6 +- .../bookkeeper/proto/WriteEntryProcessorV3.java | 5 +- .../bookkeeper/proto/WriteLacProcessorV3.java | 3 +- .../bookkeeper/bookie/BookieJournalTest.java | 15 ++--- .../bookkeeper/bookie/CompactionTest.java | 18 +++--- .../apache/bookkeeper/bookie/EntryLogTest.java | 42 +++++++------- .../bookkeeper/bookie/LedgerCacheTest.java | 20 ++++--- .../bookkeeper/bookie/TestSyncThread.java | 17 +++--- .../bookkeeper/client/BookKeeperCloseTest.java | 9 ++- .../bookkeeper/client/LedgerCloseTest.java | 8 ++- .../bookkeeper/client/LedgerRecoveryTest.java | 10 ++-- .../apache/bookkeeper/meta/GcLedgersTest.java | 12 ++-- .../bookkeeper/meta/LedgerManagerTestCase.java | 12 ++-- .../proto/TestPerChannelBookieClient.java | 2 +- .../replication/AuditorPeriodicCheckTest.java | 4 +- .../bookkeeper/test/ConcurrentLedgerTest.java | 15 +++-- 36 files changed, 292 insertions(+), 238 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java index c743ef4..090574c 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java @@ -22,6 +22,8 @@ package org.apache.bookkeeper.bookie; import static com.google.common.base.Charsets.UTF_8; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import java.io.File; import java.io.FileNotFoundException; @@ -49,8 +51,10 @@ import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirExcepti import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.jmx.BKMBeanInfo; import org.apache.bookkeeper.jmx.BKMBeanRegistry; + import org.apache.bookkeeper.meta.LedgerManager; import org.apache.bookkeeper.meta.LedgerManagerFactory; + import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.net.DNS; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback; @@ -68,6 +72,7 @@ import org.apache.bookkeeper.versioning.Versioned; import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy; import org.apache.bookkeeper.zookeeper.ZooKeeperClient; import org.apache.commons.io.FileUtils; + import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.NodeExistsException; @@ -75,6 +80,7 @@ import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; + import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; @@ -484,7 +490,7 @@ public class Bookie extends BookieCriticalThread { Versioned<Cookie> zkCookie = null; try { zkCookie = Cookie.readFromZooKeeper(zk, conf); - // If allowStorageExpansion option is set, we should + // If allowStorageExpansion option is set, we should // make sure that the new set of ledger/index dirs // is a super set of the old; else, we fail the cookie check masterCookie.verifyIsSuperSet(zkCookie.getValue()); @@ -773,7 +779,7 @@ public class Bookie extends BookieCriticalThread { LedgerDescriptor handle = handles.getHandle(ledgerId, key); recBuff.rewind(); - handle.addEntry(recBuff); + handle.addEntry(Unpooled.wrappedBuffer(recBuff)); } } catch (NoLedgerException nsle) { LOG.debug("Skip replaying entries of ledger {} since it was deleted.", ledgerId); @@ -1346,9 +1352,10 @@ public class Bookie extends BookieCriticalThread { * * @throws BookieException if masterKey does not match the master key of the ledger */ - private LedgerDescriptor getLedgerForEntry(ByteBuffer entry, final byte[] masterKey) + private LedgerDescriptor getLedgerForEntry(ByteBuf entry, final byte[] masterKey) throws IOException, BookieException { - final long ledgerId = entry.getLong(); + final long ledgerId = entry.getLong(entry.readerIndex()); + LedgerDescriptor l = handles.getHandle(ledgerId, masterKey); if (masterKeyCache.get(ledgerId) == null) { // Force the load into masterKey cache @@ -1376,16 +1383,16 @@ public class Bookie extends BookieCriticalThread { /** * Add an entry to a ledger as specified by handle. */ - private void addEntryInternal(LedgerDescriptor handle, ByteBuffer entry, WriteCallback cb, Object ctx) + private void addEntryInternal(LedgerDescriptor handle, ByteBuf entry, WriteCallback cb, Object ctx) throws IOException, BookieException { long ledgerId = handle.getLedgerId(); - entry.rewind(); long entryId = handle.addEntry(entry); - entry.rewind(); - writeBytes.add(entry.remaining()); + writeBytes.add(entry.readableBytes()); - LOG.trace("Adding {}@{}", entryId, ledgerId); + if (LOG.isTraceEnabled()) { + LOG.trace("Adding {}@{}", entryId, ledgerId); + } getJournal(ledgerId).logAddEntry(entry, cb, ctx); } @@ -1395,7 +1402,7 @@ public class Bookie extends BookieCriticalThread { * so that they exist on a quorum of bookies. The corresponding client side call for this * is not exposed to users. */ - public void recoveryAddEntry(ByteBuffer entry, WriteCallback cb, Object ctx, byte[] masterKey) + public void recoveryAddEntry(ByteBuf entry, WriteCallback cb, Object ctx, byte[] masterKey) throws IOException, BookieException { long requestNanos = MathUtils.nowInNano(); boolean success = false; @@ -1403,7 +1410,7 @@ public class Bookie extends BookieCriticalThread { try { LedgerDescriptor handle = getLedgerForEntry(entry, masterKey); synchronized (handle) { - entrySize = entry.remaining(); + entrySize = entry.readableBytes(); addEntryInternal(handle, entry, cb, ctx); } success = true; @@ -1419,15 +1426,16 @@ public class Bookie extends BookieCriticalThread { recoveryAddEntryStats.registerFailedEvent(elapsedNanos, TimeUnit.NANOSECONDS); addBytesStats.registerFailedValue(entrySize); } + + entry.release(); } } - public void setExplicitLac(ByteBuffer entry, Object ctx, byte[] masterKey) + public void setExplicitLac(ByteBuf entry, Object ctx, byte[] masterKey) throws IOException, BookieException { try { - long ledgerId = entry.getLong(); + long ledgerId = entry.getLong(entry.readerIndex()); LedgerDescriptor handle = handles.getHandle(ledgerId, masterKey); - entry.rewind(); synchronized (handle) { handle.setExplicitLac(entry); } @@ -1437,8 +1445,8 @@ public class Bookie extends BookieCriticalThread { } } - public ByteBuffer getExplicitLac(long ledgerId) throws IOException, Bookie.NoLedgerException { - ByteBuffer lac; + public ByteBuf getExplicitLac(long ledgerId) throws IOException, Bookie.NoLedgerException { + ByteBuf lac; LedgerDescriptor handle = handles.getReadOnlyHandle(ledgerId); synchronized (handle) { lac = handle.getExplicitLac(); @@ -1450,7 +1458,7 @@ public class Bookie extends BookieCriticalThread { * Add entry to a ledger. * @throws BookieException.LedgerFencedException if the ledger is fenced */ - public void addEntry(ByteBuffer entry, WriteCallback cb, Object ctx, byte[] masterKey) + public void addEntry(ByteBuf entry, WriteCallback cb, Object ctx, byte[] masterKey) throws IOException, BookieException { long requestNanos = MathUtils.nowInNano(); boolean success = false; @@ -1462,7 +1470,7 @@ public class Bookie extends BookieCriticalThread { throw BookieException .create(BookieException.Code.LedgerFencedException); } - entrySize = entry.remaining(); + entrySize = entry.readableBytes(); addEntryInternal(handle, entry, cb, ctx); } success = true; @@ -1478,6 +1486,8 @@ public class Bookie extends BookieCriticalThread { addEntryStats.registerFailedEvent(elapsedNanos, TimeUnit.NANOSECONDS); addBytesStats.registerFailedValue(entrySize); } + + entry.release(); } } @@ -1511,7 +1521,7 @@ public class Bookie extends BookieCriticalThread { } } - public ByteBuffer readEntry(long ledgerId, long entryId) + public ByteBuf readEntry(long ledgerId, long entryId) throws IOException, NoLedgerException { long requestNanos = MathUtils.nowInNano(); boolean success = false; @@ -1519,9 +1529,8 @@ public class Bookie extends BookieCriticalThread { try { LedgerDescriptor handle = handles.getReadOnlyHandle(ledgerId); LOG.trace("Reading {}@{}", entryId, ledgerId); - ByteBuffer entry = handle.readEntry(entryId); - entrySize = entry.remaining(); - readBytes.add(entrySize); + ByteBuf entry = handle.readEntry(entryId); + readBytes.add(entry.readableBytes()); success = true; return entry; } finally { @@ -1667,11 +1676,9 @@ public class Bookie extends BookieCriticalThread { CounterCallback cb = new CounterCallback(); long start = MathUtils.now(); for (int i = 0; i < 100000; i++) { - ByteBuffer buff = ByteBuffer.allocate(1024); - buff.putLong(1); - buff.putLong(i); - buff.limit(1024); - buff.position(0); + ByteBuf buff = Unpooled.buffer(1024); + buff.writeLong(1); + buff.writeLong(i); cb.incCount(); b.addEntry(buff, cb, null, new byte[0]); } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryKeyValue.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryKeyValue.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryKeyValue.java index dab5396..42a1f34 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryKeyValue.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryKeyValue.java @@ -19,6 +19,9 @@ */ package org.apache.bookkeeper.bookie; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; + import java.nio.ByteBuffer; /** @@ -82,8 +85,8 @@ public class EntryKeyValue extends EntryKey { * * @return the value */ - public ByteBuffer getValueAsByteBuffer() { - return ByteBuffer.wrap(getBuffer(), getOffset(), getLength()); + public ByteBuf getValueAsByteBuffer() { + return Unpooled.wrappedBuffer(getBuffer(), getOffset(), getLength()); } /** http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java index ff14d03..214b286 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java @@ -251,7 +251,7 @@ public class EntryMemTable { ledger = kv.getLedgerId(); if (ledgerGC != ledger) { try { - flusher.process(ledger, kv.getEntryId(), kv.getValueAsByteBuffer()); + flusher.process(ledger, kv.getEntryId(), kv.getValueAsByteBuffer().nioBuffer()); } catch (NoLedgerException exception) { ledgerGC = ledger; } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java index adf148c..90f731a 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java @@ -36,6 +36,9 @@ import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; + /** * This is the file handle for a ledger's index file that maps entry ids to location. * It is used by LedgerCache. @@ -114,31 +117,31 @@ class FileInfo { return sizeSinceLastwrite; } - public ByteBuffer getExplicitLac() { - ByteBuffer retLac = null; + public ByteBuf getExplicitLac() { + ByteBuf retLac = null; synchronized(this) { LOG.debug("fileInfo:GetLac: {}", explicitLac); if (explicitLac != null) { - retLac = ByteBuffer.allocate(explicitLac.capacity()); + retLac = Unpooled.buffer(explicitLac.capacity()); explicitLac.rewind();//copy from the beginning - retLac.put(explicitLac); + retLac.writeBytes(explicitLac); explicitLac.rewind(); - retLac.flip(); + return retLac; } } return retLac; } - public void setExplicitLac(ByteBuffer lac) { + public void setExplicitLac(ByteBuf lac) { synchronized(this) { if (explicitLac == null) { explicitLac = ByteBuffer.allocate(lac.capacity()); } - explicitLac.put(lac); + lac.readBytes(explicitLac); explicitLac.rewind(); - + // skip the ledger id - explicitLac.getLong(); + explicitLac.getLong(); long explicitLacValue = explicitLac.getLong(); setLastAddConfirmed(explicitLacValue); explicitLac.rewind(); http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java index 1ea000c..708bdb3 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java @@ -43,6 +43,8 @@ import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; +import io.netty.buffer.ByteBuf; + import static org.apache.bookkeeper.bookie.BookKeeperServerStats.LEDGER_CACHE_NUM_EVICTED_LEDGERS; import static org.apache.bookkeeper.bookie.BookKeeperServerStats.NUM_OPEN_LEDGERS; @@ -389,7 +391,7 @@ public class IndexPersistenceMgr { } } - void setExplicitLac(long ledgerId, ByteBuffer lac) throws IOException { + void setExplicitLac(long ledgerId, ByteBuf lac) throws IOException { FileInfo fi = null; try { fi = getFileInfo(ledgerId, null); @@ -402,7 +404,7 @@ public class IndexPersistenceMgr { } } - public ByteBuffer getExplicitLac(long ledgerId) { + public ByteBuf getExplicitLac(long ledgerId) { FileInfo fi = null; try { fi = getFileInfo(ledgerId, null); http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java index afd65dc..61a81c4 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java @@ -21,6 +21,9 @@ package org.apache.bookkeeper.bookie; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; + import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; @@ -219,11 +222,13 @@ public class InterleavedLedgerStorage implements CompactableLedgerStorage, Entry return ledgerCache.isFenced(ledgerId); } - public void setExplicitlac(long ledgerId, ByteBuffer lac) throws IOException { + @Override + public void setExplicitlac(long ledgerId, ByteBuf lac) throws IOException { ledgerCache.setExplicitLac(ledgerId, lac); } - public ByteBuffer getExplicitLac(long ledgerId) { + @Override + public ByteBuf getExplicitLac(long ledgerId) { return ledgerCache.getExplicitLac(ledgerId); } @@ -246,13 +251,13 @@ public class InterleavedLedgerStorage implements CompactableLedgerStorage, Entry public long getLastAddConfirmed(long ledgerId) throws IOException { Long lac = ledgerCache.getLastAddConfirmed(ledgerId); if (lac == null) { - ByteBuffer bb = getEntry(ledgerId, BookieProtocol.LAST_ADD_CONFIRMED); + ByteBuf bb = getEntry(ledgerId, BookieProtocol.LAST_ADD_CONFIRMED); if (null == bb) { return BookieProtocol.INVALID_ENTRY_ID; } else { - bb.getLong(); // ledger id - bb.getLong(); // entry id - lac = bb.getLong(); + bb.readLong(); // ledger id + bb.readLong(); // entry id + lac = bb.readLong(); lac = ledgerCache.updateLastAddConfirmed(ledgerId, lac); } } @@ -260,21 +265,19 @@ public class InterleavedLedgerStorage implements CompactableLedgerStorage, Entry } @Override - synchronized public long addEntry(ByteBuffer entry) throws IOException { - long ledgerId = entry.getLong(); - long entryId = entry.getLong(); - long lac = entry.getLong(); - entry.rewind(); + synchronized public long addEntry(ByteBuf entry) throws IOException { + long ledgerId = entry.getLong(entry.readerIndex() + 0); + long entryId = entry.getLong(entry.readerIndex() + 8); + long lac = entry.getLong(entry.readerIndex() + 16); - processEntry(ledgerId, entryId, entry); + processEntry(ledgerId, entryId, entry.nioBuffer()); ledgerCache.updateLastAddConfirmed(ledgerId, lac); - return entryId; } @Override - public ByteBuffer getEntry(long ledgerId, long entryId) throws IOException { + public ByteBuf getEntry(long ledgerId, long entryId) throws IOException { long offset; /* * If entryId is BookieProtocol.LAST_ADD_CONFIRMED, then return the last written. @@ -305,7 +308,7 @@ public class InterleavedLedgerStorage implements CompactableLedgerStorage, Entry try { byte[] retBytes = entryLogger.readEntry(ledgerId, entryId, offset); success = true; - return ByteBuffer.wrap(retBytes); + return Unpooled.wrappedBuffer(retBytes); } finally { if (success) { getEntryStats.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS); http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java index a39c3fa..c679ee9 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java @@ -21,6 +21,9 @@ package org.apache.bookkeeper.bookie; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; + import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; @@ -36,6 +39,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import com.google.common.base.Stopwatch; + import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback; @@ -272,14 +276,14 @@ class Journal extends BookieCriticalThread implements CheckpointSource { * Journal Entry to Record */ private class QueueEntry implements Runnable { - ByteBuffer entry; + ByteBuf entry; long ledgerId; long entryId; WriteCallback cb; Object ctx; long enqueueTime; - QueueEntry(ByteBuffer entry, long ledgerId, long entryId, + QueueEntry(ByteBuf entry, long ledgerId, long entryId, WriteCallback cb, Object ctx, long enqueueTime) { this.entry = entry.duplicate(); this.cb = cb; @@ -750,14 +754,20 @@ class Journal extends BookieCriticalThread implements CheckpointSource { } } + public void logAddEntry(ByteBuffer entry, WriteCallback cb, Object ctx) { + logAddEntry(Unpooled.wrappedBuffer(entry), cb, ctx); + } + /** * record an add entry operation in journal */ - public void logAddEntry(ByteBuffer entry, WriteCallback cb, Object ctx) { - long ledgerId = entry.getLong(); - long entryId = entry.getLong(); - entry.rewind(); + public void logAddEntry(ByteBuf entry, WriteCallback cb, Object ctx) { + long ledgerId = entry.getLong(entry.readerIndex() + 0); + long entryId = entry.getLong(entry.readerIndex() + 8); journalQueueSize.inc(); + + //Retain entry until it gets written to journal + entry.retain(); queue.add(new QueueEntry(entry, ledgerId, entryId, cb, ctx, MathUtils.nowInNano())); } @@ -927,24 +937,25 @@ class Journal extends BookieCriticalThread implements CheckpointSource { continue; } - journalWriteBytes.add(qe.entry.remaining()); + journalWriteBytes.add(qe.entry.readableBytes()); journalQueueSize.dec(); - batchSize += (4 + qe.entry.remaining()); + batchSize += (4 + qe.entry.readableBytes()); lenBuff.clear(); - lenBuff.putInt(qe.entry.remaining()); + lenBuff.putInt(qe.entry.readableBytes()); lenBuff.flip(); // preAlloc based on size - logFile.preAllocIfNeeded(4 + qe.entry.remaining()); + logFile.preAllocIfNeeded(4 + qe.entry.readableBytes()); // // we should be doing the following, but then we run out of // direct byte buffers // logFile.write(new ByteBuffer[] { lenBuff, qe.entry }); bc.write(lenBuff); - bc.write(qe.entry); + bc.write(qe.entry.nioBuffer()); + qe.entry.release(); toFlush.add(qe); qe = null; http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java index e004cb6..efb67dc 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java @@ -25,6 +25,8 @@ import java.io.Closeable; import java.io.IOException; import java.nio.ByteBuffer; +import io.netty.buffer.ByteBuf; + /** * This class maps a ledger entry number into a location (entrylogid, offset) in * an entry log file. It does user level caching to more efficiently manage disk @@ -51,6 +53,6 @@ interface LedgerCache extends Closeable { void deleteLedger(long ledgerId) throws IOException; LedgerCacheBean getJMXBean(); - void setExplicitLac(long ledgerId, ByteBuffer lac) throws IOException; - ByteBuffer getExplicitLac(long ledgerId); + void setExplicitLac(long ledgerId, ByteBuf lac) throws IOException; + ByteBuf getExplicitLac(long ledgerId); } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java index cece79f..515b64b 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java @@ -31,6 +31,8 @@ import org.apache.bookkeeper.util.SnapshotMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.netty.buffer.ByteBuf; + /** * Implementation of LedgerCache interface. * This class serves two purposes. @@ -137,11 +139,11 @@ public class LedgerCacheImpl implements LedgerCache { return indexPersistenceManager.isFenced(ledgerId); } - public void setExplicitLac(long ledgerId, ByteBuffer lac) throws IOException { + public void setExplicitLac(long ledgerId, ByteBuf lac) throws IOException { indexPersistenceManager.setExplicitLac(ledgerId, lac); } - public ByteBuffer getExplicitLac(long ledgerId) { + public ByteBuf getExplicitLac(long ledgerId) { return indexPersistenceManager.getExplicitLac(ledgerId); } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java index bcb0c30..9fe1629 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java @@ -21,12 +21,9 @@ package org.apache.bookkeeper.bookie; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Arrays; +import io.netty.buffer.ByteBuf; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.io.IOException; /** * Implements a ledger inside a bookie. In particular, it implements operations @@ -57,12 +54,12 @@ public abstract class LedgerDescriptor { abstract boolean setFenced() throws IOException; abstract boolean isFenced() throws IOException; - abstract long addEntry(ByteBuffer entry) throws IOException; - abstract ByteBuffer readEntry(long entryId) throws IOException; + abstract long addEntry(ByteBuf entry) throws IOException; + abstract ByteBuf readEntry(long entryId) throws IOException; abstract long getLastAddConfirmed() throws IOException; - abstract void setExplicitLac(ByteBuffer entry) throws IOException; + abstract void setExplicitLac(ByteBuf entry) throws IOException; - abstract ByteBuffer getExplicitLac(); + abstract ByteBuf getExplicitLac(); } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java index bf1c129..6602392 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java @@ -21,8 +21,10 @@ package org.apache.bookkeeper.bookie; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; + import java.io.IOException; -import java.nio.ByteBuffer; import java.util.Arrays; import org.slf4j.Logger; @@ -69,28 +71,27 @@ public class LedgerDescriptorImpl extends LedgerDescriptor { } @Override - void setExplicitLac(ByteBuffer lac) throws IOException { + void setExplicitLac(ByteBuf lac) throws IOException { ledgerStorage.setExplicitlac(ledgerId, lac); } @Override - ByteBuffer getExplicitLac() { + ByteBuf getExplicitLac() { return ledgerStorage.getExplicitLac(ledgerId); } - @Override - long addEntry(ByteBuffer entry) throws IOException { - long ledgerId = entry.getLong(); + + long addEntry(ByteBuf entry) throws IOException { + long ledgerId = entry.getLong(entry.readerIndex()); if (ledgerId != this.ledgerId) { throw new IOException("Entry for ledger " + ledgerId + " was sent to " + this.ledgerId); } - entry.rewind(); return ledgerStorage.addEntry(entry); } @Override - ByteBuffer readEntry(long entryId) throws IOException { + ByteBuf readEntry(long entryId) throws IOException { return ledgerStorage.getEntry(ledgerId, entryId); } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorReadOnlyImpl.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorReadOnlyImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorReadOnlyImpl.java index 29dcfaf..40bf988 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorReadOnlyImpl.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorReadOnlyImpl.java @@ -21,8 +21,9 @@ package org.apache.bookkeeper.bookie; +import io.netty.buffer.ByteBuf; + import java.io.IOException; -import java.nio.ByteBuffer; /** * Implements a ledger inside a bookie. In particular, it implements operations * to write entries to a ledger and read entries from a ledger. @@ -39,7 +40,7 @@ public class LedgerDescriptorReadOnlyImpl extends LedgerDescriptorImpl { } @Override - long addEntry(ByteBuffer entry) throws IOException { + long addEntry(ByteBuf entry) throws IOException { assert false; throw new IOException("Invalid action on read only descriptor"); } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java index fbdd6b9..4587460 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java @@ -21,13 +21,13 @@ package org.apache.bookkeeper.bookie; +import io.netty.buffer.ByteBuf; + import java.io.IOException; -import java.nio.ByteBuffer; import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.stats.StatsLogger; - import org.apache.bookkeeper.jmx.BKMBeanInfo; import org.apache.bookkeeper.meta.LedgerManager; @@ -101,12 +101,12 @@ public interface LedgerStorage { * Add an entry to the storage. * @return the entry id of the entry added */ - long addEntry(ByteBuffer entry) throws IOException; + long addEntry(ByteBuf entry) throws IOException; /** * Read an entry from storage */ - ByteBuffer getEntry(long ledgerId, long entryId) throws IOException; + ByteBuf getEntry(long ledgerId, long entryId) throws IOException; /** * Get last add confirmed. @@ -162,7 +162,7 @@ public interface LedgerStorage { */ BKMBeanInfo getJMXBean(); - void setExplicitlac(long ledgerId, ByteBuffer lac) throws IOException; + void setExplicitlac(long ledgerId, ByteBuf lac) throws IOException; - ByteBuffer getExplicitLac(long ledgerId); + ByteBuf getExplicitLac(long ledgerId); } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java index 698dbd3..105a8b5 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java @@ -20,6 +20,8 @@ */ package org.apache.bookkeeper.bookie; +import io.netty.buffer.ByteBuf; + import java.io.IOException; import java.nio.ByteBuffer; import java.util.concurrent.Executors; @@ -27,6 +29,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import com.google.common.util.concurrent.ThreadFactoryBuilder; + import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.meta.LedgerManager; @@ -93,12 +96,12 @@ public class SortedLedgerStorage extends InterleavedLedgerStorage } @Override - public long addEntry(ByteBuffer entry) throws IOException { - long ledgerId = entry.getLong(); - long entryId = entry.getLong(); - long lac = entry.getLong(); - entry.rewind(); - memTable.addEntry(ledgerId, entryId, entry, this); + public long addEntry(ByteBuf entry) throws IOException { + long ledgerId = entry.getLong(entry.readerIndex() + 0); + long entryId = entry.getLong(entry.readerIndex() + 8); + long lac = entry.getLong(entry.readerIndex() + 16); + + memTable.addEntry(ledgerId, entryId, entry.nioBuffer(), this); ledgerCache.updateLastAddConfirmed(ledgerId, lac); return entryId; } @@ -108,7 +111,7 @@ public class SortedLedgerStorage extends InterleavedLedgerStorage * @param ledgerId * @return */ - private ByteBuffer getLastEntryId(long ledgerId) throws IOException { + private ByteBuf getLastEntryId(long ledgerId) throws IOException { EntryKeyValue kv = memTable.getLastEntry(ledgerId); if (null != kv) { return kv.getValueAsByteBuffer(); @@ -118,11 +121,11 @@ public class SortedLedgerStorage extends InterleavedLedgerStorage } @Override - public ByteBuffer getEntry(long ledgerId, long entryId) throws IOException { + public ByteBuf getEntry(long ledgerId, long entryId) throws IOException { if (entryId == BookieProtocol.LAST_ADD_CONFIRMED) { return getLastEntryId(ledgerId); } - ByteBuffer buffToRet; + ByteBuf buffToRet; try { buffToRet = super.getEntry(ledgerId, entryId); } catch (Bookie.NoEntryException nee) { http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java index 148b31d..b1f86ae 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java @@ -20,17 +20,6 @@ */ package org.apache.bookkeeper.proto; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; -import io.netty.buffer.ByteBufInputStream; -import io.netty.buffer.ByteBufOutputStream; -import io.netty.buffer.CompositeByteBuf; -import io.netty.buffer.Unpooled; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelHandler.Sharable; -import io.netty.handler.codec.MessageToMessageDecoder; -import io.netty.handler.codec.MessageToMessageEncoder; - import java.io.IOException; import java.util.List; @@ -44,6 +33,17 @@ import com.google.protobuf.ExtensionRegistry; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.MessageLite; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.ByteBufInputStream; +import io.netty.buffer.ByteBufOutputStream; +import io.netty.buffer.ByteBufUtil; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandler.Sharable; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToMessageDecoder; +import io.netty.handler.codec.MessageToMessageEncoder; + public class BookieProtoEncoding { private final static Logger LOG = LoggerFactory.getLogger(BookieProtoEncoding.class); @@ -146,10 +146,8 @@ public class BookieProtoEncoding { packet.readBytes(masterKey, 0, BookieProtocol.MASTER_KEY_LENGTH); // Read ledger and entry id without advancing the reader index - packet.markReaderIndex(); - ledgerId = packet.readLong(); - entryId = packet.readLong(); - packet.resetReaderIndex(); + ledgerId = packet.getLong(packet.readerIndex()); + entryId = packet.getLong(packet.readerIndex() + 8); return new BookieProtocol.AddRequest(h.getVersion(), ledgerId, entryId, flags, masterKey, packet.retain()); case BookieProtocol.READENTRY: ledgerId = packet.readLong(); @@ -169,6 +167,7 @@ public class BookieProtoEncoding { builder.mergeFrom(new ByteBufInputStream(packet), extensionRegistry); return new BookieProtocol.AuthRequest(h.getVersion(), builder.build()); } + return packet; } } @@ -238,8 +237,9 @@ public class BookieProtoEncoding { entryId = buffer.readLong(); if (rc == BookieProtocol.EOK) { + ByteBuf content = buffer.slice(); return new BookieProtocol.ReadResponse(header.getVersion(), rc, - ledgerId, entryId, buffer.slice()); + ledgerId, entryId, content.retain()); } else { return new BookieProtocol.ReadResponse(header.getVersion(), rc, ledgerId, entryId); @@ -327,9 +327,6 @@ public class BookieProtoEncoding { @Override protected void encode(ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception { - if (LOG.isDebugEnabled()) { - LOG.debug("Encode request {} to channel {}.", msg, ctx.channel()); - } if (msg instanceof BookkeeperProtocol.Request) { out.add(REQ_V3.encode(msg, ctx.alloc())); } else if (msg instanceof BookieProtocol.Request) { http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java index 094daab..f0cfa58 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java @@ -22,8 +22,7 @@ package org.apache.bookkeeper.proto; */ import io.netty.buffer.ByteBuf; - -import java.nio.ByteBuffer; +import io.netty.buffer.Unpooled; import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage; @@ -262,10 +261,6 @@ public interface BookieProtocol { return data; } - ByteBuffer getDataAsByteBuffer() { - return data.nioBuffer().slice(); - } - boolean isRecoveryAdd() { return (flags & FLAG_RECOVERY_ADD) == FLAG_RECOVERY_ADD; } @@ -351,7 +346,7 @@ public interface BookieProtocol { ReadResponse(byte protocolVersion, int errorCode, long ledgerId, long entryId) { super(protocolVersion, READENTRY, errorCode, ledgerId, entryId); - this.data = null; + this.data = Unpooled.EMPTY_BUFFER; } ReadResponse(byte protocolVersion, int errorCode, long ledgerId, long entryId, ByteBuf data) { http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java index bd98374..02fa1c0 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java @@ -17,10 +17,11 @@ */ package org.apache.bookkeeper.proto; +import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; +import io.netty.util.ReferenceCountUtil; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -49,7 +50,7 @@ class ReadEntryProcessor extends PacketProcessorBase { LOG.debug("Received new read request: {}", request); int errorCode = BookieProtocol.EIO; long startTimeNanos = MathUtils.nowInNano(); - ByteBuffer data = null; + ByteBuf data = null; try { Future<Boolean> fenceResult = null; if (read.isFencingRequest()) { @@ -63,7 +64,7 @@ class ReadEntryProcessor extends PacketProcessorBase { } } data = requestProcessor.bookie.readEntry(request.getLedgerId(), request.getEntryId()); - LOG.debug("##### Read entry ##### {}", data.remaining()); + LOG.debug("##### Read entry ##### {} -- ref-count: {}", data.readableBytes(), data.refCnt()); if (null != fenceResult) { // TODO: // currently we don't have readCallback to run in separated read @@ -127,6 +128,8 @@ class ReadEntryProcessor extends PacketProcessorBase { requestProcessor.readRequestStats); } else { + ReferenceCountUtil.release(data); + requestProcessor.readEntryStats.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS); sendResponse(errorCode, ResponseBuilder.buildErrorResponse(errorCode, read), http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java index fbfa71f..b04f6b9 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java @@ -17,10 +17,11 @@ */ package org.apache.bookkeeper.proto; +import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; +import io.netty.util.ReferenceCountUtil; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -65,7 +66,7 @@ class ReadEntryProcessorV3 extends PacketProcessorBaseV3 { LOG.debug("Received new read request: {}", request); StatusCode status; - ByteBuffer entryBody; + ByteBuf entryBody = null; try { Future<Boolean> fenceResult = null; if (readRequest.hasFlag() && readRequest.getFlag().equals(ReadRequest.Flag.FENCE_LEDGER)) { @@ -98,7 +99,7 @@ class ReadEntryProcessorV3 extends PacketProcessorBaseV3 { status = StatusCode.EIO; } else { status = StatusCode.EOK; - readResponse.setBody(ByteString.copyFrom(entryBody)); + readResponse.setBody(ByteString.copyFrom(entryBody.nioBuffer())); } } catch (InterruptedException ie) { LOG.error("Interrupting fence read entry (lid: {}, eid: {})", @@ -114,7 +115,7 @@ class ReadEntryProcessorV3 extends PacketProcessorBaseV3 { status = StatusCode.EIO; } } else { - readResponse.setBody(ByteString.copyFrom(entryBody)); + readResponse.setBody(ByteString.copyFrom(entryBody.nioBuffer())); status = StatusCode.EOK; } } catch (Bookie.NoLedgerException e) { @@ -142,6 +143,8 @@ class ReadEntryProcessorV3 extends PacketProcessorBaseV3 { TimeUnit.NANOSECONDS); } + ReferenceCountUtil.release(entryBody); + // Finally set status and return. The body would have been updated if // a read went through. readResponse.setStatus(status); http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacProcessorV3.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacProcessorV3.java index 0fbdbec..e9cc1cb 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacProcessorV3.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLacProcessorV3.java @@ -21,7 +21,6 @@ package org.apache.bookkeeper.proto; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.bookie.Bookie; @@ -36,7 +35,9 @@ import org.slf4j.LoggerFactory; import com.google.protobuf.ByteString; +import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; +import io.netty.util.ReferenceCountUtil; class ReadLacProcessorV3 extends PacketProcessorBaseV3 implements Runnable { private final static Logger logger = LoggerFactory.getLogger(ReadLacProcessorV3.class); @@ -61,14 +62,14 @@ class ReadLacProcessorV3 extends PacketProcessorBaseV3 implements Runnable { logger.debug("Received ReadLac request: {}", request); StatusCode status = StatusCode.EOK; - ByteBuffer lastEntry; - ByteBuffer lac; + ByteBuf lastEntry = null; + ByteBuf lac = null; try { lastEntry = requestProcessor.bookie.readEntry(ledgerId, BookieProtocol.LAST_ADD_CONFIRMED); lac = requestProcessor.bookie.getExplicitLac(ledgerId); if (lac != null) { - readLacResponse.setLacBody(ByteString.copyFrom(lac)); - readLacResponse.setLastEntryBody(ByteString.copyFrom(lastEntry)); + readLacResponse.setLacBody(ByteString.copyFrom(lac.nioBuffer())); + readLacResponse.setLastEntryBody(ByteString.copyFrom(lastEntry.nioBuffer())); } else { status = StatusCode.ENOENTRY; } @@ -78,7 +79,11 @@ class ReadLacProcessorV3 extends PacketProcessorBaseV3 implements Runnable { } catch (IOException e) { status = StatusCode.EIO; logger.error("IOException while performing readLac from ledger: {}", ledgerId); + } finally { + ReferenceCountUtil.release(lastEntry); + ReferenceCountUtil.release(lac); } + if (status == StatusCode.EOK) { requestProcessor.readLacStats.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS); http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ResponseBuilder.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ResponseBuilder.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ResponseBuilder.java index 1418437..c0be162 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ResponseBuilder.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ResponseBuilder.java @@ -20,9 +20,7 @@ */ package org.apache.bookkeeper.proto; -import io.netty.buffer.Unpooled; - -import java.nio.ByteBuffer; +import io.netty.buffer.ByteBuf; class ResponseBuilder { static BookieProtocol.Response buildErrorResponse(int errorCode, BookieProtocol.Request r) { @@ -41,8 +39,8 @@ class ResponseBuilder { r.getEntryId()); } - static BookieProtocol.Response buildReadResponse(ByteBuffer data, BookieProtocol.Request r) { + static BookieProtocol.Response buildReadResponse(ByteBuf data, BookieProtocol.Request r) { return new BookieProtocol.ReadResponse(r.getProtocolVersion(), BookieProtocol.EOK, - r.getLedgerId(), r.getEntryId(), Unpooled.wrappedBuffer(data)); + r.getLedgerId(), r.getEntryId(), data); } } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java index 46f7f7d..827aed9 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java @@ -62,11 +62,9 @@ class WriteEntryProcessor extends PacketProcessorBase implements WriteCallback { int rc = BookieProtocol.EOK; try { if (add.isRecoveryAdd()) { - requestProcessor.bookie.recoveryAddEntry(add.getDataAsByteBuffer(), - this, channel, add.getMasterKey()); + requestProcessor.bookie.recoveryAddEntry(add.getData(), this, channel, add.getMasterKey()); } else { - requestProcessor.bookie.addEntry(add.getDataAsByteBuffer(), - this, channel, add.getMasterKey()); + requestProcessor.bookie.addEntry(add.getData(), this, channel, add.getMasterKey()); } } catch (IOException e) { LOG.error("Error writing " + add, e); http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java index e34e894..b4e89f8 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java @@ -20,10 +20,11 @@ */ package org.apache.bookkeeper.proto; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.bookie.BookieException; @@ -102,7 +103,7 @@ class WriteEntryProcessorV3 extends PacketProcessorBaseV3 { }; StatusCode status = null; byte[] masterKey = addRequest.getMasterKey().toByteArray(); - ByteBuffer entryToAdd = addRequest.getBody().asReadOnlyByteBuffer(); + ByteBuf entryToAdd = Unpooled.wrappedBuffer(addRequest.getBody().asReadOnlyByteBuffer()); try { if (addRequest.hasFlag() && addRequest.getFlag().equals(AddRequest.Flag.RECOVERY_ADD)) { requestProcessor.bookie.recoveryAddEntry(entryToAdd, wcb, channel, masterKey); http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java index 097a573..e8ffb34 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java @@ -34,6 +34,7 @@ import org.apache.bookkeeper.util.MathUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.netty.buffer.Unpooled; import io.netty.channel.Channel; class WriteLacProcessorV3 extends PacketProcessorBaseV3 implements Runnable { @@ -69,7 +70,7 @@ class WriteLacProcessorV3 extends PacketProcessorBaseV3 implements Runnable { byte[] masterKey = writeLacRequest.getMasterKey().toByteArray(); try { - requestProcessor.bookie.setExplicitLac(lacToAdd, channel, masterKey); + requestProcessor.bookie.setExplicitLac(Unpooled.wrappedBuffer(lacToAdd), channel, masterKey); status = StatusCode.EOK; } catch (IOException e) { logger.error("Error saving lac for ledger:{}", http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java index 1dfa32e..711d38b 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java @@ -47,6 +47,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.junit.Test; import org.junit.After; + import static org.junit.Assert.*; public class BookieJournalTest { @@ -620,15 +621,15 @@ public class BookieJournalTest { b.readEntry(1, 99); // still able to read last entry, but it's junk - ByteBuffer buf = b.readEntry(1, 100); - assertEquals("Ledger Id is wrong", buf.getLong(), 1); - assertEquals("Entry Id is wrong", buf.getLong(), 100); - assertEquals("Last confirmed is wrong", buf.getLong(), 99); - assertEquals("Length is wrong", buf.getLong(), 100*1024); - buf.getLong(); // skip checksum + ByteBuf buf = b.readEntry(1, 100); + assertEquals("Ledger Id is wrong", buf.readLong(), 1); + assertEquals("Entry Id is wrong", buf.readLong(), 100); + assertEquals("Last confirmed is wrong", buf.readLong(), 99); + assertEquals("Length is wrong", buf.readLong(), 100*1024); + buf.readLong(); // skip checksum boolean allX = true; for (int i = 0; i < 1024; i++) { - byte x = buf.get(); + byte x = buf.readByte(); allX = allX && x == (byte)'X'; } assertFalse("Some of buffer should have been zeroed", allX); http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java index 6ae5e60..067b411 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java @@ -20,9 +20,11 @@ */ package org.apache.bookkeeper.bookie; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; + import java.io.File; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -51,6 +53,7 @@ import org.apache.bookkeeper.util.MathUtils; import org.apache.bookkeeper.util.TestUtils; import org.apache.bookkeeper.versioning.Version; import org.apache.zookeeper.AsyncCallback; + import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -571,14 +574,13 @@ public class CompactionTest extends BookKeeperClusterTestCase { storage.gcThread.doCompactEntryLogs(threshold); } - private ByteBuffer genEntry(long ledger, long entry, int size) { - ByteBuffer bb = ByteBuffer.wrap(new byte[size]); - bb.putLong(ledger); - bb.putLong(entry); - while (bb.hasRemaining()) { - bb.put((byte)0xFF); + private ByteBuf genEntry(long ledger, long entry, int size) { + ByteBuf bb = Unpooled.buffer(size); + bb.writeLong(ledger); + bb.writeLong(entry); + while (bb.isWritable()) { + bb.writeByte((byte)0xFF); } - bb.flip(); return bb; } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java index 4e1004c..7eac1a2 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java @@ -20,9 +20,12 @@ */ package org.apache.bookkeeper.bookie; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; + +import java.io.IOException; import java.io.File; import java.io.FileNotFoundException; -import java.io.IOException; import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -72,9 +75,9 @@ public class EntryLogTest { Bookie bookie = new Bookie(conf); // create some entries EntryLogger logger = ((InterleavedLedgerStorage)bookie.ledgerStorage).entryLogger; - logger.addEntry(1, generateEntry(1, 1)); - logger.addEntry(3, generateEntry(3, 1)); - logger.addEntry(2, generateEntry(2, 1)); + logger.addEntry(1, generateEntry(1, 1).nioBuffer()); + logger.addEntry(3, generateEntry(3, 1).nioBuffer()); + logger.addEntry(2, generateEntry(2, 1).nioBuffer()); logger.flush(); // now lets truncate the file to corrupt the last entry, which simulates a partial write File f = new File(curDir, "0.log"); @@ -91,13 +94,12 @@ public class EntryLogTest { assertNotNull(meta.getLedgersMap().get(3L)); } - private ByteBuffer generateEntry(long ledger, long entry) { + private ByteBuf generateEntry(long ledger, long entry) { byte[] data = ("ledger-" + ledger + "-" + entry).getBytes(); - ByteBuffer bb = ByteBuffer.wrap(new byte[8 + 8 + data.length]); - bb.putLong(ledger); - bb.putLong(entry); - bb.put(data); - bb.flip(); + ByteBuf bb = Unpooled.buffer(8 + 8 + data.length); + bb.writeLong(ledger); + bb.writeLong(entry); + bb.writeBytes(data); return bb; } @@ -120,7 +122,7 @@ public class EntryLogTest { EntryLogger logger = new EntryLogger(conf, bookie.getLedgerDirsManager()); for (int j=0; j<numEntries; j++) { - positions[i][j] = logger.addEntry(i, generateEntry(i, j)); + positions[i][j] = logger.addEntry(i, generateEntry(i, j).nioBuffer()); } logger.flush(); } @@ -135,7 +137,7 @@ public class EntryLogTest { EntryLogger logger = new EntryLogger(conf, bookie.getLedgerDirsManager()); for (int j=0; j<numEntries; j++) { - positions[i][j] = logger.addEntry(i, generateEntry(i, j)); + positions[i][j] = logger.addEntry(i, generateEntry(i, j).nioBuffer()); } logger.flush(); } @@ -231,10 +233,10 @@ public class EntryLogTest { // create some entries EntryLogger logger = ((InterleavedLedgerStorage)bookie.ledgerStorage).entryLogger; - logger.addEntry(1, generateEntry(1, 1)); - logger.addEntry(3, generateEntry(3, 1)); - logger.addEntry(2, generateEntry(2, 1)); - logger.addEntry(1, generateEntry(1, 2)); + logger.addEntry(1, generateEntry(1, 1).nioBuffer()); + logger.addEntry(3, generateEntry(3, 1).nioBuffer()); + logger.addEntry(2, generateEntry(2, 1).nioBuffer()); + logger.addEntry(1, generateEntry(1, 2).nioBuffer()); logger.rollLog(); logger.flushRotatedLogs(); @@ -265,10 +267,10 @@ public class EntryLogTest { // create some entries EntryLogger logger = ((InterleavedLedgerStorage) bookie.ledgerStorage).entryLogger; - logger.addEntry(1, generateEntry(1, 1)); - logger.addEntry(3, generateEntry(3, 1)); - logger.addEntry(2, generateEntry(2, 1)); - logger.addEntry(1, generateEntry(1, 2)); + logger.addEntry(1, generateEntry(1, 1).nioBuffer()); + logger.addEntry(3, generateEntry(3, 1).nioBuffer()); + logger.addEntry(2, generateEntry(2, 1).nioBuffer()); + logger.addEntry(1, generateEntry(1, 2).nioBuffer()); logger.rollLog(); // Rewrite the entry log header to be on V0 format http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java index b63806e..41ab89c 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java @@ -21,6 +21,9 @@ package org.apache.bookkeeper.bookie; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; + import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; @@ -330,7 +333,7 @@ public class LedgerCacheTest { Bookie b = new Bookie(conf); b.start(); for (int i = 1; i <= numLedgers; i++) { - ByteBuffer packet = generateEntry(i, 1); + ByteBuf packet = generateEntry(i, 1); b.addEntry(packet, new Bookie.NopWriteCallback(), null, "passwd".getBytes()); } @@ -511,7 +514,7 @@ public class LedgerCacheTest { // this bookie.addEntry call is required. FileInfo for Ledger 1 would be created with this call. // without the fileinfo, 'flushTestSortedLedgerStorage.addEntry' calls will fail because of BOOKKEEPER-965 change. bookie.addEntry(generateEntry(1, 1), new Bookie.NopWriteCallback(), null, "passwd".getBytes()); - + flushTestSortedLedgerStorage.addEntry(generateEntry(1, 2)); assertFalse("Bookie is expected to be in ReadWrite mode", bookie.isReadOnly()); assertTrue("EntryMemTable SnapShot is expected to be empty", memTable.snapshot.isEmpty()); @@ -535,14 +538,13 @@ public class LedgerCacheTest { assertTrue("EntryMemTable SnapShot is expected to be empty, because of successful flush", memTable.snapshot.isEmpty()); } - - private ByteBuffer generateEntry(long ledger, long entry) { + + private ByteBuf generateEntry(long ledger, long entry) { byte[] data = ("ledger-" + ledger + "-" + entry).getBytes(); - ByteBuffer bb = ByteBuffer.wrap(new byte[8 + 8 + data.length]); - bb.putLong(ledger); - bb.putLong(entry); - bb.put(data); - bb.flip(); + ByteBuf bb = Unpooled.buffer(8 + 8 + data.length); + bb.writeLong(ledger); + bb.writeLong(entry); + bb.writeBytes(data); return bb; } } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java index 3672985..9352db3 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java @@ -20,10 +20,10 @@ */ package org.apache.bookkeeper.bookie; +import io.netty.buffer.ByteBuf; + import java.io.File; import java.io.IOException; -import java.nio.ByteBuffer; - import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Callable; @@ -35,17 +35,14 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.bookkeeper.conf.TestBKConfiguration; import org.apache.bookkeeper.conf.ServerConfiguration; -import org.apache.bookkeeper.meta.LedgerManager; import org.apache.bookkeeper.stats.StatsLogger; import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint; import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener; import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException; - import org.apache.bookkeeper.jmx.BKMBeanInfo; - +import org.apache.bookkeeper.meta.LedgerManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.junit.Test; import org.junit.Before; import org.junit.After; @@ -314,12 +311,12 @@ public class TestSyncThread { } @Override - public long addEntry(ByteBuffer entry) throws IOException { + public long addEntry(ByteBuf entry) throws IOException { return 1L; } @Override - public ByteBuffer getEntry(long ledgerId, long entryId) + public ByteBuf getEntry(long ledgerId, long entryId) throws IOException { return null; } @@ -334,11 +331,11 @@ public class TestSyncThread { } @Override - public void setExplicitlac(long ledgerId, ByteBuffer lac) { + public void setExplicitlac(long ledgerId, ByteBuf lac) { } @Override - public ByteBuffer getExplicitLac(long ledgerId) { + public ByteBuf getExplicitLac(long ledgerId) { return null; } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperCloseTest.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperCloseTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperCloseTest.java index d781d5d..41a8ecd 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperCloseTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperCloseTest.java @@ -39,6 +39,9 @@ import org.slf4j.LoggerFactory; import com.google.common.util.concurrent.SettableFuture; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; + import java.io.IOException; import java.nio.ByteBuffer; import java.util.Enumeration; @@ -72,7 +75,7 @@ public class BookKeeperCloseTest extends BookKeeperClusterTestCase { Bookie delayBookie = new Bookie(conf) { @Override - public void recoveryAddEntry(ByteBuffer entry, WriteCallback cb, + public void recoveryAddEntry(ByteBuf entry, WriteCallback cb, Object ctx, byte[] masterKey) throws IOException, BookieException { try { @@ -86,7 +89,7 @@ public class BookKeeperCloseTest extends BookKeeperClusterTestCase { } @Override - public void addEntry(ByteBuffer entry, WriteCallback cb, + public void addEntry(ByteBuf entry, WriteCallback cb, Object ctx, byte[] masterKey) throws IOException, BookieException { try { @@ -100,7 +103,7 @@ public class BookKeeperCloseTest extends BookKeeperClusterTestCase { } @Override - public ByteBuffer readEntry(long ledgerId, long entryId) + public ByteBuf readEntry(long ledgerId, long entryId) throws IOException, NoLedgerException { try { Thread.sleep(5000); http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCloseTest.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCloseTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCloseTest.java index a7bcf77..0c470a2 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCloseTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCloseTest.java @@ -17,6 +17,8 @@ */ package org.apache.bookkeeper.client; +import io.netty.buffer.ByteBuf; + import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -194,7 +196,7 @@ public class LedgerCloseTest extends BookKeeperClusterTestCase { throws Exception { Bookie sBookie = new Bookie(conf) { @Override - public void addEntry(ByteBuffer entry, WriteCallback cb, Object ctx, byte[] masterKey) + public void addEntry(ByteBuf entry, WriteCallback cb, Object ctx, byte[] masterKey) throws IOException, BookieException { try { latch.await(); @@ -204,7 +206,7 @@ public class LedgerCloseTest extends BookKeeperClusterTestCase { } @Override - public void recoveryAddEntry(ByteBuffer entry, WriteCallback cb, Object ctx, byte[] masterKey) + public void recoveryAddEntry(ByteBuf entry, WriteCallback cb, Object ctx, byte[] masterKey) throws IOException, BookieException { throw new IOException("Dead bookie for recovery adds."); } @@ -218,7 +220,7 @@ public class LedgerCloseTest extends BookKeeperClusterTestCase { private void startDeadBookie(ServerConfiguration conf, final CountDownLatch latch) throws Exception { Bookie dBookie = new Bookie(conf) { @Override - public void addEntry(ByteBuffer entry, WriteCallback cb, Object ctx, byte[] masterKey) + public void addEntry(ByteBuf entry, WriteCallback cb, Object ctx, byte[] masterKey) throws IOException, BookieException { try { latch.await(); http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java index f54cde1..5f7c56f 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java @@ -21,6 +21,8 @@ package org.apache.bookkeeper.client; * */ +import io.netty.buffer.ByteBuf; + import java.io.IOException; import java.nio.ByteBuffer; import java.util.concurrent.CountDownLatch; @@ -186,7 +188,7 @@ public class LedgerRecoveryTest extends BaseTestCase { Bookie fakeBookie = new Bookie(conf) { @Override - public void addEntry(ByteBuffer entry, WriteCallback cb, Object ctx, byte[] masterKey) + public void addEntry(ByteBuf entry, WriteCallback cb, Object ctx, byte[] masterKey) throws IOException, BookieException { // drop request to simulate a slow and failed bookie } @@ -245,7 +247,7 @@ public class LedgerRecoveryTest extends BaseTestCase { ServerConfiguration conf = newServerConfiguration(); Bookie deadBookie1 = new Bookie(conf) { @Override - public void recoveryAddEntry(ByteBuffer entry, WriteCallback cb, Object ctx, byte[] masterKey) + public void recoveryAddEntry(ByteBuf entry, WriteCallback cb, Object ctx, byte[] masterKey) throws IOException, BookieException { // drop request to simulate a slow and failed bookie throw new IOException("Couldn't write for some reason"); @@ -326,7 +328,7 @@ public class LedgerRecoveryTest extends BaseTestCase { ServerConfiguration conf = newServerConfiguration(); Bookie deadBookie1 = new Bookie(conf) { @Override - public void recoveryAddEntry(ByteBuffer entry, WriteCallback cb, Object ctx, byte[] masterKey) + public void recoveryAddEntry(ByteBuf entry, WriteCallback cb, Object ctx, byte[] masterKey) throws IOException, BookieException { // drop request to simulate a slow and failed bookie throw new IOException("Couldn't write for some reason"); @@ -410,7 +412,7 @@ public class LedgerRecoveryTest extends BaseTestCase { private void startDeadBookie(ServerConfiguration conf) throws Exception { Bookie rBookie = new Bookie(conf) { @Override - public void recoveryAddEntry(ByteBuffer entry, WriteCallback cb, Object ctx, byte[] masterKey) + public void recoveryAddEntry(ByteBuf entry, WriteCallback cb, Object ctx, byte[] masterKey) throws IOException, BookieException { // drop request to simulate a dead bookie throw new IOException("Couldn't write entries for some reason"); http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java index 280db05..0db938b 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java @@ -21,14 +21,16 @@ package org.apache.bookkeeper.meta; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import io.netty.buffer.ByteBuf; + import java.io.IOException; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; @@ -336,11 +338,11 @@ public class GcLedgersTest extends LedgerManagerTestCase { } @Override - public void setExplicitlac(long ledgerId, ByteBuffer lac) throws IOException { + public void setExplicitlac(long ledgerId, ByteBuf lac) throws IOException { } @Override - public ByteBuffer getExplicitLac(long ledgerId) { + public ByteBuf getExplicitLac(long ledgerId) { return null; } @@ -369,12 +371,12 @@ public class GcLedgersTest extends LedgerManagerTestCase { } @Override - public long addEntry(ByteBuffer entry) throws IOException { + public long addEntry(ByteBuf entry) throws IOException { return 0; } @Override - public ByteBuffer getEntry(long ledgerId, long entryId) throws IOException { + public ByteBuf getEntry(long ledgerId, long entryId) throws IOException { return null; } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java index 8d13102..ac9fe59 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java @@ -22,7 +22,6 @@ package org.apache.bookkeeper.meta; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collection; import java.util.Map; @@ -35,7 +34,6 @@ import org.apache.bookkeeper.bookie.CompactableLedgerStorage; import org.apache.bookkeeper.bookie.EntryLocation; import org.apache.bookkeeper.bookie.EntryLogger; import org.apache.bookkeeper.bookie.LedgerDirsManager; -import org.apache.bookkeeper.bookie.LedgerStorage.LedgerDeletionListener; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.jmx.BKMBeanInfo; import org.apache.bookkeeper.stats.StatsLogger; @@ -49,6 +47,8 @@ import org.junit.runners.Parameterized.Parameters; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.netty.buffer.ByteBuf; + /** * Test case to run over serveral ledger managers */ @@ -153,12 +153,12 @@ public abstract class LedgerManagerTestCase extends BookKeeperClusterTestCase { } @Override - public long addEntry(ByteBuffer entry) throws IOException { + public long addEntry(ByteBuf entry) throws IOException { return 0; } @Override - public ByteBuffer getEntry(long ledgerId, long entryId) throws IOException { + public ByteBuf getEntry(long ledgerId, long entryId) throws IOException { return null; } @@ -213,13 +213,13 @@ public abstract class LedgerManagerTestCase extends BookKeeperClusterTestCase { } @Override - public void setExplicitlac(long ledgerId, ByteBuffer lac) throws IOException { + public void setExplicitlac(long ledgerId, ByteBuf lac) throws IOException { // TODO Auto-generated method stub } @Override - public ByteBuffer getExplicitLac(long ledgerId) { + public ByteBuf getExplicitLac(long ledgerId) { // TODO Auto-generated method stub return null; } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java index 16bbfd0..e2a4f32 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java @@ -235,7 +235,7 @@ public class TestPerChannelBookieClient extends BookKeeperClusterTestCase { Bookie delayBookie = new Bookie(conf) { @Override - public ByteBuffer readEntry(long ledgerId, long entryId) + public ByteBuf readEntry(long ledgerId, long entryId) throws IOException, NoLedgerException { try { Thread.sleep(3000); http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java index 428a597..159b859 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java @@ -40,6 +40,8 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.netty.buffer.ByteBuf; + import java.io.File; import java.io.FileOutputStream; import java.io.FilenameFilter; @@ -251,7 +253,7 @@ public class AuditorPeriodicCheckTest extends BookKeeperClusterTestCase { Bookie deadBookie = new Bookie(conf) { @Override - public ByteBuffer readEntry(long ledgerId, long entryId) + public ByteBuf readEntry(long ledgerId, long entryId) throws IOException, NoLedgerException { // we want to disable during checking numReads.incrementAndGet(); http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0f81461d/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ConcurrentLedgerTest.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ConcurrentLedgerTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ConcurrentLedgerTest.java index 94319df..7a65388 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ConcurrentLedgerTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ConcurrentLedgerTest.java @@ -21,6 +21,9 @@ package org.apache.bookkeeper.test; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; + import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; @@ -148,12 +151,12 @@ public class ConcurrentLedgerTest { long start = System.currentTimeMillis(); for(int i = 1; i <= totalwrites/ledgers; i++) { for(int j = 1; j <= ledgers; j++) { - ByteBuffer entry = bookie.readEntry(j, i); + ByteBuf entry = bookie.readEntry(j, i); // skip the ledger id and the entry id - entry.getLong(); - entry.getLong(); - assertEquals(j + "@" + i, j+2, entry.getLong()); - assertEquals(j + "@" + i, i+3, entry.getLong()); + entry.readLong(); + entry.readLong(); + assertEquals(j + "@" + i, j+2, entry.readLong()); + assertEquals(j + "@" + i, i+3, entry.readLong()); } } long finish = System.currentTimeMillis(); @@ -184,7 +187,7 @@ public class ConcurrentLedgerTest { bytes.position(0); bytes.limit(bytes.capacity()); throttle.acquire(); - bookie.addEntry(bytes, cb, counter, zeros); + bookie.addEntry(Unpooled.wrappedBuffer(bytes), cb, counter, zeros); } } long finish = System.currentTimeMillis();
