shaofengshi closed pull request #387: APACHE-KYLIN-2932: Simplify the thread 
model for in-memory cubing
URL: https://github.com/apache/kylin/pull/387
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 193329b29d..018552caf1 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -808,6 +808,10 @@ public boolean isJobAutoReadyCubeEnabled() {
         return 
Boolean.parseBoolean(getOptional("kylin.job.cube-auto-ready-enabled", TRUE));
     }
 
+    public String getCubeInMemBuilderClass() {
+        return getOptional("kylin.job.cube-inmem-builder-class", 
"org.apache.kylin.cube.inmemcubing.DoggedCubeBuilder");
+    }
+
     // 
============================================================================
     // SOURCE.HIVE
     // 
============================================================================
diff --git 
a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/CompoundCuboidWriter.java
 
b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/CompoundCuboidWriter.java
index c82f418cb9..df77978c8e 100644
--- 
a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/CompoundCuboidWriter.java
+++ 
b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/CompoundCuboidWriter.java
@@ -21,6 +21,7 @@
 import java.io.IOException;
 
 import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.GridTable;
 
 /**
  */
@@ -39,6 +40,13 @@ public void write(long cuboidId, GTRecord record) throws 
IOException {
             writer.write(cuboidId, record);
         }
     }
+    
+    @Override
+    public void write(long cuboidId, GridTable table) throws IOException {
+        for (ICuboidWriter writer : cuboidWriters) {
+            writer.write(cuboidId, table);
+        }
+    }
 
     @Override
     public void flush() throws IOException {
diff --git 
a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
 
b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
index 8368051630..39dce26793 100644
--- 
a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
+++ 
b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
@@ -24,9 +24,9 @@
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.NavigableMap;
 import java.util.PriorityQueue;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentNavigableMap;
 
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.Dictionary;
@@ -181,7 +181,7 @@ private void abort(List<SplitThread> splits) throws 
IOException {
         final RecordConsumeBlockingQueueController<?> inputController;
         final InMemCubeBuilder builder;
 
-        ConcurrentNavigableMap<Long, CuboidResult> buildResult;
+        NavigableMap<Long, CuboidResult> buildResult;
         RuntimeException exception;
 
         public SplitThread(final int num, final 
RecordConsumeBlockingQueueController<?> inputController) {
diff --git 
a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidGTTableWriter.java
 
b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidGTTableWriter.java
new file mode 100755
index 0000000000..93a7994471
--- /dev/null
+++ 
b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidGTTableWriter.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+
+package org.apache.kylin.cube.inmemcubing;
+
+import java.io.IOException;
+
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.GTScanRequestBuilder;
+import org.apache.kylin.gridtable.GridTable;
+import org.apache.kylin.gridtable.IGTScanner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class ICuboidGTTableWriter implements ICuboidWriter{
+    
+    private static Logger logger = 
LoggerFactory.getLogger(ICuboidGTTableWriter.class);
+    
+    @Override
+    public void write(long cuboidId, GridTable gridTable) throws IOException {
+        long startTime = System.currentTimeMillis();
+        GTScanRequest req = new 
GTScanRequestBuilder().setInfo(gridTable.getInfo()).setRanges(null).setDimensions(null).setFilterPushDown(null).createGTScanRequest();
+        IGTScanner scanner = gridTable.scan(req);
+        for (GTRecord record : scanner) {
+            write(cuboidId, record);
+        }
+        scanner.close();
+        logger.info("Cuboid " + cuboidId + " output takes " + 
(System.currentTimeMillis() - startTime) + "ms");
+    }
+}
diff --git 
a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidWriter.java 
b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidWriter.java
index 3f6cb0c6de..4ae182edb9 100644
--- 
a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidWriter.java
+++ 
b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidWriter.java
@@ -21,6 +21,7 @@
 import java.io.IOException;
 
 import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.GridTable;
 
 /**
  */
@@ -28,6 +29,8 @@
 
     void write(long cuboidId, GTRecord record) throws IOException;
 
+    void write(long cuboidId, GridTable table) throws IOException;
+
     void flush() throws IOException;
 
     void close() throws IOException;
diff --git 
a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
 
b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
index e0bdb20905..9661fa8580 100644
--- 
a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
+++ 
b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
@@ -19,14 +19,13 @@
 package org.apache.kylin.cube.inmemcubing;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.NavigableMap;
 import java.util.TreeSet;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -47,6 +46,7 @@
 import org.apache.kylin.gridtable.GTScanRequestBuilder;
 import org.apache.kylin.gridtable.GridTable;
 import org.apache.kylin.gridtable.IGTScanner;
+import org.apache.kylin.gridtable.IGTStore;
 import org.apache.kylin.measure.topn.Counter;
 import org.apache.kylin.measure.topn.TopNCounter;
 import org.apache.kylin.metadata.datatype.DoubleMutable;
@@ -111,7 +111,10 @@ private GridTable newGridTableByCuboidID(long cuboidID) 
throws IOException {
                 new CubeDimEncMap(cubeDesc, dictionaryMap)
         );
 
-        ConcurrentDiskStore store = new ConcurrentDiskStore(info);
+        // Below several store implementation are very similar in performance. 
The ConcurrentDiskStore is the simplest.
+        // MemDiskStore store = new MemDiskStore(info, memBudget == null ? 
MemoryBudgetController.ZERO_BUDGET : memBudget);
+        // MemDiskStore store = new MemDiskStore(info, 
MemoryBudgetController.ZERO_BUDGET);
+        IGTStore store = new ConcurrentDiskStore(info);
 
         GridTable gridTable = new GridTable(info, store);
         return gridTable;
@@ -120,7 +123,7 @@ private GridTable newGridTableByCuboidID(long cuboidID) 
throws IOException {
     @Override
     public <T> void build(BlockingQueue<T> input, InputConverterUnit<T> 
inputConverterUnit, ICuboidWriter output)
             throws IOException {
-        ConcurrentNavigableMap<Long, CuboidResult> result = build(
+        NavigableMap<Long, CuboidResult> result = build(
                 
RecordConsumeBlockingQueueController.getQueueController(inputConverterUnit, 
input));
         try {
             for (CuboidResult cuboidResult : result.values()) {
@@ -132,9 +135,9 @@ private GridTable newGridTableByCuboidID(long cuboidID) 
throws IOException {
         }
     }
 
-    public <T> ConcurrentNavigableMap<Long, CuboidResult> 
build(RecordConsumeBlockingQueueController<T> input)
+    public <T> NavigableMap<Long, CuboidResult> 
build(RecordConsumeBlockingQueueController<T> input)
             throws IOException {
-        final ConcurrentNavigableMap<Long, CuboidResult> result = new 
ConcurrentSkipListMap<Long, CuboidResult>();
+        final NavigableMap<Long, CuboidResult> result = new 
ConcurrentSkipListMap<Long, CuboidResult>();
         build(input, new ICuboidCollector() {
             @Override
             public void collect(CuboidResult cuboidResult) {
@@ -213,7 +216,8 @@ private void join(Thread... threads) throws IOException {
     }
 
     private void throwExceptionIfAny() throws IOException {
-        ArrayList<Throwable> errors = new ArrayList<>();
+        List<Throwable> errors = Lists.newArrayList();
+
         for (int i = 0; i < taskThreadCount; i++) {
             Throwable t = taskThreadExceptions[i];
             if (t != null)
diff --git 
a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing2/CuboidTask.java 
b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing2/CuboidTask.java
new file mode 100755
index 0000000000..cf54eb6aa9
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing2/CuboidTask.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+
+package org.apache.kylin.cube.inmemcubing2;
+
+import java.io.IOException;
+import java.util.concurrent.RecursiveTask;
+
+import org.apache.kylin.cube.inmemcubing.CuboidResult;
+
+@SuppressWarnings("serial")
+class CuboidTask extends RecursiveTask<CuboidResult> implements 
Comparable<CuboidTask> {
+    final CuboidResult parent;
+    final long childCuboidId;
+    final InMemCubeBuilder2 cubeBuilder;
+    
+    CuboidTask(CuboidResult parent, long childCuboidId, InMemCubeBuilder2 
cubeBuilder) {
+        this.parent = parent;
+        this.childCuboidId = childCuboidId;
+        this.cubeBuilder = cubeBuilder;
+    }
+
+    @Override
+    public int compareTo(CuboidTask o) {
+        long comp = this.childCuboidId - o.childCuboidId;
+        return comp < 0 ? -1 : (comp > 0 ? 1 : 0);
+    }
+
+    @Override
+    protected CuboidResult compute() {
+        try {
+            return cubeBuilder.buildCuboid(this);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+}
diff --git 
a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing2/DefaultCuboidCollectorWithCallBack.java
 
b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing2/DefaultCuboidCollectorWithCallBack.java
new file mode 100755
index 0000000000..d7f738d2f6
--- /dev/null
+++ 
b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing2/DefaultCuboidCollectorWithCallBack.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube.inmemcubing2;
+
+import java.util.NavigableMap;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+import org.apache.kylin.cube.inmemcubing.CuboidResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DefaultCuboidCollectorWithCallBack implements 
ICuboidCollectorWithCallBack{
+    
+    private static Logger logger = 
LoggerFactory.getLogger(DefaultCuboidCollectorWithCallBack.class);
+    
+    final ConcurrentNavigableMap<Long, CuboidResult> result = new 
ConcurrentSkipListMap<Long, CuboidResult>();
+    final ICuboidResultListener listener;
+    
+    public DefaultCuboidCollectorWithCallBack(ICuboidResultListener listener){
+        this.listener = listener;
+    }
+
+    @Override
+    public void collectAndNotify(CuboidResult cuboidResult) {
+        logger.info("collecting CuboidResult cuboid id:" + 
cuboidResult.cuboidId);
+        result.put(cuboidResult.cuboidId, cuboidResult);
+        if (listener != null) {
+            listener.finish(cuboidResult);
+        }
+    }
+
+    @Override
+    public NavigableMap<Long, CuboidResult> getAllResult() {
+        return result;
+    }
+}
diff --git 
a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing2/DoggedCubeBuilder2.java
 
b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing2/DoggedCubeBuilder2.java
new file mode 100755
index 0000000000..4c5da87a46
--- /dev/null
+++ 
b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing2/DoggedCubeBuilder2.java
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube.inmemcubing2;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory;
+import java.util.concurrent.ForkJoinWorkerThread;
+import java.util.concurrent.RecursiveTask;
+
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.cube.cuboid.CuboidScheduler;
+import org.apache.kylin.cube.inmemcubing.AbstractInMemCubeBuilder;
+import org.apache.kylin.cube.inmemcubing.CuboidResult;
+import org.apache.kylin.cube.inmemcubing.ICuboidWriter;
+import org.apache.kylin.cube.inmemcubing.InputConverterUnit;
+import org.apache.kylin.cube.inmemcubing.RecordConsumeBlockingQueueController;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.GTScanRequestBuilder;
+import org.apache.kylin.gridtable.GridTable;
+import org.apache.kylin.gridtable.IGTScanner;
+import org.apache.kylin.measure.MeasureAggregators;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Queues;
+
+public class DoggedCubeBuilder2 extends AbstractInMemCubeBuilder {
+    private static Logger logger = 
LoggerFactory.getLogger(DoggedCubeBuilder2.class);
+
+    public DoggedCubeBuilder2(CuboidScheduler cuboidScheduler, 
IJoinedFlatTableDesc flatDesc,
+            Map<TblColRef, Dictionary<String>> dictionaryMap) {
+        super(cuboidScheduler, flatDesc, dictionaryMap);
+    }
+
+    @Override
+    public <T> void build(BlockingQueue<T> input, InputConverterUnit<T> 
inputConverterUnit, ICuboidWriter output)
+            throws IOException {
+        new BuildOnce().build(input, inputConverterUnit, output);
+    }
+
+    private class BuildOnce {
+        public <T> void build(BlockingQueue<T> input, InputConverterUnit<T> 
inputConverterUnit, ICuboidWriter output)
+                throws IOException {
+            final RecordConsumeBlockingQueueController<T> inputController = 
RecordConsumeBlockingQueueController
+                    .getQueueController(inputConverterUnit, input);
+
+            final List<InMemCubeBuilder2> builderList = new 
CopyOnWriteArrayList<>();
+
+            ForkJoinWorkerThreadFactory factory = new 
ForkJoinWorkerThreadFactory() {
+                @Override
+                public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
+                    final ForkJoinWorkerThread worker = 
ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
+                    worker.setName("dogged-cubing-cuboid-worker-" + 
worker.getPoolIndex());
+                    return worker;
+                }
+            };
+
+            ForkJoinPool builderPool = new ForkJoinPool(taskThreadCount, 
factory, null, true);
+            CuboidResultWatcher resultWatcher = new 
CuboidResultWatcher(builderList, output);
+
+            Stopwatch sw = new Stopwatch();
+            sw.start();
+            logger.info("Dogged Cube Build2 start");
+            try {
+                BaseCuboidTask<T> task = new BaseCuboidTask<>(inputController, 
1, resultWatcher);
+                builderPool.execute(task);
+                do {
+                    builderList.add(task.getInternalBuilder());
+                    //Exception will be thrown here if cube building failure
+                    task.join();
+                    task = task.nextTask();
+                } while (task != null);
+
+                logger.info("Has finished feeding data, and base cuboid built, 
start to build child cuboids");
+                for (final InMemCubeBuilder2 builder : builderList) {
+                    builderPool.submit(new Runnable() {
+                        @Override
+                        public void run() {
+                            builder.startBuildFromBaseCuboid();
+                        }
+                    });
+                }
+                resultWatcher.start();
+                logger.info("Dogged Cube Build2 splits complete, took " + 
sw.elapsedMillis() + " ms");
+            } catch (Throwable e) {
+                logger.error("Dogged Cube Build2 error", e);
+                if (e instanceof Error)
+                    throw (Error) e;
+                else if (e instanceof RuntimeException)
+                    throw (RuntimeException) e;
+                else
+                    throw new IOException(e);
+            } finally {
+                output.close();
+                closeGirdTables(builderList);
+                sw.stop();
+                builderPool.shutdownNow();
+                logger.info("Dogged Cube Build2 end, totally took " + 
sw.elapsedMillis() + " ms");
+                logger.info("Dogged Cube Build2 return");
+            }
+        }
+
+        private void closeGirdTables(List<InMemCubeBuilder2> builderList) {
+            for (InMemCubeBuilder2 inMemCubeBuilder : builderList) {
+                for (CuboidResult cuboidResult : 
inMemCubeBuilder.getResultCollector().getAllResult().values()) {
+                    closeGirdTable(cuboidResult.table);
+                }
+            }
+        }
+
+        private void closeGirdTable(GridTable gridTable) {
+            try {
+                gridTable.close();
+            } catch (Throwable e) {
+                logger.error("Error closing grid table " + gridTable, e);
+            }
+        }
+    }
+
+    private class BaseCuboidTask<T> extends RecursiveTask<CuboidResult> {
+        private static final long serialVersionUID = -5408592502260876799L;
+
+        private final int splitSeq;
+        private final ICuboidResultListener resultListener;
+
+        private RecordConsumeBlockingQueueController<T> inputController;
+        private InMemCubeBuilder2 builder;
+
+        private volatile BaseCuboidTask<T> next;
+
+        public BaseCuboidTask(final RecordConsumeBlockingQueueController<T> 
inputController, int splitSeq,
+                ICuboidResultListener resultListener) {
+            this.inputController = inputController;
+            this.splitSeq = splitSeq;
+            this.resultListener = resultListener;
+            this.builder = new InMemCubeBuilder2(cuboidScheduler, flatDesc, 
dictionaryMap);
+            builder.setReserveMemoryMB(reserveMemoryMB);
+            builder.setConcurrentThreads(taskThreadCount);
+            logger.info("Split #" + splitSeq + " kickoff");
+        }
+
+        @Override
+        protected CuboidResult compute() {
+            try {
+                CuboidResult baseCuboidResult = 
builder.buildBaseCuboid(inputController, resultListener);
+                if (!inputController.ifEnd()) {
+                    next = new BaseCuboidTask<>(inputController, splitSeq + 1, 
resultListener);
+                    next.fork();
+                }
+                logger.info("Split #" + splitSeq + " finished");
+                return baseCuboidResult;
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        public InMemCubeBuilder2 getInternalBuilder() {
+            return builder;
+        }
+
+        public BaseCuboidTask<T> nextTask() {
+            return next;
+        }
+    }
+
+    /**
+     * Class response for watch the cube building result, monitor the cube 
building process and trigger merge actions if required.
+     *
+     */
+    private class CuboidResultWatcher implements ICuboidResultListener {
+        final BlockingQueue<CuboidResult> outputQueue;
+        final Map<Long, List<CuboidResult>> pendingQueue = Maps.newHashMap();
+        final List<InMemCubeBuilder2> builderList;
+        final ICuboidWriter output;
+
+        public CuboidResultWatcher(final List<InMemCubeBuilder2> builderList, 
final ICuboidWriter output) {
+            this.outputQueue = Queues.newLinkedBlockingQueue();
+            this.builderList = builderList;
+            this.output = output;
+        }
+
+        public void start() throws IOException {
+            SplitMerger merger = new SplitMerger();
+            while (true) {
+                if (!outputQueue.isEmpty()) {
+                    List<CuboidResult> splitResultReturned = 
Lists.newArrayList();
+                    outputQueue.drainTo(splitResultReturned);
+                    for (CuboidResult splitResult : splitResultReturned) {
+                        if (builderList.size() == 1) {
+                            
merger.mergeAndOutput(Lists.newArrayList(splitResult), output);
+                        } else {
+                            List<CuboidResult> cuboidResultList = 
pendingQueue.get(splitResult.cuboidId);
+                            if (cuboidResultList == null) {
+                                cuboidResultList = 
Lists.newArrayListWithExpectedSize(builderList.size());
+                                cuboidResultList.add(splitResult);
+                                pendingQueue.put(splitResult.cuboidId, 
cuboidResultList);
+                            } else {
+                                cuboidResultList.add(splitResult);
+                            }
+                            if (cuboidResultList.size() == builderList.size()) 
{
+                                merger.mergeAndOutput(cuboidResultList, 
output);
+                                pendingQueue.remove(splitResult.cuboidId);
+                            }
+                        }
+                    }
+                }
+
+                boolean jobFinished = isAllBuildFinished();
+                if (outputQueue.isEmpty() && !jobFinished) {
+                    boolean ifWait = true;
+                    for (InMemCubeBuilder2 builder : builderList) {
+                        Queue<CuboidTask> queue = 
builder.getCompletedTaskQueue();
+                        while (queue.size() > 0) {
+                            CuboidTask childTask = queue.poll();
+                            if (childTask.isCompletedAbnormally()) {
+                                throw new 
RuntimeException(childTask.getException());
+                            }
+                            ifWait = false;
+                        }
+                    }
+                    if (ifWait) {
+                        try {
+                            Thread.sleep(100L);
+                        } catch (InterruptedException e) {
+                            throw new RuntimeException(e);
+                        }
+                    }
+                } else if (outputQueue.isEmpty() && pendingQueue.isEmpty() && 
jobFinished) {
+                    return;
+                }
+            }
+        }
+
+        private boolean isAllBuildFinished() {
+            for (InMemCubeBuilder2 split : builderList) {
+                if (!split.isAllCuboidDone()) {
+                    return false;
+                }
+            }
+            return true;
+        }
+
+        @Override
+        public void finish(CuboidResult result) {
+            Stopwatch stopwatch = new Stopwatch().start();
+            int nRetries = 0;
+            while (!outputQueue.offer(result)) {
+                nRetries++;
+                long sleepTime = stopwatch.elapsedMillis();
+                if (sleepTime > 3600000L) {
+                    stopwatch.stop();
+                    throw new RuntimeException(
+                            "OutputQueue Full. Cannot offer to the output 
queue after waiting for one hour!!! Current queue size: "
+                                    + outputQueue.size());
+                }
+                logger.warn("OutputQueue Full. Queue size: " + 
outputQueue.size() + ". Total sleep time : " + sleepTime
+                        + ", and retry count : " + nRetries);
+                try {
+                    Thread.sleep(5000L);
+                } catch (InterruptedException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+            stopwatch.stop();
+        }
+    }
+
+    private class SplitMerger {
+        MeasureAggregators reuseAggrs;
+        Object[] reuseMetricsArray;
+        ByteArray reuseMetricsSpace;
+
+        long lastCuboidColumnCount;
+        ImmutableBitSet lastMetricsColumns;
+
+        SplitMerger() {
+            reuseAggrs = new MeasureAggregators(cubeDesc.getMeasures());
+            reuseMetricsArray = new Object[cubeDesc.getMeasures().size()];
+        }
+
+        public void mergeAndOutput(List<CuboidResult> splitResultList, 
ICuboidWriter output) throws IOException {
+            if (splitResultList.size() == 1) {
+                CuboidResult cuboidResult = splitResultList.get(0);
+                outputCuboid(cuboidResult.cuboidId, cuboidResult.table, 
output);
+                return;
+            }
+            LinkedList<ResultMergeSlot> open = Lists.newLinkedList();
+            for (CuboidResult splitResult : splitResultList) {
+                open.add(new ResultMergeSlot(splitResult));
+            }
+
+            PriorityQueue<ResultMergeSlot> heap = new 
PriorityQueue<ResultMergeSlot>();
+            while (true) {
+                // ready records in open slots and add to heap
+                while (!open.isEmpty()) {
+                    ResultMergeSlot slot = open.removeFirst();
+                    if (slot.fetchNext()) {
+                        heap.add(slot);
+                    }
+                }
+
+                // find the smallest on heap
+                ResultMergeSlot smallest = heap.poll();
+                if (smallest == null)
+                    break;
+                open.add(smallest);
+
+                // merge with slots having the same key
+                if (smallest.isSameKey(heap.peek())) {
+                    Object[] metrics = 
getMetricsValues(smallest.currentRecord);
+                    reuseAggrs.reset();
+                    reuseAggrs.aggregate(metrics);
+                    do {
+                        ResultMergeSlot slot = heap.poll();
+                        open.add(slot);
+                        metrics = getMetricsValues(slot.currentRecord);
+                        reuseAggrs.aggregate(metrics);
+                    } while (smallest.isSameKey(heap.peek()));
+
+                    reuseAggrs.collectStates(metrics);
+                    setMetricsValues(smallest.currentRecord, metrics);
+                }
+                output.write(smallest.currentCuboidId, smallest.currentRecord);
+            }
+        }
+
+        private void setMetricsValues(GTRecord record, Object[] metricsValues) 
{
+            ImmutableBitSet metrics = getMetricsColumns(record);
+
+            if (reuseMetricsSpace == null) {
+                reuseMetricsSpace = new 
ByteArray(record.getInfo().getMaxColumnLength(metrics));
+            }
+
+            record.setValues(metrics, reuseMetricsSpace, metricsValues);
+        }
+
+        private Object[] getMetricsValues(GTRecord record) {
+            ImmutableBitSet metrics = getMetricsColumns(record);
+            return record.getValues(metrics, reuseMetricsArray);
+        }
+
+        private ImmutableBitSet getMetricsColumns(GTRecord record) {
+            // metrics columns always come after dimension columns
+            if (lastCuboidColumnCount == record.getInfo().getColumnCount())
+                return lastMetricsColumns;
+
+            int to = record.getInfo().getColumnCount();
+            int from = to - reuseMetricsArray.length;
+            lastCuboidColumnCount = record.getInfo().getColumnCount();
+            lastMetricsColumns = new ImmutableBitSet(from, to);
+            return lastMetricsColumns;
+        }
+    }
+
+    private static class ResultMergeSlot implements 
Comparable<ResultMergeSlot> {
+        CuboidResult splitResult;
+        IGTScanner scanner;
+        Iterator<GTRecord> recordIterator;
+
+        long currentCuboidId;
+        GTRecord currentRecord;
+
+        public ResultMergeSlot(CuboidResult splitResult) {
+            this.splitResult = splitResult;
+        }
+
+        public boolean fetchNext() throws IOException {
+            if (recordIterator == null) {
+                currentCuboidId = splitResult.cuboidId;
+                scanner = splitResult.table.scan(new 
GTScanRequestBuilder().setInfo(splitResult.table.getInfo())
+                        
.setRanges(null).setDimensions(null).setFilterPushDown(null).createGTScanRequest());
+                recordIterator = scanner.iterator();
+            }
+
+            if (recordIterator.hasNext()) {
+                currentRecord = recordIterator.next();
+                return true;
+            } else {
+                scanner.close();
+                recordIterator = null;
+                return false;
+            }
+        }
+
+        @Override
+        public int compareTo(ResultMergeSlot o) {
+            long cuboidComp = this.currentCuboidId - o.currentCuboidId;
+            if (cuboidComp != 0)
+                return cuboidComp < 0 ? -1 : 1;
+
+            // note GTRecord.equals() don't work because the two GTRecord 
comes from different GridTable
+            ImmutableBitSet pk = this.currentRecord.getInfo().getPrimaryKey();
+            for (int i = 0; i < pk.trueBitCount(); i++) {
+                int c = pk.trueBitAt(i);
+                int comp = 
this.currentRecord.get(c).compareTo(o.currentRecord.get(c));
+                if (comp != 0)
+                    return comp;
+            }
+            return 0;
+        }
+
+        public boolean isSameKey(ResultMergeSlot o) {
+            if (o == null)
+                return false;
+            else
+                return this.compareTo(o) == 0;
+        }
+
+    };
+}
\ No newline at end of file
diff --git 
a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing2/ICuboidCollectorWithCallBack.java
 
b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing2/ICuboidCollectorWithCallBack.java
new file mode 100755
index 0000000000..b669bbebad
--- /dev/null
+++ 
b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing2/ICuboidCollectorWithCallBack.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube.inmemcubing2;
+
+import java.util.NavigableMap;
+
+import org.apache.kylin.cube.inmemcubing.CuboidResult;
+
+public interface ICuboidCollectorWithCallBack {
+    void collectAndNotify(CuboidResult result);
+    NavigableMap<Long, CuboidResult> getAllResult();
+}
\ No newline at end of file
diff --git 
a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing2/ICuboidResultListener.java
 
b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing2/ICuboidResultListener.java
new file mode 100755
index 0000000000..6d80f00e33
--- /dev/null
+++ 
b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing2/ICuboidResultListener.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube.inmemcubing2;
+
+import org.apache.kylin.cube.inmemcubing.CuboidResult;
+
+public interface ICuboidResultListener {
+    void finish(CuboidResult cuboidResult);
+}
\ No newline at end of file
diff --git 
a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing2/InMemCubeBuilder2.java
 
b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing2/InMemCubeBuilder2.java
new file mode 100755
index 0000000000..35a4d09d2b
--- /dev/null
+++ 
b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing2/InMemCubeBuilder2.java
@@ -0,0 +1,408 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube.inmemcubing2;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory;
+import java.util.concurrent.ForkJoinTask;
+import java.util.concurrent.ForkJoinWorkerThread;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.common.util.MemoryBudgetController;
+import org.apache.kylin.common.util.MemoryBudgetController.MemoryWaterLevel;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.cuboid.CuboidScheduler;
+import org.apache.kylin.cube.gridtable.CubeGridTable;
+import org.apache.kylin.cube.inmemcubing.AbstractInMemCubeBuilder;
+import org.apache.kylin.cube.inmemcubing.ConcurrentDiskStore;
+import org.apache.kylin.cube.inmemcubing.CuboidResult;
+import org.apache.kylin.cube.inmemcubing.ICuboidWriter;
+import org.apache.kylin.cube.inmemcubing.InMemCubeBuilderUtils;
+import org.apache.kylin.cube.inmemcubing.InputConverter;
+import org.apache.kylin.cube.inmemcubing.InputConverterUnit;
+import org.apache.kylin.cube.inmemcubing.RecordConsumeBlockingQueueController;
+import org.apache.kylin.cube.kv.CubeDimEncMap;
+import org.apache.kylin.gridtable.GTAggregateScanner;
+import org.apache.kylin.gridtable.GTBuilder;
+import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.GTScanRequestBuilder;
+import org.apache.kylin.gridtable.GridTable;
+import org.apache.kylin.gridtable.IGTScanner;
+import org.apache.kylin.gridtable.IGTStore;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+
+/**
+ * Build a cube (many cuboids) in memory. Calculating multiple cuboids at the 
same time as long as memory permits.
+ * Assumes base cuboid fits in memory or otherwise OOM exception will occur.
+ */
+public class InMemCubeBuilder2 extends AbstractInMemCubeBuilder {
+    private static Logger logger = 
LoggerFactory.getLogger(InMemCubeBuilder2.class);
+
+    // by experience
+    private static final double DERIVE_AGGR_CACHE_CONSTANT_FACTOR = 0.1;
+    private static final double DERIVE_AGGR_CACHE_VARIABLE_FACTOR = 0.9;
+
+    protected final String[] metricsAggrFuncs;
+    protected final MeasureDesc[] measureDescs;
+    protected final int measureCount;
+
+    private MemoryBudgetController memBudget;
+    protected final long baseCuboidId;
+    private CuboidResult baseResult;
+
+    private Queue<CuboidTask> completedTaskQueue;
+    private AtomicInteger taskCuboidCompleted;
+
+    private ICuboidCollectorWithCallBack resultCollector;
+
+    public InMemCubeBuilder2(CuboidScheduler cuboidScheduler, 
IJoinedFlatTableDesc flatDesc,
+            Map<TblColRef, Dictionary<String>> dictionaryMap) {
+        super(cuboidScheduler, flatDesc, dictionaryMap);
+        this.measureCount = cubeDesc.getMeasures().size();
+        this.measureDescs = cubeDesc.getMeasures().toArray(new 
MeasureDesc[measureCount]);
+        List<String> metricsAggrFuncsList = Lists.newArrayList();
+
+        for (int i = 0; i < measureCount; i++) {
+            MeasureDesc measureDesc = measureDescs[i];
+            
metricsAggrFuncsList.add(measureDesc.getFunction().getExpression());
+        }
+        this.metricsAggrFuncs = metricsAggrFuncsList.toArray(new 
String[metricsAggrFuncsList.size()]);
+        this.baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
+    }
+
+    public int getBaseResultCacheMB() {
+        return baseResult.aggrCacheMB;
+    }
+
+    private GridTable newGridTableByCuboidID(long cuboidID) throws IOException 
{
+        GTInfo info = 
CubeGridTable.newGTInfo(Cuboid.findForMandatory(cubeDesc, cuboidID),
+                new CubeDimEncMap(cubeDesc, dictionaryMap));
+
+        // Below several store implementation are very similar in performance. 
The ConcurrentDiskStore is the simplest.
+        // MemDiskStore store = new MemDiskStore(info, memBudget == null ? 
MemoryBudgetController.ZERO_BUDGET : memBudget);
+        // MemDiskStore store = new MemDiskStore(info, 
MemoryBudgetController.ZERO_BUDGET);
+        IGTStore store = new ConcurrentDiskStore(info);
+
+        GridTable gridTable = new GridTable(info, store);
+        return gridTable;
+    }
+
+    @Override
+    public <T> void build(BlockingQueue<T> input, InputConverterUnit<T> 
inputConverterUnit, ICuboidWriter output)
+            throws IOException {
+        NavigableMap<Long, CuboidResult> result = buildAndCollect(
+                
RecordConsumeBlockingQueueController.getQueueController(inputConverterUnit, 
input), null);
+        try {
+            for (CuboidResult cuboidResult : result.values()) {
+                outputCuboid(cuboidResult.cuboidId, cuboidResult.table, 
output);
+                cuboidResult.table.close();
+            }
+        } finally {
+            output.close();
+        }
+    }
+
+    /**
+     * Build all the cuboids and wait for all the tasks finished. 
+     * 
+     * @param input
+     * @param listener
+     * @return
+     * @throws IOException
+     */
+    private <T> NavigableMap<Long, CuboidResult> buildAndCollect(final 
RecordConsumeBlockingQueueController<T> input,
+            final ICuboidResultListener listener) throws IOException {
+
+        long startTime = System.currentTimeMillis();
+        logger.info("In Mem Cube Build2 start, " + cubeDesc.getName());
+
+        // build base cuboid
+        buildBaseCuboid(input, listener);
+
+        ForkJoinWorkerThreadFactory factory = new 
ForkJoinWorkerThreadFactory() {
+            @Override
+            public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
+                final ForkJoinWorkerThread worker = 
ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
+                worker.setName("inmem-cubing-cuboid-worker-" + 
worker.getPoolIndex());
+                return worker;
+            }
+        };
+        ForkJoinPool builderPool = new ForkJoinPool(taskThreadCount, factory, 
null, true);
+        ForkJoinTask rootTask = builderPool.submit(new Runnable() {
+            @Override
+            public void run() {
+                startBuildFromBaseCuboid();
+            }
+        });
+        rootTask.join();
+
+        long endTime = System.currentTimeMillis();
+        logger.info("In Mem Cube Build2 end, " + cubeDesc.getName() + ", takes 
" + (endTime - startTime) + " ms");
+        logger.info("total CuboidResult count:" + 
resultCollector.getAllResult().size());
+        return resultCollector.getAllResult();
+    }
+
+    public ICuboidCollectorWithCallBack getResultCollector() {
+        return resultCollector;
+    }
+
+    public <T> CuboidResult 
buildBaseCuboid(RecordConsumeBlockingQueueController<T> input,
+            final ICuboidResultListener listener) throws IOException {
+        completedTaskQueue = new LinkedBlockingQueue<CuboidTask>();
+        taskCuboidCompleted = new AtomicInteger(0);
+
+        resultCollector = new DefaultCuboidCollectorWithCallBack(listener);
+
+        MemoryBudgetController.MemoryWaterLevel baseCuboidMemTracker = new 
MemoryWaterLevel();
+        baseCuboidMemTracker.markLow();
+        baseResult = createBaseCuboid(input, baseCuboidMemTracker);
+
+        if (baseResult.nRows == 0) {
+            taskCuboidCompleted.set(cuboidScheduler.getCuboidCount());
+            return baseResult;
+        }
+
+        baseCuboidMemTracker.markLow();
+        baseResult.aggrCacheMB = 
Math.max(baseCuboidMemTracker.getEstimateMB(), 10); // 10 MB at minimal
+
+        makeMemoryBudget();
+        return baseResult;
+    }
+
+    public CuboidResult buildCuboid(CuboidTask task) throws IOException {
+        CuboidResult newCuboid = buildCuboid(task.parent, task.childCuboidId);
+        completedTaskQueue.add(task);
+        addChildTasks(newCuboid);
+        return newCuboid;
+    }
+
+    private CuboidResult buildCuboid(CuboidResult parent, long cuboidId) 
throws IOException {
+        final String consumerName = "AggrCache@Cuboid " + cuboidId;
+        MemoryBudgetController.MemoryConsumer consumer = new 
MemoryBudgetController.MemoryConsumer() {
+            @Override
+            public int freeUp(int mb) {
+                return 0; // cannot free up on demand
+            }
+
+            @Override
+            public String toString() {
+                return consumerName;
+            }
+        };
+
+        // reserve memory for aggregation cache, can't be larger than the 
parent
+        memBudget.reserveInsist(consumer, parent.aggrCacheMB);
+        try {
+            return aggregateCuboid(parent, cuboidId);
+        } finally {
+            memBudget.reserve(consumer, 0);
+        }
+    }
+
+    public boolean isAllCuboidDone() {
+        return taskCuboidCompleted.get() == cuboidScheduler.getCuboidCount();
+    }
+
+    public void startBuildFromBaseCuboid() {
+        addChildTasks(baseResult);
+    }
+
+    private void addChildTasks(CuboidResult parent) {
+        List<Long> children = 
cuboidScheduler.getSpanningCuboid(parent.cuboidId);
+        if (children != null && !children.isEmpty()) {
+            List<CuboidTask> childTasks = 
Lists.newArrayListWithExpectedSize(children.size());
+            for (Long child : children) {
+                CuboidTask task = new CuboidTask(parent, child, this);
+                childTasks.add(task);
+                task.fork();
+            }
+            for (CuboidTask childTask : childTasks) {
+                childTask.join();
+            }
+        }
+    }
+
+    public Queue<CuboidTask> getCompletedTaskQueue() {
+        return completedTaskQueue;
+    }
+
+    private void makeMemoryBudget() {
+        int systemAvailMB = MemoryBudgetController.gcAndGetSystemAvailMB();
+        logger.info("System avail " + systemAvailMB + " MB");
+        int reserve = reserveMemoryMB;
+        logger.info("Reserve " + reserve + " MB for system basics");
+
+        int budget = systemAvailMB - reserve;
+        if (budget < baseResult.aggrCacheMB) {
+            // make sure we have base aggr cache as minimal
+            budget = baseResult.aggrCacheMB;
+            logger.warn("System avail memory (" + systemAvailMB + " MB) is 
less than base aggr cache ("
+                    + baseResult.aggrCacheMB + " MB) + minimal reservation (" 
+ reserve
+                    + " MB), consider increase JVM heap -Xmx");
+        }
+
+        logger.info("Memory Budget is " + budget + " MB");
+        memBudget = new MemoryBudgetController(budget);
+    }
+
+    private <T> CuboidResult 
createBaseCuboid(RecordConsumeBlockingQueueController<T> input,
+            MemoryBudgetController.MemoryWaterLevel baseCuboidMemTracker) 
throws IOException {
+        logger.info("Calculating base cuboid " + baseCuboidId);
+
+        Stopwatch sw = new Stopwatch();
+        sw.start();
+        GridTable baseCuboid = newGridTableByCuboidID(baseCuboidId);
+        GTBuilder baseBuilder = baseCuboid.rebuild();
+        IGTScanner baseInput = new InputConverter<>(baseCuboid.getInfo(), 
input);
+
+        Pair<ImmutableBitSet, ImmutableBitSet> dimensionMetricsBitSet = 
InMemCubeBuilderUtils
+                .getDimensionAndMetricColumnBitSet(baseCuboidId, measureCount);
+        GTScanRequest req = new 
GTScanRequestBuilder().setInfo(baseCuboid.getInfo()).setRanges(null).setDimensions(null)
+                
.setAggrGroupBy(dimensionMetricsBitSet.getFirst()).setAggrMetrics(dimensionMetricsBitSet.getSecond())
+                
.setAggrMetricsFuncs(metricsAggrFuncs).setFilterPushDown(null).createGTScanRequest();
+        GTAggregateScanner aggregationScanner = new 
GTAggregateScanner(baseInput, req);
+        aggregationScanner.trackMemoryLevel(baseCuboidMemTracker);
+
+        int count = 0;
+        for (GTRecord r : aggregationScanner) {
+            if (count == 0) {
+                baseCuboidMemTracker.markHigh();
+            }
+            baseBuilder.write(r);
+            count++;
+        }
+        aggregationScanner.close();
+        baseBuilder.close();
+
+        sw.stop();
+        logger.info("Cuboid " + baseCuboidId + " has " + count + " rows, build 
takes " + sw.elapsedMillis() + "ms");
+
+        int mbEstimateBaseAggrCache = (int) 
(aggregationScanner.getEstimateSizeOfAggrCache()
+                / MemoryBudgetController.ONE_MB);
+        logger.info("Wild estimate of base aggr cache is " + 
mbEstimateBaseAggrCache + " MB");
+
+        return updateCuboidResult(baseCuboidId, baseCuboid, count, 
sw.elapsedMillis(), 0,
+                input.inputConverterUnit.ifChange());
+    }
+
+    private CuboidResult updateCuboidResult(long cuboidId, GridTable table, 
int nRows, long timeSpent,
+            int aggrCacheMB) {
+        return updateCuboidResult(cuboidId, table, nRows, timeSpent, 
aggrCacheMB, true);
+    }
+
+    private CuboidResult updateCuboidResult(long cuboidId, GridTable table, 
int nRows, long timeSpent, int aggrCacheMB,
+            boolean ifCollect) {
+        if (aggrCacheMB <= 0 && baseResult != null) {
+            aggrCacheMB = (int) Math.round(
+                    (DERIVE_AGGR_CACHE_CONSTANT_FACTOR + 
DERIVE_AGGR_CACHE_VARIABLE_FACTOR * nRows / baseResult.nRows) //
+                            * baseResult.aggrCacheMB);
+        }
+
+        CuboidResult result = new CuboidResult(cuboidId, table, nRows, 
timeSpent, aggrCacheMB);
+        taskCuboidCompleted.incrementAndGet();
+
+        if (ifCollect) {
+            resultCollector.collectAndNotify(result);
+        }
+        return result;
+    }
+
+    protected CuboidResult aggregateCuboid(CuboidResult parent, long cuboidId) 
throws IOException {
+        final Pair<ImmutableBitSet, ImmutableBitSet> allNeededColumns = 
InMemCubeBuilderUtils
+                .getDimensionAndMetricColumnBitSet(parent.cuboidId, cuboidId, 
measureCount);
+        return scanAndAggregateGridTable(parent.table, 
newGridTableByCuboidID(cuboidId), parent.cuboidId,
+                cuboidId, allNeededColumns.getFirst(), 
allNeededColumns.getSecond());
+    }
+
+    private GTAggregateScanner prepareGTAggregationScanner(GridTable 
gridTable, long parentId, long cuboidId,
+            ImmutableBitSet aggregationColumns, ImmutableBitSet 
measureColumns) throws IOException {
+        GTInfo info = gridTable.getInfo();
+        GTScanRequest req = new 
GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null)
+                
.setAggrGroupBy(aggregationColumns).setAggrMetrics(measureColumns).setAggrMetricsFuncs(metricsAggrFuncs)
+                .setFilterPushDown(null).createGTScanRequest();
+        GTAggregateScanner scanner = (GTAggregateScanner) gridTable.scan(req);
+
+        // for child cuboid, some measures don't need aggregation.
+        if (parentId != cuboidId) {
+            boolean[] aggrMask = new boolean[measureDescs.length];
+            for (int i = 0; i < measureDescs.length; i++) {
+                aggrMask[i] = 
!measureDescs[i].getFunction().getMeasureType().onlyAggrInBaseCuboid();
+
+                if (!aggrMask[i]) {
+                    logger.info(measureDescs[i].toString() + " doesn't need 
aggregation.");
+                }
+            }
+            scanner.setAggrMask(aggrMask);
+        }
+
+        return scanner;
+    }
+
+    protected CuboidResult scanAndAggregateGridTable(GridTable gridTable, 
GridTable newGridTable, long parentId,
+            long cuboidId, ImmutableBitSet aggregationColumns, ImmutableBitSet 
measureColumns) throws IOException {
+        Stopwatch sw = new Stopwatch();
+        sw.start();
+        logger.info("Calculating cuboid " + cuboidId);
+
+        GTAggregateScanner scanner = prepareGTAggregationScanner(gridTable, 
parentId, cuboidId, aggregationColumns,
+                measureColumns);
+        GTBuilder builder = newGridTable.rebuild();
+
+        ImmutableBitSet allNeededColumns = 
aggregationColumns.or(measureColumns);
+
+        GTRecord newRecord = new GTRecord(newGridTable.getInfo());
+        int count = 0;
+        try {
+            for (GTRecord record : scanner) {
+                count++;
+                for (int i = 0; i < allNeededColumns.trueBitCount(); i++) {
+                    int c = allNeededColumns.trueBitAt(i);
+                    newRecord.set(i, record.get(c));
+                }
+                builder.write(newRecord);
+            }
+        } finally {
+            scanner.close();
+            builder.close();
+        }
+        sw.stop();
+        logger.info("Cuboid " + cuboidId + " has " + count + " rows, build 
takes " + sw.elapsedMillis() + "ms");
+
+        return updateCuboidResult(cuboidId, newGridTable, count, 
sw.elapsedMillis(), 0);
+    }
+}
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidFromBaseCuboidMapper.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidFromBaseCuboidMapper.java
index 1beebc790e..fc6edd3617 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidFromBaseCuboidMapper.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidFromBaseCuboidMapper.java
@@ -20,31 +20,21 @@
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.cuboid.CuboidScheduler;
 import org.apache.kylin.cube.gridtable.CubeGridTable;
-import org.apache.kylin.cube.inmemcubing.AbstractInMemCubeBuilder;
-import org.apache.kylin.cube.inmemcubing.DoggedCubeBuilder;
+import org.apache.kylin.cube.inmemcubing.ICuboidWriter;
 import org.apache.kylin.cube.inmemcubing.InputConverterUnit;
 import org.apache.kylin.cube.inmemcubing.InputConverterUnitForBaseCuboid;
 import org.apache.kylin.cube.kv.CubeDimEncMap;
 import org.apache.kylin.engine.mr.ByteArrayWritable;
 import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.gridtable.GTInfo;
-import org.apache.kylin.metadata.model.TblColRef;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 public class InMemCuboidFromBaseCuboidMapper
         extends InMemCuboidMapperBase<Text, Text, ByteArrayWritable, 
ByteArrayWritable, ByteArray> {
@@ -75,16 +65,8 @@ protected void doSetup(Mapper.Context context) throws 
IOException {
     }
 
     @Override
-    protected Future getCubingThreadFuture(Context context, Map<TblColRef, 
Dictionary<String>> dictionaryMap,
-            int reserveMemoryMB, CuboidScheduler cuboidScheduler) {
-        AbstractInMemCubeBuilder cubeBuilder = new 
DoggedCubeBuilder(cuboidScheduler, flatDesc, dictionaryMap);
-        cubeBuilder.setReserveMemoryMB(reserveMemoryMB);
-        cubeBuilder.setConcurrentThreads(taskThreadCount);
-
-        ExecutorService executorService = 
Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true)
-                
.setNameFormat("inmemory-cube-building-from-base-cuboid-mapper-%d").build());
-        return executorService.submit(cubeBuilder.buildAsRunnable(queue, 
inputConverterUnit,
-                new MapContextGTRecordWriter(context, cubeDesc, cubeSegment)));
+    protected ICuboidWriter getCuboidWriter(Context context) {
+        return new MapContextGTRecordWriter(context, cubeDesc, cubeSegment);
     }
 
     @Override
@@ -98,5 +80,4 @@ protected ByteArray getRecordFromKeyValue(Text key, Text 
value) {
 
         return new ByteArray(keyValue);
     }
-
 }
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
index 551a17b7d8..d363afc73f 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
@@ -19,24 +19,15 @@
 package org.apache.kylin.engine.mr.steps;
 
 import java.io.IOException;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
 
-import org.apache.kylin.common.util.Dictionary;
-import org.apache.kylin.cube.cuboid.CuboidScheduler;
-import org.apache.kylin.cube.inmemcubing.AbstractInMemCubeBuilder;
-import org.apache.kylin.cube.inmemcubing.DoggedCubeBuilder;
+import org.apache.kylin.cube.inmemcubing.ICuboidWriter;
 import org.apache.kylin.cube.inmemcubing.InputConverterUnit;
 import org.apache.kylin.cube.inmemcubing.InputConverterUnitForRawData;
 import org.apache.kylin.engine.mr.ByteArrayWritable;
 import org.apache.kylin.engine.mr.IMRInput;
 import org.apache.kylin.engine.mr.MRUtil;
-import org.apache.kylin.metadata.model.TblColRef;
 
 import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 public class InMemCuboidMapper<KEYIN>
         extends InMemCuboidMapperBase<KEYIN, Object, ByteArrayWritable, 
ByteArrayWritable, String[]> {
@@ -63,15 +54,7 @@ protected void doSetup(Context context) throws IOException {
     }
 
     @Override
-    protected Future getCubingThreadFuture(Context context, Map<TblColRef, 
Dictionary<String>> dictionaryMap,
-            int reserveMemoryMB, CuboidScheduler cuboidScheduler) {
-        AbstractInMemCubeBuilder cubeBuilder = new 
DoggedCubeBuilder(cuboidScheduler, flatDesc, dictionaryMap);
-        cubeBuilder.setReserveMemoryMB(reserveMemoryMB);
-        cubeBuilder.setConcurrentThreads(taskThreadCount);
-
-        ExecutorService executorService = Executors.newSingleThreadExecutor(
-                new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat("inmemory-cube-building-mapper-%d").build());
-        return executorService.submit(cubeBuilder.buildAsRunnable(queue, 
inputConverterUnit,
-                new MapContextGTRecordWriter(context, cubeDesc, cubeSegment)));
+    protected ICuboidWriter getCuboidWriter(Context context) {
+        return new MapContextGTRecordWriter(context, cubeDesc, cubeSegment);
     }
 }
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapperBase.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapperBase.java
index e95ce8a3be..ce08b5cf1a 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapperBase.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapperBase.java
@@ -21,10 +21,13 @@
 import java.io.IOException;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Dictionary;
@@ -33,7 +36,10 @@
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.CuboidScheduler;
+import org.apache.kylin.cube.inmemcubing.AbstractInMemCubeBuilder;
 import org.apache.kylin.cube.inmemcubing.ConsumeBlockingQueueController;
+import org.apache.kylin.cube.inmemcubing.DoggedCubeBuilder;
+import org.apache.kylin.cube.inmemcubing.ICuboidWriter;
 import org.apache.kylin.cube.inmemcubing.InputConverterUnit;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich;
@@ -74,11 +80,10 @@
 
     protected abstract InputConverterUnit<T> getInputConverterUnit(Context 
context);
 
-    protected abstract Future getCubingThreadFuture(Context context, 
Map<TblColRef, Dictionary<String>> dictionaryMap,
-            int reserveMemoryMB, CuboidScheduler cuboidScheduler);
-
     protected abstract T getRecordFromKeyValue(KEYIN key, VALUEIN value);
 
+    protected abstract ICuboidWriter getCuboidWriter(Context context);
+
     @Override
     protected void doSetup(Context context) throws IOException {
         super.bindCurrentConfiguration(context.getConfiguration());
@@ -106,7 +111,24 @@ protected void doSetup(Context context) throws IOException 
{
         taskThreadCount = config.getCubeAlgorithmInMemConcurrentThreads();
         reserveMemoryMB = calculateReserveMB(conf);
         inputConverterUnit = getInputConverterUnit(context);
-        future = getCubingThreadFuture(context, dictionaryMap, 
reserveMemoryMB, cuboidScheduler);
+
+        AbstractInMemCubeBuilder cubeBuilder;
+        try {
+            cubeBuilder = (AbstractInMemCubeBuilder) 
Class.forName(cubeSegment.getConfig().getCubeInMemBuilderClass())
+                    .getConstructor(CuboidScheduler.class, 
IJoinedFlatTableDesc.class, Map.class)
+                    .newInstance(cuboidScheduler, flatDesc, dictionaryMap);
+        } catch (Exception e) {
+            logger.warn("Fail to initialize cube builder by class name "
+                    + cubeSegment.getConfig().getCubeInMemBuilderClass() + " 
due to " + e);
+            cubeBuilder = new DoggedCubeBuilder(cuboidScheduler, flatDesc, 
dictionaryMap);
+        }
+        cubeBuilder.setReserveMemoryMB(reserveMemoryMB);
+        cubeBuilder.setConcurrentThreads(taskThreadCount);
+
+        ExecutorService executorService = Executors.newSingleThreadExecutor(
+                new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat("inmemory-cube-building-mapper-%d").build());
+        future = executorService
+                .submit(cubeBuilder.buildAsRunnable(queue, inputConverterUnit, 
getCuboidWriter(context)));
     }
 
     private int calculateReserveMB(Configuration configuration) {
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KVGTRecordWriter.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KVGTRecordWriter.java
index 60d0870518..43b7ee2230 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KVGTRecordWriter.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KVGTRecordWriter.java
@@ -25,7 +25,7 @@
 import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.inmemcubing.ICuboidWriter;
+import org.apache.kylin.cube.inmemcubing.ICuboidGTTableWriter;
 import org.apache.kylin.cube.kv.AbstractRowKeyEncoder;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.engine.mr.ByteArrayWritable;
@@ -36,7 +36,7 @@
 
 /**
  */
-public abstract class KVGTRecordWriter implements ICuboidWriter {
+public abstract class KVGTRecordWriter extends ICuboidGTTableWriter {
 
     private static final Logger logger = 
LoggerFactory.getLogger(KVGTRecordWriter.class);
 
diff --git 
a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderStressTest.java
 
b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderStressTest.java
index be3d7592f8..695455b82b 100644
--- 
a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderStressTest.java
+++ 
b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderStressTest.java
@@ -32,6 +32,7 @@
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.engine.EngineFactory;
 import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.GridTable;
 import org.apache.kylin.metadata.MetadataConstants;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.TblColRef;
@@ -100,6 +101,11 @@ public void test() throws Exception {
         public void write(long cuboidId, GTRecord record) throws IOException {
         }
 
+        @Override
+        public void write(long cuboidId, GridTable table) throws IOException {
+
+        }
+
         @Override
         public void flush() {
 
diff --git 
a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderTest.java
 
b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderTest.java
index 8fcf9ed9dd..6cfec84fbc 100644
--- 
a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderTest.java
+++ 
b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderTest.java
@@ -37,8 +37,11 @@
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.inmemcubing2.DoggedCubeBuilder2;
+import org.apache.kylin.cube.inmemcubing2.InMemCubeBuilder2;
 import org.apache.kylin.engine.EngineFactory;
 import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.GridTable;
 import org.apache.kylin.metadata.MetadataConstants;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.TblColRef;
@@ -48,6 +51,8 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Maps;
+
 /**
  */
 public class ITDoggedCubeBuilderTest extends LocalFileMetadataTestCase {
@@ -89,10 +94,19 @@ public void test() throws Exception {
         long randSeed = System.currentTimeMillis();
 
         IJoinedFlatTableDesc flatDesc = 
EngineFactory.getJoinedFlatTableDesc(cube.getDescriptor());
+        InMemCubeBuilder inmemBuilder = new 
InMemCubeBuilder(cube.getCuboidScheduler(), flatDesc, dictionaryMap);
+        inmemBuilder.setConcurrentThreads(THREADS);
+        FileRecordWriter inmemResult = new FileRecordWriter();
+        {
+            Future<?> future = 
executorService.submit(inmemBuilder.buildAsRunnable(queue, inmemResult));
+            ITInMemCubeBuilderTest.feedData(cube, flatTable, queue, 
INPUT_ROWS, randSeed);
+            future.get();
+            inmemResult.close();
+        }
+
         DoggedCubeBuilder doggedBuilder = new 
DoggedCubeBuilder(cube.getCuboidScheduler(), flatDesc, dictionaryMap);
         doggedBuilder.setConcurrentThreads(THREADS);
         FileRecordWriter doggedResult = new FileRecordWriter();
-
         {
             Future<?> future = 
executorService.submit(doggedBuilder.buildAsRunnable(queue, doggedResult));
             ITInMemCubeBuilderTest.feedData(cube, flatTable, queue, 
INPUT_ROWS, randSeed, SPLIT_ROWS);
@@ -100,20 +114,34 @@ public void test() throws Exception {
             doggedResult.close();
         }
 
-        InMemCubeBuilder inmemBuilder = new 
InMemCubeBuilder(cube.getCuboidScheduler(), flatDesc, dictionaryMap);
-        inmemBuilder.setConcurrentThreads(THREADS);
-        FileRecordWriter inmemResult = new FileRecordWriter();
-
+        InMemCubeBuilder2 inmemBuilder2 = new 
InMemCubeBuilder2(cube.getCuboidScheduler(), flatDesc, dictionaryMap);
+        inmemBuilder2.setConcurrentThreads(THREADS);
+        FileRecordWriter inmemResult2 = new FileRecordWriter();
         {
-            Future<?> future = 
executorService.submit(inmemBuilder.buildAsRunnable(queue, inmemResult));
+            Future<?> future = 
executorService.submit(inmemBuilder2.buildAsRunnable(queue, inmemResult2));
             ITInMemCubeBuilderTest.feedData(cube, flatTable, queue, 
INPUT_ROWS, randSeed);
             future.get();
-            inmemResult.close();
+            inmemResult2.close();
         }
 
+        DoggedCubeBuilder2 doggedBuilder2 = new 
DoggedCubeBuilder2(cube.getCuboidScheduler(), flatDesc, dictionaryMap);
+        doggedBuilder2.setConcurrentThreads(THREADS);
+        FileRecordWriter doggedResult2 = new FileRecordWriter();
+        {
+            Future<?> future = 
executorService.submit(doggedBuilder2.buildAsRunnable(queue, doggedResult2));
+            ITInMemCubeBuilderTest.feedData(cube, flatTable, queue, 
INPUT_ROWS, randSeed, SPLIT_ROWS);
+            future.get();
+            doggedResult2.close();
+        }
+
+        fileCompare(inmemResult.file, inmemResult2.file);
         fileCompare(inmemResult.file, doggedResult.file);
-        doggedResult.file.delete();
+        fileCompare2(inmemResult.file, doggedResult2.file);
+
         inmemResult.file.delete();
+        inmemResult2.file.delete();
+        doggedResult.file.delete();
+        doggedResult2.file.delete();
     }
 
     private void fileCompare(File file, File file2) throws IOException {
@@ -133,6 +161,27 @@ private void fileCompare(File file, File file2) throws 
IOException {
         r2.close();
     }
 
+    private void fileCompare2(File file, File file2) throws IOException {
+        Map<String, Integer> content1 = readContents(file);
+        Map<String, Integer> content2 = readContents(file2);
+        assertEquals(content1, content2);
+    }
+
+    private Map<String, Integer> readContents(File file) throws IOException {
+        BufferedReader r = new BufferedReader(new InputStreamReader(new 
FileInputStream(file), "UTF-8"));
+        Map<String, Integer> content = Maps.newHashMap();
+        String line;
+        while ((line = r.readLine()) != null) {
+            Integer cnt = content.get(line);
+            if (cnt == null) {
+                cnt = 0;
+            }
+            content.put(line, cnt + 1);
+        }
+        r.close();
+        return content;
+    }
+
     class FileRecordWriter implements ICuboidWriter {
 
         File file;
@@ -151,6 +200,14 @@ public void write(long cuboidId, GTRecord record) throws 
IOException {
             writer.println();
         }
 
+        @Override
+        public void write(long cuboidId, GridTable table) throws IOException {
+            writer.print(cuboidId);
+            writer.print(", ");
+            writer.print(table.toString());
+            writer.println();
+        }
+
         @Override
         public void flush() {
 
diff --git 
a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java
 
b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java
index 2a96b3915d..035331397b 100644
--- 
a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java
+++ 
b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java
@@ -45,6 +45,7 @@
 import org.apache.kylin.dict.IterableDictionaryValueEnumerator;
 import org.apache.kylin.engine.EngineFactory;
 import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.GridTable;
 import org.apache.kylin.metadata.MetadataConstants;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
@@ -275,6 +276,12 @@ public void write(long cuboidId, GTRecord record) throws 
IOException {
                 System.out.println(record.toString());
         }
 
+        @Override
+        public void write(long cuboidId, GridTable table) throws IOException {
+            if (verbose)
+                System.out.println(table.toString());
+        }
+
         @Override
         public void flush() {
             if (verbose) {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to