Repository: incubator-kylin Updated Branches: refs/heads/2.x-staging 134960c62 -> 71e6bed33
KYLIN-1135 pscan use cached thread pool Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/8e8187bb Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/8e8187bb Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/8e8187bb Branch: refs/heads/2.x-staging Commit: 8e8187bbd06efcd8fa91aa30d74a2d245b683b2b Parents: 134960c Author: honma <ho...@ebay.com> Authored: Mon Nov 16 11:11:40 2015 +0800 Committer: honma <ho...@ebay.com> Committed: Mon Nov 16 11:11:40 2015 +0800 ---------------------------------------------------------------------- .../org/apache/kylin/common/util/BasicTest.java | 78 +++++++++++++++----- .../hbase/cube/v2/CubeHBaseEndpointRPC.java | 4 +- 2 files changed, 61 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8e8187bb/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java ---------------------------------------------------------------------- diff --git a/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java b/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java index 2beb2c6..d753a20 100644 --- a/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java +++ b/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java @@ -25,17 +25,22 @@ import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Calendar; import java.util.HashMap; -import java.util.IdentityHashMap; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.commons.configuration.ConfigurationException; -import org.junit.Assert; import org.junit.Ignore; import org.junit.Test; import org.slf4j.LoggerFactory; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.google.common.collect.TreeMultiset; /** * <p/> @@ -118,28 +123,63 @@ public class BasicTest { @Test public void test0() throws Exception { - TreeMultiset<Long> xx = TreeMultiset.create(); - xx.add(2L); - xx.add(1L); - xx.add(1L); - for (Long hi : xx) { - System.out.println(hi); + ExecutorService executorService = Executors.newCachedThreadPool(); + List<Future<?>> futures = Lists.newArrayList(); + + futures.add(executorService.submit(new Runnable() { + @Override + public void run() { + throw new RuntimeException("hi"); + } + })); + + futures.add(executorService.submit(new Runnable() { + @Override + public void run() { + try { + Thread.sleep(2000); + System.out.println("finish 1"); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + })); + + try { + for (Future<?> future : futures) { + future.get(1, TimeUnit.HOURS); + } + } catch (InterruptedException | ExecutionException | TimeoutException e) { + System.out.println(e.getMessage()); + } + + futures.clear(); + + futures.add(executorService.submit(new Runnable() { + @Override + public void run() { + try { + Thread.sleep(3000); + System.out.println("finish 2"); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + })); + + try { + for (Future<?> future : futures) { + future.get(1, TimeUnit.HOURS); + } + } catch (InterruptedException | ExecutionException | TimeoutException e) { + System.out.println(e.getMessage()); } - System.out.println(Long.MAX_VALUE); - - IdentityHashMap<String, Void> a = new IdentityHashMap<>(); - IdentityHashMap<String, Void> b = new IdentityHashMap<>(); - String s1 = new String("s1"); - String s2 = new String("s1"); - Assert.assertEquals(s1, s2); - Assert.assertTrue(s1 != s2); - a.put(s1, null); - b.put(s2, null); } @Test @Ignore("convenient trial tool for dev") public void test1() throws Exception { + System.out.println(org.apache.kylin.common.util.DateFormat.formatToTimeStr(1433833611000L)); System.out.println(org.apache.kylin.common.util.DateFormat.formatToTimeStr(1433250517000L)); System.out.println(org.apache.kylin.common.util.DateFormat.stringToMillis("2015-06-01 00:00:00")); http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8e8187bb/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java index b606d2e..5bc4a00 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java @@ -66,6 +66,8 @@ import com.google.protobuf.HBaseZeroCopyByteString; public class CubeHBaseEndpointRPC extends CubeHBaseRPC { + private static ExecutorService executorService = Executors.newCachedThreadPool(); + static class EndpointResultsAsGTScanner implements IGTScanner { private GTInfo info; private Iterator<byte[]> blocks; @@ -155,7 +157,6 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { final ByteString scanRequestBytesString = HBaseZeroCopyByteString.wrap(scanRequestBytes); logger.info("Serialized scanRequestBytes's size is " + scanRequestBytes.length); - ExecutorService executorService = Executors.newFixedThreadPool(rawScans.size()); final List<byte[]> rowBlocks = Collections.synchronizedList(Lists.<byte[]> newArrayList()); logger.info("Total RawScan range count: " + rawScans.size()); @@ -217,7 +218,6 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { }); futures.add(future); } - executorService.shutdown(); try { for (Future<?> future : futures) { future.get(1, TimeUnit.HOURS);