http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
----------------------------------------------------------------------
diff --git 
a/dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java 
b/dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
index e399a70..0ba6566 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
@@ -21,15 +21,14 @@ package org.apache.kylin.dict;
 import java.io.IOException;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
-import java.util.*;
+import java.util.*;
 
-import com.google.common.collect.Lists;
+import com.google.common.collect.Lists;
 import org.apache.commons.lang.StringUtils;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.JsonUtil;
+import org.apache.kylin.common.util.*;
 import org.apache.kylin.dict.lookup.ReadableTable;
-import org.apache.kylin.metadata.model.DataType;
+import org.apache.kylin.metadata.datatype.DataType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,7 +42,7 @@ public class DictionaryGenerator {
 
     private static final Logger logger = 
LoggerFactory.getLogger(DictionaryGenerator.class);
 
-    private static final String[] DATE_PATTERNS = new String[] { "yyyy-MM-dd", 
"yyyyMMdd" };
+    private static final String[] DATE_PATTERNS = new String[] { "yyyy-MM-dd", 
"yyyyMMdd" };
 
     private static int getDictionaryMaxCardinality() {
         try {
@@ -53,20 +52,20 @@ public class DictionaryGenerator {
         }
     }
 
-    public static Dictionary<?> 
buildDictionaryFromValueEnumerator(DictionaryInfo info, 
IDictionaryValueEnumerator valueEnumerator) throws IOException{
-        Dictionary dict = null;
+    public static org.apache.kylin.common.util.Dictionary<?> 
buildDictionaryFromValueEnumerator(DictionaryInfo info, 
IDictionaryValueEnumerator valueEnumerator) throws IOException{
+        org.apache.kylin.common.util.Dictionary dict = null;
         int baseId = 0; // always 0 for now
-        final int nSamples = 5;
-        ArrayList samples = Lists.newArrayListWithCapacity(nSamples);
+        final int nSamples = 5;
+        ArrayList samples = Lists.newArrayListWithCapacity(nSamples);
 
         // build dict, case by data type
         DataType dataType = DataType.getInstance(info.getDataType());
         if (dataType.isDateTimeFamily())
-            dict = buildDateStrDict(valueEnumerator, baseId, nSamples, 
samples);
+            dict = buildDateStrDict(valueEnumerator, baseId, nSamples, 
samples);
         else if (dataType.isNumberFamily())
-            dict = buildNumberDict(valueEnumerator, baseId, nSamples, samples);
+            dict = buildNumberDict(valueEnumerator, baseId, nSamples, samples);
         else
-            dict = buildStringDict(valueEnumerator, baseId, nSamples, samples);
+            dict = buildStringDict(valueEnumerator, baseId, nSamples, samples);
 
         // log a few samples
         StringBuilder buf = new StringBuilder();
@@ -76,54 +75,54 @@ public class DictionaryGenerator {
             
buf.append(s.toString()).append("=>").append(dict.getIdFromValue(s));
         }
         logger.info("Dictionary value samples: " + buf.toString());
-        logger.info("Dictionary cardinality: " + dict.getSize());
+        logger.info("Dictionary cardinality: " + dict.getSize());
 
-        if (dict instanceof TrieDictionary && dict.getSize() > 
DICT_MAX_CARDINALITY)
-            throw new IllegalArgumentException("Too high cardinality is not 
suitable for dictionary -- " + info.getSourceTable() + "." + 
info.getSourceColumn() + " cardinality: " + dict.getSize());
+        if (dict instanceof TrieDictionary && dict.getSize() > 
DICT_MAX_CARDINALITY)
+            throw new IllegalArgumentException("Too high cardinality is not 
suitable for dictionary -- " + info.getSourceTable() + "." + 
info.getSourceColumn() + " cardinality: " + dict.getSize());
 
         return dict;
     }
 
-    public static Dictionary mergeDictionaries(DictionaryInfo targetInfo, 
List<DictionaryInfo> sourceDicts) throws IOException {
-        return buildDictionaryFromValueEnumerator(targetInfo, new 
MultipleDictionaryValueEnumerator(sourceDicts));
+    public static org.apache.kylin.common.util.Dictionary 
mergeDictionaries(DictionaryInfo targetInfo, List<DictionaryInfo> sourceDicts) 
throws IOException {
+        return buildDictionaryFromValueEnumerator(targetInfo, new 
MultipleDictionaryValueEnumerator(sourceDicts));
     }
 
-    public static Dictionary<?> buildDictionary(DictionaryInfo info, 
ReadableTable inpTable) throws IOException {
+    public static org.apache.kylin.common.util.Dictionary<?> 
buildDictionary(DictionaryInfo info, ReadableTable inpTable) throws IOException 
{
 
         // currently all data types are casted to string to build dictionary
         // String dataType = info.getDataType();
 
-        IDictionaryValueEnumerator columnValueEnumerator = null;
-        try {
-            logger.info("Building dictionary " + 
JsonUtil.writeValueAsString(info));
+        IDictionaryValueEnumerator columnValueEnumerator = null;
+        try {
+            logger.info("Building dictionary " + 
JsonUtil.writeValueAsString(info));
 
-            columnValueEnumerator = new 
TableColumnValueEnumerator(inpTable.getReader(), info.getSourceColumnIndex());
-            return buildDictionaryFromValueEnumerator(info, 
columnValueEnumerator);
-        } finally {
-            if (columnValueEnumerator != null)
-                columnValueEnumerator.close();
-        }
+            columnValueEnumerator = new 
TableColumnValueEnumerator(inpTable.getReader(), info.getSourceColumnIndex());
+            return buildDictionaryFromValueEnumerator(info, 
columnValueEnumerator);
+        } finally {
+            if (columnValueEnumerator != null)
+                columnValueEnumerator.close();
+        }
     }
 
-    private static Dictionary buildDateStrDict(IDictionaryValueEnumerator 
valueEnumerator, int baseId, int nSamples, ArrayList samples) throws 
IOException {
+    private static org.apache.kylin.common.util.Dictionary 
buildDateStrDict(IDictionaryValueEnumerator valueEnumerator, int baseId, int 
nSamples, ArrayList samples) throws IOException {
         final int BAD_THRESHOLD = 2;
         String matchPattern = null;
-        byte[] value;
+        byte[] value;
 
         for (String ptn : DATE_PATTERNS) {
             matchPattern = ptn; // be optimistic
             int badCount = 0;
             SimpleDateFormat sdf = new SimpleDateFormat(ptn);
-
-            while (valueEnumerator.moveNext()) {
-                value = valueEnumerator.current();
+
+            while (valueEnumerator.moveNext()) {
+                value = valueEnumerator.current();
                 if (value.length == 0)
                     continue;
 
                 String str = Bytes.toString(value);
                 try {
                     sdf.parse(str);
-                    if (samples.size() < nSamples && !samples.contains(str))
+                    if (samples.size() < nSamples && !samples.contains(str))
                         samples.add(str);
                 } catch (ParseException e) {
                     logger.info("Unrecognized datetime value: " + str);
@@ -134,33 +133,33 @@ public class DictionaryGenerator {
                     }
                 }
             }
-            if (matchPattern != null) {
+            if (matchPattern != null) {
                 return new DateStrDictionary(matchPattern, baseId);
-            }
+            }
         }
         throw new IllegalStateException("Unrecognized datetime value");
     }
 
-    private static Dictionary buildStringDict(IDictionaryValueEnumerator 
valueEnumerator, int baseId, int nSamples, ArrayList samples) throws 
IOException {
+    private static org.apache.kylin.common.util.Dictionary 
buildStringDict(IDictionaryValueEnumerator valueEnumerator, int baseId, int 
nSamples, ArrayList samples) throws IOException {
         TrieDictionaryBuilder builder = new TrieDictionaryBuilder(new 
StringBytesConverter());
-        byte[] value;
-        while (valueEnumerator.moveNext()) {
-            value = valueEnumerator.current();
+        byte[] value;
+        while (valueEnumerator.moveNext()) {
+            value = valueEnumerator.current();
             if (value == null)
                 continue;
             String v = Bytes.toString(value);
             builder.addValue(v);
-            if (samples.size() < nSamples && !samples.contains(v))
+            if (samples.size() < nSamples && !samples.contains(v))
                 samples.add(v);
         }
         return builder.build(baseId);
     }
 
-    private static Dictionary buildNumberDict(IDictionaryValueEnumerator 
valueEnumerator, int baseId, int nSamples, ArrayList samples) throws 
IOException {
+    private static org.apache.kylin.common.util.Dictionary 
buildNumberDict(IDictionaryValueEnumerator valueEnumerator, int baseId, int 
nSamples, ArrayList samples) throws IOException {
         NumberDictionaryBuilder builder = new NumberDictionaryBuilder(new 
StringBytesConverter());
-        byte[] value;
-        while (valueEnumerator.moveNext()) {
-            value = valueEnumerator.current();
+        byte[] value;
+        while (valueEnumerator.moveNext()) {
+            value = valueEnumerator.current();
             if (value == null)
                 continue;
             String v = Bytes.toString(value);
@@ -168,7 +167,7 @@ public class DictionaryGenerator {
                 continue;
 
             builder.addValue(v);
-            if (samples.size() < nSamples && !samples.contains(v))
+            if (samples.size() < nSamples && !samples.contains(v))
                 samples.add(v);
         }
         return builder.build(baseId);

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfo.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfo.java 
b/dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfo.java
index 0d6c9b1..645722c 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfo.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfo.java
@@ -20,6 +20,7 @@ package org.apache.kylin.dict;
 
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.RootPersistentEntity;
+import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.dict.lookup.ReadableTable.TableSignature;
 
 import com.fasterxml.jackson.annotation.JsonAutoDetect;

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfoSerializer.java
----------------------------------------------------------------------
diff --git 
a/dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfoSerializer.java 
b/dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfoSerializer.java
index 9ac8206..6381643 100644
--- 
a/dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfoSerializer.java
+++ 
b/dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfoSerializer.java
@@ -24,6 +24,7 @@ import java.io.IOException;
 
 import org.apache.kylin.common.persistence.Serializer;
 import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.common.util.JsonUtil;
 
 /**

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
----------------------------------------------------------------------
diff --git 
a/dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java 
b/dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
index 1c492c4..aa0bc5d 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.dict.lookup.FileTable;
 import org.apache.kylin.dict.lookup.HiveTable;

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/dictionary/src/main/java/org/apache/kylin/dict/ISegment.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/ISegment.java 
b/dictionary/src/main/java/org/apache/kylin/dict/ISegment.java
index b877eef..68368b5 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/ISegment.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/ISegment.java
@@ -18,6 +18,7 @@
 
 package org.apache.kylin.dict;
 
+import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.metadata.model.TblColRef;
 
 /**
@@ -27,7 +28,7 @@ public interface ISegment {
 
     public abstract int getColumnLength(TblColRef col);
 
-    public abstract Dictionary<?> getDictionary(TblColRef col);
+    public abstract Dictionary<String> getDictionary(TblColRef col);
 
     public String getName();
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/dictionary/src/main/java/org/apache/kylin/dict/MultipleDictionaryValueEnumerator.java
----------------------------------------------------------------------
diff --git 
a/dictionary/src/main/java/org/apache/kylin/dict/MultipleDictionaryValueEnumerator.java
 
b/dictionary/src/main/java/org/apache/kylin/dict/MultipleDictionaryValueEnumerator.java
index 13f7394..43d62a3 100644
--- 
a/dictionary/src/main/java/org/apache/kylin/dict/MultipleDictionaryValueEnumerator.java
+++ 
b/dictionary/src/main/java/org/apache/kylin/dict/MultipleDictionaryValueEnumerator.java
@@ -20,6 +20,7 @@ package org.apache.kylin.dict;
 
 import com.google.common.collect.Lists;
 import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.Dictionary;
 
 import java.io.IOException;
 import java.util.List;

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/dictionary/src/main/java/org/apache/kylin/dict/TrieDictionary.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/TrieDictionary.java 
b/dictionary/src/main/java/org/apache/kylin/dict/TrieDictionary.java
index 3479dbd..2b6d77d 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/TrieDictionary.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/TrieDictionary.java
@@ -30,6 +30,7 @@ import java.util.HashMap;
 
 import org.apache.kylin.common.util.BytesUtil;
 import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.Dictionary;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTable.java
----------------------------------------------------------------------
diff --git 
a/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTable.java 
b/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTable.java
index 96a7cd6..cc3c637 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTable.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTable.java
@@ -30,7 +30,7 @@ import org.apache.commons.lang.ArrayUtils;
 import org.apache.hadoop.fs.Path;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.RootPersistentEntity;
-import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.dict.StringBytesConverter;
 import org.apache.kylin.dict.TrieDictionary;
 import org.apache.kylin.dict.TrieDictionaryBuilder;

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_desc.json
----------------------------------------------------------------------
diff --git 
a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_desc.json
 
b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_desc.json
index 485170b..fedb2c1 100644
--- 
a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_desc.json
+++ 
b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_desc.json
@@ -169,7 +169,12 @@
         "expression": "COUNT_DISTINCT",
         "parameter": {
           "type": "column",
-          "value": "LSTG_FORMAT_NAME,SELLER_ID"
+          "value": "LSTG_FORMAT_NAME",
+          "next_parameter": {
+            "type": "column",
+            "value": "SELLER_ID",
+            "next_parameter": null
+          }
         },
         "returntype": "hllc(10)"
       },

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_left_join_desc.json
----------------------------------------------------------------------
diff --git 
a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_left_join_desc.json
 
b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_left_join_desc.json
index c6bd12f..7e6934b 100644
--- 
a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_left_join_desc.json
+++ 
b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_left_join_desc.json
@@ -169,7 +169,12 @@
         "expression": "COUNT_DISTINCT",
         "parameter": {
           "type": "column",
-          "value": "LSTG_FORMAT_NAME,SELLER_ID"
+          "value": "LSTG_FORMAT_NAME",
+          "next_parameter": {
+            "type": "column",
+            "value": "SELLER_ID",
+            "next_parameter": null
+          }
         },
         "returntype": "hllc(10)"
       },

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/invertedindex/src/main/java/org/apache/kylin/invertedindex/IICapabilityChecker.java
----------------------------------------------------------------------
diff --git 
a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IICapabilityChecker.java
 
b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IICapabilityChecker.java
new file mode 100644
index 0000000..4ee8f50
--- /dev/null
+++ 
b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IICapabilityChecker.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.invertedindex;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.kylin.invertedindex.model.IIDesc;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.JoinDesc;
+import org.apache.kylin.metadata.model.LookupDesc;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.realization.CapabilityResult;
+import org.apache.kylin.metadata.realization.SQLDigest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Sets;
+
+/**
+ */
+public class IICapabilityChecker {
+    private static final Logger logger = 
LoggerFactory.getLogger(IICapabilityChecker.class);
+
+    public static CapabilityResult check(IIInstance ii, SQLDigest digest) {
+        CapabilityResult result = new CapabilityResult();
+        result.capable = false;
+
+        // match fact table
+        if (!digest.factTable.equalsIgnoreCase(ii.getFactTable())) {
+            logger.info("Exclude II " + ii.getName() + " because fact table 
unmatch");
+            return result;
+        }
+
+        // match joins
+        boolean matchJoin = isMatchedWithJoins(digest.joinDescs, ii);
+        if (!matchJoin) {
+            logger.info("Exclude II " + ii.getName() + " because unmatched 
joins");
+            return result;
+        }
+
+        // dimensions & measures
+        Collection<TblColRef> dimensionColumns = getDimensionColumns(digest);
+        Collection<FunctionDesc> aggrFunctions = digest.aggregations;
+        Collection<TblColRef> unmatchedDimensions = 
unmatchedDimensions(dimensionColumns, ii);
+        Collection<FunctionDesc> unmatchedAggregations = 
unmatchedAggregations(aggrFunctions, ii);
+
+        // try dimension-as-measure
+        if (!unmatchedAggregations.isEmpty()) {
+            tryDimensionAsMeasures(unmatchedAggregations, digest, ii, result);
+        }
+
+        if (!unmatchedDimensions.isEmpty()) {
+            logger.info("Exclude ii " + ii.getName() + " because unmatched 
dimensions");
+            return result;
+        }
+
+        if (!unmatchedAggregations.isEmpty()) {
+            logger.info("Exclude ii " + ii.getName() + " because unmatched 
aggregations");
+            return result;
+        }
+
+        // cost will be minded by caller
+        result.capable = true;
+        return result;
+    }
+
+    private static boolean isMatchedWithJoins(Collection<JoinDesc> joins, 
IIInstance iiInstance) {
+        IIDesc iiDesc = iiInstance.getDescriptor();
+        List<TableDesc> tables = iiDesc.listTables();
+
+        List<JoinDesc> cubeJoins = new ArrayList<JoinDesc>(tables.size());
+        for (TableDesc tableDesc : tables) {
+            JoinDesc join = null;
+            for (LookupDesc lookup : iiDesc.getModel().getLookups()) {
+                if 
(lookup.getTable().equalsIgnoreCase(tableDesc.getIdentity())) {
+                    join = lookup.getJoin();
+                    cubeJoins.add(join);
+                    break;
+                }
+            }
+        }
+
+        for (JoinDesc j : joins) {
+            // optiq engine can't decide which one is fk or pk
+            String pTable = j.getPrimaryKeyColumns()[0].getTable();
+            String factTable = iiDesc.getModel().getFactTable();
+            if (factTable.equals(pTable)) {
+                j.swapPKFK();
+            }
+
+            // check primary key, all PK column should refer to same tale, the 
Fact Table of iiInstance.
+            // Using first column's table name to check.
+            String fTable = j.getForeignKeyColumns()[0].getTable();
+            if (!factTable.equals(fTable)) {
+                logger.info("Fact Table" + factTable + " not matched in join: 
" + j + " on ii " + iiInstance.getName());
+                return false;
+            }
+
+            // The hashcode() function of JoinDesc has been overwritten,
+            // which takes into consideration: pk,fk,jointype
+            if (!cubeJoins.contains(j)) {
+                logger.info("Query joins don't match on ii " + 
iiInstance.getName());
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private static Collection<TblColRef> getDimensionColumns(SQLDigest 
sqlDigest) {
+        Collection<TblColRef> groupByColumns = sqlDigest.groupbyColumns;
+        Collection<TblColRef> filterColumns = sqlDigest.filterColumns;
+
+        Collection<TblColRef> dimensionColumns = new HashSet<TblColRef>();
+        dimensionColumns.addAll(groupByColumns);
+        dimensionColumns.addAll(filterColumns);
+        return dimensionColumns;
+    }
+
+    private static Set<TblColRef> unmatchedDimensions(Collection<TblColRef> 
dimensionColumns, IIInstance ii) {
+        HashSet<TblColRef> result = Sets.newHashSet(dimensionColumns);
+        result.removeAll(ii.getDescriptor().listAllDimensions());
+        return result;
+    }
+
+    private static Set<FunctionDesc> 
unmatchedAggregations(Collection<FunctionDesc> aggregations, IIInstance ii) {
+        HashSet<FunctionDesc> result = Sets.newHashSet(aggregations);
+        result.removeAll(ii.getDescriptor().listAllFunctions());
+        return result;
+    }
+
+    private static void tryDimensionAsMeasures(Collection<FunctionDesc> 
unmatchedAggregations, SQLDigest digest, IIInstance ii, CapabilityResult 
result) {
+        IIDesc iiDesc = ii.getDescriptor();
+        Collection<FunctionDesc> iiFuncs = iiDesc.listAllFunctions();
+
+        Iterator<FunctionDesc> it = unmatchedAggregations.iterator();
+        while (it.hasNext()) {
+            FunctionDesc functionDesc = it.next();
+
+            if (iiFuncs.contains(functionDesc)) {
+                it.remove();
+                continue;
+            }
+
+            // let calcite handle count
+            if (functionDesc.isCount()) {
+                it.remove();
+                continue;
+            }
+
+            // calcite can do aggregation from columns on-the-fly
+            List<TblColRef> neededCols = 
functionDesc.getParameter().getColRefs();
+            if (neededCols.size() > 0 && 
iiDesc.listAllDimensions().containsAll(neededCols)) {
+                result.influences.add(new 
CapabilityResult.DimensionAsMeasure(functionDesc));
+                it.remove();
+                continue;
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIInstance.java
----------------------------------------------------------------------
diff --git 
a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIInstance.java 
b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIInstance.java
index b372c70..86b0543 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIInstance.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIInstance.java
@@ -29,6 +29,7 @@ import org.apache.kylin.invertedindex.model.IIDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.realization.CapabilityResult;
 import org.apache.kylin.metadata.realization.IRealization;
 import org.apache.kylin.metadata.realization.RealizationStatusEnum;
 import org.apache.kylin.metadata.realization.RealizationType;
@@ -265,15 +266,16 @@ public class IIInstance extends RootPersistentEntity 
implements IRealization {
     }
 
     @Override
-    public boolean isCapable(SQLDigest digest) {
-        //TODO: currently II is nearly omnipotent
-        if (!digest.factTable.equalsIgnoreCase(this.getFactTable()))
-            return false;
-
-        return true;
+    public CapabilityResult isCapable(SQLDigest digest) {
+        CapabilityResult result = IICapabilityChecker.check(this, digest);
+        if (result.capable) {
+            result.cost = getCost(digest);
+        } else {
+            result.cost = -1;
+        }
+        return result;
     }
 
-    @Override
     public int getCost(SQLDigest digest) {
         return 0;
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIManager.java
----------------------------------------------------------------------
diff --git 
a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIManager.java 
b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIManager.java
index cb5a1cb..f7e70f4 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIManager.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIManager.java
@@ -33,7 +33,7 @@ import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.Serializer;
 import org.apache.kylin.common.restclient.Broadcaster;
 import org.apache.kylin.common.restclient.CaseInsensitiveStringCache;
-import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.dict.DictionaryInfo;
 import org.apache.kylin.dict.DictionaryManager;
 import org.apache.kylin.invertedindex.model.IIDesc;

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/invertedindex/src/main/java/org/apache/kylin/invertedindex/IISegment.java
----------------------------------------------------------------------
diff --git 
a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IISegment.java 
b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IISegment.java
index 33ff1b0..77a876c 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IISegment.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IISegment.java
@@ -25,7 +25,7 @@ import java.util.Map;
 import java.util.TimeZone;
 import java.util.concurrent.ConcurrentHashMap;
 
-import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.dict.ISegment;
 import org.apache.kylin.invertedindex.index.TableRecordInfo;
 import org.apache.kylin.invertedindex.model.IIDesc;
@@ -271,7 +271,7 @@ public class IISegment implements Comparable<IISegment>, 
ISegment {
     }
 
     @Override
-    public Dictionary<?> getDictionary(TblColRef col) {
+    public Dictionary<String> getDictionary(TblColRef col) {
 
         int index = getTableRecordInfo().findColumn(col);
         return getTableRecordInfo().dict(index);

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/BitMapContainer.java
----------------------------------------------------------------------
diff --git 
a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/BitMapContainer.java
 
b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/BitMapContainer.java
index 2594936..164e2b9 100644
--- 
a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/BitMapContainer.java
+++ 
b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/BitMapContainer.java
@@ -25,7 +25,7 @@ import java.util.List;
 
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.common.util.Dictionary;
 import org.roaringbitmap.RoaringBitmap;
 
 /**

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/CompressedValueContainer.java
----------------------------------------------------------------------
diff --git 
a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/CompressedValueContainer.java
 
b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/CompressedValueContainer.java
index c58261d..334457c 100644
--- 
a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/CompressedValueContainer.java
+++ 
b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/CompressedValueContainer.java
@@ -24,7 +24,7 @@ import java.util.Arrays;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.common.util.Dictionary;
 
 import com.ning.compress.lzf.LZFDecoder;
 import com.ning.compress.lzf.LZFEncoder;

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/RawTableRecord.java
----------------------------------------------------------------------
diff --git 
a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/RawTableRecord.java
 
b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/RawTableRecord.java
index baae663..b9f963e 100644
--- 
a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/RawTableRecord.java
+++ 
b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/RawTableRecord.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.metadata.measure.fixedlen.FixedLenMeasureCodec;
 
 /**

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java
----------------------------------------------------------------------
diff --git 
a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java
 
b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java
index 11ecdf8..c41a70c 100644
--- 
a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java
+++ 
b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java
@@ -21,7 +21,7 @@ package org.apache.kylin.invertedindex.index;
 import java.io.IOException;
 import java.util.List;
 
-import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.dict.DictionaryManager;
 import org.apache.kylin.invertedindex.IISegment;
 import org.apache.kylin.invertedindex.model.IIDesc;

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfoDigest.java
----------------------------------------------------------------------
diff --git 
a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfoDigest.java
 
b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfoDigest.java
index da78627..0ed58b0 100644
--- 
a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfoDigest.java
+++ 
b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfoDigest.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.io.LongWritable;
 import org.apache.kylin.common.util.BytesSerializer;
 import org.apache.kylin.common.util.BytesUtil;
 import org.apache.kylin.metadata.measure.fixedlen.FixedLenMeasureCodec;
-import org.apache.kylin.metadata.model.DataType;
+import org.apache.kylin.metadata.datatype.DataType;
 
 /**
  * Created by honma on 11/10/14.

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java
----------------------------------------------------------------------
diff --git 
a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java 
b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java
index c8597bc..f10712a 100644
--- 
a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java
+++ 
b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java
@@ -207,6 +207,10 @@ public class IIDesc extends RootPersistentEntity {
         p1.setColRefs(ImmutableList.of(new TblColRef(columnDesc)));
         f1.setParameter(p1);
         f1.setReturnType(returnType);
+        if (f1.isSum() && f1.getReturnDataType().isIntegerFamily()) {
+            f1.setReturnType("bigint");
+        }
+        
         measureDesc.setFunction(f1);
         return measureDesc;
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/IIInstanceTest.java
----------------------------------------------------------------------
diff --git 
a/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/IIInstanceTest.java
 
b/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/IIInstanceTest.java
index 4ad38b5..50af8a4 100644
--- 
a/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/IIInstanceTest.java
+++ 
b/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/IIInstanceTest.java
@@ -22,7 +22,7 @@ import java.io.IOException;
 import java.util.List;
 
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
-import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.invertedindex.IIDescManager;
 import org.apache.kylin.invertedindex.IIInstance;
 import org.apache.kylin.invertedindex.IIManager;

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/InvertedIndexLocalTest.java
----------------------------------------------------------------------
diff --git 
a/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/InvertedIndexLocalTest.java
 
b/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/InvertedIndexLocalTest.java
index 5f75b04..cfa4ba6 100644
--- 
a/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/InvertedIndexLocalTest.java
+++ 
b/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/InvertedIndexLocalTest.java
@@ -33,7 +33,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.kylin.common.util.BytesUtil;
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
 import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.invertedindex.IIInstance;
 import org.apache.kylin.invertedindex.IIManager;
 import org.apache.kylin.invertedindex.index.BitMapContainer;

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/job/src/main/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapper.java
----------------------------------------------------------------------
diff --git 
a/job/src/main/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapper.java 
b/job/src/main/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapper.java
index 470225e..929408a 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapper.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapper.java
@@ -22,12 +22,14 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.hadoop.io.Text;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.mr.KylinMapper;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.BytesSplitter;
+import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.common.util.SplittedBytes;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
@@ -39,11 +41,13 @@ import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.job.constant.BatchConstants;
 import org.apache.kylin.job.hadoop.AbstractHadoopJob;
 import org.apache.kylin.job.hadoop.hive.CubeJoinedFlatTableDesc;
-import org.apache.kylin.metadata.measure.MeasureCodec;
+import org.apache.kylin.measure.MeasureCodec;
+import org.apache.kylin.measure.MeasureIngester;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.ParameterDesc;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.model.TblColRef;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -75,6 +79,8 @@ public class BaseCuboidMapper<KEYIN> extends 
KylinMapper<KEYIN, Text, Text, Text
     private int errorRecordCounter;
     private Text outputKey = new Text();
     private Text outputValue = new Text();
+    protected MeasureIngester<?>[] aggrIngesters;
+    protected Map<TblColRef, Dictionary<String>> dictionaryMap;
     private Object[] measures;
     private byte[][] keyBytesBuf;
     private ByteBuffer valueBuf = 
ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
@@ -116,6 +122,9 @@ public class BaseCuboidMapper<KEYIN> extends 
KylinMapper<KEYIN, Text, Text, Text
         int colCount = cubeDesc.getRowkey().getRowKeyColumns().length;
         keyBytesBuf = new byte[colCount][];
 
+        aggrIngesters = MeasureIngester.create(cubeDesc.getMeasures());
+        dictionaryMap = cubeSegment.buildDictionaryMap();
+
         initNullBytes();
     }
 
@@ -153,52 +162,45 @@ public class BaseCuboidMapper<KEYIN> extends 
KylinMapper<KEYIN, Text, Text, Text
     private void buildValue(SplittedBytes[] splitBuffers) {
 
         for (int i = 0; i < measures.length; i++) {
-            byte[] valueBytes = getValueBytes(splitBuffers, i);
-            measures[i] = measureCodec.getSerializer(i).valueOf(valueBytes);
+            measures[i] = buildValueOf(i, splitBuffers);
         }
 
         valueBuf.clear();
         measureCodec.encode(measures, valueBuf);
     }
 
-    private byte[] getValueBytes(SplittedBytes[] splitBuffers, int measureIdx) 
{
-        MeasureDesc desc = cubeDesc.getMeasures().get(measureIdx);
-        FunctionDesc func = desc.getFunction();
-        ParameterDesc paramDesc = func.getParameter();
-        int[] flatTableIdx = 
intermediateTableDesc.getMeasureColumnIndexes()[measureIdx];
-
-        byte[] result = null;
-
-        // constant
-        if (flatTableIdx == null) {
-            result = Bytes.toBytes(paramDesc.getValue());
-        }
-        // column values
-        else {
-            // for multiple columns, their values are joined
-            for (int i = 0; i < flatTableIdx.length; i++) {
-                SplittedBytes split = splitBuffers[flatTableIdx[i]];
-                if (result == null) {
-                    result = Arrays.copyOf(split.value, split.length);
-                } else {
-                    byte[] newResult = new byte[result.length + split.length];
-                    System.arraycopy(result, 0, newResult, 0, result.length);
-                    System.arraycopy(split.value, 0, newResult, result.length, 
split.length);
-                    result = newResult;
-                }
+    private Object buildValueOf(int idxOfMeasure, SplittedBytes[] 
splitBuffers) {
+        MeasureDesc measure = cubeDesc.getMeasures().get(idxOfMeasure);
+        FunctionDesc function = measure.getFunction();
+        int[] colIdxOnFlatTable = 
intermediateTableDesc.getMeasureColumnIndexes()[idxOfMeasure];
+
+        int paramCount = function.getParameterCount();
+        String[] inputToMeasure = new String[paramCount];
+
+        // pick up parameter values
+        ParameterDesc param = function.getParameter();
+        int colParamIdx = 0; // index among parameters of column type
+        for (int i = 0; i < paramCount; i++, param = param.getNextParameter()) 
{
+            String value;
+            if (function.isCount()) {
+                value = "1";
+            } else if (param.isColumnType()) {
+                value = getCell(colIdxOnFlatTable[colParamIdx++], 
splitBuffers);
+            } else {
+                value = param.getValue();
             }
+            inputToMeasure[i] = value;
         }
 
-        if (func.isCount() || func.isHolisticCountDistinct()) {
-            // note for holistic count distinct, this value will be ignored
-            result = ONE;
-        }
-
-        if (isNull(result)) {
-            result = null;
-        }
+        return aggrIngesters[idxOfMeasure].valueOf(inputToMeasure, measure, 
dictionaryMap);
+    }
 
-        return result;
+    private String getCell(int i, SplittedBytes[] splitBuffers) {
+        byte[] bytes = Arrays.copyOf(splitBuffers[i].value, 
splitBuffers[i].length);
+        if (isNull(bytes))
+            return null;
+        else
+            return Bytes.toString(bytes);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileMapper.java
----------------------------------------------------------------------
diff --git 
a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileMapper.java 
b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileMapper.java
index 36bf80c..c19e54b 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileMapper.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileMapper.java
@@ -37,7 +37,7 @@ import org.apache.kylin.cube.model.HBaseColumnDesc;
 import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
 import org.apache.kylin.job.constant.BatchConstants;
 import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.apache.kylin.metadata.measure.MeasureCodec;
+import org.apache.kylin.measure.MeasureCodec;
 import org.apache.kylin.metadata.model.MeasureDesc;
 
 import com.google.common.collect.Lists;
@@ -93,7 +93,7 @@ public class CubeHFileMapper extends KylinMapper<Text, Text, 
ImmutableBytesWrita
         } else { // normal (complex) case that distributes measures to multiple
                  // HBase columns
 
-            inputCodec.decode(value, inputMeasures);
+            inputCodec.decode(ByteBuffer.wrap(value.getBytes(), 0, 
value.getLength()), inputMeasures);
 
             for (int i = 0; i < n; i++) {
                 outputValue = keyValueCreators.get(i).create(key, 
inputMeasures);

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidJob.java 
b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidJob.java
index a74f2a1..bff9e3a 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidJob.java
@@ -161,9 +161,8 @@ public class CuboidJob extends AbstractHadoopJob {
         // number of reduce tasks
         int numReduceTasks = (int) Math.round(totalReduceInputMB / 
perReduceInputMB * reduceCountRatio);
 
-        // adjust reducer number for cube which has DISTINCT_COUNT measures for
-        // better performance
-        if (cubeDesc.hasHolisticCountDistinctMeasures()) {
+        // adjust reducer number for cube which has DISTINCT_COUNT measures 
for better performance
+        if (cubeDesc.hasMemoryHungryMeasures()) {
             numReduceTasks = numReduceTasks * 4;
         }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidReducer.java
----------------------------------------------------------------------
diff --git 
a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidReducer.java 
b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidReducer.java
index bcb4b52..4527f30 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidReducer.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidReducer.java
@@ -30,8 +30,8 @@ import org.apache.kylin.cube.kv.RowConstants;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.job.constant.BatchConstants;
 import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.apache.kylin.metadata.measure.MeasureAggregators;
-import org.apache.kylin.metadata.measure.MeasureCodec;
+import org.apache.kylin.measure.MeasureAggregators;
+import org.apache.kylin.measure.MeasureCodec;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -81,7 +81,7 @@ public class CuboidReducer extends KylinReducer<Text, Text, 
Text, Text> {
         aggs.reset();
 
         for (Text value : values) {
-            codec.decode(value, input);
+            codec.decode(ByteBuffer.wrap(value.getBytes(), 0, 
value.getLength()), input);
             aggs.aggregate(input);
         }
         aggs.collectStates(result);

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/job/src/main/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapper.java
----------------------------------------------------------------------
diff --git 
a/job/src/main/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapper.java 
b/job/src/main/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapper.java
index 2516745..2528e07 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapper.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapper.java
@@ -19,17 +19,22 @@
 package org.apache.kylin.job.hadoop.cube;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import com.google.common.collect.Lists;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.mr.KylinMapper;
 import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.common.util.SplittedBytes;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
@@ -38,10 +43,14 @@ import org.apache.kylin.cube.common.RowKeySplitter;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.kv.RowConstants;
 import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.dict.DictionaryManager;
 import org.apache.kylin.job.constant.BatchConstants;
 import org.apache.kylin.job.hadoop.AbstractHadoopJob;
+import org.apache.kylin.measure.MeasureCodec;
+import org.apache.kylin.measure.MeasureIngester;
+import org.apache.kylin.measure.MeasureType;
+import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.TblColRef;
 
@@ -60,6 +69,17 @@ public class MergeCuboidMapper extends KylinMapper<Text, 
Text, Text, Text> {
     private CubeSegment sourceCubeSegment;// Must be unique during a mapper's
     // life cycle
 
+
+    // for re-encode measures that use dictionary
+    private List<Pair<Integer, MeasureIngester>> dictMeasures;
+    private Map<TblColRef, Dictionary<String>> oldDicts;
+    private Map<TblColRef, Dictionary<String>> newDicts;
+    private List<MeasureDesc> measureDescs;
+    private MeasureCodec codec;
+    private Object[] measureObjs;
+    private ByteBuffer valueBuf;
+    private Text outputValue;
+    
     private Text outputKey = new Text();
 
     private byte[] newKeyBuf;
@@ -133,6 +153,26 @@ public class MergeCuboidMapper extends KylinMapper<Text, 
Text, Text, Text> {
         System.out.println(sourceCubeSegment);
 
         this.rowKeySplitter = new RowKeySplitter(sourceCubeSegment, 65, 255);
+
+
+        measureDescs = cubeDesc.getMeasures();
+        codec = new MeasureCodec(measureDescs);
+        measureObjs = new Object[measureDescs.size()];
+        valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
+        outputValue = new Text();
+
+        dictMeasures = Lists.newArrayList();
+        for (int i = 0; i < measureDescs.size(); i++) {
+            MeasureDesc measureDesc = measureDescs.get(i);
+            MeasureType measureType = 
measureDesc.getFunction().getMeasureType();
+            if 
(measureType.getColumnsNeedDictionary(measureDesc.getFunction()).isEmpty() == 
false) {
+                dictMeasures.add(Pair.newPair(i, measureType.newIngester()));
+            }
+        }
+        if (dictMeasures.size() > 0) {
+            oldDicts = sourceCubeSegment.buildDictionaryMap();
+            newDicts = mergedCubeSegment.buildDictionaryMap();
+        }
     }
 
     @Override
@@ -187,6 +227,21 @@ public class MergeCuboidMapper extends KylinMapper<Text, 
Text, Text, Text> {
         byte[] newKey = Arrays.copyOf(newKeyBuf, bufOffset);
         outputKey.set(newKey, 0, newKey.length);
 
+
+        // re-encode measures if dictionary is used
+        if (dictMeasures.size() > 0) {
+            codec.decode(ByteBuffer.wrap(value.getBytes(), 0, 
value.getLength()), measureObjs);
+            for (Pair<Integer, MeasureIngester> pair : dictMeasures) {
+                int i = pair.getFirst();
+                MeasureIngester ingester = pair.getSecond();
+                measureObjs[i] = ingester.reEncodeDictionary(measureObjs[i], 
measureDescs.get(i), oldDicts, newDicts);
+            }
+            valueBuf.clear();
+            codec.encode(measureObjs, valueBuf);
+            outputValue.set(valueBuf.array(), 0, valueBuf.position());
+            value = outputValue;
+        }
+        
         context.write(outputKey, value);
     }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/job/src/main/java/org/apache/kylin/job/hadoop/cube/NewBaseCuboidMapper.java
----------------------------------------------------------------------
diff --git 
a/job/src/main/java/org/apache/kylin/job/hadoop/cube/NewBaseCuboidMapper.java 
b/job/src/main/java/org/apache/kylin/job/hadoop/cube/NewBaseCuboidMapper.java
deleted file mode 100644
index 35b4662..0000000
--- 
a/job/src/main/java/org/apache/kylin/job/hadoop/cube/NewBaseCuboidMapper.java
+++ /dev/null
@@ -1,346 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.hadoop.cube;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.hadoop.io.Text;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.mr.KylinMapper;
-import org.apache.kylin.common.util.Array;
-import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.BytesSplitter;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.common.util.SplittedBytes;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.kv.AbstractRowKeyEncoder;
-import org.apache.kylin.cube.kv.RowConstants;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.DimensionDesc;
-import org.apache.kylin.dict.lookup.HiveTable;
-import org.apache.kylin.dict.lookup.LookupBytesTable;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.apache.kylin.metadata.MetadataManager;
-import org.apache.kylin.metadata.measure.MeasureCodec;
-import org.apache.kylin.metadata.model.FunctionDesc;
-import org.apache.kylin.metadata.model.JoinDesc;
-import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.metadata.model.ParameterDesc;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author George Song (ysong1),honma
- */
-public class NewBaseCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Text, Text, 
Text> {
-
-    private static final Logger logger = 
LoggerFactory.getLogger(NewBaseCuboidMapper.class);
-
-    private String cubeName;
-    private String segmentName;
-    private Cuboid baseCuboid;
-    private CubeInstance cube;
-    private CubeSegment cubeSegment;
-
-    private CubeDesc cubeDesc;
-    private MetadataManager metadataManager;
-    private TableDesc factTableDesc;
-
-    private boolean byteRowDelimiterInferred = false;
-    private byte byteRowDelimiter;
-
-    private int counter;
-    private Text outputKey = new Text();
-    private Text outputValue = new Text();
-    private Object[] measures;
-    private byte[][] keyBytesBuf;
-    private ByteBuffer valueBuf = 
ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
-
-    private BytesSplitter bytesSplitter;
-    private AbstractRowKeyEncoder rowKeyEncoder;
-    private MeasureCodec measureCodec;
-
-    // deal with table join
-    private HashMap<String, LookupBytesTable> lookupTables;// name -> table
-    private LinkedList<TableJoin> tableJoins;
-    private LinkedList<Pair<Integer, Integer>> factTblColAsRowKey;// similar as
-    // TableJoin.dimTblColAsRowKey
-    private int[][] measureColumnIndice;
-    private byte[] nullValue;
-
-    private class TableJoin {
-        public LinkedList<Integer> fkIndice;// zero-based join columns on fact
-        // table
-        public String lookupTableName;
-        public String joinType;
-
-        // Pair.first -> zero-based column index in lookup table
-        // Pair.second -> zero based row key index
-        public LinkedList<Pair<Integer, Integer>> dimTblColAsRowKey;
-
-        private TableJoin(String joinType, LinkedList<Integer> fkIndice, 
String lookupTableName, LinkedList<Pair<Integer, Integer>> dimTblColAsRowKey) {
-            this.joinType = joinType;
-            this.fkIndice = fkIndice;
-            this.lookupTableName = lookupTableName;
-            this.dimTblColAsRowKey = dimTblColAsRowKey;
-        }
-    }
-
-    @Override
-    protected void setup(Context context) throws IOException {
-        super.publishConfiguration(context.getConfiguration());
-
-        cubeName = 
context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase();
-        segmentName = 
context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_NAME);
-
-        KylinConfig config = 
AbstractHadoopJob.loadKylinPropsAndMetadata(context.getConfiguration());
-
-        metadataManager = MetadataManager.getInstance(config);
-        cube = CubeManager.getInstance(config).getCube(cubeName);
-        cubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
-        cubeDesc = cube.getDescriptor();
-        factTableDesc = metadataManager.getTableDesc(cubeDesc.getFactTable());
-
-        long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
-        baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId);
-
-        // intermediateTableDesc = new
-        // JoinedFlatTableDesc(cube.getDescriptor());
-
-        rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cubeSegment, 
baseCuboid);
-
-        measureCodec = new MeasureCodec(cubeDesc.getMeasures());
-        measures = new Object[cubeDesc.getMeasures().size()];
-
-        int colCount = cubeDesc.getRowkey().getRowKeyColumns().length;
-        keyBytesBuf = new byte[colCount][];
-
-        bytesSplitter = new BytesSplitter(factTableDesc.getColumns().length, 
4096);
-
-        nullValue = new byte[] { (byte) '\\', (byte) 'N' };// As in Hive, null
-        // value is
-        // represented by \N
-
-        prepareJoins();
-        prepareMetrics();
-    }
-
-    private void prepareJoins() throws IOException {
-        this.lookupTables = new HashMap<String, LookupBytesTable>();
-        this.tableJoins = new LinkedList<TableJoin>();
-        this.factTblColAsRowKey = new LinkedList<Pair<Integer, Integer>>();
-
-        for (DimensionDesc dim : cubeDesc.getDimensions()) {
-            JoinDesc join = dim.getJoin();
-            if (join != null) {
-                String joinType = join.getType().toUpperCase();
-                String lookupTableName = dim.getTable();
-
-                // load lookup tables
-                if (!lookupTables.containsKey(lookupTableName)) {
-                    HiveTable htable = new HiveTable(metadataManager, 
lookupTableName);
-                    LookupBytesTable btable = new 
LookupBytesTable(metadataManager.getTableDesc(lookupTableName), 
join.getPrimaryKey(), htable);
-                    lookupTables.put(lookupTableName, btable);
-                }
-
-                // create join infos
-                LinkedList<Integer> fkIndice = new LinkedList<Integer>();
-                for (TblColRef colRef : join.getForeignKeyColumns()) {
-                    fkIndice.add(colRef.getColumn().getZeroBasedIndex());
-                }
-                this.tableJoins.add(new TableJoin(joinType, fkIndice, 
lookupTableName, this.findColumnRowKeyRelationships(dim)));
-
-            } else {
-
-                
this.factTblColAsRowKey.addAll(this.findColumnRowKeyRelationships(dim));
-            }
-        }
-
-        // put composite keys joins ahead of single key joins
-        Collections.sort(tableJoins, new Comparator<TableJoin>() {
-            @Override
-            public int compare(TableJoin o1, TableJoin o2) {
-                return 
Integer.valueOf(o2.fkIndice.size()).compareTo(Integer.valueOf(o1.fkIndice.size()));
-            }
-        });
-    }
-
-    private LinkedList<Pair<Integer, Integer>> 
findColumnRowKeyRelationships(DimensionDesc dim) {
-        LinkedList<Pair<Integer, Integer>> dimTblColAsRowKey = new 
LinkedList<Pair<Integer, Integer>>();
-        for (TblColRef colRef : dim.getColumnRefs()) {
-            int dimTableIndex = colRef.getColumn().getZeroBasedIndex();
-            int rowKeyIndex = 
cubeDesc.getRowkey().getRowKeyIndexByColumnName(colRef.getName());
-            dimTblColAsRowKey.add(new Pair<Integer, Integer>(dimTableIndex, 
rowKeyIndex));
-        }
-        return dimTblColAsRowKey;
-    }
-
-    private void prepareMetrics() {
-        List<MeasureDesc> measures = cubeDesc.getMeasures();
-        int measureSize = measures.size();
-        measureColumnIndice = new int[measureSize][];
-        for (int i = 0; i < measureSize; i++) {
-            FunctionDesc func = measures.get(i).getFunction();
-            List<TblColRef> colRefs = func.getParameter().getColRefs();
-            if (colRefs == null) {
-                measureColumnIndice[i] = null;
-            } else {
-                measureColumnIndice[i] = new int[colRefs.size()];
-                for (int j = 0; j < colRefs.size(); j++) {
-                    TblColRef c = colRefs.get(j);
-                    int factTblIdx = 
factTableDesc.findColumnByName(c.getName()).getZeroBasedIndex();
-                    measureColumnIndice[i][j] = factTblIdx;
-                }
-            }
-        }
-    }
-
-    private byte[] trimSplitBuffer(SplittedBytes splittedBytes) {
-        return Arrays.copyOf(splittedBytes.value, splittedBytes.length);
-    }
-
-    private byte[] buildKey(SplittedBytes[] splitBuffers) {
-
-        int filledDimension = 0;// debug
-
-        // join lookup tables, and fill into RowKey the columns in lookup table
-        for (TableJoin tableJoin : this.tableJoins) {
-            String dimTblName = tableJoin.lookupTableName;
-            LookupBytesTable dimTbl = this.lookupTables.get(dimTblName);
-            ByteArray[] rawKey = new ByteArray[tableJoin.fkIndice.size()];
-            for (int i = 0; i < tableJoin.fkIndice.size(); ++i) {
-                rawKey[i] = new 
ByteArray(trimSplitBuffer(splitBuffers[tableJoin.fkIndice.get(i)]));
-            }
-            Array<ByteArray> key = new Array<ByteArray>(rawKey);
-            ByteArray[] dimRow = dimTbl.getRow(key);
-            if (dimRow == null) {
-                if (tableJoin.joinType.equalsIgnoreCase("INNER")) {
-                    return null;
-                } else if (tableJoin.joinType.equalsIgnoreCase("LEFT")) {
-                    for (Pair<Integer, Integer> relation : 
tableJoin.dimTblColAsRowKey) {
-                        keyBytesBuf[relation.getSecond()] = nullValue;
-                        filledDimension++;
-                    }
-                }
-            } else {
-                for (Pair<Integer, Integer> relation : 
tableJoin.dimTblColAsRowKey) {
-                    keyBytesBuf[relation.getSecond()] = 
dimRow[relation.getFirst()].data;
-                    filledDimension++;
-                }
-            }
-        }
-
-        // fill into RowKey the columns in fact table
-        for (Pair<Integer, Integer> relation : this.factTblColAsRowKey) {
-            keyBytesBuf[relation.getSecond()] = 
trimSplitBuffer(splitBuffers[relation.getFirst()]);
-            filledDimension++;
-        }
-
-        assert filledDimension == keyBytesBuf.length;
-
-        // all the row key slots(keyBytesBuf) should be complete now
-        return rowKeyEncoder.encode(keyBytesBuf);
-    }
-
-    private void buildValue(SplittedBytes[] splitBuffers) {
-
-        for (int i = 0; i < measures.length; i++) {
-            byte[] valueBytes = getValueBytes(splitBuffers, i);
-            measures[i] = measureCodec.getSerializer(i).valueOf(valueBytes);
-        }
-
-        valueBuf.clear();
-        measureCodec.encode(measures, valueBuf);
-    }
-
-    private byte[] getValueBytes(SplittedBytes[] splitBuffers, int measureIdx) 
{
-        MeasureDesc desc = cubeDesc.getMeasures().get(measureIdx);
-        ParameterDesc paramDesc = desc.getFunction().getParameter();
-        int[] flatTableIdx = this.measureColumnIndice[measureIdx];
-
-        byte[] result = null;
-
-        // constant
-        if (flatTableIdx == null) {
-            result = Bytes.toBytes(paramDesc.getValue());
-        }
-        // column values
-        else {
-            for (int i = 0; i < flatTableIdx.length; i++) {
-                SplittedBytes split = splitBuffers[flatTableIdx[i]];
-                result = Arrays.copyOf(split.value, split.length);
-            }
-        }
-
-        if (desc.getFunction().isCount()) {
-            result = Bytes.toBytes("1");
-        }
-
-        return result;
-    }
-
-    @Override
-    public void map(KEYIN key, Text value, Context context) throws 
IOException, InterruptedException {
-        // combining the hive table flattening logic into base cuboid building.
-        // the input of this mapper is the fact table rows
-
-        counter++;
-        if (counter % BatchConstants.COUNTER_MAX == 0) {
-            logger.info("Handled " + counter + " records!");
-        }
-
-        if (!byteRowDelimiterInferred)
-            byteRowDelimiter = 
bytesSplitter.inferByteRowDelimiter(value.getBytes(), value.getLength(), 
factTableDesc.getColumns().length);
-
-        bytesSplitter.split(value.getBytes(), value.getLength(), 
byteRowDelimiter);
-
-        try {
-            byte[] rowKey = buildKey(bytesSplitter.getSplitBuffers());
-            if (rowKey == null)
-                return;// skip this fact table row
-
-            outputKey.set(rowKey, 0, rowKey.length);
-
-            buildValue(bytesSplitter.getSplitBuffers());
-            outputValue.set(valueBuf.array(), 0, valueBuf.position());
-
-            context.write(outputKey, outputValue);
-
-        } catch (Throwable t) {
-            logger.error("", t);
-            context.getCounter(BatchConstants.MAPREDUCE_COUNTER_GROUP_NAME, 
"Error records").increment(1L);
-            return;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/job/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java
----------------------------------------------------------------------
diff --git 
a/job/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java 
b/job/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java
index c9988fc..fa7e51b 100644
--- a/job/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java
+++ b/job/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java
@@ -45,7 +45,7 @@ import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.DimensionDesc;
 import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.model.ColumnDesc;
-import org.apache.kylin.metadata.model.DataType;
+import org.apache.kylin.metadata.datatype.DataType;
 import org.apache.kylin.metadata.model.JoinDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.TblColRef;

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/job/src/test/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapperTest.java
----------------------------------------------------------------------
diff --git 
a/job/src/test/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapperTest.java 
b/job/src/test/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapperTest.java
index f100490..58d093a 100644
--- 
a/job/src/test/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapperTest.java
+++ 
b/job/src/test/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapperTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue;
 
 import java.io.File;
 import java.math.BigDecimal;
+import java.nio.ByteBuffer;
 import java.util.List;
 
 import org.apache.commons.io.FileUtils;
@@ -36,7 +37,7 @@ import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.kv.RowKeyDecoder;
 import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.metadata.measure.MeasureCodec;
+import org.apache.kylin.measure.MeasureCodec;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.junit.After;
 import org.junit.Before;
@@ -100,7 +101,7 @@ public class BaseCuboidMapperTest extends 
LocalFileMetadataTestCase {
     private void verifyMeasures(List<MeasureDesc> measures, Text valueBytes, 
String... valueStr) {
         MeasureCodec codec = new MeasureCodec(measures);
         Object[] values = new Object[measures.size()];
-        codec.decode(valueBytes, values);
+        codec.decode(ByteBuffer.wrap(valueBytes.getBytes()), values);
         assertTrue(new BigDecimal(valueStr[0]).equals(values[0]));
         assertTrue(new BigDecimal(valueStr[1]).equals(values[1]));
         assertTrue(new BigDecimal(valueStr[2]).equals(values[2]));
@@ -138,6 +139,6 @@ public class BaseCuboidMapperTest extends 
LocalFileMetadataTestCase {
         assertEquals(511, Bytes.toLong(cuboidId));
         assertEquals(22, restKey.length);
 
-        verifyMeasures(cube.getDescriptor().getMeasures(), 
result.get(0).getSecond(), "0", "0", "0", "1","22");
+        verifyMeasures(cube.getDescriptor().getMeasures(), 
result.get(0).getSecond(), "0", "0", "0", "1", "22");
     }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/job/src/test/java/org/apache/kylin/job/hadoop/cube/CubeHFileMapper2Test.java
----------------------------------------------------------------------
diff --git 
a/job/src/test/java/org/apache/kylin/job/hadoop/cube/CubeHFileMapper2Test.java 
b/job/src/test/java/org/apache/kylin/job/hadoop/cube/CubeHFileMapper2Test.java
index 867faa6..c9b7eba 100644
--- 
a/job/src/test/java/org/apache/kylin/job/hadoop/cube/CubeHFileMapper2Test.java
+++ 
b/job/src/test/java/org/apache/kylin/job/hadoop/cube/CubeHFileMapper2Test.java
@@ -34,7 +34,7 @@ import org.apache.kylin.common.util.LocalFileMetadataTestCase;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.kv.RowConstants;
 import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.metadata.measure.MeasureCodec;
+import org.apache.kylin.measure.MeasureCodec;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/job/src/test/java/org/apache/kylin/job/hadoop/cube/CubeReducerTest.java
----------------------------------------------------------------------
diff --git 
a/job/src/test/java/org/apache/kylin/job/hadoop/cube/CubeReducerTest.java 
b/job/src/test/java/org/apache/kylin/job/hadoop/cube/CubeReducerTest.java
index 837f759..16cfa89 100644
--- a/job/src/test/java/org/apache/kylin/job/hadoop/cube/CubeReducerTest.java
+++ b/job/src/test/java/org/apache/kylin/job/hadoop/cube/CubeReducerTest.java
@@ -37,7 +37,7 @@ import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.kv.RowConstants;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.metadata.measure.MeasureCodec;
+import org.apache.kylin.measure.MeasureCodec;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/job/src/test/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapperTest.java
----------------------------------------------------------------------
diff --git 
a/job/src/test/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapperTest.java 
b/job/src/test/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapperTest.java
index 9a1fdfb..ea5c163 100644
--- 
a/job/src/test/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapperTest.java
+++ 
b/job/src/test/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapperTest.java
@@ -33,7 +33,7 @@ import org.apache.kylin.common.util.LocalFileMetadataTestCase;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.dict.DictionaryGenerator;
 import org.apache.kylin.dict.DictionaryInfo;
 import org.apache.kylin.dict.DictionaryManager;

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/job/src/test/java/org/apache/kylin/job/hadoop/cube/NDCuboidMapperTest.java
----------------------------------------------------------------------
diff --git 
a/job/src/test/java/org/apache/kylin/job/hadoop/cube/NDCuboidMapperTest.java 
b/job/src/test/java/org/apache/kylin/job/hadoop/cube/NDCuboidMapperTest.java
index 7ce6ee9..d60ec67 100644
--- a/job/src/test/java/org/apache/kylin/job/hadoop/cube/NDCuboidMapperTest.java
+++ b/job/src/test/java/org/apache/kylin/job/hadoop/cube/NDCuboidMapperTest.java
@@ -24,8 +24,11 @@ import static org.junit.Assert.assertTrue;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Collection;
 import java.util.List;
 
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mrunit.mapreduce.MapReduceDriver;
@@ -37,6 +40,8 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import javax.annotation.Nullable;
+
 public class NDCuboidMapperTest extends LocalFileMetadataTestCase {
     MapReduceDriver<Text, Text, Text, Text, Text, Text> mapReduceDriver;
 
@@ -81,7 +86,17 @@ public class NDCuboidMapperTest extends 
LocalFileMetadataTestCase {
         byte[] resultValue = { 14, 7, 23, -16, 56, 92, 114, -80, 118, 14, 7, 
23, -16, 56, 92, 114, -80, 118, 14, 7, 23, -16, 56, 92, 114, -80, 118, 1, 1 };
         Pair<Text, Text> output1 = new Pair<Text, Text>(new Text(resultKey), 
new Text(resultValue));
 
-        assertTrue(result.contains(output1));
+
+        //As we will truncate decimal(KYLIN-766), value will no longer equals 
to resultValue
+        Collection<Text> keys = Collections2.transform(result, new 
Function<Pair<Text, Text>, Text>() {
+            @Nullable
+            @Override
+            public Text apply(Pair<Text, Text> input) {
+                return input.getFirst();
+            }
+        });
+        assertTrue(keys.contains(output1.getFirst()));
+        assertTrue(!result.contains(output1));
 
         long[] keySet = new long[result.size()];
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/measure/MeasureAggregator.java
----------------------------------------------------------------------
diff --git 
a/metadata/src/main/java/org/apache/kylin/measure/MeasureAggregator.java 
b/metadata/src/main/java/org/apache/kylin/measure/MeasureAggregator.java
new file mode 100644
index 0000000..7b74225
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kylin/measure/MeasureAggregator.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure;
+
+import java.io.Serializable;
+
+import org.apache.kylin.metadata.datatype.DataType;
+
+/**
+ */
+@SuppressWarnings("serial")
+abstract public class MeasureAggregator<V> implements Serializable {
+
+    public static MeasureAggregator<?> create(String funcName, DataType 
dataType) {
+        return MeasureTypeFactory.create(funcName, dataType).newAggregator();
+    }
+
+    public static int guessBigDecimalMemBytes() {
+        // 116 returned by AggregationCacheMemSizeTest
+        return 8 // aggregator obj shell
+                + 8 // ref to BigDecimal
+                + 8 // BigDecimal obj shell
+                + 100; // guess of BigDecimal internal
+    }
+
+    public static int guessDoubleMemBytes() {
+        // 29 to 44 returned by AggregationCacheMemSizeTest
+        return 44;
+        /*
+        return 8 // aggregator obj shell
+        + 8 // ref to DoubleWritable
+        + 8 // DoubleWritable obj shell
+        + 8; // size of double
+        */
+    }
+
+    public static int guessLongMemBytes() {
+        // 29 to 44 returned by AggregationCacheMemSizeTest
+        return 44;
+        /*
+        return 8 // aggregator obj shell
+        + 8 // ref to LongWritable
+        + 8 // LongWritable obj shell
+        + 8; // size of long
+        */
+    }
+
+    // 
============================================================================
+
+    @SuppressWarnings("rawtypes")
+    public void setDependentAggregator(MeasureAggregator agg) {
+    }
+
+    abstract public void reset();
+
+    abstract public void aggregate(V value);
+
+    abstract public V getState();
+
+    // get an estimate of memory consumption UPPER BOUND
+    abstract public int getMemBytesEstimate();
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/measure/MeasureAggregators.java
----------------------------------------------------------------------
diff --git 
a/metadata/src/main/java/org/apache/kylin/measure/MeasureAggregators.java 
b/metadata/src/main/java/org/apache/kylin/measure/MeasureAggregators.java
new file mode 100644
index 0000000..12832ff
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kylin/measure/MeasureAggregators.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
+
+/**
+ */
+@SuppressWarnings({ "rawtypes", "unchecked", "serial" })
+public class MeasureAggregators implements Serializable {
+
+    private final MeasureAggregator[] aggs;
+    private final int descLength;
+
+    public MeasureAggregators(Collection<MeasureDesc> measureDescs) {
+        this((MeasureDesc[]) measureDescs.toArray(new 
MeasureDesc[measureDescs.size()]));
+    }
+
+    public MeasureAggregators(MeasureDesc... measureDescs) {
+        descLength = measureDescs.length;
+        aggs = new MeasureAggregator[descLength];
+
+        Map<String, Integer> measureIndexMap = new HashMap<String, Integer>();
+        for (int i = 0; i < descLength; i++) {
+            FunctionDesc func = measureDescs[i].getFunction();
+            aggs[i] = func.getMeasureType().newAggregator();
+            measureIndexMap.put(measureDescs[i].getName(), i);
+        }
+        // fill back dependent aggregator
+        for (int i = 0; i < descLength; i++) {
+            String depMsrRef = measureDescs[i].getDependentMeasureRef();
+            if (depMsrRef != null) {
+                int index = measureIndexMap.get(depMsrRef);
+                aggs[i].setDependentAggregator(aggs[index]);
+            }
+        }
+    }
+
+    public void reset() {
+        for (int i = 0; i < aggs.length; i++) {
+            aggs[i].reset();
+        }
+    }
+
+    public void aggregate(Object[] values) {
+        assert values.length == descLength;
+
+        for (int i = 0; i < descLength; i++) {
+            aggs[i].aggregate(values[i]);
+        }
+    }
+
+    public void collectStates(Object[] states) {
+        for (int i = 0; i < descLength; i++) {
+            states[i] = aggs[i].getState();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/measure/MeasureCodec.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/measure/MeasureCodec.java 
b/metadata/src/main/java/org/apache/kylin/measure/MeasureCodec.java
new file mode 100644
index 0000000..6209079
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kylin/measure/MeasureCodec.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+
+import org.apache.kylin.metadata.datatype.DataTypeSerializer;
+import org.apache.kylin.metadata.model.MeasureDesc;
+
+/**
+ * @author yangli9
+ *
+ */
+@SuppressWarnings({ "rawtypes", "unchecked" })
+public class MeasureCodec {
+
+    int nMeasures;
+    DataTypeSerializer[] serializers;
+
+    public MeasureCodec(Collection<MeasureDesc> measureDescs) {
+        this((MeasureDesc[]) measureDescs.toArray(new 
MeasureDesc[measureDescs.size()]));
+    }
+
+    public MeasureCodec(MeasureDesc... measureDescs) {
+        String[] dataTypes = new String[measureDescs.length];
+        for (int i = 0; i < dataTypes.length; i++) {
+            dataTypes[i] = measureDescs[i].getFunction().getReturnType();
+        }
+        init(dataTypes);
+    }
+
+    public MeasureCodec(String... dataTypes) {
+        init(dataTypes);
+    }
+
+    private void init(String[] dataTypes) {
+        nMeasures = dataTypes.length;
+        serializers = new DataTypeSerializer[nMeasures];
+
+        for (int i = 0; i < nMeasures; i++) {
+            serializers[i] = DataTypeSerializer.create(dataTypes[i]);
+        }
+    }
+
+    public DataTypeSerializer getSerializer(int idx) {
+        return serializers[idx];
+    }
+
+    public void decode(ByteBuffer buf, Object[] result) {
+        assert result.length == nMeasures;
+        for (int i = 0; i < nMeasures; i++) {
+            result[i] = serializers[i].deserialize(buf);
+        }
+    }
+
+    public void encode(Object[] values, ByteBuffer out) {
+        assert values.length == nMeasures;
+        for (int i = 0; i < nMeasures; i++) {
+            serializers[i].serialize(values[i], out);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/measure/MeasureIngester.java
----------------------------------------------------------------------
diff --git 
a/metadata/src/main/java/org/apache/kylin/measure/MeasureIngester.java 
b/metadata/src/main/java/org/apache/kylin/measure/MeasureIngester.java
new file mode 100644
index 0000000..0076252
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kylin/measure/MeasureIngester.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure;
+
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+
+abstract public class MeasureIngester<V> {
+
+    public static MeasureIngester<?> create(MeasureDesc measure) {
+        return measure.getFunction().getMeasureType().newIngester();
+    }
+
+    public static MeasureIngester<?>[] create(Collection<MeasureDesc> 
measures) {
+        MeasureIngester<?>[] result = new MeasureIngester<?>[measures.size()];
+        int i = 0;
+        for (MeasureDesc measure : measures) {
+            result[i++] = create(measure);
+        }
+        return result;
+    }
+
+    abstract public V valueOf(String[] values, MeasureDesc measureDesc, 
Map<TblColRef, Dictionary<String>> dictionaryMap);
+
+    public V reEncodeDictionary(V value, MeasureDesc measureDesc, 
Map<TblColRef, Dictionary<String>> oldDicts, Map<TblColRef, Dictionary<String>> 
newDicts) {
+        throw new UnsupportedOperationException();
+    }
+}

Reply via email to