HBASE-15811 Batch Get after batch Put does not fetch all Cells We were not
waiting on all executors in a batch to complete. The test for no-more-executors
was damaged by the 0.99/0.98.4 fix "HBASE-11403 Fix race conditions around
Object#notify"
It added this in AsyncProcess#waitForMaximumCurrentTasks:
synchronized (this.tasksInProgress) {
+ if (tasksInProgress.get() != oldInProgress) break;
this.tasksInProgress.wait(100);
which added a break out of our waiting loop if any change in
count of tasks; it seems that what was wanted was instead to
avoid the wait if there was movement in the count of completed
task.
Reformats waitForMaximumCurrentTasks so it is testable. Adds
test that we indeed wait on the specified parameter.
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/6904430a
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/6904430a
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/6904430a
Branch: refs/heads/HBASE-14850
Commit: 6904430a3d2bd87190b5f5d51a85d929684caae1
Parents: 60e19f6
Author: stack <[email protected]>
Authored: Thu May 12 14:37:29 2016 -0700
Committer: stack <[email protected]>
Committed: Fri May 13 17:51:27 2016 -0700
----------------------------------------------------------------------
.../hadoop/hbase/client/AsyncProcess.java | 18 ++++++---
.../hadoop/hbase/client/TestAsyncProcess.java | 42 ++++++++++++++++++++
2 files changed, 55 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/6904430a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
----------------------------------------------------------------------
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
index 6de8c82..e8af36c 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
@@ -1678,7 +1678,7 @@ class AsyncProcess {
synchronized (actionsInProgress) {
if (actionsInProgress.get() == 0) break;
if (!hasWait) {
- actionsInProgress.wait(100);
+ actionsInProgress.wait(10);
} else {
long waitMicroSecond = Math.min(100000L, (cutoff - now * 1000L));
TimeUnit.MICROSECONDS.timedWait(actionsInProgress,
waitMicroSecond);
@@ -1770,9 +1770,16 @@ class AsyncProcess {
/** Wait until the async does not have more than max tasks in progress. */
private void waitForMaximumCurrentTasks(int max) throws
InterruptedIOException {
+ waitForMaximumCurrentTasks(max, tasksInProgress, id);
+ }
+
+ // Break out this method so testable
+ @VisibleForTesting
+ static void waitForMaximumCurrentTasks(int max, final AtomicLong
tasksInProgress, final long id)
+ throws InterruptedIOException {
long lastLog = EnvironmentEdgeManager.currentTime();
long currentInProgress, oldInProgress = Long.MAX_VALUE;
- while ((currentInProgress = this.tasksInProgress.get()) > max) {
+ while ((currentInProgress = tasksInProgress.get()) > max) {
if (oldInProgress != currentInProgress) { // Wait for in progress to
change.
long now = EnvironmentEdgeManager.currentTime();
if (now > lastLog + 10000) {
@@ -1783,9 +1790,10 @@ class AsyncProcess {
}
oldInProgress = currentInProgress;
try {
- synchronized (this.tasksInProgress) {
- if (tasksInProgress.get() != oldInProgress) break;
- this.tasksInProgress.wait(100);
+ synchronized (tasksInProgress) {
+ if (tasksInProgress.get() == oldInProgress) {
+ tasksInProgress.wait(10);
+ }
}
} catch (InterruptedException e) {
throw new InterruptedIOException("#" + id + ", interrupted." +
http://git-wip-us.apache.org/repos/asf/hbase/blob/6904430a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
----------------------------------------------------------------------
diff --git
a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
index 1003d24..376c02a 100644
---
a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
+++
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
@@ -30,6 +30,8 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
@@ -63,6 +65,7 @@ import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
import org.junit.Assert;
+import static org.junit.Assert.assertTrue;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
@@ -1129,4 +1132,43 @@ public class TestAsyncProcess {
Assert.assertTrue(puts.isEmpty());
}
+ @Test
+ public void testWaitForMaximumCurrentTasks() throws InterruptedException,
BrokenBarrierException {
+ final AtomicLong tasks = new AtomicLong(0);
+ final AtomicInteger max = new AtomicInteger(0);
+ final CyclicBarrier barrier = new CyclicBarrier(2);
+ Runnable runnable = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ barrier.await();
+ AsyncProcess.waitForMaximumCurrentTasks(max.get(), tasks, 1);
+ } catch (InterruptedIOException e) {
+ Assert.fail(e.getMessage());
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ } catch (BrokenBarrierException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ };
+ // First test that our runnable thread only exits when tasks is zero.
+ Thread t = new Thread(runnable);
+ t.start();
+ barrier.await();
+ t.join();
+ // Now assert we stay running if max == zero and tasks is > 0.
+ barrier.reset();
+ tasks.set(1000000);
+ t = new Thread(runnable);
+ t.start();
+ barrier.await();
+ while (tasks.get() > 0) {
+ assertTrue(t.isAlive());
+ tasks.set(tasks.get() - 1);
+ }
+ t.join();
+ }
}