Repository: incubator-kylin Updated Branches: refs/heads/KYLIN-1126 74536a8e6 -> 933919846
Rowkeyencoder work with all build/merge build engine Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/48d08ef6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/48d08ef6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/48d08ef6 Branch: refs/heads/KYLIN-1126 Commit: 48d08ef6e8848c941c11a8fbc090eb10a4f5e2ca Parents: 74536a8 Author: honma <ho...@ebay.com> Authored: Fri Nov 6 17:19:22 2015 +0800 Committer: honma <ho...@ebay.com> Committed: Fri Nov 6 17:19:22 2015 +0800 ---------------------------------------------------------------------- .../org/apache/kylin/common/util/ByteArray.java | 5 ++ .../org/apache/kylin/common/util/BasicTest.java | 54 +++++++++++++---- .../java/org/apache/kylin/cube/CubeManager.java | 1 - .../java/org/apache/kylin/cube/CubeSegment.java | 11 +--- .../kylin/cube/common/RowKeySplitter.java | 32 +++++----- .../kylin/cube/kv/AbstractRowKeyEncoder.java | 16 ++++- .../org/apache/kylin/cube/kv/RowConstants.java | 4 +- .../org/apache/kylin/cube/kv/RowKeyDecoder.java | 2 +- .../org/apache/kylin/cube/kv/RowKeyEncoder.java | 19 +++++- .../kylin/cube/kv/RowKeyEncoderProvider.java | 45 ++++++++++++++ .../org/apache/kylin/cube/model/CubeDesc.java | 11 ++-- .../kylin/metadata/model/IStorageAware.java | 1 + .../kylin/engine/mr/BatchCubingJobBuilder.java | 11 +++- .../kylin/engine/mr/BatchMergeJobBuilder.java | 5 +- .../mr/steps/MergeCuboidFromStorageMapper.java | 64 ++++++++++++-------- .../engine/mr/steps/MergeCuboidMapper.java | 61 +++++++++++-------- .../kylin/engine/mr/steps/NDCuboidMapper.java | 41 ++++++------- .../spark/cube/DefaultTupleConverter.java | 30 ++++----- .../storage/hbase/cube/v1/CubeStorageQuery.java | 11 ++-- webapp/app/js/model/cubeDescModel.js | 3 +- 20 files changed, 282 insertions(+), 145 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/48d08ef6/core-common/src/main/java/org/apache/kylin/common/util/ByteArray.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/util/ByteArray.java b/core-common/src/main/java/org/apache/kylin/common/util/ByteArray.java index a388dda..ccd5001 100644 --- a/core-common/src/main/java/org/apache/kylin/common/util/ByteArray.java +++ b/core-common/src/main/java/org/apache/kylin/common/util/ByteArray.java @@ -90,6 +90,11 @@ public class ByteArray implements Comparable<ByteArray>, Serializable { set(o.data, o.offset, o.length); } + public void set(int offset, int length) { + this.offset = offset; + this.length = length; + } + public void setLength(int length) { this.length = length; } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/48d08ef6/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java ---------------------------------------------------------------------- diff --git a/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java b/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java index 259f977..2beb2c6 100644 --- a/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java +++ b/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java @@ -19,12 +19,12 @@ package org.apache.kylin.common.util; import java.io.IOException; -import java.lang.management.ManagementFactory; import java.nio.ByteBuffer; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Calendar; +import java.util.HashMap; import java.util.IdentityHashMap; import org.apache.commons.configuration.ConfigurationException; @@ -34,14 +34,14 @@ import org.junit.Test; import org.slf4j.LoggerFactory; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.collect.TreeMultiset; -import com.sun.management.OperatingSystemMXBean; /** -* <p/> -* Keep this test case to test basic java functionality -* development concept proving use -*/ + * <p/> + * Keep this test case to test basic java functionality + * development concept proving use + */ @Ignore("convenient trial tool for dev") @SuppressWarnings("unused") public class BasicTest { @@ -73,14 +73,46 @@ public class BasicTest { Count, DimensionAsMetric, DistinctCount, Normal } + public static int counter = 1; + + class X { + byte[] mm = new byte[100]; + + public X() { + counter++; + } + } + @Test public void testxx() throws InterruptedException { - while (true) { - OperatingSystemMXBean operatingSystemMXBean = (OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean(); - System.out.println(operatingSystemMXBean.getSystemCpuLoad()); - System.out.println(operatingSystemMXBean.getFreePhysicalMemorySize()); - Thread.sleep(1000); + byte[][] data = new byte[10000000][]; + byte[] temp = new byte[100]; + for (int i = 0; i < 100; i++) { + temp[i] = (byte) i; + } + for (int i = 0; i < 10000000; i++) { + data[i] = new byte[100]; + } + + long wallClock = System.currentTimeMillis(); + + for (int i = 0; i < 10000000; i++) { + System.arraycopy(temp, 0, data[i], 0, 100); + } + System.out.println("Time Consumed: " + (System.currentTimeMillis() - wallClock)); + } + + @Test + public void testyy() throws InterruptedException { + long wallClock = System.currentTimeMillis(); + + HashMap<Integer, byte[]> map = Maps.newHashMap(); + for (int i = 0; i < 10000000; i++) { + byte[] a = new byte[100]; + map.put(i, a); } + + System.out.println("Time Consumed: " + (System.currentTimeMillis() - wallClock)); } @Test http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/48d08ef6/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java index 3ead061..2232f01 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java @@ -568,7 +568,6 @@ public class CubeManager implements IRealizationProvider { segment.setDateRangeEnd(endDate); segment.setStatus(SegmentStatusEnum.NEW); segment.setStorageLocationIdentifier(generateStorageLocation()); - segment.setEnableSharding(true); segment.setCubeInstance(cubeInstance); http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/48d08ef6/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java index 1a34596..7999ea3 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java @@ -31,6 +31,7 @@ import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.dict.Dictionary; import org.apache.kylin.dict.IDictionaryAware; import org.apache.kylin.metadata.model.IBuildable; +import org.apache.kylin.metadata.model.IStorageAware; import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.apache.kylin.metadata.model.TblColRef; @@ -70,8 +71,6 @@ public class CubeSegment implements Comparable<CubeSegment>, IDictionaryAware, I private String lastBuildJobID; @JsonProperty("create_time_utc") private long createTimeUTC; - @JsonProperty("enable_sharding") - private boolean enableSharding = true; @JsonProperty("cuboid_shard_nums") private Map<Long, Short> cuboidShardNums = Maps.newHashMap(); @JsonProperty("total_shards") @@ -372,15 +371,11 @@ public class CubeSegment implements Comparable<CubeSegment>, IDictionaryAware, I } public boolean isEnableSharding() { - return enableSharding; - } - - public void setEnableSharding(boolean enableSharding) { - this.enableSharding = enableSharding; + return getCubeDesc().isEnableSharding(); } public int getRowKeyPreambleSize() { - return enableSharding ? RowConstants.ROWKEY_SHARD_AND_CUBOID_LEN : RowConstants.ROWKEY_CUBOIDID_LEN; + return isEnableSharding() ? RowConstants.ROWKEY_SHARD_AND_CUBOID_LEN : RowConstants.ROWKEY_CUBOIDID_LEN; } /** http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/48d08ef6/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java b/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java index 0111cee..56247bc 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java @@ -36,25 +36,27 @@ public class RowKeySplitter { private int bufferSize; private long lastSplittedCuboidId; - private short lastSplittedShard; + private boolean enableSharding; public SplittedBytes[] getSplitBuffers() { return splitBuffers; } - public int getBufferSize() { - return bufferSize; + public int getBodySplitOffset() { + if (enableSharding) { + return 2;//shard+cuboid + } else { + return 1;//cuboid + } } - public long getLastSplittedCuboidId() { - return lastSplittedCuboidId; + public int getBufferSize() { + return bufferSize; } - public short getLastSplittedShard() { - return lastSplittedShard; - } public RowKeySplitter(CubeSegment cubeSeg, int splitLen, int bytesLen) { + this.enableSharding = cubeSeg.isEnableSharding(); this.cubeDesc = cubeSeg.getCubeDesc(); this.colIO = new RowKeyColumnIO(cubeSeg); @@ -73,11 +75,14 @@ public class RowKeySplitter { this.bufferSize = 0; int offset = 0; - // extract shard - SplittedBytes shardSplit = this.splitBuffers[this.bufferSize++]; - shardSplit.length = RowConstants.ROWKEY_SHARDID_LEN; - System.arraycopy(bytes, offset, shardSplit.value, 0, RowConstants.ROWKEY_SHARDID_LEN); - offset += RowConstants.ROWKEY_SHARDID_LEN; + if (enableSharding) { + // extract shard + SplittedBytes shardSplit = this.splitBuffers[this.bufferSize++]; + shardSplit.length = RowConstants.ROWKEY_SHARDID_LEN; + System.arraycopy(bytes, offset, shardSplit.value, 0, RowConstants.ROWKEY_SHARDID_LEN); + offset += RowConstants.ROWKEY_SHARDID_LEN; + //lastSplittedShard = Bytes.toShort(shardSplit.value, 0, shardSplit.length); + } // extract cuboid id SplittedBytes cuboidIdSplit = this.splitBuffers[this.bufferSize++]; @@ -86,7 +91,6 @@ public class RowKeySplitter { offset += RowConstants.ROWKEY_CUBOIDID_LEN; lastSplittedCuboidId = Bytes.toLong(cuboidIdSplit.value, 0, cuboidIdSplit.length); - lastSplittedShard = Bytes.toShort(shardSplit.value, 0, shardSplit.length); Cuboid cuboid = Cuboid.findById(cubeDesc, lastSplittedCuboidId); // rowkey columns http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/48d08ef6/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java index c9a304e..4316376 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java @@ -20,6 +20,7 @@ package org.apache.kylin.cube.kv; import java.util.Map; +import org.apache.kylin.common.util.ByteArray; import org.apache.kylin.common.util.ImmutableBitSet; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.cuboid.Cuboid; @@ -39,9 +40,9 @@ public abstract class AbstractRowKeyEncoder { protected static final Logger logger = LoggerFactory.getLogger(AbstractRowKeyEncoder.class); public static final byte DEFAULT_BLANK_BYTE = Dictionary.NULL; - protected final Cuboid cuboid; - protected final CubeSegment cubeSeg; protected byte blankByte = DEFAULT_BLANK_BYTE; + protected final CubeSegment cubeSeg; + protected Cuboid cuboid; public static AbstractRowKeyEncoder createInstance(CubeSegment cubeSeg, Cuboid cuboid) { return new RowKeyEncoder(cubeSeg, cuboid); @@ -60,6 +61,10 @@ public abstract class AbstractRowKeyEncoder { return cuboid.getId(); } + public void setCuboid(Cuboid cuboid) { + this.cuboid = cuboid; + } + abstract public byte[] createBuf(); /** @@ -70,6 +75,13 @@ public abstract class AbstractRowKeyEncoder { */ abstract public void encode(GTRecord record, ImmutableBitSet keyColumns, byte[] buf); + /** + * when a rowkey's body is provided, help to encode cuboid & shard (if apply) + * @param bodyBytes + * @param outputBuf + */ + abstract public void encode(ByteArray bodyBytes, ByteArray outputBuf); + abstract public byte[] encode(Map<TblColRef, String> valueMap); abstract public byte[] encode(byte[][] values); http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/48d08ef6/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java index 09bccc3..3c5d3ac 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java @@ -34,7 +34,7 @@ public class RowConstants { public static final int ROWKEY_HEADER_LEN = ROWKEY_CUBOIDID_LEN + ROWKEY_SHARDID_LEN; public static final int ROWKEY_SHARD_AND_CUBOID_LEN = ROWKEY_CUBOIDID_LEN + ROWKEY_SHARDID_LEN; - + public static final byte BYTE_ZERO = 0; public static final byte BYTE_ONE = 1; @@ -43,7 +43,7 @@ public class RowConstants { public static final String ROWVALUE_DELIMITER_STRING = String.valueOf((char) 7); public static final byte[] ROWVALUE_DELIMITER_BYTES = { 7 }; - public static final int ROWKEY_BUFFER_SIZE = 1024 * 1024; // 1 MB + public static final int ROWKEY_BUFFER_SIZE = 65 * 256;// a little more than 64 dimensions * 256 bytes each public static final int ROWVALUE_BUFFER_SIZE = 1024 * 1024; // 1 MB // marker class http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/48d08ef6/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java index 3506845..e4a6a52 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java @@ -58,7 +58,7 @@ public class RowKeyDecoder { SplittedBytes[] splits = rowKeySplitter.getSplitBuffers(); - int offset = 2; // skip shard and cuboid id part + int offset = rowKeySplitter.getBodySplitOffset(); // skip shard and cuboid id part for (int i = 0; i < this.cuboid.getColumns().size(); i++) { TblColRef col = this.cuboid.getColumns().get(i); http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/48d08ef6/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java index 4d1055b..990cf06 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java @@ -33,13 +33,15 @@ import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.gridtable.GTRecord; import org.apache.kylin.metadata.model.TblColRef; +import com.google.common.base.Preconditions; + public class RowKeyEncoder extends AbstractRowKeyEncoder { private int bodyLength = 0; private RowKeyColumnIO colIO; protected boolean enableSharding; - protected RowKeyEncoder(CubeSegment cubeSeg, Cuboid cuboid) { + public RowKeyEncoder(CubeSegment cubeSeg, Cuboid cuboid) { super(cubeSeg, cuboid); enableSharding = cubeSeg.isEnableSharding(); colIO = new RowKeyColumnIO(cubeSeg); @@ -48,11 +50,11 @@ public class RowKeyEncoder extends AbstractRowKeyEncoder { } } - protected int getHeaderLength() { + public int getHeaderLength() { return cubeSeg.getRowKeyPreambleSize(); } - protected int getBytesLength() { + public int getBytesLength() { return getHeaderLength() + bodyLength; } @@ -86,6 +88,17 @@ public class RowKeyEncoder extends AbstractRowKeyEncoder { } @Override + public void encode(ByteArray bodyBytes, ByteArray outputBuf) { + Preconditions.checkState(bodyBytes.length() == bodyLength); + Preconditions.checkState(bodyBytes.length() + getHeaderLength() == outputBuf.length(),// + "bodybytes length: " + bodyBytes.length() + " outputBuf length: " + outputBuf.length() + " header length: " + getHeaderLength()); + System.arraycopy(bodyBytes.array(), bodyBytes.offset(), outputBuf.array(), getHeaderLength(), bodyLength); + + //fill shard and cuboid + fillHeader(outputBuf.array()); + } + + @Override public byte[] encode(Map<TblColRef, String> valueMap) { List<byte[]> valueList = new ArrayList<byte[]>(); for (TblColRef bdCol : cuboid.getColumns()) { http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/48d08ef6/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoderProvider.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoderProvider.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoderProvider.java new file mode 100644 index 0000000..cb36259 --- /dev/null +++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoderProvider.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.cube.kv; + +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.cuboid.Cuboid; + +/** + * thread unsafe + */ +public class RowKeyEncoderProvider { + + private CubeSegment cubeSegment; + private RowKeyEncoder rowKeyEncoder; + + public RowKeyEncoderProvider(CubeSegment cubeSegment) { + this.cubeSegment = cubeSegment; + } + + public RowKeyEncoder getRowkeyEncoder(Cuboid cuboid) { + if (rowKeyEncoder == null) { + rowKeyEncoder = new RowKeyEncoder(cubeSegment, cuboid); + } + if (rowKeyEncoder.getCuboidID() != cuboid.getId()) { + rowKeyEncoder.setCuboid(cuboid); + } + return rowKeyEncoder; + } +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/48d08ef6/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java index b51e6cb..eced0f2 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java @@ -134,6 +134,10 @@ public class CubeDesc extends RootPersistentEntity { private Map<TblColRef, DeriveInfo> derivedToHostMap = Maps.newHashMap(); private Map<Array<TblColRef>, List<DeriveInfo>> hostToDerivedMap = Maps.newHashMap(); + public boolean isEnableSharding() { + return storageType == IStorageAware.ID_SHARDED_HBASE; + } + /** * Error messages during resolving json metadata */ @@ -662,7 +666,7 @@ public class CubeDesc extends RootPersistentEntity { if (colRefs.isEmpty() == false) p.setColRefs(colRefs); - + // verify holistic count distinct as a dependent measure if (m.getFunction().isHolisticCountDistinct() && StringUtils.isBlank(m.getDependentMeasureRef())) { throw new IllegalStateException(m + " is a holistic count distinct but it has no DependentMeasureRef defined!"); @@ -822,17 +826,16 @@ public class CubeDesc extends RootPersistentEntity { this.engineType = engineType; } - public List<TblColRef> getAllColumnsNeedDictionary() { List<TblColRef> result = Lists.newArrayList(); - + for (RowKeyColDesc rowKeyColDesc : rowkey.getRowKeyColumns()) { TblColRef colRef = rowKeyColDesc.getColRef(); if (rowkey.isUseDictionary(colRef)) { result.add(colRef); } } - + for (TblColRef colRef : measureDisplayColumns) { if (!result.contains(colRef)) result.add(colRef); http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/48d08ef6/core-metadata/src/main/java/org/apache/kylin/metadata/model/IStorageAware.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/IStorageAware.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/IStorageAware.java index ea1aae9..e552574 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/IStorageAware.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/IStorageAware.java @@ -22,6 +22,7 @@ public interface IStorageAware { public static final int ID_HBASE = 0; public static final int ID_HYBRID = 1; + public static final int ID_SHARDED_HBASE = 2; int getStorageType(); } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/48d08ef6/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java index d00f592..382a124 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java @@ -28,22 +28,27 @@ import org.apache.kylin.job.constant.ExecutableConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Preconditions; + public class BatchCubingJobBuilder extends JobBuilderSupport { - + private static final Logger logger = LoggerFactory.getLogger(BatchCubingJobBuilder.class); - + private final IMRBatchCubingInputSide inputSide; private final IMRBatchCubingOutputSide outputSide; public BatchCubingJobBuilder(CubeSegment newSegment, String submitter) { super(newSegment, submitter); + + Preconditions.checkArgument(!newSegment.isEnableSharding(), "V1 job engine does not support building sharded cubes"); + this.inputSide = MRUtil.getBatchCubingInputSide(seg); this.outputSide = MRUtil.getBatchCubingOutputSide(seg); } public CubingJob build() { logger.info("MR_V1 new job to BUILD segment " + seg); - + final CubingJob result = CubingJob.createBuildJob(seg, submitter, config); final String jobId = result.getId(); final String cuboidRootPath = getCuboidRootPath(jobId); http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/48d08ef6/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java index bc377ed..1743573 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java @@ -39,12 +39,15 @@ public class BatchMergeJobBuilder extends JobBuilderSupport { public BatchMergeJobBuilder(CubeSegment mergeSegment, String submitter) { super(mergeSegment, submitter); + + Preconditions.checkArgument(!mergeSegment.isEnableSharding(), "V1 job engine does not support merging sharded cubes"); + this.outputSide = MRUtil.getBatchMergeOutputSide(seg); } public CubingJob build() { logger.info("MR_V1 new job to MERGE segment " + seg); - + final CubingJob result = CubingJob.createMergeJob(seg, submitter, config); final String jobId = result.getId(); final String cuboidRootPath = getCuboidRootPath(jobId); http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/48d08ef6/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java index 9b25c97..50f3d4c 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java @@ -20,10 +20,10 @@ package org.apache.kylin.engine.mr.steps; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.Arrays; import java.util.HashMap; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.ByteArray; import org.apache.kylin.common.util.BytesUtil; import org.apache.kylin.common.util.Pair; import org.apache.kylin.common.util.SplittedBytes; @@ -33,6 +33,8 @@ import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.common.RowKeySplitter; import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.cube.kv.RowConstants; +import org.apache.kylin.cube.kv.RowKeyEncoder; +import org.apache.kylin.cube.kv.RowKeyEncoderProvider; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.dict.Dictionary; import org.apache.kylin.dict.DictionaryManager; @@ -68,8 +70,10 @@ public class MergeCuboidFromStorageMapper extends KylinMapper<Object, Object, By private IMRStorageInputFormat storageInputFormat; private ByteArrayWritable outputKey = new ByteArrayWritable(); - private byte[] newKeyBuf; + private byte[] newKeyBodyBuf; + private ByteArray newKeyBuf; private RowKeySplitter rowKeySplitter; + private RowKeyEncoderProvider rowKeyEncoderProvider; private HashMap<TblColRef, Boolean> dictsNeedMerging = new HashMap<TblColRef, Boolean>(); @@ -106,12 +110,14 @@ public class MergeCuboidFromStorageMapper extends KylinMapper<Object, Object, By mergedCubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW); storageInputFormat = MRUtil.getBatchMergeInputSide2(mergedCubeSegment).getStorageInputFormat(); - newKeyBuf = new byte[256]; // size will auto-grow + newKeyBodyBuf = new byte[RowConstants.ROWKEY_BUFFER_SIZE]; // size will auto-grow + newKeyBuf = ByteArray.allocate(RowConstants.ROWKEY_BUFFER_SIZE); sourceCubeSegment = storageInputFormat.findSourceSegment(context); logger.info("Source cube segment: " + sourceCubeSegment); rowKeySplitter = new RowKeySplitter(sourceCubeSegment, 65, 255); + rowKeyEncoderProvider = new RowKeyEncoderProvider(mergedCubeSegment); codec = new MeasureCodec(cubeDesc.getMeasures()); } @@ -125,19 +131,15 @@ public class MergeCuboidFromStorageMapper extends KylinMapper<Object, Object, By Preconditions.checkState(key.offset() == 0); long cuboidID = rowKeySplitter.split(key.array()); - short shard = rowKeySplitter.getLastSplittedShard(); Cuboid cuboid = Cuboid.findById(cubeDesc, cuboidID); + RowKeyEncoder rowkeyEncoder = rowKeyEncoderProvider.getRowkeyEncoder(cuboid); SplittedBytes[] splittedByteses = rowKeySplitter.getSplitBuffers(); int bufOffset = 0; - - BytesUtil.writeShort(shard, newKeyBuf, bufOffset, RowConstants.ROWKEY_SHARDID_LEN); - bufOffset += RowConstants.ROWKEY_SHARDID_LEN; - - BytesUtil.writeLong(cuboidID, newKeyBuf, bufOffset, RowConstants.ROWKEY_CUBOIDID_LEN); - bufOffset += RowConstants.ROWKEY_CUBOIDID_LEN; + int bodySplitOffset = rowKeySplitter.getBodySplitOffset(); for (int i = 0; i < cuboid.getColumns().size(); ++i) { + int useSplit = i + bodySplitOffset; TblColRef col = cuboid.getColumns().get(i); if (this.checkNeedMerging(col)) { @@ -146,38 +148,48 @@ public class MergeCuboidFromStorageMapper extends KylinMapper<Object, Object, By Dictionary<?> sourceDict = dictMgr.getDictionary(sourceCubeSegment.getDictResPath(col)); Dictionary<?> mergedDict = dictMgr.getDictionary(mergedCubeSegment.getDictResPath(col)); - while (sourceDict.getSizeOfValue() > newKeyBuf.length - bufOffset || mergedDict.getSizeOfValue() > newKeyBuf.length - bufOffset) { - byte[] oldBuf = newKeyBuf; - newKeyBuf = new byte[2 * newKeyBuf.length]; - System.arraycopy(oldBuf, 0, newKeyBuf, 0, oldBuf.length); + while (sourceDict.getSizeOfValue() > newKeyBodyBuf.length - bufOffset || // + mergedDict.getSizeOfValue() > newKeyBodyBuf.length - bufOffset || // + mergedDict.getSizeOfId() > newKeyBodyBuf.length - bufOffset) { + //also use this buf to hold value before translating + byte[] oldBuf = newKeyBodyBuf; + newKeyBodyBuf = new byte[2 * newKeyBodyBuf.length]; + System.arraycopy(oldBuf, 0, newKeyBodyBuf, 0, oldBuf.length); } - int idInSourceDict = BytesUtil.readUnsigned(splittedByteses[i + 1].value, 0, splittedByteses[i + 1].length); + int idInSourceDict = BytesUtil.readUnsigned(splittedByteses[useSplit].value, 0, splittedByteses[useSplit].length); + int size = sourceDict.getValueBytesFromId(idInSourceDict, newKeyBodyBuf, bufOffset); - int size = sourceDict.getValueBytesFromId(idInSourceDict, newKeyBuf, bufOffset); int idInMergedDict; if (size < 0) { idInMergedDict = mergedDict.nullId(); } else { - idInMergedDict = mergedDict.getIdFromValueBytes(newKeyBuf, bufOffset, size); + idInMergedDict = mergedDict.getIdFromValueBytes(newKeyBodyBuf, bufOffset, size); } - BytesUtil.writeUnsigned(idInMergedDict, newKeyBuf, bufOffset, mergedDict.getSizeOfId()); + BytesUtil.writeUnsigned(idInMergedDict, newKeyBodyBuf, bufOffset, mergedDict.getSizeOfId()); bufOffset += mergedDict.getSizeOfId(); } else { // keep as it is - while (splittedByteses[i + 1].length > newKeyBuf.length - bufOffset) { - byte[] oldBuf = newKeyBuf; - newKeyBuf = new byte[2 * newKeyBuf.length]; - System.arraycopy(oldBuf, 0, newKeyBuf, 0, oldBuf.length); + while (splittedByteses[useSplit].length > newKeyBodyBuf.length - bufOffset) { + byte[] oldBuf = newKeyBodyBuf; + newKeyBodyBuf = new byte[2 * newKeyBodyBuf.length]; + System.arraycopy(oldBuf, 0, newKeyBodyBuf, 0, oldBuf.length); } - System.arraycopy(splittedByteses[i + 1].value, 0, newKeyBuf, bufOffset, splittedByteses[i + 1].length); - bufOffset += splittedByteses[i + 1].length; + System.arraycopy(splittedByteses[useSplit].value, 0, newKeyBodyBuf, bufOffset, splittedByteses[useSplit].length); + bufOffset += splittedByteses[useSplit].length; } } - byte[] newKey = Arrays.copyOf(newKeyBuf, bufOffset); - outputKey.set(newKey, 0, newKey.length); + + int fullKeySize = rowkeyEncoder.getBytesLength(); + while (newKeyBuf.array().length < fullKeySize) { + newKeyBuf.set(new byte[newKeyBuf.length() * 2]); + } + newKeyBuf.set(0, fullKeySize); + + rowkeyEncoder.encode(new ByteArray(newKeyBodyBuf, 0, bufOffset), newKeyBuf); + outputKey.set(newKeyBuf.array(), 0, fullKeySize); valueBuf.clear(); codec.encode(value, valueBuf); http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/48d08ef6/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java index 6301f3d..0b68e59 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java @@ -19,7 +19,6 @@ package org.apache.kylin.engine.mr.steps; import java.io.IOException; -import java.util.Arrays; import java.util.HashMap; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -27,6 +26,7 @@ import java.util.regex.Pattern; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.ByteArray; import org.apache.kylin.common.util.BytesUtil; import org.apache.kylin.common.util.SplittedBytes; import org.apache.kylin.cube.CubeInstance; @@ -35,6 +35,8 @@ import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.common.RowKeySplitter; import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.cube.kv.RowConstants; +import org.apache.kylin.cube.kv.RowKeyEncoder; +import org.apache.kylin.cube.kv.RowKeyEncoderProvider; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.dict.Dictionary; import org.apache.kylin.dict.DictionaryManager; @@ -60,8 +62,10 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> { private Text outputKey = new Text(); - private byte[] newKeyBuf; + private byte[] newKeyBodyBuf; + private ByteArray newKeyBuf; private RowKeySplitter rowKeySplitter; + private RowKeyEncoderProvider rowKeyEncoderProvider; private HashMap<TblColRef, Boolean> dictsNeedMerging = new HashMap<TblColRef, Boolean>(); @@ -95,13 +99,15 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> { mergedCubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW); // int colCount = cubeDesc.getRowkey().getRowKeyColumns().length; - newKeyBuf = new byte[256];// size will auto-grow + newKeyBodyBuf = new byte[RowConstants.ROWKEY_BUFFER_SIZE];// size will auto-grow + newKeyBuf = ByteArray.allocate(RowConstants.ROWKEY_BUFFER_SIZE); // decide which source segment FileSplit fileSplit = (FileSplit) context.getInputSplit(); sourceCubeSegment = findSourceSegment(fileSplit, cube); rowKeySplitter = new RowKeySplitter(sourceCubeSegment, 65, 255); + rowKeyEncoderProvider = new RowKeyEncoderProvider(mergedCubeSegment); } private static final Pattern JOB_NAME_PATTERN = Pattern.compile("kylin-([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})"); @@ -135,17 +141,15 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> { @Override public void map(Text key, Text value, Context context) throws IOException, InterruptedException { long cuboidID = rowKeySplitter.split(key.getBytes()); - short shard = rowKeySplitter.getLastSplittedShard(); Cuboid cuboid = Cuboid.findById(cubeDesc, cuboidID); + RowKeyEncoder rowkeyEncoder = rowKeyEncoderProvider.getRowkeyEncoder(cuboid); SplittedBytes[] splittedByteses = rowKeySplitter.getSplitBuffers(); int bufOffset = 0; - BytesUtil.writeShort(shard, newKeyBuf, bufOffset, RowConstants.ROWKEY_SHARDID_LEN); - bufOffset += RowConstants.ROWKEY_SHARDID_LEN; - BytesUtil.writeLong(cuboidID, newKeyBuf, bufOffset, RowConstants.ROWKEY_CUBOIDID_LEN); - bufOffset += RowConstants.ROWKEY_CUBOIDID_LEN; + int bodySplitOffset = rowKeySplitter.getBodySplitOffset(); for (int i = 0; i < cuboid.getColumns().size(); ++i) { + int useSplit = i + bodySplitOffset; TblColRef col = cuboid.getColumns().get(i); if (this.checkNeedMerging(col)) { @@ -154,38 +158,47 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> { Dictionary<?> sourceDict = dictMgr.getDictionary(sourceCubeSegment.getDictResPath(col)); Dictionary<?> mergedDict = dictMgr.getDictionary(mergedCubeSegment.getDictResPath(col)); - while (sourceDict.getSizeOfValue() > newKeyBuf.length - bufOffset || mergedDict.getSizeOfValue() > newKeyBuf.length - bufOffset) { - byte[] oldBuf = newKeyBuf; - newKeyBuf = new byte[2 * newKeyBuf.length]; - System.arraycopy(oldBuf, 0, newKeyBuf, 0, oldBuf.length); + while (sourceDict.getSizeOfValue() > newKeyBodyBuf.length - bufOffset || // + mergedDict.getSizeOfValue() > newKeyBodyBuf.length - bufOffset || // + mergedDict.getSizeOfId() > newKeyBodyBuf.length - bufOffset) { + byte[] oldBuf = newKeyBodyBuf; + newKeyBodyBuf = new byte[2 * newKeyBodyBuf.length]; + System.arraycopy(oldBuf, 0, newKeyBodyBuf, 0, oldBuf.length); } - int idInSourceDict = BytesUtil.readUnsigned(splittedByteses[i + 1].value, 0, splittedByteses[i + 1].length); + int idInSourceDict = BytesUtil.readUnsigned(splittedByteses[useSplit].value, 0, splittedByteses[useSplit].length); int idInMergedDict; - int size = sourceDict.getValueBytesFromId(idInSourceDict, newKeyBuf, bufOffset); + int size = sourceDict.getValueBytesFromId(idInSourceDict, newKeyBodyBuf, bufOffset); if (size < 0) { idInMergedDict = mergedDict.nullId(); } else { - idInMergedDict = mergedDict.getIdFromValueBytes(newKeyBuf, bufOffset, size); + idInMergedDict = mergedDict.getIdFromValueBytes(newKeyBodyBuf, bufOffset, size); } - BytesUtil.writeUnsigned(idInMergedDict, newKeyBuf, bufOffset, mergedDict.getSizeOfId()); + BytesUtil.writeUnsigned(idInMergedDict, newKeyBodyBuf, bufOffset, mergedDict.getSizeOfId()); bufOffset += mergedDict.getSizeOfId(); } else { // keep as it is - while (splittedByteses[i + 1].length > newKeyBuf.length - bufOffset) { - byte[] oldBuf = newKeyBuf; - newKeyBuf = new byte[2 * newKeyBuf.length]; - System.arraycopy(oldBuf, 0, newKeyBuf, 0, oldBuf.length); + while (splittedByteses[useSplit].length > newKeyBodyBuf.length - bufOffset) { + byte[] oldBuf = newKeyBodyBuf; + newKeyBodyBuf = new byte[2 * newKeyBodyBuf.length]; + System.arraycopy(oldBuf, 0, newKeyBodyBuf, 0, oldBuf.length); } - System.arraycopy(splittedByteses[i + 1].value, 0, newKeyBuf, bufOffset, splittedByteses[i + 1].length); - bufOffset += splittedByteses[i + 1].length; + System.arraycopy(splittedByteses[useSplit].value, 0, newKeyBodyBuf, bufOffset, splittedByteses[useSplit].length); + bufOffset += splittedByteses[useSplit].length; } } - byte[] newKey = Arrays.copyOf(newKeyBuf, bufOffset); - outputKey.set(newKey, 0, newKey.length); + + int fullKeySize = rowkeyEncoder.getBytesLength(); + while (newKeyBuf.array().length < fullKeySize) { + newKeyBuf.set(new byte[newKeyBuf.length() * 2]); + } + newKeyBuf.set(0, fullKeySize); + + rowkeyEncoder.encode(new ByteArray(newKeyBodyBuf, 0, bufOffset), newKeyBuf); + outputKey.set(newKeyBuf.array(), 0, fullKeySize); context.write(outputKey, value); } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/48d08ef6/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java index 2180dd6..9bebfdb 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java @@ -23,8 +23,7 @@ import java.util.Collection; import org.apache.hadoop.io.Text; import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.BytesUtil; -import org.apache.kylin.common.util.ShardingHash; +import org.apache.kylin.common.util.ByteArray; import org.apache.kylin.common.util.SplittedBytes; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; @@ -33,6 +32,8 @@ import org.apache.kylin.cube.common.RowKeySplitter; import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.cube.cuboid.CuboidScheduler; import org.apache.kylin.cube.kv.RowConstants; +import org.apache.kylin.cube.kv.RowKeyEncoder; +import org.apache.kylin.cube.kv.RowKeyEncoderProvider; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.engine.mr.KylinMapper; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; @@ -59,8 +60,10 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> { private int handleCounter; private int skipCounter; - private byte[] keyBuf = new byte[4096]; + private byte[] newKeyBodyBuf = new byte[RowConstants.ROWKEY_BUFFER_SIZE]; + private ByteArray newKeyBuf = ByteArray.allocate(RowConstants.ROWKEY_BUFFER_SIZE); private RowKeySplitter rowKeySplitter; + private RowKeyEncoderProvider rowKeyEncoderProvider; @Override protected void setup(Context context) throws IOException { @@ -79,19 +82,13 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> { cuboidScheduler = new CuboidScheduler(cubeDesc); rowKeySplitter = new RowKeySplitter(cubeSegment, 65, 256); + rowKeyEncoderProvider = new RowKeyEncoderProvider(cubeSegment); } private int buildKey(Cuboid parentCuboid, Cuboid childCuboid, SplittedBytes[] splitBuffers) { - int offset = 0; - - //shard id will be filled after other contents - offset += RowConstants.ROWKEY_SHARDID_LEN; - - // cuboid id - System.arraycopy(childCuboid.getBytes(), 0, keyBuf, offset, childCuboid.getBytes().length); - offset += RowConstants.ROWKEY_CUBOIDID_LEN; + RowKeyEncoder rowkeyEncoder = rowKeyEncoderProvider.getRowkeyEncoder(childCuboid); - int bodyOffset = offset; + int offset = 0; // rowkey columns long mask = Long.highestOneBit(parentCuboid.getId()); @@ -104,7 +101,7 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> { // 1 if ((mask & childCuboidId) > 0) {// if the child cuboid has this // column - System.arraycopy(splitBuffers[index].value, 0, keyBuf, offset, splitBuffers[index].length); + System.arraycopy(splitBuffers[index].value, 0, newKeyBodyBuf, offset, splitBuffers[index].length); offset += splitBuffers[index].length; } index++; @@ -112,13 +109,15 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> { mask = mask >> 1; } - //fill shard - short cuboidShardNum = cubeSegment.getCuboidShardNum(childCuboidId); - short shardOffset = ShardingHash.getShard(keyBuf, bodyOffset, offset - bodyOffset, cuboidShardNum); - short finalShard = ShardingHash.normalize(cubeSegment.getCuboidBaseShard(childCuboidId), shardOffset, cubeSegment.getTotalShards()); - BytesUtil.writeShort(finalShard, keyBuf, 0, RowConstants.ROWKEY_SHARDID_LEN); + int fullKeySize = rowkeyEncoder.getBytesLength(); + while (newKeyBuf.array().length < fullKeySize) { + newKeyBuf.set(new byte[newKeyBuf.length() * 2]); + } + newKeyBuf.set(0, fullKeySize); + + rowkeyEncoder.encode(new ByteArray(newKeyBodyBuf, 0, offset), newKeyBuf); - return offset; + return fullKeySize; } @Override @@ -147,8 +146,8 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> { for (Long child : myChildren) { Cuboid childCuboid = Cuboid.findById(cubeDesc, child); - int keyLength = buildKey(parentCuboid, childCuboid, rowKeySplitter.getSplitBuffers()); - outputKey.set(keyBuf, 0, keyLength); + int fullKeySize = buildKey(parentCuboid, childCuboid, rowKeySplitter.getSplitBuffers()); + outputKey.set(newKeyBuf.array(), 0, fullKeySize); context.write(outputKey, value); } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/48d08ef6/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/DefaultTupleConverter.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/DefaultTupleConverter.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/DefaultTupleConverter.java index 986e45e..6b3a82c 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/DefaultTupleConverter.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/DefaultTupleConverter.java @@ -22,12 +22,11 @@ import java.util.BitSet; import java.util.Map; import org.apache.kylin.common.util.ByteArray; -import org.apache.kylin.common.util.Bytes; -import org.apache.kylin.common.util.BytesUtil; -import org.apache.kylin.common.util.ShardingHash; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.cube.kv.RowConstants; +import org.apache.kylin.cube.kv.RowKeyEncoder; +import org.apache.kylin.cube.kv.RowKeyEncoderProvider; import org.apache.kylin.gridtable.GTRecord; import org.apache.kylin.metadata.model.TblColRef; @@ -42,11 +41,14 @@ public final class DefaultTupleConverter implements TupleConverter { private final CubeSegment segment; private final int measureCount; private final Map<TblColRef, Integer> columnLengthMap; + private RowKeyEncoderProvider rowKeyEncoderProvider; + private byte[] rowKeyBodyBuf = new byte[RowConstants.ROWKEY_BUFFER_SIZE]; public DefaultTupleConverter(CubeSegment segment, Map<TblColRef, Integer> columnLengthMap) { this.segment = segment; this.measureCount = segment.getCubeDesc().getMeasures().size(); this.columnLengthMap = columnLengthMap; + this.rowKeyEncoderProvider = new RowKeyEncoderProvider(this.segment); } private ByteBuffer getValueBuf() { @@ -65,11 +67,8 @@ public final class DefaultTupleConverter implements TupleConverter { @Override public final Tuple2<byte[], byte[]> convert(long cuboidId, GTRecord record) { - int bytesLength = RowConstants.ROWKEY_HEADER_LEN; Cuboid cuboid = Cuboid.findById(segment.getCubeDesc(), cuboidId); - for (TblColRef column : cuboid.getColumns()) { - bytesLength += columnLengthMap.get(column); - } + RowKeyEncoder rowkeyEncoder = rowKeyEncoderProvider.getRowkeyEncoder(cuboid); final int dimensions = BitSet.valueOf(new long[] { cuboidId }).cardinality(); int[] measureColumnsIndex = getMeasureColumnsIndex(); @@ -77,22 +76,15 @@ public final class DefaultTupleConverter implements TupleConverter { measureColumnsIndex[i] = dimensions + i; } - byte[] key = new byte[bytesLength]; - System.arraycopy(Bytes.toBytes(cuboidId), 0, key, 0, RowConstants.ROWKEY_CUBOIDID_LEN); - int header = RowConstants.ROWKEY_HEADER_LEN; - int offSet = header; + int offSet = 0; for (int x = 0; x < dimensions; x++) { final ByteArray byteArray = record.get(x); - System.arraycopy(byteArray.array(), byteArray.offset(), key, offSet, byteArray.length()); + System.arraycopy(byteArray.array(), byteArray.offset(), rowKeyBodyBuf, offSet, byteArray.length()); offSet += byteArray.length(); } - //fill shard - short cuboidShardNum = segment.getCuboidShardNum(cuboidId); - short shardOffset = ShardingHash.getShard(key, header, offSet - header, cuboidShardNum); - short cuboidShardBase = segment.getCuboidBaseShard(cuboidId); - short finalShard = ShardingHash.normalize(cuboidShardBase, shardOffset, segment.getTotalShards()); - BytesUtil.writeShort(finalShard, key, 0, RowConstants.ROWKEY_SHARDID_LEN); + byte[] rowKey = rowkeyEncoder.createBuf(); + rowkeyEncoder.encode(new ByteArray(rowKeyBodyBuf), new ByteArray(rowKey)); ByteBuffer valueBuf = getValueBuf(); valueBuf.clear(); @@ -100,6 +92,6 @@ public final class DefaultTupleConverter implements TupleConverter { byte[] value = new byte[valueBuf.position()]; System.arraycopy(valueBuf.array(), 0, value, 0, valueBuf.position()); - return new Tuple2<>(key, value); + return new Tuple2<>(rowKey, value); } } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/48d08ef6/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java index c62308e..f84e4e6 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java @@ -33,9 +33,6 @@ import java.util.Map; import java.util.Set; import java.util.TreeSet; -import com.google.common.collect.Maps; -import com.google.common.collect.Range; -import com.google.common.collect.Sets; import org.apache.hadoop.hbase.client.HConnection; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.common.util.BytesUtil; @@ -75,6 +72,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Range; +import com.google.common.collect.Sets; @SuppressWarnings("unused") public class CubeStorageQuery implements ICachableStorageQuery { @@ -482,7 +482,10 @@ public class CubeStorageQuery implements ICachableStorageQuery { dropUnhitSegments(result); logger.info("hbasekeyrange count after dropping unhit :" + result.size()); - result = duplicateRangeByShard(result); + //TODO: should use LazyRowKeyEncoder.getRowKeysDifferentShards like CubeHBaseRPC, not do so because v1 query engine is retiring. not worth changing it + if (cubeDesc.isEnableSharding()) { + result = duplicateRangeByShard(result); + } logger.info("hbasekeyrange count after dropping duplicatebyshard :" + result.size()); return result; http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/48d08ef6/webapp/app/js/model/cubeDescModel.js ---------------------------------------------------------------------- diff --git a/webapp/app/js/model/cubeDescModel.js b/webapp/app/js/model/cubeDescModel.js index 3376bfc..c9dfe56 100644 --- a/webapp/app/js/model/cubeDescModel.js +++ b/webapp/app/js/model/cubeDescModel.js @@ -51,7 +51,8 @@ KylinApp.service('CubeDescModel', function () { }, "retention_range": "0", "auto_merge_time_ranges": [604800000, 2419200000], - "engine_type": 2 + "engine_type": 2, + "storage_type":2 }; return cubeMeta;