Repository: hbase
Updated Branches:
  refs/heads/branch-1 47eb79311 -> a3d550fbc


HBASE-15360 Fix flaky TestSimpleRpcScheduler; ADDENDUM


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/a3d550fb
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/a3d550fb
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/a3d550fb

Branch: refs/heads/branch-1
Commit: a3d550fbcaf2001588b42a2404e73e9864e385d6
Parents: 47eb793
Author: stack <st...@apache.org>
Authored: Wed Mar 23 15:47:22 2016 -0700
Committer: stack <st...@apache.org>
Committed: Wed Mar 23 15:47:22 2016 -0700

----------------------------------------------------------------------
 .../hbase/ipc/AdaptiveLifoCoDelCallQueue.java   |   5 +-
 .../hadoop/hbase/ipc/SimpleRpcScheduler.java    |   2 +-
 .../hbase/ipc/TestSimpleRpcScheduler.java       | 126 +++++++++++++------
 3 files changed, 95 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/a3d550fb/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveLifoCoDelCallQueue.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveLifoCoDelCallQueue.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveLifoCoDelCallQueue.java
index 37e86be..266c6a2 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveLifoCoDelCallQueue.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveLifoCoDelCallQueue.java
@@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 
 /**
  * Adaptive LIFO blocking queue utilizing CoDel algorithm to prevent queue 
overloading.
@@ -77,7 +78,7 @@ public class AdaptiveLifoCoDelCallQueue implements 
BlockingQueue<CallRunner> {
   private volatile long minDelay;
 
   // the moment when current interval ends
-  private volatile long intervalTime = System.currentTimeMillis();
+  private volatile long intervalTime = EnvironmentEdgeManager.currentTime();
 
   // switch to ensure only one threads does interval cutoffs
   private AtomicBoolean resetDelay = new AtomicBoolean(true);
@@ -147,7 +148,7 @@ public class AdaptiveLifoCoDelCallQueue implements 
BlockingQueue<CallRunner> {
    *   and internal queue state (deemed overloaded).
    */
   private boolean needToDrop(CallRunner callRunner) {
-    long now = System.currentTimeMillis();
+    long now = EnvironmentEdgeManager.currentTime();
     long callDelay = now - callRunner.getCall().timestamp;
 
     long localMinDelay = this.minDelay;

http://git-wip-us.apache.org/repos/asf/hbase/blob/a3d550fb/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
index 0cd34bb..431aeeb 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
@@ -203,7 +203,7 @@ public class SimpleRpcScheduler extends RpcScheduler 
implements ConfigurationObs
       } else if (callQueueType.equals(CALL_QUEUE_TYPE_CODEL_CONF_VALUE)) {
         Object[] callQueueInitArgs = {maxQueueLength, codelTargetDelay, 
codelInterval,
           codelLifoThreshold, numGeneralCallsDropped, numLifoModeSwitches};
-        callExecutor = new RWQueueRpcExecutor("B.default", handlerCount,
+        callExecutor = new RWQueueRpcExecutor("RW.default", handlerCount,
           numCallQueues, callqReadShare, callqScanShare,
           AdaptiveLifoCoDelCallQueue.class, callQueueInitArgs,
           AdaptiveLifoCoDelCallQueue.class, callQueueInitArgs);

http://git-wip-us.apache.org/repos/asf/hbase/blob/a3d550fb/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
index 1229ea2..aae1203 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
@@ -16,6 +16,29 @@
  * limitations under the License.
  */
 package org.apache.hadoop.hbase.ipc;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
@@ -23,7 +46,6 @@ import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Maps;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import com.google.protobuf.Message;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
@@ -32,11 +54,17 @@ import 
org.apache.hadoop.hbase.monitoring.MonitoredRPCHandlerImpl;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.ipc.RpcServer.Call;
+import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandlerImpl;
 import org.apache.hadoop.hbase.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.testclassification.RPCTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdge;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.Threads;
 import org.junit.Before;
 import org.junit.Test;
@@ -44,25 +72,11 @@ import org.junit.experimental.categories.Category;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyObject;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.timeout;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+import com.google.protobuf.Message;
 
 @Category(SmallTests.class)
 public class TestSimpleRpcScheduler {
@@ -212,7 +226,7 @@ public class TestSimpleRpcScheduler {
       scheduler.dispatch(smallCallTask);
 
       while (work.size() < 8) {
-        Threads.sleepWithoutInterrupt(100);
+        Thread.sleep(100);
       }
 
       int seqSum = 0;
@@ -292,7 +306,7 @@ public class TestSimpleRpcScheduler {
       scheduler.dispatch(scanCallTask);
 
       while (work.size() < 6) {
-        Threads.sleepWithoutInterrupt(100);
+        Thread.sleep(100);
       }
 
       for (int i = 0; i < work.size() - 2; i += 3) {
@@ -320,6 +334,13 @@ public class TestSimpleRpcScheduler {
     }).when(callTask).run();
   }
 
+  private static void waitUntilQueueEmpty(SimpleRpcScheduler scheduler)
+      throws InterruptedException {
+    while (scheduler.getGeneralQueueLength() > 0) {
+      Thread.sleep(100);
+    }
+  }
+
   @Test
   public void testSoftAndHardQueueLimits() throws Exception {
     Configuration schedConf = HBaseConfiguration.create();
@@ -348,9 +369,7 @@ public class TestSimpleRpcScheduler {
       schedConf.setInt("hbase.ipc.server.max.callqueue.length", 0);
       scheduler.onConfigurationChange(schedConf);
       assertFalse(scheduler.dispatch(putCallTask));
-      while (scheduler.getGeneralQueueLength() > 0) {
-        Threads.sleepWithoutInterrupt(100);
-      }
+      waitUntilQueueEmpty(scheduler);
       schedConf.setInt("hbase.ipc.server.max.callqueue.length", 1);
       scheduler.onConfigurationChange(schedConf);
       assertTrue(scheduler.dispatch(putCallTask));
@@ -359,8 +378,30 @@ public class TestSimpleRpcScheduler {
     }
   }
 
+  private static final class CoDelEnvironmentEdge implements EnvironmentEdge {
+
+    private final BlockingQueue<Long> timeQ = new LinkedBlockingQueue<>();
+
+    private long offset;
+
+    private final Set<String> threadNamePrefixs = new HashSet<>();
+
+    @Override
+    public long currentTime() {
+      for (String threadNamePrefix : threadNamePrefixs) {
+        if (Thread.currentThread().getName().startsWith(threadNamePrefix)) {
+          return timeQ.poll().longValue() + offset;
+        }
+      }
+      return System.currentTimeMillis();
+    }
+  }
+
   @Test
   public void testCoDelScheduling() throws Exception {
+    CoDelEnvironmentEdge envEdge = new CoDelEnvironmentEdge();
+    envEdge.threadNamePrefixs.add("RW.default");
+    envEdge.threadNamePrefixs.add("B.default");
     Configuration schedConf = HBaseConfiguration.create();
 
     schedConf.set(SimpleRpcScheduler.CALL_QUEUE_TYPE_CONF_KEY,
@@ -373,36 +414,51 @@ public class TestSimpleRpcScheduler {
       HConstants.QOS_THRESHOLD);
     try {
       scheduler.start();
-
+      EnvironmentEdgeManager.injectEdge(envEdge);
+      envEdge.offset = 5;
       // calls faster than min delay
       for (int i = 0; i < 100; i++) {
-        CallRunner cr = getMockedCallRunner();
+        long time = System.currentTimeMillis();
+        envEdge.timeQ.put(time);
+        CallRunner cr = getMockedCallRunner(time);
         Thread.sleep(5);
         scheduler.dispatch(cr);
       }
-      Thread.sleep(100); // make sure fast calls are handled
+      // make sure fast calls are handled
+      waitUntilQueueEmpty(scheduler);
+      Thread.sleep(100);
       assertEquals("None of these calls should have been discarded", 0,
         scheduler.getNumGeneralCallsDropped());
 
+      envEdge.offset = 6;
       // calls slower than min delay, but not individually slow enough to be 
dropped
       for (int i = 0; i < 20; i++) {
-        CallRunner cr = getMockedCallRunner();
+        long time = System.currentTimeMillis();
+        envEdge.timeQ.put(time);
+        CallRunner cr = getMockedCallRunner(time);
         Thread.sleep(6);
         scheduler.dispatch(cr);
       }
 
-      Thread.sleep(100); // make sure somewhat slow calls are handled
+      // make sure somewhat slow calls are handled
+      waitUntilQueueEmpty(scheduler);
+      Thread.sleep(100);
       assertEquals("None of these calls should have been discarded", 0,
         scheduler.getNumGeneralCallsDropped());
 
+      envEdge.offset = 12;
       // now slow calls and the ones to be dropped
       for (int i = 0; i < 20; i++) {
-        CallRunner cr = getMockedCallRunner();
+        long time = System.currentTimeMillis();
+        envEdge.timeQ.put(time);
+        CallRunner cr = getMockedCallRunner(time);
         Thread.sleep(12);
         scheduler.dispatch(cr);
       }
 
-      Thread.sleep(100); // make sure somewhat slow calls are handled
+      // make sure somewhat slow calls are handled
+      waitUntilQueueEmpty(scheduler);
+      Thread.sleep(100);
       assertTrue("There should have been at least 12 calls dropped",
         scheduler.getNumGeneralCallsDropped() > 12);
     } finally {
@@ -410,7 +466,7 @@ public class TestSimpleRpcScheduler {
     }
   }
 
-  private CallRunner getMockedCallRunner() throws IOException {
+  private CallRunner getMockedCallRunner(long timestamp) throws IOException {
     CallRunner putCallTask = mock(CallRunner.class);
     RpcServer.Call putCall = mock(RpcServer.Call.class);
     putCall.param = RequestConverter.buildMutateRequest(
@@ -418,7 +474,7 @@ public class TestSimpleRpcScheduler {
     RPCProtos.RequestHeader putHead = 
RPCProtos.RequestHeader.newBuilder().setMethodName("mutate").build();
     when(putCallTask.getCall()).thenReturn(putCall);
     when(putCall.getHeader()).thenReturn(putHead);
-    putCall.timestamp = System.currentTimeMillis();
+    putCall.timestamp = timestamp;
     return putCallTask;
   }
 }

Reply via email to