Repository: incubator-kylin
Updated Branches:
  refs/heads/KYLIN-1126 74536a8e6 -> 933919846


Rowkeyencoder work with all build/merge build engine


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/48d08ef6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/48d08ef6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/48d08ef6

Branch: refs/heads/KYLIN-1126
Commit: 48d08ef6e8848c941c11a8fbc090eb10a4f5e2ca
Parents: 74536a8
Author: honma <ho...@ebay.com>
Authored: Fri Nov 6 17:19:22 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Fri Nov 6 17:19:22 2015 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/common/util/ByteArray.java |  5 ++
 .../org/apache/kylin/common/util/BasicTest.java | 54 +++++++++++++----
 .../java/org/apache/kylin/cube/CubeManager.java |  1 -
 .../java/org/apache/kylin/cube/CubeSegment.java | 11 +---
 .../kylin/cube/common/RowKeySplitter.java       | 32 +++++-----
 .../kylin/cube/kv/AbstractRowKeyEncoder.java    | 16 ++++-
 .../org/apache/kylin/cube/kv/RowConstants.java  |  4 +-
 .../org/apache/kylin/cube/kv/RowKeyDecoder.java |  2 +-
 .../org/apache/kylin/cube/kv/RowKeyEncoder.java | 19 +++++-
 .../kylin/cube/kv/RowKeyEncoderProvider.java    | 45 ++++++++++++++
 .../org/apache/kylin/cube/model/CubeDesc.java   | 11 ++--
 .../kylin/metadata/model/IStorageAware.java     |  1 +
 .../kylin/engine/mr/BatchCubingJobBuilder.java  | 11 +++-
 .../kylin/engine/mr/BatchMergeJobBuilder.java   |  5 +-
 .../mr/steps/MergeCuboidFromStorageMapper.java  | 64 ++++++++++++--------
 .../engine/mr/steps/MergeCuboidMapper.java      | 61 +++++++++++--------
 .../kylin/engine/mr/steps/NDCuboidMapper.java   | 41 ++++++-------
 .../spark/cube/DefaultTupleConverter.java       | 30 ++++-----
 .../storage/hbase/cube/v1/CubeStorageQuery.java | 11 ++--
 webapp/app/js/model/cubeDescModel.js            |  3 +-
 20 files changed, 282 insertions(+), 145 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/48d08ef6/core-common/src/main/java/org/apache/kylin/common/util/ByteArray.java
----------------------------------------------------------------------
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/util/ByteArray.java 
b/core-common/src/main/java/org/apache/kylin/common/util/ByteArray.java
index a388dda..ccd5001 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/ByteArray.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/ByteArray.java
@@ -90,6 +90,11 @@ public class ByteArray implements Comparable<ByteArray>, 
Serializable {
         set(o.data, o.offset, o.length);
     }
 
+    public void set(int offset, int length) {
+        this.offset = offset;
+        this.length = length;
+    }
+
     public void setLength(int length) {
         this.length = length;
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/48d08ef6/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java
----------------------------------------------------------------------
diff --git 
a/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java 
b/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java
index 259f977..2beb2c6 100644
--- a/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java
@@ -19,12 +19,12 @@
 package org.apache.kylin.common.util;
 
 import java.io.IOException;
-import java.lang.management.ManagementFactory;
 import java.nio.ByteBuffer;
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Calendar;
+import java.util.HashMap;
 import java.util.IdentityHashMap;
 
 import org.apache.commons.configuration.ConfigurationException;
@@ -34,14 +34,14 @@ import org.junit.Test;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import com.google.common.collect.TreeMultiset;
-import com.sun.management.OperatingSystemMXBean;
 
 /**
-* <p/>
-* Keep this test case to test basic java functionality
-* development concept proving use
-*/
+ * <p/>
+ * Keep this test case to test basic java functionality
+ * development concept proving use
+ */
 @Ignore("convenient trial tool for dev")
 @SuppressWarnings("unused")
 public class BasicTest {
@@ -73,14 +73,46 @@ public class BasicTest {
         Count, DimensionAsMetric, DistinctCount, Normal
     }
 
+    public static int counter = 1;
+
+    class X {
+        byte[] mm = new byte[100];
+
+        public X() {
+            counter++;
+        }
+    }
+
     @Test
     public void testxx() throws InterruptedException {
-        while (true) {
-            OperatingSystemMXBean operatingSystemMXBean = 
(OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean();
-            System.out.println(operatingSystemMXBean.getSystemCpuLoad());
-            
System.out.println(operatingSystemMXBean.getFreePhysicalMemorySize());
-            Thread.sleep(1000);
+        byte[][] data = new byte[10000000][];
+        byte[] temp = new byte[100];
+        for (int i = 0; i < 100; i++) {
+            temp[i] = (byte) i;
+        }
+        for (int i = 0; i < 10000000; i++) {
+            data[i] = new byte[100];
+        }
+
+        long wallClock = System.currentTimeMillis();
+
+        for (int i = 0; i < 10000000; i++) {
+            System.arraycopy(temp, 0, data[i], 0, 100);
+        }
+        System.out.println("Time Consumed: " + (System.currentTimeMillis() - 
wallClock));
+    }
+
+    @Test
+    public void testyy() throws InterruptedException {
+        long wallClock = System.currentTimeMillis();
+
+        HashMap<Integer, byte[]> map = Maps.newHashMap();
+        for (int i = 0; i < 10000000; i++) {
+            byte[] a = new byte[100];
+            map.put(i, a);
         }
+
+        System.out.println("Time Consumed: " + (System.currentTimeMillis() - 
wallClock));
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/48d08ef6/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java 
b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 3ead061..2232f01 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -568,7 +568,6 @@ public class CubeManager implements IRealizationProvider {
         segment.setDateRangeEnd(endDate);
         segment.setStatus(SegmentStatusEnum.NEW);
         segment.setStorageLocationIdentifier(generateStorageLocation());
-        segment.setEnableSharding(true);
 
         segment.setCubeInstance(cubeInstance);
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/48d08ef6/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java 
b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
index 1a34596..7999ea3 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
@@ -31,6 +31,7 @@ import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.dict.Dictionary;
 import org.apache.kylin.dict.IDictionaryAware;
 import org.apache.kylin.metadata.model.IBuildable;
+import org.apache.kylin.metadata.model.IStorageAware;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.TblColRef;
 
@@ -70,8 +71,6 @@ public class CubeSegment implements Comparable<CubeSegment>, 
IDictionaryAware, I
     private String lastBuildJobID;
     @JsonProperty("create_time_utc")
     private long createTimeUTC;
-    @JsonProperty("enable_sharding")
-    private boolean enableSharding = true;
     @JsonProperty("cuboid_shard_nums")
     private Map<Long, Short> cuboidShardNums = Maps.newHashMap();
     @JsonProperty("total_shards")
@@ -372,15 +371,11 @@ public class CubeSegment implements 
Comparable<CubeSegment>, IDictionaryAware, I
     }
 
     public boolean isEnableSharding() {
-        return enableSharding;
-    }
-
-    public void setEnableSharding(boolean enableSharding) {
-        this.enableSharding = enableSharding;
+        return getCubeDesc().isEnableSharding();
     }
 
     public int getRowKeyPreambleSize() {
-        return enableSharding ? RowConstants.ROWKEY_SHARD_AND_CUBOID_LEN : 
RowConstants.ROWKEY_CUBOIDID_LEN;
+        return isEnableSharding() ? RowConstants.ROWKEY_SHARD_AND_CUBOID_LEN : 
RowConstants.ROWKEY_CUBOIDID_LEN;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/48d08ef6/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java
----------------------------------------------------------------------
diff --git 
a/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java 
b/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java
index 0111cee..56247bc 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java
@@ -36,25 +36,27 @@ public class RowKeySplitter {
     private int bufferSize;
 
     private long lastSplittedCuboidId;
-    private short lastSplittedShard;
+    private boolean enableSharding;
 
     public SplittedBytes[] getSplitBuffers() {
         return splitBuffers;
     }
 
-    public int getBufferSize() {
-        return bufferSize;
+    public int getBodySplitOffset() {
+        if (enableSharding) {
+            return 2;//shard+cuboid
+        } else {
+            return 1;//cuboid
+        }
     }
 
-    public long getLastSplittedCuboidId() {
-        return lastSplittedCuboidId;
+    public int getBufferSize() {
+        return bufferSize;
     }
 
-    public short getLastSplittedShard() {
-        return lastSplittedShard;
-    }
 
     public RowKeySplitter(CubeSegment cubeSeg, int splitLen, int bytesLen) {
+        this.enableSharding = cubeSeg.isEnableSharding();
         this.cubeDesc = cubeSeg.getCubeDesc();
         this.colIO = new RowKeyColumnIO(cubeSeg);
 
@@ -73,11 +75,14 @@ public class RowKeySplitter {
         this.bufferSize = 0;
         int offset = 0;
 
-        // extract shard
-        SplittedBytes shardSplit = this.splitBuffers[this.bufferSize++];
-        shardSplit.length = RowConstants.ROWKEY_SHARDID_LEN;
-        System.arraycopy(bytes, offset, shardSplit.value, 0, 
RowConstants.ROWKEY_SHARDID_LEN);
-        offset += RowConstants.ROWKEY_SHARDID_LEN;
+        if (enableSharding) {
+            // extract shard
+            SplittedBytes shardSplit = this.splitBuffers[this.bufferSize++];
+            shardSplit.length = RowConstants.ROWKEY_SHARDID_LEN;
+            System.arraycopy(bytes, offset, shardSplit.value, 0, 
RowConstants.ROWKEY_SHARDID_LEN);
+            offset += RowConstants.ROWKEY_SHARDID_LEN;
+            //lastSplittedShard = Bytes.toShort(shardSplit.value, 0, 
shardSplit.length);
+        }
 
         // extract cuboid id
         SplittedBytes cuboidIdSplit = this.splitBuffers[this.bufferSize++];
@@ -86,7 +91,6 @@ public class RowKeySplitter {
         offset += RowConstants.ROWKEY_CUBOIDID_LEN;
 
         lastSplittedCuboidId = Bytes.toLong(cuboidIdSplit.value, 0, 
cuboidIdSplit.length);
-        lastSplittedShard = Bytes.toShort(shardSplit.value, 0, 
shardSplit.length);
         Cuboid cuboid = Cuboid.findById(cubeDesc, lastSplittedCuboidId);
 
         // rowkey columns

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/48d08ef6/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java
----------------------------------------------------------------------
diff --git 
a/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java 
b/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java
index c9a304e..4316376 100644
--- 
a/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java
+++ 
b/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java
@@ -20,6 +20,7 @@ package org.apache.kylin.cube.kv;
 
 import java.util.Map;
 
+import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.Cuboid;
@@ -39,9 +40,9 @@ public abstract class AbstractRowKeyEncoder {
     protected static final Logger logger = 
LoggerFactory.getLogger(AbstractRowKeyEncoder.class);
     public static final byte DEFAULT_BLANK_BYTE = Dictionary.NULL;
 
-    protected final Cuboid cuboid;
-    protected final CubeSegment cubeSeg;
     protected byte blankByte = DEFAULT_BLANK_BYTE;
+    protected final CubeSegment cubeSeg;
+    protected Cuboid cuboid;
 
     public static AbstractRowKeyEncoder createInstance(CubeSegment cubeSeg, 
Cuboid cuboid) {
         return new RowKeyEncoder(cubeSeg, cuboid);
@@ -60,6 +61,10 @@ public abstract class AbstractRowKeyEncoder {
         return cuboid.getId();
     }
 
+    public void setCuboid(Cuboid cuboid) {
+        this.cuboid = cuboid;
+    }
+
     abstract public byte[] createBuf();
 
     /**
@@ -70,6 +75,13 @@ public abstract class AbstractRowKeyEncoder {
      */
     abstract public void encode(GTRecord record, ImmutableBitSet keyColumns, 
byte[] buf);
 
+    /**
+     * when a rowkey's body is provided, help to encode cuboid & shard (if 
apply)
+     * @param bodyBytes
+     * @param outputBuf
+     */
+    abstract public void encode(ByteArray bodyBytes, ByteArray outputBuf);
+
     abstract public byte[] encode(Map<TblColRef, String> valueMap);
 
     abstract public byte[] encode(byte[][] values);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/48d08ef6/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java 
b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java
index 09bccc3..3c5d3ac 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java
@@ -34,7 +34,7 @@ public class RowConstants {
 
     public static final int ROWKEY_HEADER_LEN = ROWKEY_CUBOIDID_LEN + 
ROWKEY_SHARDID_LEN;
     public static final int ROWKEY_SHARD_AND_CUBOID_LEN = ROWKEY_CUBOIDID_LEN 
+ ROWKEY_SHARDID_LEN;
-    
+
     public static final byte BYTE_ZERO = 0;
     public static final byte BYTE_ONE = 1;
 
@@ -43,7 +43,7 @@ public class RowConstants {
     public static final String ROWVALUE_DELIMITER_STRING = 
String.valueOf((char) 7);
     public static final byte[] ROWVALUE_DELIMITER_BYTES = { 7 };
 
-    public static final int ROWKEY_BUFFER_SIZE = 1024 * 1024; // 1 MB
+    public static final int ROWKEY_BUFFER_SIZE = 65 * 256;// a little more 
than 64 dimensions * 256 bytes each
     public static final int ROWVALUE_BUFFER_SIZE = 1024 * 1024; // 1 MB
 
     // marker class

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/48d08ef6/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java
----------------------------------------------------------------------
diff --git 
a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java 
b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java
index 3506845..e4a6a52 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java
@@ -58,7 +58,7 @@ public class RowKeyDecoder {
 
         SplittedBytes[] splits = rowKeySplitter.getSplitBuffers();
 
-        int offset = 2; // skip shard and cuboid id part
+        int offset = rowKeySplitter.getBodySplitOffset(); // skip shard and 
cuboid id part
 
         for (int i = 0; i < this.cuboid.getColumns().size(); i++) {
             TblColRef col = this.cuboid.getColumns().get(i);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/48d08ef6/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java
----------------------------------------------------------------------
diff --git 
a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java 
b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java
index 4d1055b..990cf06 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java
@@ -33,13 +33,15 @@ import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.gridtable.GTRecord;
 import org.apache.kylin.metadata.model.TblColRef;
 
+import com.google.common.base.Preconditions;
+
 public class RowKeyEncoder extends AbstractRowKeyEncoder {
 
     private int bodyLength = 0;
     private RowKeyColumnIO colIO;
     protected boolean enableSharding;
 
-    protected RowKeyEncoder(CubeSegment cubeSeg, Cuboid cuboid) {
+    public RowKeyEncoder(CubeSegment cubeSeg, Cuboid cuboid) {
         super(cubeSeg, cuboid);
         enableSharding = cubeSeg.isEnableSharding();
         colIO = new RowKeyColumnIO(cubeSeg);
@@ -48,11 +50,11 @@ public class RowKeyEncoder extends AbstractRowKeyEncoder {
         }
     }
 
-    protected int getHeaderLength() {
+    public int getHeaderLength() {
         return cubeSeg.getRowKeyPreambleSize();
     }
 
-    protected int getBytesLength() {
+    public int getBytesLength() {
         return getHeaderLength() + bodyLength;
     }
 
@@ -86,6 +88,17 @@ public class RowKeyEncoder extends AbstractRowKeyEncoder {
     }
 
     @Override
+    public void encode(ByteArray bodyBytes, ByteArray outputBuf) {
+        Preconditions.checkState(bodyBytes.length() == bodyLength);
+        Preconditions.checkState(bodyBytes.length() + getHeaderLength() == 
outputBuf.length(),//
+                "bodybytes length: " + bodyBytes.length() + " outputBuf 
length: " + outputBuf.length() + " header length: " + getHeaderLength());
+        System.arraycopy(bodyBytes.array(), bodyBytes.offset(), 
outputBuf.array(), getHeaderLength(), bodyLength);
+
+        //fill shard and cuboid
+        fillHeader(outputBuf.array());
+    }
+
+    @Override
     public byte[] encode(Map<TblColRef, String> valueMap) {
         List<byte[]> valueList = new ArrayList<byte[]>();
         for (TblColRef bdCol : cuboid.getColumns()) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/48d08ef6/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoderProvider.java
----------------------------------------------------------------------
diff --git 
a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoderProvider.java 
b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoderProvider.java
new file mode 100644
index 0000000..cb36259
--- /dev/null
+++ 
b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoderProvider.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cube.kv;
+
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.Cuboid;
+
+/**
+ * thread unsafe
+ */
+public class RowKeyEncoderProvider {
+    
+    private CubeSegment cubeSegment;
+    private RowKeyEncoder rowKeyEncoder;
+
+    public RowKeyEncoderProvider(CubeSegment cubeSegment) {
+        this.cubeSegment = cubeSegment;
+    }
+
+    public  RowKeyEncoder getRowkeyEncoder(Cuboid cuboid) {
+        if (rowKeyEncoder == null) {
+            rowKeyEncoder = new RowKeyEncoder(cubeSegment, cuboid);
+        }
+        if (rowKeyEncoder.getCuboidID() != cuboid.getId()) {
+            rowKeyEncoder.setCuboid(cuboid);
+        }
+        return rowKeyEncoder;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/48d08ef6/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java 
b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
index b51e6cb..eced0f2 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
@@ -134,6 +134,10 @@ public class CubeDesc extends RootPersistentEntity {
     private Map<TblColRef, DeriveInfo> derivedToHostMap = Maps.newHashMap();
     private Map<Array<TblColRef>, List<DeriveInfo>> hostToDerivedMap = 
Maps.newHashMap();
 
+    public boolean isEnableSharding() {
+        return storageType == IStorageAware.ID_SHARDED_HBASE;
+    }
+
     /**
      * Error messages during resolving json metadata
      */
@@ -662,7 +666,7 @@ public class CubeDesc extends RootPersistentEntity {
 
             if (colRefs.isEmpty() == false)
                 p.setColRefs(colRefs);
-            
+
             // verify holistic count distinct as a dependent measure
             if (m.getFunction().isHolisticCountDistinct() && 
StringUtils.isBlank(m.getDependentMeasureRef())) {
                 throw new IllegalStateException(m + " is a holistic count 
distinct but it has no DependentMeasureRef defined!");
@@ -822,17 +826,16 @@ public class CubeDesc extends RootPersistentEntity {
         this.engineType = engineType;
     }
 
-    
     public List<TblColRef> getAllColumnsNeedDictionary() {
         List<TblColRef> result = Lists.newArrayList();
-        
+
         for (RowKeyColDesc rowKeyColDesc : rowkey.getRowKeyColumns()) {
             TblColRef colRef = rowKeyColDesc.getColRef();
             if (rowkey.isUseDictionary(colRef)) {
                 result.add(colRef);
             }
         }
-        
+
         for (TblColRef colRef : measureDisplayColumns) {
             if (!result.contains(colRef))
                 result.add(colRef);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/48d08ef6/core-metadata/src/main/java/org/apache/kylin/metadata/model/IStorageAware.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/metadata/model/IStorageAware.java
 
b/core-metadata/src/main/java/org/apache/kylin/metadata/model/IStorageAware.java
index ea1aae9..e552574 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/metadata/model/IStorageAware.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/metadata/model/IStorageAware.java
@@ -22,6 +22,7 @@ public interface IStorageAware {
 
     public static final int ID_HBASE = 0;
     public static final int ID_HYBRID = 1;
+    public static final int ID_SHARDED_HBASE = 2;
 
     int getStorageType();
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/48d08ef6/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
index d00f592..382a124 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
@@ -28,22 +28,27 @@ import org.apache.kylin.job.constant.ExecutableConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
+
 public class BatchCubingJobBuilder extends JobBuilderSupport {
-    
+
     private static final Logger logger = 
LoggerFactory.getLogger(BatchCubingJobBuilder.class);
-    
+
     private final IMRBatchCubingInputSide inputSide;
     private final IMRBatchCubingOutputSide outputSide;
 
     public BatchCubingJobBuilder(CubeSegment newSegment, String submitter) {
         super(newSegment, submitter);
+
+        Preconditions.checkArgument(!newSegment.isEnableSharding(), "V1 job 
engine does not support building sharded cubes");
+
         this.inputSide = MRUtil.getBatchCubingInputSide(seg);
         this.outputSide = MRUtil.getBatchCubingOutputSide(seg);
     }
 
     public CubingJob build() {
         logger.info("MR_V1 new job to BUILD segment " + seg);
-        
+
         final CubingJob result = CubingJob.createBuildJob(seg, submitter, 
config);
         final String jobId = result.getId();
         final String cuboidRootPath = getCuboidRootPath(jobId);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/48d08ef6/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java
index bc377ed..1743573 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java
@@ -39,12 +39,15 @@ public class BatchMergeJobBuilder extends JobBuilderSupport 
{
 
     public BatchMergeJobBuilder(CubeSegment mergeSegment, String submitter) {
         super(mergeSegment, submitter);
+
+        Preconditions.checkArgument(!mergeSegment.isEnableSharding(), "V1 job 
engine does not support merging sharded cubes");
+
         this.outputSide = MRUtil.getBatchMergeOutputSide(seg);
     }
 
     public CubingJob build() {
         logger.info("MR_V1 new job to MERGE segment " + seg);
-        
+
         final CubingJob result = CubingJob.createMergeJob(seg, submitter, 
config);
         final String jobId = result.getId();
         final String cuboidRootPath = getCuboidRootPath(jobId);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/48d08ef6/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
index 9b25c97..50f3d4c 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
@@ -20,10 +20,10 @@ package org.apache.kylin.engine.mr.steps;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.Arrays;
 import java.util.HashMap;
 
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.BytesUtil;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.common.util.SplittedBytes;
@@ -33,6 +33,8 @@ import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.common.RowKeySplitter;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.cube.kv.RowKeyEncoder;
+import org.apache.kylin.cube.kv.RowKeyEncoderProvider;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.dict.Dictionary;
 import org.apache.kylin.dict.DictionaryManager;
@@ -68,8 +70,10 @@ public class MergeCuboidFromStorageMapper extends 
KylinMapper<Object, Object, By
     private IMRStorageInputFormat storageInputFormat;
 
     private ByteArrayWritable outputKey = new ByteArrayWritable();
-    private byte[] newKeyBuf;
+    private byte[] newKeyBodyBuf;
+    private ByteArray newKeyBuf;
     private RowKeySplitter rowKeySplitter;
+    private RowKeyEncoderProvider rowKeyEncoderProvider;
 
     private HashMap<TblColRef, Boolean> dictsNeedMerging = new 
HashMap<TblColRef, Boolean>();
 
@@ -106,12 +110,14 @@ public class MergeCuboidFromStorageMapper extends 
KylinMapper<Object, Object, By
         mergedCubeSegment = cube.getSegment(segmentName, 
SegmentStatusEnum.NEW);
         storageInputFormat = 
MRUtil.getBatchMergeInputSide2(mergedCubeSegment).getStorageInputFormat();
 
-        newKeyBuf = new byte[256]; // size will auto-grow
+        newKeyBodyBuf = new byte[RowConstants.ROWKEY_BUFFER_SIZE]; // size 
will auto-grow
+        newKeyBuf = ByteArray.allocate(RowConstants.ROWKEY_BUFFER_SIZE);
 
         sourceCubeSegment = storageInputFormat.findSourceSegment(context);
         logger.info("Source cube segment: " + sourceCubeSegment);
 
         rowKeySplitter = new RowKeySplitter(sourceCubeSegment, 65, 255);
+        rowKeyEncoderProvider = new RowKeyEncoderProvider(mergedCubeSegment);
 
         codec = new MeasureCodec(cubeDesc.getMeasures());
     }
@@ -125,19 +131,15 @@ public class MergeCuboidFromStorageMapper extends 
KylinMapper<Object, Object, By
         Preconditions.checkState(key.offset() == 0);
 
         long cuboidID = rowKeySplitter.split(key.array());
-        short shard = rowKeySplitter.getLastSplittedShard();
         Cuboid cuboid = Cuboid.findById(cubeDesc, cuboidID);
+        RowKeyEncoder rowkeyEncoder = 
rowKeyEncoderProvider.getRowkeyEncoder(cuboid);
 
         SplittedBytes[] splittedByteses = rowKeySplitter.getSplitBuffers();
         int bufOffset = 0;
-
-        BytesUtil.writeShort(shard, newKeyBuf, bufOffset, 
RowConstants.ROWKEY_SHARDID_LEN);
-        bufOffset += RowConstants.ROWKEY_SHARDID_LEN;
-        
-        BytesUtil.writeLong(cuboidID, newKeyBuf, bufOffset, 
RowConstants.ROWKEY_CUBOIDID_LEN);
-        bufOffset += RowConstants.ROWKEY_CUBOIDID_LEN;
+        int bodySplitOffset = rowKeySplitter.getBodySplitOffset();
 
         for (int i = 0; i < cuboid.getColumns().size(); ++i) {
+            int useSplit = i + bodySplitOffset;
             TblColRef col = cuboid.getColumns().get(i);
 
             if (this.checkNeedMerging(col)) {
@@ -146,38 +148,48 @@ public class MergeCuboidFromStorageMapper extends 
KylinMapper<Object, Object, By
                 Dictionary<?> sourceDict = 
dictMgr.getDictionary(sourceCubeSegment.getDictResPath(col));
                 Dictionary<?> mergedDict = 
dictMgr.getDictionary(mergedCubeSegment.getDictResPath(col));
 
-                while (sourceDict.getSizeOfValue() > newKeyBuf.length - 
bufOffset || mergedDict.getSizeOfValue() > newKeyBuf.length - bufOffset) {
-                    byte[] oldBuf = newKeyBuf;
-                    newKeyBuf = new byte[2 * newKeyBuf.length];
-                    System.arraycopy(oldBuf, 0, newKeyBuf, 0, oldBuf.length);
+                while (sourceDict.getSizeOfValue() > newKeyBodyBuf.length - 
bufOffset || //
+                        mergedDict.getSizeOfValue() > newKeyBodyBuf.length - 
bufOffset || //
+                        mergedDict.getSizeOfId() > newKeyBodyBuf.length - 
bufOffset) {
+                    //also use this buf to hold value before translating
+                    byte[] oldBuf = newKeyBodyBuf;
+                    newKeyBodyBuf = new byte[2 * newKeyBodyBuf.length];
+                    System.arraycopy(oldBuf, 0, newKeyBodyBuf, 0, 
oldBuf.length);
                 }
 
-                int idInSourceDict = BytesUtil.readUnsigned(splittedByteses[i 
+ 1].value, 0, splittedByteses[i + 1].length);
+                int idInSourceDict = 
BytesUtil.readUnsigned(splittedByteses[useSplit].value, 0, 
splittedByteses[useSplit].length);
+                int size = sourceDict.getValueBytesFromId(idInSourceDict, 
newKeyBodyBuf, bufOffset);
 
-                int size = sourceDict.getValueBytesFromId(idInSourceDict, 
newKeyBuf, bufOffset);
                 int idInMergedDict;
                 if (size < 0) {
                     idInMergedDict = mergedDict.nullId();
                 } else {
-                    idInMergedDict = mergedDict.getIdFromValueBytes(newKeyBuf, 
bufOffset, size);
+                    idInMergedDict = 
mergedDict.getIdFromValueBytes(newKeyBodyBuf, bufOffset, size);
                 }
-                BytesUtil.writeUnsigned(idInMergedDict, newKeyBuf, bufOffset, 
mergedDict.getSizeOfId());
+                BytesUtil.writeUnsigned(idInMergedDict, newKeyBodyBuf, 
bufOffset, mergedDict.getSizeOfId());
 
                 bufOffset += mergedDict.getSizeOfId();
             } else {
                 // keep as it is
-                while (splittedByteses[i + 1].length > newKeyBuf.length - 
bufOffset) {
-                    byte[] oldBuf = newKeyBuf;
-                    newKeyBuf = new byte[2 * newKeyBuf.length];
-                    System.arraycopy(oldBuf, 0, newKeyBuf, 0, oldBuf.length);
+                while (splittedByteses[useSplit].length > newKeyBodyBuf.length 
- bufOffset) {
+                    byte[] oldBuf = newKeyBodyBuf;
+                    newKeyBodyBuf = new byte[2 * newKeyBodyBuf.length];
+                    System.arraycopy(oldBuf, 0, newKeyBodyBuf, 0, 
oldBuf.length);
                 }
 
-                System.arraycopy(splittedByteses[i + 1].value, 0, newKeyBuf, 
bufOffset, splittedByteses[i + 1].length);
-                bufOffset += splittedByteses[i + 1].length;
+                System.arraycopy(splittedByteses[useSplit].value, 0, 
newKeyBodyBuf, bufOffset, splittedByteses[useSplit].length);
+                bufOffset += splittedByteses[useSplit].length;
             }
         }
-        byte[] newKey = Arrays.copyOf(newKeyBuf, bufOffset);
-        outputKey.set(newKey, 0, newKey.length);
+
+        int fullKeySize = rowkeyEncoder.getBytesLength();
+        while (newKeyBuf.array().length < fullKeySize) {
+            newKeyBuf.set(new byte[newKeyBuf.length() * 2]);
+        }
+        newKeyBuf.set(0, fullKeySize);
+
+        rowkeyEncoder.encode(new ByteArray(newKeyBodyBuf, 0, bufOffset), 
newKeyBuf);
+        outputKey.set(newKeyBuf.array(), 0, fullKeySize);
 
         valueBuf.clear();
         codec.encode(value, valueBuf);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/48d08ef6/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
index 6301f3d..0b68e59 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
@@ -19,7 +19,6 @@
 package org.apache.kylin.engine.mr.steps;
 
 import java.io.IOException;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -27,6 +26,7 @@ import java.util.regex.Pattern;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.BytesUtil;
 import org.apache.kylin.common.util.SplittedBytes;
 import org.apache.kylin.cube.CubeInstance;
@@ -35,6 +35,8 @@ import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.common.RowKeySplitter;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.cube.kv.RowKeyEncoder;
+import org.apache.kylin.cube.kv.RowKeyEncoderProvider;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.dict.Dictionary;
 import org.apache.kylin.dict.DictionaryManager;
@@ -60,8 +62,10 @@ public class MergeCuboidMapper extends KylinMapper<Text, 
Text, Text, Text> {
 
     private Text outputKey = new Text();
 
-    private byte[] newKeyBuf;
+    private byte[] newKeyBodyBuf;
+    private ByteArray newKeyBuf;
     private RowKeySplitter rowKeySplitter;
+    private RowKeyEncoderProvider rowKeyEncoderProvider;
 
     private HashMap<TblColRef, Boolean> dictsNeedMerging = new 
HashMap<TblColRef, Boolean>();
 
@@ -95,13 +99,15 @@ public class MergeCuboidMapper extends KylinMapper<Text, 
Text, Text, Text> {
         mergedCubeSegment = cube.getSegment(segmentName, 
SegmentStatusEnum.NEW);
 
         // int colCount = cubeDesc.getRowkey().getRowKeyColumns().length;
-        newKeyBuf = new byte[256];// size will auto-grow
+        newKeyBodyBuf = new byte[RowConstants.ROWKEY_BUFFER_SIZE];// size will 
auto-grow
+        newKeyBuf = ByteArray.allocate(RowConstants.ROWKEY_BUFFER_SIZE);
 
         // decide which source segment
         FileSplit fileSplit = (FileSplit) context.getInputSplit();
         sourceCubeSegment = findSourceSegment(fileSplit, cube);
 
         rowKeySplitter = new RowKeySplitter(sourceCubeSegment, 65, 255);
+        rowKeyEncoderProvider = new RowKeyEncoderProvider(mergedCubeSegment);
     }
 
     private static final Pattern JOB_NAME_PATTERN = 
Pattern.compile("kylin-([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})");
@@ -135,17 +141,15 @@ public class MergeCuboidMapper extends KylinMapper<Text, 
Text, Text, Text> {
     @Override
     public void map(Text key, Text value, Context context) throws IOException, 
InterruptedException {
         long cuboidID = rowKeySplitter.split(key.getBytes());
-        short shard = rowKeySplitter.getLastSplittedShard();
         Cuboid cuboid = Cuboid.findById(cubeDesc, cuboidID);
+        RowKeyEncoder rowkeyEncoder = 
rowKeyEncoderProvider.getRowkeyEncoder(cuboid);
 
         SplittedBytes[] splittedByteses = rowKeySplitter.getSplitBuffers();
         int bufOffset = 0;
-        BytesUtil.writeShort(shard, newKeyBuf, bufOffset, 
RowConstants.ROWKEY_SHARDID_LEN);
-        bufOffset += RowConstants.ROWKEY_SHARDID_LEN;
-        BytesUtil.writeLong(cuboidID, newKeyBuf, bufOffset, 
RowConstants.ROWKEY_CUBOIDID_LEN);
-        bufOffset += RowConstants.ROWKEY_CUBOIDID_LEN;
+        int bodySplitOffset = rowKeySplitter.getBodySplitOffset();
 
         for (int i = 0; i < cuboid.getColumns().size(); ++i) {
+            int useSplit = i + bodySplitOffset;
             TblColRef col = cuboid.getColumns().get(i);
 
             if (this.checkNeedMerging(col)) {
@@ -154,38 +158,47 @@ public class MergeCuboidMapper extends KylinMapper<Text, 
Text, Text, Text> {
                 Dictionary<?> sourceDict = 
dictMgr.getDictionary(sourceCubeSegment.getDictResPath(col));
                 Dictionary<?> mergedDict = 
dictMgr.getDictionary(mergedCubeSegment.getDictResPath(col));
 
-                while (sourceDict.getSizeOfValue() > newKeyBuf.length - 
bufOffset || mergedDict.getSizeOfValue() > newKeyBuf.length - bufOffset) {
-                    byte[] oldBuf = newKeyBuf;
-                    newKeyBuf = new byte[2 * newKeyBuf.length];
-                    System.arraycopy(oldBuf, 0, newKeyBuf, 0, oldBuf.length);
+                while (sourceDict.getSizeOfValue() > newKeyBodyBuf.length - 
bufOffset || //
+                        mergedDict.getSizeOfValue() > newKeyBodyBuf.length - 
bufOffset || //
+                        mergedDict.getSizeOfId() > newKeyBodyBuf.length - 
bufOffset) {
+                    byte[] oldBuf = newKeyBodyBuf;
+                    newKeyBodyBuf = new byte[2 * newKeyBodyBuf.length];
+                    System.arraycopy(oldBuf, 0, newKeyBodyBuf, 0, 
oldBuf.length);
                 }
 
-                int idInSourceDict = BytesUtil.readUnsigned(splittedByteses[i 
+ 1].value, 0, splittedByteses[i + 1].length);
+                int idInSourceDict = 
BytesUtil.readUnsigned(splittedByteses[useSplit].value, 0, 
splittedByteses[useSplit].length);
                 int idInMergedDict;
 
-                int size = sourceDict.getValueBytesFromId(idInSourceDict, 
newKeyBuf, bufOffset);
+                int size = sourceDict.getValueBytesFromId(idInSourceDict, 
newKeyBodyBuf, bufOffset);
                 if (size < 0) {
                     idInMergedDict = mergedDict.nullId();
                 } else {
-                    idInMergedDict = mergedDict.getIdFromValueBytes(newKeyBuf, 
bufOffset, size);
+                    idInMergedDict = 
mergedDict.getIdFromValueBytes(newKeyBodyBuf, bufOffset, size);
                 }
 
-                BytesUtil.writeUnsigned(idInMergedDict, newKeyBuf, bufOffset, 
mergedDict.getSizeOfId());
+                BytesUtil.writeUnsigned(idInMergedDict, newKeyBodyBuf, 
bufOffset, mergedDict.getSizeOfId());
                 bufOffset += mergedDict.getSizeOfId();
             } else {
                 // keep as it is
-                while (splittedByteses[i + 1].length > newKeyBuf.length - 
bufOffset) {
-                    byte[] oldBuf = newKeyBuf;
-                    newKeyBuf = new byte[2 * newKeyBuf.length];
-                    System.arraycopy(oldBuf, 0, newKeyBuf, 0, oldBuf.length);
+                while (splittedByteses[useSplit].length > newKeyBodyBuf.length 
- bufOffset) {
+                    byte[] oldBuf = newKeyBodyBuf;
+                    newKeyBodyBuf = new byte[2 * newKeyBodyBuf.length];
+                    System.arraycopy(oldBuf, 0, newKeyBodyBuf, 0, 
oldBuf.length);
                 }
 
-                System.arraycopy(splittedByteses[i + 1].value, 0, newKeyBuf, 
bufOffset, splittedByteses[i + 1].length);
-                bufOffset += splittedByteses[i + 1].length;
+                System.arraycopy(splittedByteses[useSplit].value, 0, 
newKeyBodyBuf, bufOffset, splittedByteses[useSplit].length);
+                bufOffset += splittedByteses[useSplit].length;
             }
         }
-        byte[] newKey = Arrays.copyOf(newKeyBuf, bufOffset);
-        outputKey.set(newKey, 0, newKey.length);
+
+        int fullKeySize = rowkeyEncoder.getBytesLength();
+        while (newKeyBuf.array().length < fullKeySize) {
+            newKeyBuf.set(new byte[newKeyBuf.length() * 2]);
+        }
+        newKeyBuf.set(0, fullKeySize);
+
+        rowkeyEncoder.encode(new ByteArray(newKeyBodyBuf, 0, bufOffset), 
newKeyBuf);
+        outputKey.set(newKeyBuf.array(), 0, fullKeySize);
 
         context.write(outputKey, value);
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/48d08ef6/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
index 2180dd6..9bebfdb 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
@@ -23,8 +23,7 @@ import java.util.Collection;
 
 import org.apache.hadoop.io.Text;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.common.util.ShardingHash;
+import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.SplittedBytes;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
@@ -33,6 +32,8 @@ import org.apache.kylin.cube.common.RowKeySplitter;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.cuboid.CuboidScheduler;
 import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.cube.kv.RowKeyEncoder;
+import org.apache.kylin.cube.kv.RowKeyEncoderProvider;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.engine.mr.KylinMapper;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
@@ -59,8 +60,10 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, 
Text, Text> {
     private int handleCounter;
     private int skipCounter;
 
-    private byte[] keyBuf = new byte[4096];
+    private byte[] newKeyBodyBuf = new byte[RowConstants.ROWKEY_BUFFER_SIZE];
+    private ByteArray newKeyBuf = 
ByteArray.allocate(RowConstants.ROWKEY_BUFFER_SIZE);
     private RowKeySplitter rowKeySplitter;
+    private RowKeyEncoderProvider rowKeyEncoderProvider;
 
     @Override
     protected void setup(Context context) throws IOException {
@@ -79,19 +82,13 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, 
Text, Text> {
         cuboidScheduler = new CuboidScheduler(cubeDesc);
 
         rowKeySplitter = new RowKeySplitter(cubeSegment, 65, 256);
+        rowKeyEncoderProvider = new RowKeyEncoderProvider(cubeSegment);
     }
 
     private int buildKey(Cuboid parentCuboid, Cuboid childCuboid, 
SplittedBytes[] splitBuffers) {
-        int offset = 0;
-
-        //shard id will be filled after other contents
-        offset += RowConstants.ROWKEY_SHARDID_LEN;
-
-        // cuboid id
-        System.arraycopy(childCuboid.getBytes(), 0, keyBuf, offset, 
childCuboid.getBytes().length);
-        offset += RowConstants.ROWKEY_CUBOIDID_LEN;
+        RowKeyEncoder rowkeyEncoder = 
rowKeyEncoderProvider.getRowkeyEncoder(childCuboid);
 
-        int bodyOffset = offset;
+        int offset = 0;
 
         // rowkey columns
         long mask = Long.highestOneBit(parentCuboid.getId());
@@ -104,7 +101,7 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, 
Text, Text> {
                                               // 1
                 if ((mask & childCuboidId) > 0) {// if the child cuboid has 
this
                                                  // column
-                    System.arraycopy(splitBuffers[index].value, 0, keyBuf, 
offset, splitBuffers[index].length);
+                    System.arraycopy(splitBuffers[index].value, 0, 
newKeyBodyBuf, offset, splitBuffers[index].length);
                     offset += splitBuffers[index].length;
                 }
                 index++;
@@ -112,13 +109,15 @@ public class NDCuboidMapper extends KylinMapper<Text, 
Text, Text, Text> {
             mask = mask >> 1;
         }
 
-        //fill shard
-        short cuboidShardNum = cubeSegment.getCuboidShardNum(childCuboidId);
-        short shardOffset = ShardingHash.getShard(keyBuf, bodyOffset, offset - 
bodyOffset, cuboidShardNum);
-        short finalShard = 
ShardingHash.normalize(cubeSegment.getCuboidBaseShard(childCuboidId), 
shardOffset, cubeSegment.getTotalShards());
-        BytesUtil.writeShort(finalShard, keyBuf, 0, 
RowConstants.ROWKEY_SHARDID_LEN);
+        int fullKeySize = rowkeyEncoder.getBytesLength();
+        while (newKeyBuf.array().length < fullKeySize) {
+            newKeyBuf.set(new byte[newKeyBuf.length() * 2]);
+        }
+        newKeyBuf.set(0, fullKeySize);
+
+        rowkeyEncoder.encode(new ByteArray(newKeyBodyBuf, 0, offset), 
newKeyBuf);
 
-        return offset;
+        return fullKeySize;
     }
 
     @Override
@@ -147,8 +146,8 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, 
Text, Text> {
 
         for (Long child : myChildren) {
             Cuboid childCuboid = Cuboid.findById(cubeDesc, child);
-            int keyLength = buildKey(parentCuboid, childCuboid, 
rowKeySplitter.getSplitBuffers());
-            outputKey.set(keyBuf, 0, keyLength);
+            int fullKeySize = buildKey(parentCuboid, childCuboid, 
rowKeySplitter.getSplitBuffers());
+            outputKey.set(newKeyBuf.array(), 0, fullKeySize);
             context.write(outputKey, value);
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/48d08ef6/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/DefaultTupleConverter.java
----------------------------------------------------------------------
diff --git 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/DefaultTupleConverter.java
 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/DefaultTupleConverter.java
index 986e45e..6b3a82c 100644
--- 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/DefaultTupleConverter.java
+++ 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/DefaultTupleConverter.java
@@ -22,12 +22,11 @@ import java.util.BitSet;
 import java.util.Map;
 
 import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.common.util.ShardingHash;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.cube.kv.RowKeyEncoder;
+import org.apache.kylin.cube.kv.RowKeyEncoderProvider;
 import org.apache.kylin.gridtable.GTRecord;
 import org.apache.kylin.metadata.model.TblColRef;
 
@@ -42,11 +41,14 @@ public final class DefaultTupleConverter implements 
TupleConverter {
     private final CubeSegment segment;
     private final int measureCount;
     private final Map<TblColRef, Integer> columnLengthMap;
+    private RowKeyEncoderProvider rowKeyEncoderProvider;
+    private byte[] rowKeyBodyBuf = new byte[RowConstants.ROWKEY_BUFFER_SIZE];
 
     public DefaultTupleConverter(CubeSegment segment, Map<TblColRef, Integer> 
columnLengthMap) {
         this.segment = segment;
         this.measureCount = segment.getCubeDesc().getMeasures().size();
         this.columnLengthMap = columnLengthMap;
+        this.rowKeyEncoderProvider = new RowKeyEncoderProvider(this.segment);
     }
 
     private ByteBuffer getValueBuf() {
@@ -65,11 +67,8 @@ public final class DefaultTupleConverter implements 
TupleConverter {
 
     @Override
     public final Tuple2<byte[], byte[]> convert(long cuboidId, GTRecord 
record) {
-        int bytesLength = RowConstants.ROWKEY_HEADER_LEN;
         Cuboid cuboid = Cuboid.findById(segment.getCubeDesc(), cuboidId);
-        for (TblColRef column : cuboid.getColumns()) {
-            bytesLength += columnLengthMap.get(column);
-        }
+        RowKeyEncoder rowkeyEncoder = 
rowKeyEncoderProvider.getRowkeyEncoder(cuboid);
 
         final int dimensions = BitSet.valueOf(new long[] { cuboidId 
}).cardinality();
         int[] measureColumnsIndex = getMeasureColumnsIndex();
@@ -77,22 +76,15 @@ public final class DefaultTupleConverter implements 
TupleConverter {
             measureColumnsIndex[i] = dimensions + i;
         }
 
-        byte[] key = new byte[bytesLength];
-        System.arraycopy(Bytes.toBytes(cuboidId), 0, key, 0, 
RowConstants.ROWKEY_CUBOIDID_LEN);
-        int header = RowConstants.ROWKEY_HEADER_LEN;
-        int offSet = header;
+        int offSet = 0;
         for (int x = 0; x < dimensions; x++) {
             final ByteArray byteArray = record.get(x);
-            System.arraycopy(byteArray.array(), byteArray.offset(), key, 
offSet, byteArray.length());
+            System.arraycopy(byteArray.array(), byteArray.offset(), 
rowKeyBodyBuf, offSet, byteArray.length());
             offSet += byteArray.length();
         }
 
-        //fill shard
-        short cuboidShardNum = segment.getCuboidShardNum(cuboidId);
-        short shardOffset = ShardingHash.getShard(key, header, offSet - 
header, cuboidShardNum);
-        short cuboidShardBase = segment.getCuboidBaseShard(cuboidId);
-        short finalShard = ShardingHash.normalize(cuboidShardBase, 
shardOffset, segment.getTotalShards());
-        BytesUtil.writeShort(finalShard, key, 0, 
RowConstants.ROWKEY_SHARDID_LEN);
+        byte[] rowKey = rowkeyEncoder.createBuf();
+        rowkeyEncoder.encode(new ByteArray(rowKeyBodyBuf), new 
ByteArray(rowKey));
 
         ByteBuffer valueBuf = getValueBuf();
         valueBuf.clear();
@@ -100,6 +92,6 @@ public final class DefaultTupleConverter implements 
TupleConverter {
 
         byte[] value = new byte[valueBuf.position()];
         System.arraycopy(valueBuf.array(), 0, value, 0, valueBuf.position());
-        return new Tuple2<>(key, value);
+        return new Tuple2<>(rowKey, value);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/48d08ef6/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
index c62308e..f84e4e6 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
@@ -33,9 +33,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
 
-import com.google.common.collect.Maps;
-import com.google.common.collect.Range;
-import com.google.common.collect.Sets;
 import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.BytesUtil;
@@ -75,6 +72,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Range;
+import com.google.common.collect.Sets;
 
 @SuppressWarnings("unused")
 public class CubeStorageQuery implements ICachableStorageQuery {
@@ -482,7 +482,10 @@ public class CubeStorageQuery implements 
ICachableStorageQuery {
         dropUnhitSegments(result);
         logger.info("hbasekeyrange count after dropping unhit :" + 
result.size());
 
-        result = duplicateRangeByShard(result);
+        //TODO: should use LazyRowKeyEncoder.getRowKeysDifferentShards like 
CubeHBaseRPC, not do so because v1 query engine is retiring. not worth changing 
it
+        if (cubeDesc.isEnableSharding()) {
+            result = duplicateRangeByShard(result);
+        }
         logger.info("hbasekeyrange count after dropping duplicatebyshard :" + 
result.size());
 
         return result;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/48d08ef6/webapp/app/js/model/cubeDescModel.js
----------------------------------------------------------------------
diff --git a/webapp/app/js/model/cubeDescModel.js 
b/webapp/app/js/model/cubeDescModel.js
index 3376bfc..c9dfe56 100644
--- a/webapp/app/js/model/cubeDescModel.js
+++ b/webapp/app/js/model/cubeDescModel.js
@@ -51,7 +51,8 @@ KylinApp.service('CubeDescModel', function () {
       },
       "retention_range": "0",
       "auto_merge_time_ranges": [604800000, 2419200000],
-      "engine_type": 2
+      "engine_type": 2,
+      "storage_type":2
     };
 
     return cubeMeta;

Reply via email to