Repository: hbase
Updated Branches:
  refs/heads/branch-2 69fddb58b -> be034c26b


HBASE-18485 Performance issue: ClientAsyncPrefetchScanner is slower than 
ClientSimpleScanner


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/be034c26
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/be034c26
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/be034c26

Branch: refs/heads/branch-2
Commit: be034c26b4ba820651d3255798ef01dc958608d5
Parents: 69fddb5
Author: Guanghao Zhang <zg...@apache.org>
Authored: Fri Aug 4 23:10:18 2017 +0800
Committer: Guanghao Zhang <zg...@apache.org>
Committed: Mon Aug 7 10:43:03 2017 +0800

----------------------------------------------------------------------
 .../client/ClientAsyncPrefetchScanner.java      | 181 ++++++-------------
 .../hadoop/hbase/client/ClientScanner.java      |   6 +-
 .../client/TestScannersFromClientSide.java      |   1 +
 3 files changed, 64 insertions(+), 124 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/be034c26/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java
index 34c5620..e8da18f 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java
@@ -18,15 +18,20 @@
 package org.apache.hadoop.hbase.client;
 
 import 
org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
+
 import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize;
 
 import java.io.IOException;
+import java.io.InterruptedIOException;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Consumer;
 
 import org.apache.hadoop.conf.Configuration;
@@ -48,25 +53,18 @@ import org.apache.hadoop.hbase.util.Threads;
 @InterfaceAudience.Private
 public class ClientAsyncPrefetchScanner extends ClientSimpleScanner {
 
-  private static final int ESTIMATED_SINGLE_RESULT_SIZE = 1024;
-  private static final int DEFAULT_QUEUE_CAPACITY = 1024;
-
-  private int cacheCapacity;
+  private long maxCacheSize;
   private AtomicLong cacheSizeInBytes;
   // exception queue (from prefetch to main scan execution)
   private Queue<Exception> exceptionsQueue;
-  // prefetch runnable object to be executed asynchronously
-  private PrefetchRunnable prefetchRunnable;
-  // Boolean flag to ensure only a single prefetch is running (per scan)
-  // We use atomic boolean to allow multiple concurrent threads to
-  // consume records from the same cache, but still have a single prefetcher 
thread.
-  // For a single consumer thread this can be replace with a native boolean.
-  private AtomicBoolean prefetchRunning;
-  // an attribute for synchronizing close between scanner and prefetch threads
-  private AtomicLong closingThreadId;
+  // prefetch thread to be executed asynchronously
+  private Thread prefetcher;
   // used for testing
   private Consumer<Boolean> prefetchListener;
-  private static final int NO_THREAD = -1;
+
+  private final Lock lock = new ReentrantLock();
+  private final Condition notEmpty = lock.newCondition();
+  private final Condition notFull = lock.newCondition();
 
   public ClientAsyncPrefetchScanner(Configuration configuration, Scan scan, 
TableName name,
       ClusterConnection connection, RpcRetryingCallerFactory rpcCallerFactory,
@@ -84,82 +82,56 @@ public class ClientAsyncPrefetchScanner extends 
ClientSimpleScanner {
   @Override
   protected void initCache() {
     // concurrent cache
-    cacheCapacity = calcCacheCapacity();
+    maxCacheSize = resultSize2CacheSize(maxScannerResultSize);
     cache = new LinkedBlockingQueue<>();
     cacheSizeInBytes = new AtomicLong(0);
     exceptionsQueue = new ConcurrentLinkedQueue<>();
-    prefetchRunnable = new PrefetchRunnable();
-    prefetchRunning = new AtomicBoolean(false);
-    closingThreadId = new AtomicLong(NO_THREAD);
+    prefetcher = new Thread(new PrefetchRunnable());
+    Threads.setDaemonThreadRunning(prefetcher, tableName + ".asyncPrefetcher");
+  }
+
+  private long resultSize2CacheSize(long maxResultSize) {
+    // * 2 if possible
+    return maxResultSize > Long.MAX_VALUE / 2 ? maxResultSize : maxResultSize 
* 2;
   }
 
   @Override
   public Result next() throws IOException {
-
     try {
-      boolean hasExecutedPrefetch = false;
-      do {
+      lock.lock();
+      while (cache.isEmpty()) {
         handleException();
-
-        // If the scanner is closed and there's nothing left in the cache, 
next is a no-op.
-        if (getCacheCount() == 0 && this.closed) {
+        if (this.closed) {
           return null;
         }
-
-        if (prefetchCondition()) {
-          // run prefetch in the background only if no prefetch is already 
running
-          if (!isPrefetchRunning()) {
-            if (prefetchRunning.compareAndSet(false, true)) {
-              getPool().execute(prefetchRunnable);
-              hasExecutedPrefetch = true;
-            }
-          }
-        }
-
-        while (isPrefetchRunning()) {
-          // prefetch running or still pending
-          if (getCacheCount() > 0) {
-            return pollCache();
-          } else {
-            // (busy) wait for a record - sleep
-            Threads.sleep(1);
-          }
+        try {
+          notEmpty.await();
+        } catch (InterruptedException e) {
+          throw new InterruptedIOException("Interrupted when wait to load 
cache");
         }
+      }
 
-        if (getCacheCount() > 0) {
-          return pollCache();
-        }
-      } while (!hasExecutedPrefetch);
-
-      // if we exhausted this scanner before calling close, write out the scan 
metrics
-      writeScanMetrics();
-      return null;
+      Result result = pollCache();
+      if (prefetchCondition()) {
+        notFull.signalAll();
+      }
+      return result;
     } finally {
+      lock.unlock();
       handleException();
     }
   }
 
   @Override
   public void close() {
-    if (!scanMetricsPublished) writeScanMetrics();
-    closed = true;
-    if (!isPrefetchRunning()) {
-      if(closingThreadId.compareAndSet(NO_THREAD, 
Thread.currentThread().getId())) {
-        super.close();
-      }
-    } // else do nothing since the async prefetch still needs this resources
-  }
-
-  @Override
-  public int getCacheCount() {
-    if(cache != null) {
-      int size = cache.size();
-      if(size > cacheCapacity) {
-        cacheCapacity = size;
-      }
-      return size;
-    } else {
-      return 0;
+    try {
+      lock.lock();
+      super.close();
+      closed = true;
+      notFull.signalAll();
+      notEmpty.signalAll();
+    } finally {
+      lock.unlock();
     }
   }
 
@@ -182,44 +154,8 @@ public class ClientAsyncPrefetchScanner extends 
ClientSimpleScanner {
     }
   }
 
-  private boolean isPrefetchRunning() {
-    return prefetchRunning.get();
-  }
-
-  // double buffer - double cache size
-  private int calcCacheCapacity() {
-    int capacity = Integer.MAX_VALUE;
-    if(caching > 0 && caching < (Integer.MAX_VALUE /2)) {
-      capacity = caching * 2 + 1;
-    }
-    if(capacity == Integer.MAX_VALUE){
-      if(maxScannerResultSize != Integer.MAX_VALUE) {
-        capacity = (int) (maxScannerResultSize / ESTIMATED_SINGLE_RESULT_SIZE);
-      }
-      else {
-        capacity = DEFAULT_QUEUE_CAPACITY;
-      }
-    }
-    return Math.max(capacity, 1);
-  }
-
   private boolean prefetchCondition() {
-    return
-        (getCacheCount() < getCountThreshold()) &&
-        (maxScannerResultSize == Long.MAX_VALUE ||
-         getCacheSizeInBytes() < getSizeThreshold()) ;
-  }
-
-  private int getCountThreshold() {
-    return Math.max(cacheCapacity / 2, 1);
-  }
-
-  private long getSizeThreshold() {
-    return Math.max(maxScannerResultSize / 2, 1);
-  }
-
-  private long getCacheSizeInBytes() {
-    return cacheSizeInBytes.get();
+    return cacheSizeInBytes.get() < maxCacheSize / 2;
   }
 
   private Result pollCache() {
@@ -233,21 +169,22 @@ public class ClientAsyncPrefetchScanner extends 
ClientSimpleScanner {
 
     @Override
     public void run() {
-      boolean succeed = false;
-      try {
-        loadCache();
-        succeed = true;
-      } catch (Exception e) {
-        exceptionsQueue.add(e);
-      } finally {
-        if (prefetchListener != null) {
-          prefetchListener.accept(succeed);
-        }
-        prefetchRunning.set(false);
-        if(closed) {
-          if (closingThreadId.compareAndSet(NO_THREAD, 
Thread.currentThread().getId())) {
-            // close was waiting for the prefetch to end
-            close();
+      while (!closed) {
+        boolean succeed = false;
+        try {
+          lock.lock();
+          while (!prefetchCondition()) {
+            notFull.await();
+          }
+          loadCache();
+          succeed = true;
+        } catch (Exception e) {
+          exceptionsQueue.add(e);
+        } finally {
+          notEmpty.signalAll();
+          lock.unlock();
+          if (prefetchListener != null) {
+            prefetchListener.accept(succeed);
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/be034c26/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
index d3b19e4..2522434 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
@@ -72,7 +72,7 @@ public abstract class ClientScanner extends 
AbstractClientScanner {
   protected Result lastResult = null;
   protected final long maxScannerResultSize;
   private final ClusterConnection connection;
-  private final TableName tableName;
+  protected final TableName tableName;
   protected final int scannerTimeout;
   protected boolean scanMetricsPublished = false;
   protected RpcRetryingCaller<Result[]> caller;
@@ -412,6 +412,7 @@ public abstract class ClientScanner extends 
AbstractClientScanner {
     // This is possible if we just stopped at the boundary of a region in the 
previous call.
     if (callable == null) {
       if (!moveToNextRegion()) {
+        closed = true;
         return;
       }
     }
@@ -478,7 +479,7 @@ public abstract class ClientScanner extends 
AbstractClientScanner {
         assert newLimit >= 0;
         scan.setLimit(newLimit);
       }
-      if (scanExhausted(values)) {
+      if (scan.getLimit() == 0 || scanExhausted(values)) {
         closeScanner();
         closed = true;
         break;
@@ -532,6 +533,7 @@ public abstract class ClientScanner extends 
AbstractClientScanner {
       // we are done with the current region
       if (regionExhausted) {
         if (!moveToNextRegion()) {
+          closed = true;
           break;
         }
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/be034c26/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java
index ef00b24..43be573 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java
@@ -754,6 +754,7 @@ public class TestScannersFromClientSide {
       result = Result.create(kvListScan);
       verifyResult(result, kvListExp, toLog, "Testing async scan");
     }
+
     TEST_UTIL.deleteTable(table);
   }
 

Reply via email to