Repository: incubator-kylin
Updated Branches:
  refs/heads/2.x-staging 134960c62 -> 71e6bed33


KYLIN-1135 pscan use cached thread pool


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/8e8187bb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/8e8187bb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/8e8187bb

Branch: refs/heads/2.x-staging
Commit: 8e8187bbd06efcd8fa91aa30d74a2d245b683b2b
Parents: 134960c
Author: honma <ho...@ebay.com>
Authored: Mon Nov 16 11:11:40 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Mon Nov 16 11:11:40 2015 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/common/util/BasicTest.java | 78 +++++++++++++++-----
 .../hbase/cube/v2/CubeHBaseEndpointRPC.java     |  4 +-
 2 files changed, 61 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8e8187bb/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java
----------------------------------------------------------------------
diff --git 
a/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java 
b/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java
index 2beb2c6..d753a20 100644
--- a/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java
@@ -25,17 +25,22 @@ import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Calendar;
 import java.util.HashMap;
-import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.commons.configuration.ConfigurationException;
-import org.junit.Assert;
 import org.junit.Ignore;
 import org.junit.Test;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import com.google.common.collect.TreeMultiset;
 
 /**
  * <p/>
@@ -118,28 +123,63 @@ public class BasicTest {
     @Test
     public void test0() throws Exception {
 
-        TreeMultiset<Long> xx = TreeMultiset.create();
-        xx.add(2L);
-        xx.add(1L);
-        xx.add(1L);
-        for (Long hi : xx) {
-            System.out.println(hi);
+        ExecutorService executorService = Executors.newCachedThreadPool();
+        List<Future<?>> futures = Lists.newArrayList();
+
+        futures.add(executorService.submit(new Runnable() {
+            @Override
+            public void run() {
+                throw new RuntimeException("hi");
+            }
+        }));
+
+        futures.add(executorService.submit(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    Thread.sleep(2000);
+                    System.out.println("finish 1");
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+            }
+        }));
+
+        try {
+            for (Future<?> future : futures) {
+                future.get(1, TimeUnit.HOURS);
+            }
+        } catch (InterruptedException | ExecutionException | TimeoutException 
e) {
+            System.out.println(e.getMessage());
+        }
+
+        futures.clear();
+
+        futures.add(executorService.submit(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    Thread.sleep(3000);
+                    System.out.println("finish 2");
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+            }
+        }));
+
+        try {
+            for (Future<?> future : futures) {
+                future.get(1, TimeUnit.HOURS);
+            }
+        } catch (InterruptedException | ExecutionException | TimeoutException 
e) {
+            System.out.println(e.getMessage());
         }
-        System.out.println(Long.MAX_VALUE);
-
-        IdentityHashMap<String, Void> a = new IdentityHashMap<>();
-        IdentityHashMap<String, Void> b = new IdentityHashMap<>();
-        String s1 = new String("s1");
-        String s2 = new String("s1");
-        Assert.assertEquals(s1, s2);
-        Assert.assertTrue(s1 != s2);
-        a.put(s1, null);
-        b.put(s2, null);
     }
 
     @Test
     @Ignore("convenient trial tool for dev")
     public void test1() throws Exception {
+
         
System.out.println(org.apache.kylin.common.util.DateFormat.formatToTimeStr(1433833611000L));
         
System.out.println(org.apache.kylin.common.util.DateFormat.formatToTimeStr(1433250517000L));
         
System.out.println(org.apache.kylin.common.util.DateFormat.stringToMillis("2015-06-01
 00:00:00"));

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8e8187bb/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index b606d2e..5bc4a00 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -66,6 +66,8 @@ import com.google.protobuf.HBaseZeroCopyByteString;
 
 public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
 
+    private static ExecutorService executorService = 
Executors.newCachedThreadPool();
+
     static class EndpointResultsAsGTScanner implements IGTScanner {
         private GTInfo info;
         private Iterator<byte[]> blocks;
@@ -155,7 +157,6 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
         final ByteString scanRequestBytesString = 
HBaseZeroCopyByteString.wrap(scanRequestBytes);
         logger.info("Serialized scanRequestBytes's size is " + 
scanRequestBytes.length);
 
-        ExecutorService executorService = 
Executors.newFixedThreadPool(rawScans.size());
         final List<byte[]> rowBlocks = 
Collections.synchronizedList(Lists.<byte[]> newArrayList());
 
         logger.info("Total RawScan range count: " + rawScans.size());
@@ -217,7 +218,6 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
             });
             futures.add(future);
         }
-        executorService.shutdown();
         try {
             for (Future<?> future : futures) {
                 future.get(1, TimeUnit.HOURS);

Reply via email to