Author: liyin Date: Thu Apr 17 00:48:37 2014 New Revision: 1588111 URL: http://svn.apache.org/r1588111 Log: [HBASE-10709] Fix the bug of rejecting exception in HTableClientScanner when too many scanner running at the same time.
Author: daviddeng Summary: `SynchronousQueue` is replaced with `LinkedBlockingQueue` Background thread is no longer blocked waiting for main thread to consume the data. Test Plan: `TestHTableClientScanner` Reviewers: liyintang, manukranthk, elliott, ehwang Reviewed By: ehwang CC: hbase-eng@, elliott, ehwang Differential Revision: https://phabricator.fb.com/D1271322 Task ID: 4073520 Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableClientScanner.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestHTableClientScanner.java Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableClientScanner.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableClientScanner.java?rev=1588111&r1=1588110&r2=1588111&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableClientScanner.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableClientScanner.java Thu Apr 17 00:48:37 2014 @@ -24,9 +24,9 @@ import java.util.Arrays; import java.util.Iterator; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -36,13 +36,14 @@ import org.apache.hadoop.hbase.HRegionIn import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.DaemonThreadFactory; /** * Implements the scanner interface for the HBase client. * If there are multiple regions in a table, this scanner will iterate * through them all. */ -public class HTableClientScanner implements ResultScanner, Runnable { +public class HTableClientScanner implements ResultScanner { private static final Log LOG = LogFactory.getLog(HTableClientScanner.class); // End of Scanning private static final Result[] EOS = new Result[0]; @@ -50,14 +51,10 @@ public class HTableClientScanner impleme private static final int MAX_THREADS_IN_POOL = Runtime.getRuntime() .availableProcessors(); - private static final ExecutorService executor = new ThreadPoolExecutor(1, - MAX_THREADS_IN_POOL, 60L, TimeUnit.SECONDS, - new SynchronousQueue<Runnable>()); - - // HEADSUP: The scan internal start row can change as we move through table. - protected final Scan scan; - // The number of prefetched and cached results - private final int caching; + private static final ExecutorService executor = Executors.newFixedThreadPool( + MAX_THREADS_IN_POOL, new DaemonThreadFactory( + "HTableClientScanner.Fetching.")); + // Temporary results list in main thread, may be null private Result[] currentResults; // The position of next unfetched results in currentResults if it is @@ -65,37 +62,44 @@ public class HTableClientScanner impleme private int currentPos; // Whether this client has closed. private boolean closed; - /** - * The queue transferring fetched Result[] to main thread. - * When queue.take() returns an EOS, scanning ends. - */ + // The queue transferring fetched Result[] to main thread. + // When queue.take() returns an EOS, scanning ends. private final ArrayBlockingQueue<Result[]> queue; + // A place storing Result[] in case the queue is full. It is set only at + // fetcher thread, will be cleared in main thread. + private final AtomicReference<Result[]> justFetched = new AtomicReference<>(); + // Contains exception thrown in fetcher thread. + private final AtomicReference<Throwable> exception = new AtomicReference<>(); // The variable informing fetching thread to stop - private volatile boolean closing; - // Contains the exception caught in fetch thread. - private volatile Throwable exception; + private final AtomicBoolean closing = new AtomicBoolean(false); - private final HTable table; + private final Fetcher fetcher; /** * Constructor. + * + * @param scan The scan internal start row can change as we move through + * table. */ - public HTableClientScanner(Scan scan, HTable table) { - this.scan = scan; - this.table = table; + public HTableClientScanner(Scan scan, HTable table) throws IOException { this.queue = new ArrayBlockingQueue<>(table.getConfiguration().getInt( HConstants.HBASE_CLIENT_SCANNER_QUEUE_LENGTH, HConstants.DEFAULT_HBASE_CLIENT_SCANNER_QUEUE_LENGTH)); + int caching; if (scan.getCaching() > 0) { - this.caching = scan.getCaching(); + caching = scan.getCaching(); } else { - this.caching = table.getScannerCaching(); + caching = table.getScannerCaching(); } + + fetcher = new Fetcher(table, scan, caching, queue, justFetched, exception, + closing); } HTableClientScanner initialize() { - executor.execute(this); + executor.execute(fetcher); + return this; } @@ -117,7 +121,7 @@ public class HTableClientScanner impleme } /** - * Fetches results from queue to currentResults if it is not null. + * Fetches results from queue to currentResults if it is null. * * @return true if more results available, false if end of scanning */ @@ -132,27 +136,39 @@ public class HTableClientScanner impleme try { currentResults = queue.take(); + if (currentResults.length == 0) { // End of scanning closed = true; currentResults = null; - if (exception != null) { - + Throwable e = this.exception.get(); + if (e != null) { // Failure of scanning - throwIOException(exception); + throwIOException(e); } return false; } - // Results fetched - currentPos = 0; - return true; + Result[] jf = justFetched.getAndSet(null); + if (jf != null) { + // Something is put justFetched because the queue is full when those + // results are fetched. The fetching task should not be running now. + queue.add(jf); + if (jf.length > 0) { + // We may have more results + executor.execute(fetcher); + } + } } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException(e); } + + // Results fetched + currentPos = 0; + return true; } @Override @@ -212,7 +228,7 @@ public class HTableClientScanner impleme if (this.closed) { return; } - this.closing = true; + this.closing.set(true); try { while (fetchFromQueue()) { // skip all results @@ -224,137 +240,219 @@ public class HTableClientScanner impleme } } - private Result[] call(ScannerCallable callable) throws IOException { - return table.getConnectionAndResetOperationContext() - .getRegionServerWithRetries(callable); + @Override + public boolean isClosed() { + return closed; } - // Returns a ScannerCallable with a start key - private ScannerCallable getScannerCallable(byte[] startKey) { - scan.setStartRow(startKey); - ScannerCallable s = new ScannerCallable( - table.getConnectionAndResetOperationContext(), - table.getTableNameStringBytes(), - scan, table.getOptions()); - s.setCaching(caching); - return s; - } + private static class Fetcher implements Runnable { + // The startKey for opening a scanner. + private byte[] startKey; + // The callable for scanning + private ScannerCallable callable; + // Current scanning region info. + private HRegionInfo currentRegion; + // Timestamp of last successful scan + private long lastSuccNextTs; + // The last result returned. + private Result lastRes = null; + + private final HTable table; + private final Scan scan; + private final int caching; + + private final ArrayBlockingQueue<Result[]> queue; + private final AtomicReference<Result[]> justFetched; + private final AtomicReference<Throwable> exception; + private final AtomicBoolean closing; + + public Fetcher(HTable table, Scan scan, int caching, + ArrayBlockingQueue<Result[]> queue, + AtomicReference<Result[]> justFetched, + AtomicReference<Throwable> exception, AtomicBoolean closing) { + this.table = table; + this.scan = scan; + this.caching = caching; + + this.queue = queue; + this.justFetched = justFetched; + this.exception = exception; + this.closing = closing; + + // Initialize startKey + startKey = scan.getStartRow(); + if (startKey == null) { + // In case startKey == null, set it to zero-length byte array since + // null means end-of-scan. + startKey = HConstants.EMPTY_BYTE_ARRAY; + } + } - // Closes a callable silently. - private void closeScanner(ScannerCallable callable) { - callable.setClose(); - try { - call(callable); - } catch (IOException e) { - // We used to catch this error, interpret, and rethrow. However, we - // have since decided that it's not nice for a scanner's close to - // throw exceptions. Chances are it was just an UnknownScanner - // exception due to lease time out. - LOG.error("Exception caught during closeScanner", e); + private Result[] call(ScannerCallable callable) throws IOException { + return table.getConnectionAndResetOperationContext() + .getRegionServerWithRetries(callable); + } + + // Returns a ScannerCallable with a start key + private ScannerCallable getScannerCallable(byte[] startKey) { + scan.setStartRow(startKey); + ScannerCallable s = + new ScannerCallable(table.getConnectionAndResetOperationContext(), + table.getTableNameStringBytes(), scan, table.getOptions()); + s.setCaching(caching); + return s; + } + + // Closes a callable silently. + private void closeScanner(ScannerCallable callable) { + callable.setClose(); + try { + call(callable); + } catch (IOException e) { + // We used to catch this error, interpret, and rethrow. However, we + // have since decided that it's not nice for a scanner's close to + // throw exceptions. Chances are it was just an UnknownScanner + // exception due to lease time out. + LOG.error("Exception caught during closeScanner", e); + } } - } - /** - * Scans a region server, results are put to queue. - * - * @return New start key if scanning does not end, null otherwise - * @throws IOException - * @throws InterruptedException - */ - private byte[] scanRegionServer(byte[] startKey) throws IOException, - InterruptedException { - // Open a scanner - ScannerCallable callable = getScannerCallable(startKey); - // openScanner - call(callable); - HRegionInfo currentRegion = callable.getHRegionInfo(); + /** + * Keep scanning on a region server. If we can get some results, they are + * returned, otherwise a null is returned. + * + * startKey is changed if necessary. At the end of scanning, it is set to + * null. + * + * @return an non-empty array of Result if we get some data. null otherwise. + */ + private Result[] scanRegionServer() throws IOException, + InterruptedException { + if (callable == null) { + // Open a scanner + callable = getScannerCallable(startKey); + // openScanner + call(callable); + currentRegion = callable.getHRegionInfo(); - Result lastRes = null; - long lastSuccNextTs = System.currentTimeMillis(); - try { - while (!closing) { + lastRes = null; + lastSuccNextTs = System.currentTimeMillis(); + } + + boolean keepCallable = false; + + try { Result[] values = call(callable); if (values == null) { // End of scanning - return null; + startKey = null; } else if (values.length == 0) { // End of region - return currentRegion.getEndKey(); - } + startKey = currentRegion.getEndKey(); + // Mark startKey as null for last region. + if (startKey != null && startKey.length == 0) { + startKey = null; + } + } else { + // We got some results + lastRes = values[values.length - 1]; + lastSuccNextTs = System.currentTimeMillis(); + + // In this case, we keep callable + keepCallable = true; - lastRes = values[values.length - 1]; - if (!closing) { - queue.put(values); + return values; } - lastSuccNextTs = System.currentTimeMillis(); - } - } catch (DoNotRetryIOException e) { - boolean canRetry = false; - if (e instanceof UnknownScannerException) { - long timeoutTs = lastSuccNextTs + table.scannerTimeout; - long now = System.currentTimeMillis(); - if (now > timeoutTs) { - // Scanner timeout - long elapsed = now - lastSuccNextTs; - ScannerTimeoutException ex = new ScannerTimeoutException(elapsed - + "ms pased since the last invocation, " - + "timetout is current set to " + table.scannerTimeout); - ex.initCause(e); - throw ex; + } catch (DoNotRetryIOException e) { + boolean canRetry = false; + if (e instanceof UnknownScannerException) { + // The region server may restarted. + long timeoutTs = lastSuccNextTs + table.scannerTimeout; + long now = System.currentTimeMillis(); + if (now > timeoutTs) { + // Scanner timeout + long elapsed = now - lastSuccNextTs; + ScannerTimeoutException ex = + new ScannerTimeoutException(elapsed + + "ms pased since the last invocation, " + + "timetout is current set to " + table.scannerTimeout); + ex.initCause(e); + throw ex; + } + + canRetry = true; // scannerTimeout + } else { + Throwable cause = e.getCause(); + if (cause != null && cause instanceof NotServingRegionException) { + canRetry = true; + } } - canRetry = true; // scannerTimeout - } else { - Throwable cause = e.getCause(); - if (cause != null && cause instanceof NotServingRegionException) { - canRetry = true; + if (!canRetry) { + // Cannot retry, simply throw it out + throw e; } - } - - if (!canRetry) { - // Cannot retry, simply throw it out - throw e; - } - if (lastRes != null) { - return Bytes.nextOf(lastRes.getRow()); + if (lastRes != null) { + // Skip lastRes since it has been returned. + startKey = Bytes.nextOf(lastRes.getRow()); + } + } finally { + if (!keepCallable) { + closeScanner(callable); + callable = null; + } } - return startKey; - } finally { - closeScanner(callable); + return null; } - // Only reach here when closing is true - return null; - } - @Override - public void run() { - try { - byte[] startKey = this.scan.getStartRow(); - while (!closing) { - startKey = scanRegionServer(startKey); - if (startKey == null || startKey.length == 0) { - break; + /** + * Puts results in queue or justFetched. + * + * @return whether we should continue fetching in this run. + */ + private boolean putResults(Result[] results) { + if (!queue.offer(results)) { + // queue is full, put results in justFetched + justFetched.set(results); + + if (queue.isEmpty()) { + // It's possible the queue is empty before justFetched is set + // and the main thread is blocking on queue.Take(). + // We try move results in justFetched to queue here. + Result[] js = justFetched.getAndSet(null); + if (js != null) { + queue.add(js); + return true; + } + // If js == null, it means the main thread moved justFetched to + // queue and arranged a new run. } + // Then quit from this run. New run is submitted when some results + // are taken out of the queue + return false; } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - exception = e; - } catch (Throwable e) { - exception = e; + return true; } - try { - queue.put(EOS); - } catch (InterruptedException e) { - LOG.info("Fetching thread interrupted", e); - Thread.currentThread().interrupt(); + @Override + public void run() { + try { + while (!closing.get() && startKey != null) { + Result[] results = scanRegionServer(); + + if (results != null) { + if (!putResults(results)) { + return; + } + } + } + } catch (Throwable e) { + exception.set(e); + } + // We only get here scanning is over or aborted with exception + putResults(EOS); } } - - @Override - public boolean isClosed() { - return closed; - } } Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java?rev=1588111&r1=1588110&r2=1588111&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java Thu Apr 17 00:48:37 2014 @@ -113,7 +113,8 @@ public interface HTableInterface { * Returns a scanner on the current table as specified by the {@link Scan} * object. * - * @param scan A configured {@link Scan} object. + * @param scan A configured {@link Scan} object. NOTE scan may be kept and + * changed inside. The caller should not reuse it. * @return A scanner. * @throws IOException if a remote or network exception occurs. * @since 0.20.0 Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java?rev=1588111&r1=1588110&r2=1588111&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (original) +++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java Thu Apr 17 00:48:37 2014 @@ -1951,12 +1951,13 @@ REGION_LOOP: public static int countRows(final HTable t, final Scan s) throws IOException { // Assert all rows in table. - ResultScanner scanner = t.getScanner(s); - int count = 0; - for (Result result: scanner) { - count++; - assertTrue(result.size() > 0); + try (ResultScanner scanner = t.getScanner(s)) { + int count = 0; + for (Result result : scanner) { + count++; + assertTrue(result.size() > 0); + } + return count; } - return count; } } Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestHTableClientScanner.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestHTableClientScanner.java?rev=1588111&r1=1588110&r2=1588111&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestHTableClientScanner.java (original) +++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestHTableClientScanner.java Thu Apr 17 00:48:37 2014 @@ -22,11 +22,17 @@ package org.apache.hadoop.hbase.client; import static org.junit.Assert.assertEquals; import java.io.IOException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import junit.framework.Assert; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.StringBytes; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -39,7 +45,6 @@ public class TestHTableClientScanner { final Log LOG = LogFactory.getLog(getClass()); private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - private static final byte[] TABLE_NAME = Bytes.toBytes("TABLE"); private static final byte[] FAMILY = Bytes.toBytes("FAMILY"); private static final int SLAVES = 3; @@ -58,6 +63,7 @@ public class TestHTableClientScanner { @Test public void testScanner() throws IOException { + final StringBytes TABLE_NAME = new StringBytes("testScanner"); HTable table = TEST_UTIL.createTable(TABLE_NAME, new byte[][] { FAMILY }, 3, Bytes.toBytes("bbb"), Bytes.toBytes("yyy"), 25); @@ -66,4 +72,48 @@ public class TestHTableClientScanner { int counted = HBaseTestingUtility.countRows(table, new Scan()); assertEquals("rowCount", rowCount, counted); } + + /** + * Testing parallel scanning with more threads than background threads. + */ + @Test + public void testMoreThreads() throws Exception { + final int ROW_COUNT = 10000; + final int THREAD_COUNT = Runtime.getRuntime().availableProcessors() + 1; + final StringBytes TABLE_NAME = new StringBytes("testMoreThreads"); + + HTable table = TEST_UTIL.createTable(TABLE_NAME, FAMILY); + table.setAutoFlush(false); + for (int i = 0; i < ROW_COUNT; i++) { + byte[] row = Bytes.toBytes("row-" + i); + Put put = new Put(row).add(FAMILY, row, row); + table.put(put); + } + table.flushCommits(); + + ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT); + Future<?>[] futures = new Future<?>[THREAD_COUNT]; + for (int i = 0; i < THREAD_COUNT; i++) { + futures[i] = executor.submit(new Runnable() { + @Override + public void run() { + try { + HTable table = new HTableAsync(TEST_UTIL.getConfiguration(), + TABLE_NAME); + try (ResultScanner scanner = table.getScanner(new Scan())) { + for (Result result : scanner) { + Assert.assertTrue("result.size should > 0", result.size() > 0); + } + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }); + } + + for (Future<?> future : futures) { + future.get(); + } + } }
