Repository: hbase Updated Branches: refs/heads/branch-1.2 e77c57874 -> e1d695038
HBASE-18233 We shouldn't wait for readlock in doMiniBatchMutation in case of deadlock (Allan Yang) This patch plus a sorting of the batch (HBASE-17924) fixes a regression in Increment/CheckAndPut-style operations. Signed-off-by: Allan Yang <allan...@163.com> Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e1d69503 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e1d69503 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e1d69503 Branch: refs/heads/branch-1.2 Commit: e1d6950381daeb46f0218fc1de3256e88cfc0a1f Parents: e77c578 Author: Michael Stack <st...@apache.org> Authored: Tue Nov 28 09:14:58 2017 -0800 Committer: Michael Stack <st...@apache.org> Committed: Tue Nov 28 09:15:04 2017 -0800 ---------------------------------------------------------------------- .../hadoop/hbase/regionserver/HRegion.java | 59 ++++++-- .../hadoop/hbase/regionserver/Region.java | 4 +- .../hadoop/hbase/client/TestMultiParallel.java | 141 +++++++++++++++++++ .../hbase/regionserver/TestAtomicOperation.java | 9 ++ 4 files changed, 203 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/e1d69503/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 0e73d1c..30202a0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -3051,18 +3051,29 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi continue; } + + //HBASE-18233 // If we haven't got any rows in our batch, we should block to - // get the next one. + // get the next one's read lock. We need at least one row to mutate. + // If we have got rows, do not block when lock is not available, + // so that we can fail fast and go on with the rows with locks in + // the batch. By doing this, we can reduce contention and prevent + // possible deadlocks. + // The unfinished rows in the batch will be detected in batchMutate, + // and it wil try to finish them by calling doMiniBatchMutation again. + boolean shouldBlock = numReadyToWrite == 0; RowLock rowLock = null; try { - rowLock = getRowLock(mutation.getRow(), true); + rowLock = getRowLock(mutation.getRow(), true, shouldBlock); } catch (IOException ioe) { LOG.warn("Failed getting lock in batch put, row=" + Bytes.toStringBinary(mutation.getRow()), ioe); } if (rowLock == null) { - // We failed to grab another lock - break; // stop acquiring more rows for this batch + // We failed to grab another lock. Stop acquiring more rows for this + // batch and go on with the gotten ones + break; + } else { acquiredRowLocks.add(rowLock); } @@ -5138,7 +5149,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * Get an exclusive ( write lock ) lock on a given row. * @param row Which row to lock. * @return A locked RowLock. The lock is exclusive and already aqquired. - * @throws IOException + * @throws IOException if any error occurred */ public RowLock getRowLock(byte[] row) throws IOException { return getRowLock(row, false); @@ -5152,9 +5163,28 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * started (the calling thread has already acquired the region-close-guard lock). * @param row The row actions will be performed against * @param readLock is the lock reader or writer. True indicates that a non-exlcusive - * lock is requested + * lock is requested + * @return A locked RowLock. + * @throws IOException if any error occurred */ public RowLock getRowLock(byte[] row, boolean readLock) throws IOException { + return getRowLock(row, readLock, true); + } + + /** + * + * Get a row lock for the specified row. All locks are reentrant. + * + * Before calling this function make sure that a region operation has already been + * started (the calling thread has already acquired the region-close-guard lock). + * @param row The row actions will be performed against + * @param readLock is the lock reader or writer. True indicates that a non-exlcusive + * lock is requested + * @param waitForLock whether should wait for this lock + * @return A locked RowLock, or null if {@code waitForLock} set to false and tryLock failed + * @throws IOException if any error occurred + */ + public RowLock getRowLock(byte[] row, boolean readLock, boolean waitForLock) throws IOException { // Make sure the row is inside of this region before getting the lock for it. checkRow(row, "row lock"); // create an object to use a a key in the row lock map @@ -5195,12 +5225,25 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi result = rowLockContext.newWriteLock(); } } - if (!result.getLock().tryLock(this.rowLockWaitDuration, TimeUnit.MILLISECONDS)) { + boolean lockAvailable = false; + if(waitForLock) { + //if waiting for lock, wait for rowLockWaitDuration milliseconds + lockAvailable = result.getLock().tryLock(this.rowLockWaitDuration, TimeUnit.MILLISECONDS); + } else { + //if we are not waiting for lock, tryLock() will return immediately whether we have got + //this lock or not + lockAvailable = result.getLock().tryLock(); + } + if(!lockAvailable) { if (traceScope != null) { traceScope.getSpan().addTimelineAnnotation("Failed to get row lock"); } result = null; - throw new IOException("Timed out waiting for lock for row: " + rowKey); + if(waitForLock) { + throw new IOException("Timed out waiting for lock for row: " + rowKey); + } else { + return null; + } } rowLockContext.setThreadName(Thread.currentThread().getName()); success = true; http://git-wip-us.apache.org/repos/asf/hbase/blob/e1d69503/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java index 81fb0b9..dcf151c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java @@ -270,14 +270,14 @@ public interface Region extends ConfigurationObserver { /** * Tries to acquire a lock on the given row. - * @param waitForLock if true, will block until the lock is available. + * @param readlock if true, will block until the lock is available. * Otherwise, just tries to obtain the lock and returns * false if unavailable. * @return the row lock if acquired, * null if waitForLock was false and the lock was not acquired * @throws IOException if waitForLock was true and the lock could not be acquired after waiting */ - RowLock getRowLock(byte[] row, boolean waitForLock) throws IOException; + RowLock getRowLock(byte[] row, boolean readlock) throws IOException; /** * If the given list of row locks is not null, releases all locks. http://git-wip-us.apache.org/repos/asf/hbase/blob/e1d69503/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java index 935f6e8..ecd67a1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java @@ -25,6 +25,7 @@ import static org.junit.Assert.fail; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.concurrent.CountDownLatch; @@ -63,6 +64,7 @@ public class TestMultiParallel { private static final byte[] QUALIFIER = Bytes.toBytes("qual"); private static final String FAMILY = "family"; private static final TableName TEST_TABLE = TableName.valueOf("multi_test_table"); + private static final TableName TEST_TABLE2 = TableName.valueOf("multi_test_table2"); private static final byte[] BYTES_FAMILY = Bytes.toBytes(FAMILY); private static final byte[] ONE_ROW = Bytes.toBytes("xxx"); private static final byte [][] KEYS = makeKeys(); @@ -728,4 +730,143 @@ public class TestMultiParallel { validateEmpty(result); } } + + private static class MultiThread extends Thread { + public Throwable throwable = null; + private CountDownLatch endLatch; + private CountDownLatch beginLatch; + List<Put> puts; + public MultiThread(List<Put> puts, CountDownLatch beginLatch, CountDownLatch endLatch) { + this.puts = puts; + this.beginLatch = beginLatch; + this.endLatch = endLatch; + } + @Override + public void run() { + HTable table = null; + try { + table = new HTable(UTIL.getConfiguration(), TEST_TABLE2); + table.setAutoFlush(false); + beginLatch.await(); + for (int i = 0; i < 100; i++) { + for(Put put : puts) { + table.put(put); + } + table.flushCommits(); + } + } catch (Throwable t) { + throwable = t; + LOG.warn("Error when put:", t); + } finally { + endLatch.countDown(); + if(table != null) { + try { + table.close(); + } catch (IOException ioe) { + LOG.error("Error when close table", ioe); + } + } + } + } + } + + + private static class IncrementThread extends Thread { + public Throwable throwable = null; + private CountDownLatch endLatch; + private CountDownLatch beginLatch; + List<Put> puts; + public IncrementThread(List<Put> puts, CountDownLatch beginLatch, CountDownLatch endLatch) { + this.puts = puts; + this.beginLatch = beginLatch; + this.endLatch = endLatch; + } + @Override + public void run() { + HTable table = null; + try { + table = new HTable(UTIL.getConfiguration(), TEST_TABLE2); + beginLatch.await(); + for (int i = 0; i < 100; i++) { + for(Put put : puts) { + Increment inc = new Increment(put.getRow()); + inc.addColumn(BYTES_FAMILY, BYTES_FAMILY, 1); + table.increment(inc); + } + } + } catch (Throwable t) { + throwable = t; + LOG.warn("Error when incr:", t); + } finally { + endLatch.countDown(); + if(table != null) { + try { + table.close(); + } catch (IOException ioe) { + LOG.error("Error when close table", ioe); + } + } + } + } + } + + /** + * UT for HBASE-18233, test for disordered batch mutation thread and + * increment won't lock each other + * @throws Exception if any error occurred + */ + @Test(timeout=300000) + public void testMultiThreadWithRowLocks() throws Exception { + //set a short timeout to get timeout exception when getting row lock fail + UTIL.getConfiguration().setInt("hbase.rpc.timeout", 2000); + UTIL.getConfiguration().setInt("hbase.client.operation.timeout", 4000); + UTIL.getConfiguration().setInt("hbase.client.retries.number", 10); + + UTIL.createTable(TEST_TABLE2, BYTES_FAMILY); + List<Put> puts = new ArrayList<>(); + for(int i = 0; i < 10; i++) { + Put put = new Put(Bytes.toBytes(i)); + put.add(BYTES_FAMILY, BYTES_FAMILY, Bytes.toBytes((long)0)); + puts.add(put); + } + List<Put> reversePuts = new ArrayList<>(puts); + Collections.reverse(reversePuts); + int NUM_OF_THREAD = 12; + CountDownLatch latch = new CountDownLatch(NUM_OF_THREAD); + CountDownLatch beginLatch = new CountDownLatch(1); + int threadNum = NUM_OF_THREAD / 4; + List<MultiThread> multiThreads = new ArrayList<>(); + List<IncrementThread> incThreads = new ArrayList<>(); + for(int i = 0; i < threadNum; i ++) { + MultiThread thread = new MultiThread(reversePuts, beginLatch, latch); + thread.start(); + multiThreads.add(thread); + } + for(int i = 0; i < threadNum; i++) { + MultiThread thread = new MultiThread(puts, beginLatch, latch); + thread.start(); + multiThreads.add(thread); + } + for(int i = 0; i < threadNum; i ++) { + IncrementThread thread = new IncrementThread(reversePuts, beginLatch, latch); + thread.start(); + incThreads.add(thread); + } + for(int i = 0; i < threadNum; i++) { + IncrementThread thread = new IncrementThread(puts, beginLatch, latch); + thread.start(); + incThreads.add(thread); + } + long timeBegin = System.currentTimeMillis(); + beginLatch.countDown(); + latch.await(); + LOG.error("Time took:" + (System.currentTimeMillis() - timeBegin)); + for(MultiThread thread : multiThreads) { + Assert.assertTrue(thread.throwable == null); + } + for(IncrementThread thread : incThreads) { + Assert.assertTrue(thread.throwable == null); + } + + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/e1d69503/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java index a02f56a..cd325c0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java @@ -626,6 +626,15 @@ public class TestAtomicOperation { } @Override + public RowLock getRowLock(final byte[] row, boolean readLock, boolean waitForLock) + throws IOException { + if (testStep == TestStep.CHECKANDPUT_STARTED) { + latch.countDown(); + } + return new WrappedRowLock(super.getRowLock(row, readLock, waitForLock)); + } + + @Override public RowLock getRowLock(final byte[] row, boolean readLock) throws IOException { if (testStep == TestStep.CHECKANDPUT_STARTED) { latch.countDown();