This is an automated email from the ASF dual-hosted git repository. zhangduo pushed a commit to branch HBASE-21512 in repository https://gitbox.apache.org/repos/asf/hbase.git
commit 00365d4ca2cf7dc01a9a99ab31668d42db773349 Author: zhangduo <zhang...@apache.org> AuthorDate: Sun Jun 2 21:54:29 2019 +0800 HBASE-22526 RejectedExecutionException could be thrown from TableOverAsyncTable.coprocessor service if the connection has been shutown --- .../hadoop/hbase/client/TableOverAsyncTable.java | 38 ++++++++++++++-------- 1 file changed, 25 insertions(+), 13 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableOverAsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableOverAsyncTable.java index 30e3062..5686b09 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableOverAsyncTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableOverAsyncTable.java @@ -38,11 +38,14 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.apache.commons.lang3.ArrayUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CompareOperator; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.TableName; @@ -422,20 +425,29 @@ class TableOverAsyncTable implements Table { // get regions covered by the row range List<byte[]> keys = getStartKeysInRange(startKey, endKey); Map<byte[], Future<R>> futures = new TreeMap<>(Bytes.BYTES_COMPARATOR); - for (byte[] r : keys) { - RegionCoprocessorRpcChannel channel = coprocessorService(r); - Future<R> future = pool.submit(new Callable<R>() { - @Override - public R call() throws Exception { - R result = call.call(channel); - byte[] region = channel.getLastRegion(); - if (callback != null) { - callback.update(region, r, result); + try { + for (byte[] r : keys) { + RegionCoprocessorRpcChannel channel = coprocessorService(r); + Future<R> future = pool.submit(new Callable<R>() { + @Override + public R call() throws Exception { + R result = call.call(channel); + byte[] region = channel.getLastRegion(); + if (callback != null) { + callback.update(region, r, result); + } + return result; } - return result; - } - }); - futures.put(r, future); + }); + futures.put(r, future); + } + } catch (RejectedExecutionException e) { + // maybe the connection has been closed, let's check + if (pool.isShutdown()) { + throw new DoNotRetryIOException("Connection is closed", e); + } else { + throw new HBaseIOException("Coprocessor operation is rejected", e); + } } for (Map.Entry<byte[], Future<R>> e : futures.entrySet()) { try {