Repository: kylin Updated Branches: refs/heads/2.x-staging 2528111b4 -> 36e26528e
KYLIN-1391 quicker and better response to v2 storage engine's rpc timeout exception Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/36e26528 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/36e26528 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/36e26528 Branch: refs/heads/2.x-staging Commit: 36e26528e9f2c45d57c6d974d52b2cf1af8ac315 Parents: 2528111 Author: honma <[email protected]> Authored: Tue Feb 2 11:36:23 2016 +0800 Committer: honma <[email protected]> Committed: Wed Feb 3 14:34:12 2016 +0800 ---------------------------------------------------------------------- .../hbase/cube/v2/CubeHBaseEndpointRPC.java | 46 ++++++++++++++++++-- 1 file changed, 43 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/36e26528/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java index d6ef16c..f22964f 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java @@ -25,8 +25,11 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.zip.DataFormatException; @@ -34,6 +37,7 @@ import java.util.zip.DataFormatException; import javax.annotation.Nullable; import org.apache.commons.lang.NotImplementedException; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.coprocessor.Batch; @@ -48,6 +52,7 @@ import org.apache.kylin.common.util.ImmutableBitSet; import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.cuboid.Cuboid; +import org.apache.kylin.engine.mr.HadoopUtil; import org.apache.kylin.gridtable.GTInfo; import org.apache.kylin.gridtable.GTRecord; import org.apache.kylin.gridtable.GTScanRequest; @@ -70,17 +75,52 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { public static final Logger logger = LoggerFactory.getLogger(CubeHBaseEndpointRPC.class); - private static ExecutorService executorService = Executors.newCachedThreadPool(); + private static ExecutorService executorService = newLoggableCachedThreadPool(); + + public static ExecutorService newLoggableCachedThreadPool() { + return new LoggableCachedThreadPool(); + } + + public static class LoggableCachedThreadPool extends ThreadPoolExecutor { + + public LoggableCachedThreadPool() { + super(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); + } + + @Override + protected void afterExecute(Runnable r, Throwable t) { + super.afterExecute(r, t); + if (t == null && r instanceof Future<?>) { + try { + ((Future<?>) r).get(); + } catch (ExecutionException ee) { + logger.error("Execution exception when running task in " + Thread.currentThread().getName()); + t = ee.getCause(); + } catch (InterruptedException ie) { + logger.error("Thread interrupted: "); + Thread.currentThread().interrupt(); // ignore/reset + } catch (Throwable throwable) { + t = throwable; + } + } + if (t != null) { + logger.error("Caught exception in thread " + Thread.currentThread().getName() + ": ", t); + } + } + } static class ExpectedSizeIterator implements Iterator<byte[]> { int expectedSize; int current = 0; BlockingQueue<byte[]> queue; + long timeout; public ExpectedSizeIterator(int expectedSize) { this.expectedSize = expectedSize; this.queue = new ArrayBlockingQueue<byte[]>(expectedSize); + this.timeout = HadoopUtil.getCurrentConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); + logger.info("Timeout for ExpectedSizeIterator is " + this.timeout); } @Override @@ -95,7 +135,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { } try { current++; - return queue.poll(1, TimeUnit.HOURS); + return queue.poll(timeout, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { throw new RuntimeException("error when waiting queue", e); }
