HBASE-20017 BufferedMutatorImpl submit the same mutation repeatedly Signed-off-by: Andrew Purtell <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/79d9403a Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/79d9403a Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/79d9403a Branch: refs/heads/HBASE-19064 Commit: 79d9403a79cca60e614834659e3d9005d5482cac Parents: 0068b95 Author: Chia-Ping Tsai <[email protected]> Authored: Sun Feb 18 21:45:04 2018 +0800 Committer: Andrew Purtell <[email protected]> Committed: Tue Feb 20 16:59:48 2018 -0800 ---------------------------------------------------------------------- .../hbase/client/BufferedMutatorImpl.java | 47 +++++++++++++++----- .../hadoop/hbase/client/TestAsyncProcess.java | 44 ++++++++++++++++++ .../hbase/regionserver/wal/ReaderBase.java | 2 +- 3 files changed, 81 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/79d9403a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java index 9d24b4d..d4bc811 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java @@ -295,7 +295,7 @@ public class BufferedMutatorImpl implements BufferedMutator { break; } AsyncRequestFuture asf; - try (QueueRowAccess access = new QueueRowAccess()) { + try (QueueRowAccess access = createQueueRowAccess()) { if (access.isEmpty()) { // It means someone has gotten the ticker to run the flush. break; @@ -406,16 +406,46 @@ public class BufferedMutatorImpl implements BufferedMutator { return currentWriteBufferSize.get(); } + /** + * Count the mutations which haven't been processed. + * @return count of undealt mutation + */ @VisibleForTesting int size() { return undealtMutationCount.get(); } - private class QueueRowAccess implements RowAccess<Row>, Closeable { + /** + * Count the mutations which haven't been flushed + * @return count of unflushed mutation + */ + @VisibleForTesting + int getUnflushedSize() { + return writeAsyncBuffer.size(); + } + + @VisibleForTesting + QueueRowAccess createQueueRowAccess() { + return new QueueRowAccess(); + } + + @VisibleForTesting + class QueueRowAccess implements RowAccess<Row>, Closeable { private int remainder = undealtMutationCount.getAndSet(0); + private Mutation last = null; + + private void restoreLastMutation() { + // restore the last mutation since it isn't submitted + if (last != null) { + writeAsyncBuffer.add(last); + currentWriteBufferSize.addAndGet(last.heapSize()); + last = null; + } + } @Override public void close() { + restoreLastMutation(); if (remainder > 0) { undealtMutationCount.addAndGet(remainder); remainder = 0; @@ -425,25 +455,22 @@ public class BufferedMutatorImpl implements BufferedMutator { @Override public Iterator<Row> iterator() { return new Iterator<Row>() { - private final Iterator<Mutation> iter = writeAsyncBuffer.iterator(); private int countDown = remainder; - private Mutation last = null; @Override public boolean hasNext() { - if (countDown <= 0) { - return false; - } - return iter.hasNext(); + return countDown > 0; } @Override public Row next() { + restoreLastMutation(); if (!hasNext()) { throw new NoSuchElementException(); } - last = iter.next(); + last = writeAsyncBuffer.poll(); if (last == null) { throw new NoSuchElementException(); } + currentWriteBufferSize.addAndGet(-last.heapSize()); --countDown; return last; } @@ -452,8 +479,6 @@ public class BufferedMutatorImpl implements BufferedMutator { if (last == null) { throw new IllegalStateException(); } - iter.remove(); - currentWriteBufferSize.addAndGet(-last.heapSize()); --remainder; last = null; } http://git-wip-us.apache.org/repos/asf/hbase/blob/79d9403a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java index 2979dcd..4a2ed8d 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java @@ -1795,4 +1795,48 @@ public class TestAsyncProcess { LOG.debug("Expected to sleep " + expectedSleep + "ms, actually slept " + actualSleep + "ms"); Assert.assertTrue("Slept for too long: " + actualSleep + "ms", actualSleep <= expectedSleep); } + + @Test + public void testQueueRowAccess() throws Exception { + ClusterConnection conn = createHConnection(); + BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, null, null, + new BufferedMutatorParams(DUMMY_TABLE).writeBufferSize(100000)); + Put p0 = new Put(DUMMY_BYTES_1).addColumn(DUMMY_BYTES_1, DUMMY_BYTES_1, DUMMY_BYTES_1); + Put p1 = new Put(DUMMY_BYTES_2).addColumn(DUMMY_BYTES_2, DUMMY_BYTES_2, DUMMY_BYTES_2); + mutator.mutate(p0); + BufferedMutatorImpl.QueueRowAccess ra0 = mutator.createQueueRowAccess(); + // QueueRowAccess should take all undealt mutations + assertEquals(0, mutator.size()); + mutator.mutate(p1); + assertEquals(1, mutator.size()); + BufferedMutatorImpl.QueueRowAccess ra1 = mutator.createQueueRowAccess(); + // QueueRowAccess should take all undealt mutations + assertEquals(0, mutator.size()); + assertEquals(1, ra0.size()); + assertEquals(1, ra1.size()); + Iterator<Row> iter0 = ra0.iterator(); + Iterator<Row> iter1 = ra1.iterator(); + assertTrue(iter0.hasNext()); + assertTrue(iter1.hasNext()); + // the next() will poll the mutation from inner buffer and update the buffer count + assertTrue(iter0.next() == p0); + assertEquals(1, mutator.getUnflushedSize()); + assertEquals(p1.heapSize(), mutator.getCurrentWriteBufferSize()); + assertTrue(iter1.next() == p1); + assertEquals(0, mutator.getUnflushedSize()); + assertEquals(0, mutator.getCurrentWriteBufferSize()); + assertFalse(iter0.hasNext()); + assertFalse(iter1.hasNext()); + // ra0 doest handle the mutation so the mutation won't be pushed back to buffer + iter0.remove(); + ra0.close(); + assertEquals(0, mutator.size()); + assertEquals(0, mutator.getUnflushedSize()); + assertEquals(0, mutator.getCurrentWriteBufferSize()); + // ra1 doesn't handle the mutation so the mutation will be pushed back to buffer + ra1.close(); + assertEquals(1, mutator.size()); + assertEquals(1, mutator.getUnflushedSize()); + assertEquals(p1.heapSize(), mutator.getCurrentWriteBufferSize()); + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/79d9403a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java index f242cef..4338f6d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java @@ -139,7 +139,7 @@ public abstract class ReaderBase implements AbstractFSWALProvider.Reader { * Initializes the compression after the shared stuff has been initialized. Called once. */ protected abstract void initAfterCompression() throws IOException; - + /** * Initializes the compression after the shared stuff has been initialized. Called once. * @param cellCodecClsName class name of cell Codec
