Repository: hbase
Updated Branches:
  refs/heads/master f041306cd -> 665fe3eef


HBASE-17331 Avoid busy waiting in ThrottledInputStream (ChiaPing Tsai)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/665fe3ee
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/665fe3ee
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/665fe3ee

Branch: refs/heads/master
Commit: 665fe3eef17a96da037901cba776880d0499a9c1
Parents: f041306
Author: tedyu <[email protected]>
Authored: Sun Dec 18 08:15:15 2016 -0800
Committer: tedyu <[email protected]>
Committed: Sun Dec 18 08:15:15 2016 -0800

----------------------------------------------------------------------
 .../io/hadoopbackport/ThrottledInputStream.java | 43 ++++++++++++++----
 .../TestThrottledInputStream.java               | 48 ++++++++++++++++++++
 .../org/apache/hadoop/hbase/SplitLogTask.java   |  2 +-
 3 files changed, 82 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/665fe3ee/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hadoopbackport/ThrottledInputStream.java
----------------------------------------------------------------------
diff --git 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hadoopbackport/ThrottledInputStream.java
 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hadoopbackport/ThrottledInputStream.java
index 1bef221..88c1b37 100644
--- 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hadoopbackport/ThrottledInputStream.java
+++ 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hadoopbackport/ThrottledInputStream.java
@@ -18,11 +18,15 @@
 
 package org.apache.hadoop.hbase.io.hadoopbackport;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.InterruptedIOException;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.fs.PositionedReadable;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 
 /**
  * The ThrottleInputStream provides bandwidth throttling on a specified
@@ -44,8 +48,6 @@ public class ThrottledInputStream extends InputStream {
   private long bytesRead = 0;
   private long totalSleepTime = 0;
 
-  private static final long SLEEP_DURATION_MS = 50;
-
   public ThrottledInputStream(InputStream rawStream) {
     this(rawStream, Long.MAX_VALUE);
   }
@@ -118,14 +120,35 @@ public class ThrottledInputStream extends InputStream {
     return readLen;
   }
 
-  private void throttle() throws IOException {
-    while (getBytesPerSec() > maxBytesPerSec) {
-      try {
-        Thread.sleep(SLEEP_DURATION_MS);
-        totalSleepTime += SLEEP_DURATION_MS;
-      } catch (InterruptedException e) {
-        throw new IOException("Thread aborted", e);
-      }
+  private long calSleepTimeMs() {
+    return calSleepTimeMs(bytesRead, maxBytesPerSec,
+      EnvironmentEdgeManager.currentTime() - startTime);
+  }
+
+  @VisibleForTesting
+  static long calSleepTimeMs(long bytesRead, long maxBytesPerSec, long 
elapsed) {
+    assert elapsed > 0 : "The elapsed time should be greater than zero";
+    if (bytesRead <= 0 || maxBytesPerSec <= 0) {
+      return 0;
+    }
+    // We use this class to load the single source file, so the bytesRead
+    // and maxBytesPerSec aren't greater than Double.MAX_VALUE.
+    // We can get the precise sleep time by using the double value.
+    long rval = (long) ((((double) bytesRead) / ((double) maxBytesPerSec)) * 
1000 - elapsed);
+    if (rval <= 0) {
+      return 0;
+    } else {
+      return rval;
+    }
+  }
+
+  private void throttle() throws InterruptedIOException {
+    long sleepTime = calSleepTimeMs();
+    totalSleepTime += sleepTime;
+    try {
+      TimeUnit.MILLISECONDS.sleep(sleepTime);
+    } catch (InterruptedException e) {
+      throw new InterruptedIOException("Thread aborted");
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/665fe3ee/hbase-common/src/test/java/org/apache/hadoop/hbase/io/hadoopbackport/TestThrottledInputStream.java
----------------------------------------------------------------------
diff --git 
a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/hadoopbackport/TestThrottledInputStream.java
 
b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/hadoopbackport/TestThrottledInputStream.java
new file mode 100644
index 0000000..15d292d
--- /dev/null
+++ 
b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/hadoopbackport/TestThrottledInputStream.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.hbase.io.hadoopbackport;
+
+import org.apache.hadoop.hbase.testclassification.MiscTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import static org.junit.Assert.assertEquals;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({MiscTests.class, SmallTests.class})
+public class TestThrottledInputStream {
+
+  @Test
+  public void testCalSleepTimeMs() {
+    // case 0: initial - no read, no sleep
+    assertEquals(0, ThrottledInputStream.calSleepTimeMs(0, 10000, 1000));
+
+    // case 1: no threshold
+    assertEquals(0, ThrottledInputStream.calSleepTimeMs(Long.MAX_VALUE, 0, 
1000));
+    assertEquals(0, ThrottledInputStream.calSleepTimeMs(Long.MAX_VALUE, -1, 
1000));
+
+    // case 2: too fast
+    assertEquals(1500, ThrottledInputStream.calSleepTimeMs(5, 2, 1000));
+    assertEquals(500, ThrottledInputStream.calSleepTimeMs(5, 2, 2000));
+    assertEquals(6500, ThrottledInputStream.calSleepTimeMs(15, 2, 1000));
+
+    // case 3: too slow
+    assertEquals(0, ThrottledInputStream.calSleepTimeMs(1, 2, 1000));
+    assertEquals(0, ThrottledInputStream.calSleepTimeMs(2, 2, 2000));
+    assertEquals(0, ThrottledInputStream.calSleepTimeMs(1, 2, 1000));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/665fe3ee/hbase-server/src/main/java/org/apache/hadoop/hbase/SplitLogTask.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/SplitLogTask.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/SplitLogTask.java
index 66493e1..03d5108 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/SplitLogTask.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/SplitLogTask.java
@@ -72,7 +72,7 @@ public class SplitLogTask {
   SplitLogTask(final ZooKeeperProtos.SplitLogTask slt) {
     this.originServer = ProtobufUtil.toServerName(slt.getServerName());
     this.state = slt.getState();
-    this.mode = (slt.hasMode()) ? slt.getMode() : 
+    this.mode = (slt.hasMode()) ? slt.getMode() :
       ZooKeeperProtos.SplitLogTask.RecoveryMode.UNKNOWN;
   }
 

Reply via email to