KYLIN-1283 Replace GTScanRequest's SerDer form Kryo to manual
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/2ac2f56a Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/2ac2f56a Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/2ac2f56a Branch: refs/heads/2.x-staging Commit: 2ac2f56a138e9c014796423bcddb1a01248fe218 Parents: ec5c28d Author: honma <ho...@ebay.com> Authored: Wed Dec 30 22:20:24 2015 +0800 Committer: honma <ho...@ebay.com> Committed: Mon Jan 4 21:39:54 2016 +0800 ---------------------------------------------------------------------- .../org/apache/kylin/common/util/ByteArray.java | 10 ++ .../kylin/common/util/ImmutableBitSet.java | 34 ++++-- .../org/apache/kylin/common/util/BasicTest.java | 10 -- .../kylin/cube/gridtable/CubeCodeSystem.java | 96 ++------------- .../kylin/cube/gridtable/FixLenSerializer.java | 103 ++++++++++++++++ .../cube/gridtable/TrimmedCubeCodeSystem.java | 103 ++++++++++++++-- .../cube/inmemcubing/InMemCubeBuilder.java | 7 +- .../java/org/apache/kylin/gridtable/GTInfo.java | 100 ++++++++++++---- .../org/apache/kylin/gridtable/GTRecord.java | 9 +- .../kylin/gridtable/GTSampleCodeSystem.java | 12 ++ .../org/apache/kylin/gridtable/GTScanRange.java | 2 + .../kylin/gridtable/GTScanRangePlanner.java | 7 +- .../apache/kylin/gridtable/GTScanRequest.java | 84 ++++++++++++-- .../java/org/apache/kylin/gridtable/GTUtil.java | 10 ++ .../inmemcubing/ConcurrentDiskStoreTest.java | 2 +- .../cube/inmemcubing/MemDiskStoreTest.java | 2 +- .../gridtable/AggregationCacheSpillTest.java | 6 +- .../kylin/gridtable/DictGridTableTest.java | 41 +++++-- .../kylin/gridtable/GTScanReqSerDerTest.java | 116 +++++++++++++++++++ .../kylin/gridtable/SimpleGridTableTest.java | 4 +- .../metadata/datatype/BigDecimalSerializer.java | 6 +- .../kylin/metadata/datatype/DataType.java | 30 ++++- .../index/TableRecordInfoDigest.java | 8 +- .../coprocessor/CoprocessorConstants.java | 26 ----- .../coprocessor/CoprocessorProjector.java | 9 +- .../common/coprocessor/CoprocessorRowType.java | 10 +- .../observer/ObserverAggregators.java | 13 +-- .../hbase/cube/v2/CubeHBaseEndpointRPC.java | 18 ++- .../storage/hbase/cube/v2/CubeHBaseRPC.java | 8 +- .../hbase/cube/v2/CubeSegmentScanner.java | 7 +- .../storage/hbase/cube/v2/CubeStorageQuery.java | 1 - .../kylin/storage/hbase/cube/v2/RawScan.java | 56 ++++++++- .../coprocessor/endpoint/CubeVisitService.java | 5 +- .../endpoint/EndpointAggregators.java | 11 +- .../ii/coprocessor/endpoint/IIEndpoint.java | 7 +- 35 files changed, 702 insertions(+), 271 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/2ac2f56a/core-common/src/main/java/org/apache/kylin/common/util/ByteArray.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/util/ByteArray.java b/core-common/src/main/java/org/apache/kylin/common/util/ByteArray.java index ccd5001..015d5c5 100644 --- a/core-common/src/main/java/org/apache/kylin/common/util/ByteArray.java +++ b/core-common/src/main/java/org/apache/kylin/common/util/ByteArray.java @@ -105,6 +105,16 @@ public class ByteArray implements Comparable<ByteArray>, Serializable { return copy; } + //notice this will have a length header + public void exportData(ByteBuffer out) { + BytesUtil.writeByteArray(this.data, this.offset, this.length, out); + } + + public static ByteArray importData(ByteBuffer in) { + byte[] bytes = BytesUtil.readByteArray(in); + return new ByteArray(bytes); + } + public void copyFrom(ByteArray other) { System.arraycopy(other.array(), other.offset, data, offset, other.length); this.length = other.length; http://git-wip-us.apache.org/repos/asf/kylin/blob/2ac2f56a/core-common/src/main/java/org/apache/kylin/common/util/ImmutableBitSet.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/util/ImmutableBitSet.java b/core-common/src/main/java/org/apache/kylin/common/util/ImmutableBitSet.java index f5a22d2..a36bcef 100644 --- a/core-common/src/main/java/org/apache/kylin/common/util/ImmutableBitSet.java +++ b/core-common/src/main/java/org/apache/kylin/common/util/ImmutableBitSet.java @@ -17,6 +17,7 @@ */ package org.apache.kylin.common.util; +import java.nio.ByteBuffer; import java.util.BitSet; public class ImmutableBitSet { @@ -30,6 +31,16 @@ public class ImmutableBitSet { this(newBitSet(index)); } + public ImmutableBitSet(BitSet set) { + this.set = (BitSet) set.clone(); + this.arr = new int[set.cardinality()]; + + int j = 0; + for (int i = set.nextSetBit(0); i >= 0; i = set.nextSetBit(i + 1)) { + arr[j++] = i; + } + } + private static BitSet newBitSet(int index) { BitSet set = new BitSet(index); set.set(index); @@ -46,16 +57,6 @@ public class ImmutableBitSet { return set; } - public ImmutableBitSet(BitSet set) { - this.set = (BitSet) set.clone(); - this.arr = new int[set.cardinality()]; - - int j = 0; - for (int i = set.nextSetBit(0); i >= 0; i = set.nextSetBit(i + 1)) { - arr[j++] = i; - } - } - /** return number of true bits */ public int trueBitCount() { return arr.length; @@ -145,4 +146,17 @@ public class ImmutableBitSet { public boolean isEmpty() { return set.isEmpty(); } + + public static final BytesSerializer<ImmutableBitSet> serializer = new BytesSerializer<ImmutableBitSet>() { + @Override + public void serialize(ImmutableBitSet value, ByteBuffer out) { + BytesUtil.writeByteArray(value.set.toByteArray(), out); + } + + @Override + public ImmutableBitSet deserialize(ByteBuffer in) { + BitSet bitSet = BitSet.valueOf(BytesUtil.readByteArray(in)); + return new ImmutableBitSet(bitSet); + } + }; } http://git-wip-us.apache.org/repos/asf/kylin/blob/2ac2f56a/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java ---------------------------------------------------------------------- diff --git a/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java b/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java index 22d92d2..ae96e67 100644 --- a/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java +++ b/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java @@ -50,15 +50,6 @@ import com.google.common.collect.Maps; public class BasicTest { protected static final org.slf4j.Logger logger = LoggerFactory.getLogger(BasicTest.class); - class A { - public void foo() { - System.out.println(this.getClass().getName()); - } - } - - class B extends A { - } - private void log(ByteBuffer a) { Integer x = 4; foo(x); @@ -86,7 +77,6 @@ public class BasicTest { } } - @Test public void testxx() throws InterruptedException { byte[][] data = new byte[10000000][]; http://git-wip-us.apache.org/repos/asf/kylin/blob/2ac2f56a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java index dc906f6..f83f920 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java @@ -1,15 +1,12 @@ package org.apache.kylin.cube.gridtable; import java.nio.ByteBuffer; -import java.util.Arrays; import java.util.Collections; import java.util.Map; -import org.apache.kylin.common.util.Bytes; import org.apache.kylin.common.util.BytesUtil; import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.common.util.ImmutableBitSet; -import org.apache.kylin.cube.kv.RowConstants; import org.apache.kylin.gridtable.DefaultGTComparator; import org.apache.kylin.gridtable.GTInfo; import org.apache.kylin.gridtable.IGTCodeSystem; @@ -17,6 +14,8 @@ import org.apache.kylin.gridtable.IGTComparator; import org.apache.kylin.measure.MeasureAggregator; import org.apache.kylin.metadata.datatype.DataTypeSerializer; +import com.google.common.collect.Maps; + /** * defines how column values will be encoded to/ decoded from GTRecord * @@ -46,18 +45,19 @@ public class CubeCodeSystem implements IGTCodeSystem { this.dependentMetricsMap = dependentMetricsMap; } - public IGTCodeSystem trimForCoprocessor() { - DataTypeSerializer[] newSerializers = new DataTypeSerializer[serializers.length]; + public TrimmedCubeCodeSystem trimForCoprocessor() { + Map<Integer,Integer> dictSizes = Maps.newHashMap(); + Map<Integer,Integer> fixedLengthSizes = Maps.newHashMap(); for (int i = 0; i < serializers.length; i++) { if (serializers[i] instanceof DictionarySerializer) { - newSerializers[i] = new TrimmedDictionarySerializer(serializers[i].maxLength()); - } else { - newSerializers[i] = serializers[i]; + dictSizes.put(i,serializers[i].maxLength()); + } else if(serializers[i] instanceof FixLenSerializer) { + fixedLengthSizes.put(i,serializers[i].maxLength()); } } - return new TrimmedCubeCodeSystem(info, dependentMetricsMap, newSerializers, comparator); + return new TrimmedCubeCodeSystem(dependentMetricsMap,dictSizes,fixedLengthSizes); } @Override @@ -80,6 +80,7 @@ public class CubeCodeSystem implements IGTCodeSystem { } } + //when changing this, also take care of TrimmedCubeCodeSystem.init this.comparator = new DefaultGTComparator(); } @@ -156,7 +157,6 @@ public class CubeCodeSystem implements IGTCodeSystem { public TrimmedDictionarySerializer(int fieldSize) { this.fieldSize = fieldSize; - } @Override @@ -226,80 +226,4 @@ public class CubeCodeSystem implements IGTCodeSystem { } - static class FixLenSerializer extends DataTypeSerializer { - - // be thread-safe and avoid repeated obj creation - private ThreadLocal<byte[]> current = new ThreadLocal<byte[]>(); - - private int fixLen; - - FixLenSerializer(int fixLen) { - this.fixLen = fixLen; - } - - private byte[] currentBuf() { - byte[] buf = current.get(); - if (buf == null) { - buf = new byte[fixLen]; - current.set(buf); - } - return buf; - } - - @Override - public void serialize(Object value, ByteBuffer out) { - byte[] buf = currentBuf(); - if (value == null) { - Arrays.fill(buf, Dictionary.NULL); - out.put(buf); - } else { - byte[] bytes = Bytes.toBytes(value.toString()); - if (bytes.length > fixLen) { - throw new IllegalStateException("Expect at most " + fixLen + " bytes, but got " + bytes.length + ", value string: " + value.toString()); - } - out.put(bytes); - for (int i = bytes.length; i < fixLen; i++) { - out.put(RowConstants.ROWKEY_PLACE_HOLDER_BYTE); - } - } - } - - @Override - public Object deserialize(ByteBuffer in) { - byte[] buf = currentBuf(); - in.get(buf); - - int tail = fixLen; - while (tail > 0 && (buf[tail - 1] == RowConstants.ROWKEY_PLACE_HOLDER_BYTE || buf[tail - 1] == Dictionary.NULL)) { - tail--; - } - - if (tail == 0) { - return buf[0] == Dictionary.NULL ? null : ""; - } - - return Bytes.toString(buf, 0, tail); - } - - @Override - public int peekLength(ByteBuffer in) { - return fixLen; - } - - @Override - public int maxLength() { - return fixLen; - } - - @Override - public int getStorageBytesEstimate() { - return fixLen; - } - - @Override - public Object valueOf(String str) { - return str; - } - } - } http://git-wip-us.apache.org/repos/asf/kylin/blob/2ac2f56a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/FixLenSerializer.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/FixLenSerializer.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/FixLenSerializer.java new file mode 100644 index 0000000..2dc1d60 --- /dev/null +++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/FixLenSerializer.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.cube.gridtable; + +import java.nio.ByteBuffer; +import java.util.Arrays; + +import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.common.util.Dictionary; +import org.apache.kylin.cube.kv.RowConstants; +import org.apache.kylin.metadata.datatype.DataTypeSerializer; + +public class FixLenSerializer extends DataTypeSerializer { + + // be thread-safe and avoid repeated obj creation + private ThreadLocal<byte[]> current = new ThreadLocal<byte[]>(); + + private int fixLen; + + FixLenSerializer(int fixLen) { + this.fixLen = fixLen; + } + + private byte[] currentBuf() { + byte[] buf = current.get(); + if (buf == null) { + buf = new byte[fixLen]; + current.set(buf); + } + return buf; + } + + @Override + public void serialize(Object value, ByteBuffer out) { + byte[] buf = currentBuf(); + if (value == null) { + Arrays.fill(buf, Dictionary.NULL); + out.put(buf); + } else { + byte[] bytes = Bytes.toBytes(value.toString()); + if (bytes.length > fixLen) { + throw new IllegalStateException("Expect at most " + fixLen + " bytes, but got " + bytes.length + ", value string: " + value.toString()); + } + out.put(bytes); + for (int i = bytes.length; i < fixLen; i++) { + out.put(RowConstants.ROWKEY_PLACE_HOLDER_BYTE); + } + } + } + + @Override + public Object deserialize(ByteBuffer in) { + byte[] buf = currentBuf(); + in.get(buf); + + int tail = fixLen; + while (tail > 0 && (buf[tail - 1] == RowConstants.ROWKEY_PLACE_HOLDER_BYTE || buf[tail - 1] == Dictionary.NULL)) { + tail--; + } + + if (tail == 0) { + return buf[0] == Dictionary.NULL ? null : ""; + } + + return Bytes.toString(buf, 0, tail); + } + + @Override + public int peekLength(ByteBuffer in) { + return fixLen; + } + + @Override + public int maxLength() { + return fixLen; + } + + @Override + public int getStorageBytesEstimate() { + return fixLen; + } + + @Override + public Object valueOf(String str) { + return str; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/2ac2f56a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/TrimmedCubeCodeSystem.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/TrimmedCubeCodeSystem.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/TrimmedCubeCodeSystem.java index 19ebaef..6048ba0 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/TrimmedCubeCodeSystem.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/TrimmedCubeCodeSystem.java @@ -23,31 +23,56 @@ package org.apache.kylin.cube.gridtable; import java.nio.ByteBuffer; import java.util.Map; +import org.apache.kylin.common.util.BytesSerializer; +import org.apache.kylin.common.util.BytesUtil; import org.apache.kylin.common.util.ImmutableBitSet; +import org.apache.kylin.gridtable.DefaultGTComparator; import org.apache.kylin.gridtable.GTInfo; import org.apache.kylin.gridtable.IGTCodeSystem; import org.apache.kylin.gridtable.IGTComparator; import org.apache.kylin.measure.MeasureAggregator; import org.apache.kylin.metadata.datatype.DataTypeSerializer; +import com.google.common.collect.Maps; + @SuppressWarnings({ "rawtypes", "unchecked" }) public class TrimmedCubeCodeSystem implements IGTCodeSystem { - private GTInfo info; - private Map<Integer, Integer> dependentMetricsMap; - private DataTypeSerializer[] serializers; - private IGTComparator comparator; + private Map<Integer, Integer> dictSizes; + private Map<Integer, Integer> fixedLengthSize; - public TrimmedCubeCodeSystem(GTInfo info, Map<Integer, Integer> dependentMetricsMap, DataTypeSerializer[] serializers, IGTComparator comparator) { - this.info = info; + private transient GTInfo info; + private transient DataTypeSerializer[] serializers; + private transient IGTComparator comparator; + + public TrimmedCubeCodeSystem(Map<Integer, Integer> dependentMetricsMap, Map<Integer, Integer> dictSizes, Map<Integer, Integer> fixedLengthSize) { this.dependentMetricsMap = dependentMetricsMap; - this.serializers = serializers; - this.comparator = comparator; + this.dictSizes = dictSizes; + this.fixedLengthSize = fixedLengthSize; } @Override public void init(GTInfo info) { + this.info = info; + + serializers = new DataTypeSerializer[info.getColumnCount()]; + for (int i = 0; i < info.getColumnCount(); i++) { + // dimension with dictionary + if (dictSizes.get(i) != null) { + serializers[i] = new CubeCodeSystem.TrimmedDictionarySerializer(dictSizes.get(i)); + } + // dimension of fixed length + else if (fixedLengthSize.get(i) != null) { + serializers[i] = new FixLenSerializer(fixedLengthSize.get(i)); + } + // metrics + else { + serializers[i] = DataTypeSerializer.create(info.getColumnType(i)); + } + } + + this.comparator = new DefaultGTComparator(); } @Override @@ -74,10 +99,10 @@ public class TrimmedCubeCodeSystem implements IGTCodeSystem { public void encodeColumnValue(int col, Object value, int roundingFlag, ByteBuffer buf) { DataTypeSerializer serializer = serializers[col]; -// if (((value instanceof String) && !(serializer instanceof StringSerializer || serializer instanceof CubeCodeSystem.FixLenSerializer))) { -// value = serializer.valueOf((String) value); -// } - + // if (((value instanceof String) && !(serializer instanceof StringSerializer || serializer instanceof CubeCodeSystem.FixLenSerializer))) { + // value = serializer.valueOf((String) value); + // } + serializer.serialize(value, buf); } @@ -115,4 +140,58 @@ public class TrimmedCubeCodeSystem implements IGTCodeSystem { return result; } + public static final BytesSerializer<TrimmedCubeCodeSystem> serializer = new BytesSerializer<TrimmedCubeCodeSystem>() { + @Override + public void serialize(TrimmedCubeCodeSystem value, ByteBuffer out) { + BytesUtil.writeVInt(value.dependentMetricsMap.size(), out); + for (Map.Entry<Integer, Integer> x : value.dependentMetricsMap.entrySet()) { + BytesUtil.writeVInt(x.getKey(), out); + BytesUtil.writeVInt(x.getValue(), out); + } + + BytesUtil.writeVInt(value.dictSizes.size(), out); + for (Map.Entry<Integer, Integer> x : value.dictSizes.entrySet()) { + BytesUtil.writeVInt(x.getKey(), out); + BytesUtil.writeVInt(x.getValue(), out); + } + + BytesUtil.writeVInt(value.fixedLengthSize.size(), out); + for (Map.Entry<Integer, Integer> x : value.fixedLengthSize.entrySet()) { + BytesUtil.writeVInt(x.getKey(), out); + BytesUtil.writeVInt(x.getValue(), out); + } + } + + @Override + public TrimmedCubeCodeSystem deserialize(ByteBuffer in) { + Map<Integer, Integer> dependentMetricsMap = Maps.newHashMap(); + Map<Integer, Integer> dictSizes = Maps.newHashMap(); + Map<Integer, Integer> fixedLengthSize = Maps.newHashMap(); + + int size = 0; + + size = BytesUtil.readVInt(in); + for (int i = 0; i < size; ++i) { + int key = BytesUtil.readVInt(in); + int value = BytesUtil.readVInt(in); + dependentMetricsMap.put(key, value); + } + + size = BytesUtil.readVInt(in); + for (int i = 0; i < size; ++i) { + int key = BytesUtil.readVInt(in); + int value = BytesUtil.readVInt(in); + dictSizes.put(key, value); + } + + size = BytesUtil.readVInt(in); + for (int i = 0; i < size; ++i) { + int key = BytesUtil.readVInt(in); + int value = BytesUtil.readVInt(in); + fixedLengthSize.put(key, value); + } + return new TrimmedCubeCodeSystem(dependentMetricsMap, dictSizes, fixedLengthSize); + } + }; + } http://git-wip-us.apache.org/repos/asf/kylin/blob/2ac2f56a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java index a1940a4..39f2a34 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java @@ -46,8 +46,6 @@ import org.apache.kylin.gridtable.GTRecord; import org.apache.kylin.gridtable.GTScanRequest; import org.apache.kylin.gridtable.GridTable; import org.apache.kylin.gridtable.IGTScanner; -import org.apache.kylin.measure.MeasureType; -import org.apache.kylin.measure.MeasureTypeFactory; import org.apache.kylin.metadata.datatype.DoubleMutable; import org.apache.kylin.metadata.model.MeasureDesc; import org.apache.kylin.metadata.model.TblColRef; @@ -331,7 +329,7 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder { IGTScanner baseInput = new InputConverter(baseCuboid.getInfo(), input); Pair<ImmutableBitSet, ImmutableBitSet> dimensionMetricsBitSet = InMemCubeBuilderUtils.getDimensionAndMetricColumnBitSet(baseCuboidId, measureCount); - GTScanRequest req = new GTScanRequest(baseCuboid.getInfo(), null, dimensionMetricsBitSet.getFirst(), dimensionMetricsBitSet.getSecond(), metricsAggrFuncs, null); + GTScanRequest req = new GTScanRequest(baseCuboid.getInfo(), null, null, dimensionMetricsBitSet.getFirst(), dimensionMetricsBitSet.getSecond(), metricsAggrFuncs, null, true, 0); GTAggregateScanner aggregationScanner = new GTAggregateScanner(baseInput, req); aggregationScanner.trackMemoryLevel(baseCuboidMemTracker); @@ -399,7 +397,7 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder { private GTAggregateScanner prepareGTAggregationScanner(GridTable gridTable, long parentId, long cuboidId, ImmutableBitSet aggregationColumns, ImmutableBitSet measureColumns) throws IOException { GTInfo info = gridTable.getInfo(); - GTScanRequest req = new GTScanRequest(info, null, aggregationColumns, measureColumns, metricsAggrFuncs, null); + GTScanRequest req = new GTScanRequest(info, null, null, aggregationColumns, measureColumns, metricsAggrFuncs, null, true, 0); GTAggregateScanner scanner = (GTAggregateScanner) gridTable.scan(req); // for child cuboid, some measures don't need aggregation. @@ -417,6 +415,7 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder { return scanner; } + private CuboidResult scanAndAggregateGridTable(GridTable gridTable, long parentId, long cuboidId, ImmutableBitSet aggregationColumns, ImmutableBitSet measureColumns) throws IOException { long startTime = System.currentTimeMillis(); logger.info("Calculating cuboid " + cuboidId); http://git-wip-us.apache.org/repos/asf/kylin/blob/2ac2f56a/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java index 5479449..b6da738 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java @@ -1,13 +1,16 @@ package org.apache.kylin.gridtable; +import java.nio.ByteBuffer; import java.util.Arrays; import java.util.BitSet; import java.util.Iterator; import java.util.LinkedList; +import org.apache.kylin.common.util.BytesSerializer; +import org.apache.kylin.common.util.BytesUtil; import org.apache.kylin.common.util.ImmutableBitSet; import org.apache.kylin.cube.gridtable.CubeCodeSystem; -import org.apache.kylin.cube.util.KryoUtils; +import org.apache.kylin.cube.gridtable.TrimmedCubeCodeSystem; import org.apache.kylin.metadata.datatype.DataType; import org.apache.kylin.metadata.model.TblColRef; @@ -18,27 +21,25 @@ public class GTInfo { } String tableName; - IGTCodeSystem codeSystem; // column schema - int nColumns; DataType[] colTypes; - ImmutableBitSet colAll; ImmutableBitSet colPreferIndex; + transient int nColumns; + transient ImmutableBitSet colAll; transient TblColRef[] colRefs; // grid info ImmutableBitSet primaryKey; // order by, uniqueness is not required ImmutableBitSet[] colBlocks; // primary key must be the first column block - ImmutableBitSet colBlocksAll; int rowBlockSize; // 0: disable row block + transient ImmutableBitSet colBlocksAll; // must create from builder private GTInfo() { } - public String getTableName() { return tableName; } @@ -94,7 +95,7 @@ public class GTInfo { } return result; } - + public ImmutableBitSet selectColumnBlocks(ImmutableBitSet columns) { if (columns == null) columns = colAll; @@ -241,23 +242,76 @@ public class GTInfo { } } - public static byte[] serialize(GTInfo info) { - if (info.codeSystem instanceof CubeCodeSystem) { - CubeCodeSystem origin = (CubeCodeSystem) info.codeSystem; - info.codeSystem = origin.trimForCoprocessor(); - byte[] trimmedInfoBytes = KryoUtils.serialize(info); - info.codeSystem = origin; - return trimmedInfoBytes; - } else { - return KryoUtils.serialize(info); - } - } - - public static GTInfo deserialize(byte[] bytes) { - return KryoUtils.deserialize(bytes, GTInfo.class); - } - public IGTCodeSystem getCodeSystem() { return codeSystem; } + + public static final BytesSerializer<GTInfo> serializer = new BytesSerializer<GTInfo>() { + @Override + public void serialize(GTInfo value, ByteBuffer out) { + if (value.codeSystem instanceof CubeCodeSystem) { + BytesUtil.writeAsciiString(CubeCodeSystem.class.getCanonicalName(), out); + TrimmedCubeCodeSystem trimmed = ((CubeCodeSystem) value.codeSystem).trimForCoprocessor(); + TrimmedCubeCodeSystem.serializer.serialize(trimmed, out); + } else if (value.codeSystem instanceof GTSampleCodeSystem) { + BytesUtil.writeAsciiString(GTSampleCodeSystem.class.getCanonicalName(), out); + GTSampleCodeSystem.serializer.serialize((GTSampleCodeSystem) value.codeSystem, out); + } else { + throw new IllegalArgumentException("Can't recognize code system " + value.codeSystem.getClass()); + } + + BytesUtil.writeUTFString(value.tableName, out); + BytesUtil.writeVInt(value.colTypes.length, out); + for (DataType dataType : value.colTypes) { + DataType.serializer.serialize(dataType, out); + } + ImmutableBitSet.serializer.serialize(value.colPreferIndex, out); + ImmutableBitSet.serializer.serialize(value.primaryKey, out); + BytesUtil.writeVInt(value.colBlocks.length, out); + for (ImmutableBitSet x : value.colBlocks) { + ImmutableBitSet.serializer.serialize(x, out); + } + BytesUtil.writeVInt(value.rowBlockSize, out); + } + + @Override + public GTInfo deserialize(ByteBuffer in) { + IGTCodeSystem codeSystem = null; + String codeSystemType = BytesUtil.readAsciiString(in); + if (CubeCodeSystem.class.getCanonicalName().equals(codeSystemType)) { + codeSystem = TrimmedCubeCodeSystem.serializer.deserialize(in); + } else if (GTSampleCodeSystem.class.getCanonicalName().equals(codeSystemType)) { + codeSystem = GTSampleCodeSystem.serializer.deserialize(in); + } else { + throw new IllegalArgumentException("Can't recognize code system " + codeSystemType); + } + + String newTableName = BytesUtil.readUTFString(in); + + int colTypesSize = BytesUtil.readVInt(in); + DataType[] newColTypes = new DataType[colTypesSize]; + for (int i = 0; i < colTypesSize; ++i) { + newColTypes[i] = DataType.serializer.deserialize(in); + } + + ImmutableBitSet newColPreferIndex = ImmutableBitSet.serializer.deserialize(in); + ImmutableBitSet newPrimaryKey = ImmutableBitSet.serializer.deserialize(in); + + int colBlockSize = BytesUtil.readVInt(in); + ImmutableBitSet[] newColBlocks = new ImmutableBitSet[colBlockSize]; + for (int i = 0; i < colBlockSize; ++i) { + newColBlocks[i] = ImmutableBitSet.serializer.deserialize(in); + } + + int newRowBlockSize = BytesUtil.readVInt(in); + + return GTInfo.builder().setCodeSystem(codeSystem).// + setTableName(newTableName).// + setColumns(newColTypes).// + setColumnPreferIndex(newColPreferIndex).// + setPrimaryKey(newPrimaryKey).// + enableColumnBlock(newColBlocks).// + enableRowBlock(newRowBlockSize).build(); + } + }; } http://git-wip-us.apache.org/repos/asf/kylin/blob/2ac2f56a/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java index 98f6e2d..b671c88 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java @@ -11,10 +11,9 @@ import com.google.common.base.Preconditions; public class GTRecord implements Comparable<GTRecord> { - final GTInfo info; + final transient GTInfo info; final ByteArray[] cols; - - private ImmutableBitSet maskForEqualHashComp; + final ImmutableBitSet maskForEqualHashComp; public GTRecord(GTInfo info, ImmutableBitSet maskForEqualHashComp, ByteArray[] cols) { this.info = info; @@ -141,10 +140,6 @@ public class GTRecord implements Comparable<GTRecord> { return maskForEqualHashComp; } - public void maskForEqualHashComp(ImmutableBitSet set) { - this.maskForEqualHashComp = set; - } - @Override public boolean equals(Object obj) { if (this == obj) http://git-wip-us.apache.org/repos/asf/kylin/blob/2ac2f56a/core-cube/src/main/java/org/apache/kylin/gridtable/GTSampleCodeSystem.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTSampleCodeSystem.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTSampleCodeSystem.java index 77eb430..d68d7d4 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTSampleCodeSystem.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTSampleCodeSystem.java @@ -2,6 +2,7 @@ package org.apache.kylin.gridtable; import java.nio.ByteBuffer; +import org.apache.kylin.common.util.BytesSerializer; import org.apache.kylin.common.util.ImmutableBitSet; import org.apache.kylin.measure.MeasureAggregator; import org.apache.kylin.metadata.datatype.DataTypeSerializer; @@ -81,4 +82,15 @@ public class GTSampleCodeSystem implements IGTCodeSystem { return serializers[col].deserialize(buf); } + public static final BytesSerializer<GTSampleCodeSystem> serializer = new BytesSerializer<GTSampleCodeSystem>() { + @Override + public void serialize(GTSampleCodeSystem value, ByteBuffer out) { + } + + @Override + public GTSampleCodeSystem deserialize(ByteBuffer in) { + return new GTSampleCodeSystem(); + } + }; + } http://git-wip-us.apache.org/repos/asf/kylin/blob/2ac2f56a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRange.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRange.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRange.java index 87f51df..1ca2cd0 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRange.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRange.java @@ -11,6 +11,8 @@ public class GTScanRange { final public GTRecord pkEnd; // inclusive, record must not be null, col[pk].array() can be null to mean unbounded final public List<GTRecord> fuzzyKeys; // partial matching primary keys + + public GTScanRange(GTRecord pkStart, GTRecord pkEnd) { this(pkStart, pkEnd, null); } http://git-wip-us.apache.org/repos/asf/kylin/blob/2ac2f56a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java index 3d07623..f0e42a0 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java @@ -134,13 +134,16 @@ public class GTScanRangePlanner { List<Map<Integer, ByteArray>> fuzzyValueCombinations = FuzzyValueCombination.calculate(fuzzyValueSet, MAX_HBASE_FUZZY_KEYS); for (Map<Integer, ByteArray> fuzzyValue : fuzzyValueCombinations) { - GTRecord fuzzy = new GTRecord(info); + BitSet bitSet = new BitSet(info.getColumnCount()); for (Map.Entry<Integer, ByteArray> entry : fuzzyValue.entrySet()) { bitSet.set(entry.getKey()); + } + GTRecord fuzzy = new GTRecord(info, new ImmutableBitSet(bitSet)); + for (Map.Entry<Integer, ByteArray> entry : fuzzyValue.entrySet()) { fuzzy.set(entry.getKey(), entry.getValue()); } - fuzzy.maskForEqualHashComp(new ImmutableBitSet(bitSet)); + result.add(fuzzy); } return result; http://git-wip-us.apache.org/repos/asf/kylin/blob/2ac2f56a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java index ac99d4e..a73a950 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java @@ -1,15 +1,19 @@ package org.apache.kylin.gridtable; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Arrays; import java.util.List; import java.util.Set; import org.apache.kylin.common.util.ByteArray; +import org.apache.kylin.common.util.BytesSerializer; +import org.apache.kylin.common.util.BytesUtil; import org.apache.kylin.common.util.ImmutableBitSet; import org.apache.kylin.metadata.filter.TupleFilter; import org.apache.kylin.metadata.model.TblColRef; +import com.google.common.collect.Lists; import com.google.common.collect.Sets; public class GTScanRequest { @@ -17,7 +21,7 @@ public class GTScanRequest { private GTInfo info; private GTScanRange range; private ImmutableBitSet columns; - private ImmutableBitSet selectedColBlocks; + private transient ImmutableBitSet selectedColBlocks; // optional filtering private TupleFilter filterPushDown; @@ -31,9 +35,72 @@ public class GTScanRequest { private boolean allowPreAggregation = true; private double aggrCacheGB = 0; // no limit - public GTScanRequest(GTInfo info) { - this(info, null, null, null); - } + public static final BytesSerializer<GTScanRequest> serializer = new BytesSerializer<GTScanRequest>() { + @Override + public void serialize(GTScanRequest value, ByteBuffer out) { + GTInfo.serializer.serialize(value.info, out); + + serializeGTRecord(value.range.pkStart, out); + serializeGTRecord(value.range.pkEnd, out); + BytesUtil.writeVInt(value.range.fuzzyKeys.size(), out); + for (GTRecord f : value.range.fuzzyKeys) { + serializeGTRecord(f, out); + } + + ImmutableBitSet.serializer.serialize(value.columns, out); + BytesUtil.writeByteArray(GTUtil.serializeGTFilter(value.filterPushDown, value.info), out); + + ImmutableBitSet.serializer.serialize(value.aggrGroupBy, out); + ImmutableBitSet.serializer.serialize(value.aggrMetrics, out); + BytesUtil.writeAsciiStringArray(value.aggrMetricsFuncs, out); + BytesUtil.writeVInt(value.allowPreAggregation ? 1 : 0, out); + out.putDouble(value.aggrCacheGB); + } + + @Override + public GTScanRequest deserialize(ByteBuffer in) { + GTInfo sInfo = GTInfo.serializer.deserialize(in); + + GTRecord sPkStart = deserializeGTRecord(in, sInfo); + GTRecord sPkEnd = deserializeGTRecord(in, sInfo); + List<GTRecord> sFuzzyKeys = Lists.newArrayList(); + int sFuzzyKeySize = BytesUtil.readVInt(in); + for (int i = 0; i < sFuzzyKeySize; i++) { + sFuzzyKeys.add(deserializeGTRecord(in, sInfo)); + } + GTScanRange sRange = new GTScanRange(sPkStart, sPkEnd, sFuzzyKeys); + + ImmutableBitSet sColumns = ImmutableBitSet.serializer.deserialize(in); + TupleFilter sGTFilter = GTUtil.deserializeGTFilter(BytesUtil.readByteArray(in), sInfo); + + ImmutableBitSet sAggGroupBy = ImmutableBitSet.serializer.deserialize(in); + ImmutableBitSet sAggrMetrics = ImmutableBitSet.serializer.deserialize(in); + String[] sAggrMetricFuncs = BytesUtil.readAsciiStringArray(in); + boolean sAllowPreAggr = (BytesUtil.readVInt(in) == 1); + double sAggrCacheGB = in.getDouble(); + + return new GTScanRequest(sInfo, sRange, sColumns, sAggGroupBy, sAggrMetrics, sAggrMetricFuncs, sGTFilter, sAllowPreAggr, sAggrCacheGB); + } + + private void serializeGTRecord(GTRecord gtRecord, ByteBuffer out) { + BytesUtil.writeVInt(gtRecord.cols.length, out); + for (ByteArray col : gtRecord.cols) { + col.exportData(out); + } + ImmutableBitSet.serializer.serialize(gtRecord.maskForEqualHashComp, out); + } + + private GTRecord deserializeGTRecord(ByteBuffer in, GTInfo sInfo) { + int colLength = BytesUtil.readVInt(in); + ByteArray[] sCols = new ByteArray[colLength]; + for (int i = 0; i < colLength; i++) { + sCols[i] = ByteArray.importData(in); + } + ImmutableBitSet sMaskForEqualHashComp = ImmutableBitSet.serializer.deserialize(in); + return new GTRecord(sInfo, sMaskForEqualHashComp, sCols); + } + + }; public GTScanRequest(GTInfo info, GTScanRange range, ImmutableBitSet columns, TupleFilter filterPushDown) { this.info = info; @@ -43,13 +110,8 @@ public class GTScanRequest { validate(info); } - public GTScanRequest(GTInfo info, GTScanRange range, ImmutableBitSet aggrGroupBy, ImmutableBitSet aggrMetrics, // - String[] aggrMetricsFuncs, TupleFilter filterPushDown) { - this(info, range, null, aggrGroupBy, aggrMetrics, aggrMetricsFuncs, filterPushDown, true); - } - public GTScanRequest(GTInfo info, GTScanRange range, ImmutableBitSet dimensions, ImmutableBitSet aggrGroupBy, // - ImmutableBitSet aggrMetrics, String[] aggrMetricsFuncs, TupleFilter filterPushDown, boolean allowPreAggregation) { + ImmutableBitSet aggrMetrics, String[] aggrMetricsFuncs, TupleFilter filterPushDown, boolean allowPreAggregation, double aggrCacheGB) { this.info = info; this.range = range == null ? new GTScanRange(new GTRecord(info), new GTRecord(info)) : range; this.columns = dimensions; @@ -60,6 +122,7 @@ public class GTScanRequest { this.aggrMetricsFuncs = aggrMetricsFuncs; this.allowPreAggregation = allowPreAggregation; + this.aggrCacheGB = aggrCacheGB; validate(info); } @@ -76,6 +139,7 @@ public class GTScanRequest { if (columns == null) columns = ImmutableBitSet.EMPTY; + columns = columns.or(aggrGroupBy); columns = columns.or(aggrMetrics); } http://git-wip-us.apache.org/repos/asf/kylin/blob/2ac2f56a/core-cube/src/main/java/org/apache/kylin/gridtable/GTUtil.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTUtil.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTUtil.java index de9a5ce..7cf7f3c 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTUtil.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTUtil.java @@ -28,6 +28,16 @@ public class GTUtil { return new TblColRef(desc); } + public static byte[] serializeGTFilter(TupleFilter gtFilter, GTInfo info) { + IFilterCodeSystem<ByteArray> filterCodeSystem = wrap(info.codeSystem.getComparator()); + return TupleFilterSerializer.serialize(gtFilter, filterCodeSystem); + } + + public static TupleFilter deserializeGTFilter(byte[] bytes, GTInfo info) { + IFilterCodeSystem<ByteArray> filterCodeSystem = wrap(info.codeSystem.getComparator()); + return TupleFilterSerializer.deserialize(bytes, filterCodeSystem); + } + public static TupleFilter convertFilterUnevaluatable(TupleFilter rootFilter, GTInfo info, // Set<TblColRef> unevaluatableColumnCollector) { return convertFilter(rootFilter, info, null, false, unevaluatableColumnCollector); http://git-wip-us.apache.org/repos/asf/kylin/blob/2ac2f56a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/ConcurrentDiskStoreTest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/ConcurrentDiskStoreTest.java b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/ConcurrentDiskStoreTest.java index e6949b2..9a1029c 100644 --- a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/ConcurrentDiskStoreTest.java +++ b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/ConcurrentDiskStoreTest.java @@ -71,7 +71,7 @@ public class ConcurrentDiskStoreTest { t[i] = new Thread() { public void run() { try { - IGTScanner scanner = table.scan(new GTScanRequest(table.getInfo())); + IGTScanner scanner = table.scan(new GTScanRequest(table.getInfo(), null, null, null)); int i = 0; for (GTRecord r : scanner) { assertEquals(data.get(i++), r); http://git-wip-us.apache.org/repos/asf/kylin/blob/2ac2f56a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/MemDiskStoreTest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/MemDiskStoreTest.java b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/MemDiskStoreTest.java index 54253d3..ff6a2ba 100644 --- a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/MemDiskStoreTest.java +++ b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/MemDiskStoreTest.java @@ -87,7 +87,7 @@ public class MemDiskStoreTest { } builder.close(); - IGTScanner scanner = table.scan(new GTScanRequest(info)); + IGTScanner scanner = table.scan(new GTScanRequest(info,null,null,null)); int i = 0; for (GTRecord r : scanner) { assertEquals(data.get(i++), r); http://git-wip-us.apache.org/repos/asf/kylin/blob/2ac2f56a/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheSpillTest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheSpillTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheSpillTest.java index 7fd5ce7..0e399dd 100644 --- a/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheSpillTest.java +++ b/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheSpillTest.java @@ -76,8 +76,7 @@ public class AggregationCacheSpillTest { } }; - GTScanRequest scanRequest = new GTScanRequest(INFO, null, new ImmutableBitSet(0, 3), new ImmutableBitSet(0, 3), new ImmutableBitSet(3, 6), new String[] { "SUM", "SUM", "COUNT_DISTINCT" }, null, true); - scanRequest.setAggrCacheGB(0.5); // 500 MB + GTScanRequest scanRequest = new GTScanRequest(INFO, null, new ImmutableBitSet(0, 3), new ImmutableBitSet(0, 3), new ImmutableBitSet(3, 6), new String[] { "SUM", "SUM", "COUNT_DISTINCT" }, null, true, 0.5); GTAggregateScanner scanner = new GTAggregateScanner(inputScanner, scanRequest); @@ -119,8 +118,7 @@ public class AggregationCacheSpillTest { }; // all-in-mem testcase - GTScanRequest scanRequest = new GTScanRequest(INFO, null, new ImmutableBitSet(0, 3), new ImmutableBitSet(1, 3), new ImmutableBitSet(3, 6), new String[] { "SUM", "SUM", "COUNT_DISTINCT" }, null, true); - scanRequest.setAggrCacheGB(0.5); // 500 MB + GTScanRequest scanRequest = new GTScanRequest(INFO, null, new ImmutableBitSet(0, 3), new ImmutableBitSet(1, 3), new ImmutableBitSet(3, 6), new String[] { "SUM", "SUM", "COUNT_DISTINCT" }, null, true, 0.5); GTAggregateScanner scanner = new GTAggregateScanner(inputScanner, scanRequest); http://git-wip-us.apache.org/repos/asf/kylin/blob/2ac2f56a/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java index 836bafd..4588051 100644 --- a/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java +++ b/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java @@ -17,7 +17,7 @@ package org.apache.kylin.gridtable; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; import java.io.IOException; import java.math.BigDecimal; @@ -28,6 +28,7 @@ import java.util.List; import java.util.Map; import org.apache.kylin.common.util.ByteArray; +import org.apache.kylin.common.util.BytesSerializer; import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.common.util.ImmutableBitSet; import org.apache.kylin.common.util.Pair; @@ -49,6 +50,7 @@ import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum; import org.apache.kylin.metadata.model.ColumnDesc; import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.metadata.model.TblColRef; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -209,7 +211,28 @@ public class DictGridTableTest { @Test public void verifyFirstRow() throws IOException { - doScanAndVerify(table, new GTScanRequest(table.getInfo()), "[1421193600000, 30, Yang, 10, 10.5]"); + doScanAndVerify(table, new GTScanRequest(table.getInfo(), null, null, null), "[1421193600000, 30, Yang, 10, 10.5]",// + "[1421193600000, 30, Luke, 10, 10.5]",// + "[1421280000000, 20, Dong, 10, 10.5]",// + "[1421280000000, 20, Jason, 10, 10.5]",// + "[1421280000000, 30, Xu, 10, 10.5]",// + "[1421366400000, 20, Mahone, 10, 10.5]",// + "[1421366400000, 20, Qianhao, 10, 10.5]",// + "[1421366400000, 30, George, 10, 10.5]",// + "[1421366400000, 30, Shaofeng, 10, 10.5]",// + "[1421452800000, 10, Kejia, 10, 10.5]"); + } + + //for testing GTScanRequest serialization and deserialization + private GTScanRequest useDeserializedGTScanRequest(GTScanRequest origin) { + ByteBuffer buffer = ByteBuffer.allocate(BytesSerializer.SERIALIZE_BUFFER_SIZE); + GTScanRequest.serializer.serialize(origin, buffer); + buffer.flip(); + GTScanRequest sGTScanRequest = GTScanRequest.serializer.deserialize(buffer); + + Assert.assertArrayEquals(origin.getAggrMetricsFuncs(), sGTScanRequest.getAggrMetricsFuncs()); + Assert.assertEquals(origin.getAggrCacheGB(), sGTScanRequest.getAggrCacheGB(), 0.01); + return sGTScanRequest; } @Test @@ -221,12 +244,12 @@ public class DictGridTableTest { LogicalTupleFilter fNotPlusUnevaluatable = not(unevaluatable(info.colRef(1))); LogicalTupleFilter filter = and(fComp, fUnevaluatable, fNotPlusUnevaluatable); - GTScanRequest req = new GTScanRequest(info, null, setOf(0), setOf(3), new String[] { "sum" }, filter); + GTScanRequest req = new GTScanRequest(info, null, null, setOf(0), setOf(3), new String[] { "sum" }, filter, true, 0); // note the unEvaluatable column 1 in filter is added to group by assertEquals("GTScanRequest [range=[null, null]-[null, null], columns={0, 1, 3}, filterPushDown=AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], [null], [null]], aggrGroupBy={0, 1}, aggrMetrics={3}, aggrMetricsFuncs=[sum]]", req.toString()); - doScanAndVerify(table, req, "[1421280000000, 20, null, 20, null]"); + doScanAndVerify(table, useDeserializedGTScanRequest(req), "[1421280000000, 20, null, 20, null]", "[1421280000000, 30, null, 10, null]", "[1421366400000, 20, null, 20, null]", "[1421366400000, 30, null, 20, null]", "[1421452800000, 10, null, 10, null]"); } @Test @@ -237,12 +260,11 @@ public class DictGridTableTest { CompareTupleFilter fComp2 = compare(info.colRef(1), FilterOperatorEnum.GT, enc(info, 1, "10")); LogicalTupleFilter filter = and(fComp1, fComp2); - GTScanRequest req = new GTScanRequest(info, null, setOf(0), setOf(3), new String[] { "sum" }, filter); - + GTScanRequest req = new GTScanRequest(info, null, null, setOf(0), setOf(3), new String[] { "sum" }, filter, true, 0); // note the evaluatable column 1 in filter is added to returned columns but not in group by assertEquals("GTScanRequest [range=[null, null]-[null, null], columns={0, 1, 3}, filterPushDown=AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], NULL.GT_MOCKUP_TABLE.1 GT [\\x00]], aggrGroupBy={0}, aggrMetrics={3}, aggrMetricsFuncs=[sum]]", req.toString()); - doScanAndVerify(table, req, "[1421280000000, 20, null, 30, null]", "[1421366400000, 20, null, 40, null]"); + doScanAndVerify(table, useDeserializedGTScanRequest(req), "[1421280000000, 20, null, 30, null]", "[1421366400000, 20, null, 40, null]"); } @Test @@ -334,9 +356,10 @@ public class DictGridTableTest { int i = 0; for (GTRecord r : scanner) { System.out.println(r); - if (verifyRows != null && i < verifyRows.length) { - assertEquals(verifyRows[i], r.toString()); + if (verifyRows == null || i >= verifyRows.length) { + Assert.fail(); } + assertEquals(verifyRows[i], r.toString()); i++; } scanner.close(); http://git-wip-us.apache.org/repos/asf/kylin/blob/2ac2f56a/core-cube/src/test/java/org/apache/kylin/gridtable/GTScanReqSerDerTest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/GTScanReqSerDerTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/GTScanReqSerDerTest.java new file mode 100644 index 0000000..6642d95 --- /dev/null +++ b/core-cube/src/test/java/org/apache/kylin/gridtable/GTScanReqSerDerTest.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.gridtable; + +import java.nio.ByteBuffer; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.BytesSerializer; +import org.apache.kylin.common.util.ImmutableBitSet; +import org.apache.kylin.common.util.LocalFileMetadataTestCase; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.cuboid.Cuboid; +import org.apache.kylin.cube.gridtable.CubeGridTable; +import org.apache.kylin.cube.gridtable.NotEnoughGTInfoException; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class GTScanReqSerDerTest extends LocalFileMetadataTestCase { + + private ByteBuffer buffer = ByteBuffer.allocate(BytesSerializer.SERIALIZE_BUFFER_SIZE); + + @Before + public void setUp() throws Exception { + buffer.clear(); + this.createTestMetadata(); + } + + @After + public void after() throws Exception { + this.cleanupTestMetadata(); + } + + @Test + public void testImmutableBitSet() { + ImmutableBitSet x = new ImmutableBitSet(10, 100); + ImmutableBitSet.serializer.serialize(x, buffer); + + buffer.flip(); + + ImmutableBitSet sx = ImmutableBitSet.serializer.deserialize(buffer); + for (int i = 0; i < 10; i++) { + Assert.assertFalse(sx.get(i)); + } + for (int i = 10; i < 100; i++) { + Assert.assertTrue(sx.get(i)); + } + } + + @Test + public void testBasicInfo() { + GTInfo info = UnitTestSupport.basicInfo(); + GTInfo.serializer.serialize(info, buffer); + buffer.flip(); + + GTInfo sInfo = GTInfo.serializer.deserialize(buffer); + this.compareTwoGTInfo(info,sInfo); + } + + @Test + public void testAdvancedInfo() { + GTInfo info = UnitTestSupport.advancedInfo(); + GTInfo.serializer.serialize(info, buffer); + buffer.flip(); + + GTInfo sInfo = GTInfo.serializer.deserialize(buffer); + this.compareTwoGTInfo(info,sInfo); + } + + @Test + public void testGTInfo() throws NotEnoughGTInfoException { + CubeInstance cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube("test_kylin_cube_with_slr_ready"); + CubeSegment segment = cube.getFirstSegment(); + + GTInfo info = CubeGridTable.newGTInfo(segment, Cuboid.getBaseCuboidId(cube.getDescriptor())); + GTInfo.serializer.serialize(info, buffer); + buffer.flip(); + + GTInfo sInfo = GTInfo.serializer.deserialize(buffer); + this.compareTwoGTInfo(info,sInfo); + } + + private void compareTwoGTInfo(GTInfo info, GTInfo sInfo) { + Assert.assertEquals(info.tableName, sInfo.tableName); + Assert.assertEquals(info.primaryKey, sInfo.primaryKey); + + for (int i = 0; i < info.colTypes.length; i++) { + Assert.assertEquals(info.codeSystem.maxCodeLength(i), sInfo.codeSystem.maxCodeLength(i)); + Assert.assertTrue(info.codeSystem.maxCodeLength(i) > 0); + Assert.assertEquals(info.colRef(i), sInfo.colRef(i)); + } + Assert.assertArrayEquals(info.colBlocks, sInfo.colBlocks); + Assert.assertEquals(info.getRowBlockSize(), sInfo.getRowBlockSize()); + Assert.assertEquals(info.rowBlockSize, sInfo.rowBlockSize); + + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/2ac2f56a/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java index 5454490..2505e00 100644 --- a/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java +++ b/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java @@ -78,7 +78,7 @@ public class SimpleGridTableTest { } private IGTScanner scan(GridTable table) throws IOException { - GTScanRequest req = new GTScanRequest(table.getInfo()); + GTScanRequest req = new GTScanRequest(table.getInfo(), null, null, null); IGTScanner scanner = table.scan(req); for (GTRecord r : scanner) { Object[] v = r.getValues(); @@ -94,7 +94,7 @@ public class SimpleGridTableTest { } private IGTScanner scanAndAggregate(GridTable table) throws IOException { - GTScanRequest req = new GTScanRequest(table.getInfo(), null, setOf(0, 2), setOf(3, 4), new String[] { "count", "sum" }, null); + GTScanRequest req = new GTScanRequest(table.getInfo(), null, null, setOf(0, 2), setOf(3, 4), new String[] { "count", "sum" }, null, true, 0); IGTScanner scanner = table.scan(req); int i = 0; for (GTRecord r : scanner) { http://git-wip-us.apache.org/repos/asf/kylin/blob/2ac2f56a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/BigDecimalSerializer.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/BigDecimalSerializer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/BigDecimalSerializer.java index 288eb55..25b4e07 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/BigDecimalSerializer.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/BigDecimalSerializer.java @@ -35,9 +35,9 @@ public class BigDecimalSerializer extends DataTypeSerializer<BigDecimal> { private static final Logger logger = LoggerFactory.getLogger(BigDecimalSerializer.class); final DataType type; - final int maxLength; + transient final int maxLength; - int avoidVerbose = 0; + transient int avoidVerbose = 0; public BigDecimalSerializer(DataType type) { this.type = type; @@ -101,5 +101,5 @@ public class BigDecimalSerializer extends DataTypeSerializer<BigDecimal> { public BigDecimal valueOf(String str) { return new BigDecimal(str); } - + } http://git-wip-us.apache.org/repos/asf/kylin/blob/2ac2f56a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataType.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataType.java b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataType.java index 100dc8c..7cf0243 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataType.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataType.java @@ -19,6 +19,7 @@ package org.apache.kylin.metadata.datatype; import java.io.Serializable; +import java.nio.ByteBuffer; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashSet; @@ -30,6 +31,8 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.commons.lang.StringUtils; +import org.apache.kylin.common.util.BytesSerializer; +import org.apache.kylin.common.util.BytesUtil; import org.apache.kylin.measure.MeasureTypeFactory; import org.apache.kylin.metadata.model.TblColRef.InnerDataTypeEnum; @@ -37,7 +40,7 @@ import org.apache.kylin.metadata.model.TblColRef.InnerDataTypeEnum; */ @SuppressWarnings("serial") public class DataType implements Serializable { - + private static final LinkedHashSet<String> VALID_TYPES = new LinkedHashSet<String>(); private static Pattern TYPE_PATTERN = null; @@ -129,6 +132,12 @@ public class DataType implements Serializable { private int precision; private int scale; + public DataType(String name, int precision, int scale) { + this.name = name; + this.precision = precision; + this.scale = scale; + } + private DataType(String datatype) { datatype = datatype.trim().toLowerCase(); datatype = replaceLegacy(datatype); @@ -295,4 +304,23 @@ public class DataType implements Serializable { else return name + "(" + precision + "," + scale + ")"; } + + public static final BytesSerializer<DataType> serializer = new BytesSerializer<DataType>() { + @Override + public void serialize(DataType value, ByteBuffer out) { + BytesUtil.writeUTFString(value.name, out); + BytesUtil.writeVInt(value.precision, out); + BytesUtil.writeVInt(value.scale, out); + + } + + @Override + public DataType deserialize(ByteBuffer in) { + String name = BytesUtil.readUTFString(in); + int precision = BytesUtil.readVInt(in); + int scale = BytesUtil.readVInt(in); + + return new DataType(name, precision, scale); + } + }; } http://git-wip-us.apache.org/repos/asf/kylin/blob/2ac2f56a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfoDigest.java ---------------------------------------------------------------------- diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfoDigest.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfoDigest.java index 09e6e4b..9eebdbe 100644 --- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfoDigest.java +++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfoDigest.java @@ -129,7 +129,7 @@ public class TableRecordInfoDigest { } public static byte[] serialize(TableRecordInfoDigest o) { - ByteBuffer buf = ByteBuffer.allocate(Serializer.SERIALIZE_BUFFER_SIZE); + ByteBuffer buf = ByteBuffer.allocate(BytesSerializer.SERIALIZE_BUFFER_SIZE); serializer.serialize(o, buf); byte[] result = new byte[buf.position()]; System.arraycopy(buf.array(), 0, result, 0, buf.position()); @@ -144,9 +144,7 @@ public class TableRecordInfoDigest { return serializer.deserialize(buffer); } - private static final Serializer serializer = new Serializer(); - - private static class Serializer implements BytesSerializer<TableRecordInfoDigest> { + private static final BytesSerializer<TableRecordInfoDigest> serializer = new BytesSerializer<TableRecordInfoDigest>() { @Override public void serialize(TableRecordInfoDigest value, ByteBuffer out) { @@ -173,5 +171,5 @@ public class TableRecordInfoDigest { return result; } - } + }; } http://git-wip-us.apache.org/repos/asf/kylin/blob/2ac2f56a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorConstants.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorConstants.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorConstants.java deleted file mode 100644 index 5033d55..0000000 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorConstants.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.storage.hbase.common.coprocessor; - -/** - */ -public class CoprocessorConstants { - public static final int SERIALIZE_BUFFER_SIZE = 65536; - public static final int METRIC_SERIALIZE_BUFFER_SIZE = 65536; -} http://git-wip-us.apache.org/repos/asf/kylin/blob/2ac2f56a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorProjector.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorProjector.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorProjector.java index c37b2f4..e142536 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorProjector.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorProjector.java @@ -71,7 +71,7 @@ public class CoprocessorProjector { } public static byte[] serialize(CoprocessorProjector o) { - ByteBuffer buf = ByteBuffer.allocate(CoprocessorConstants.SERIALIZE_BUFFER_SIZE); + ByteBuffer buf = ByteBuffer.allocate(BytesSerializer.SERIALIZE_BUFFER_SIZE); serializer.serialize(o, buf); byte[] result = new byte[buf.position()]; System.arraycopy(buf.array(), 0, result, 0, buf.position()); @@ -82,10 +82,7 @@ public class CoprocessorProjector { return serializer.deserialize(ByteBuffer.wrap(bytes)); } - private static final Serializer serializer = new Serializer(); - - private static class Serializer implements BytesSerializer<CoprocessorProjector> { - + private static final BytesSerializer<CoprocessorProjector> serializer = new BytesSerializer<CoprocessorProjector>() { @Override public void serialize(CoprocessorProjector value, ByteBuffer out) { BytesUtil.writeByteArray(value.groupByMask, out); @@ -98,7 +95,7 @@ public class CoprocessorProjector { boolean hasGroupBy = BytesUtil.readVInt(in) == 1; return new CoprocessorProjector(mask, hasGroupBy); } - } + }; // ============================================================================ http://git-wip-us.apache.org/repos/asf/kylin/blob/2ac2f56a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorRowType.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorRowType.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorRowType.java index 35488d1..a39798b 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorRowType.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorRowType.java @@ -46,7 +46,7 @@ public class CoprocessorRowType { for (int i = 0; i < cols.size(); i++) { colSizes[i] = tableRecordInfo.getDigest().length(i); } - + //TODO:check0 return new CoprocessorRowType(cols.toArray(new TblColRef[cols.size()]), colSizes, 0); } @@ -64,7 +64,7 @@ public class CoprocessorRowType { } public static byte[] serialize(CoprocessorRowType o) { - ByteBuffer buf = ByteBuffer.allocate(CoprocessorConstants.SERIALIZE_BUFFER_SIZE); + ByteBuffer buf = ByteBuffer.allocate(BytesSerializer.SERIALIZE_BUFFER_SIZE); serializer.serialize(o, buf); byte[] result = new byte[buf.position()]; System.arraycopy(buf.array(), 0, result, 0, buf.position()); @@ -75,9 +75,7 @@ public class CoprocessorRowType { return serializer.deserialize(ByteBuffer.wrap(bytes)); } - private static final Serializer serializer = new Serializer(); - - private static class Serializer implements BytesSerializer<CoprocessorRowType> { + private static final BytesSerializer<CoprocessorRowType> serializer = new BytesSerializer<CoprocessorRowType>() { @Override public void serialize(CoprocessorRowType o, ByteBuffer out) { @@ -113,7 +111,7 @@ public class CoprocessorRowType { } return new CoprocessorRowType(cols, colSizes, bodyOffset); } - } + }; // ============================================================================ http://git-wip-us.apache.org/repos/asf/kylin/blob/2ac2f56a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverAggregators.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverAggregators.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverAggregators.java index 5b16b04..9e9dd6d 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverAggregators.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverAggregators.java @@ -36,7 +36,6 @@ import org.apache.kylin.measure.MeasureType; import org.apache.kylin.measure.MeasureTypeFactory; import org.apache.kylin.metadata.datatype.DataType; import org.apache.kylin.metadata.model.MeasureDesc; -import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorConstants; import org.apache.kylin.storage.hbase.steps.RowValueDecoder; /** @@ -76,7 +75,7 @@ public class ObserverAggregators { } public static byte[] serialize(ObserverAggregators o) { - ByteBuffer buf = ByteBuffer.allocate(CoprocessorConstants.SERIALIZE_BUFFER_SIZE); + ByteBuffer buf = ByteBuffer.allocate(BytesSerializer.SERIALIZE_BUFFER_SIZE); serializer.serialize(o, buf); byte[] result = new byte[buf.position()]; System.arraycopy(buf.array(), 0, result, 0, buf.position()); @@ -87,9 +86,7 @@ public class ObserverAggregators { return serializer.deserialize(ByteBuffer.wrap(bytes)); } - private static final Serializer serializer = new Serializer(); - - private static class Serializer implements BytesSerializer<ObserverAggregators> { + private static final BytesSerializer<ObserverAggregators> serializer = new BytesSerializer<ObserverAggregators>() { @Override public void serialize(ObserverAggregators value, ByteBuffer out) { @@ -117,7 +114,7 @@ public class ObserverAggregators { return new ObserverAggregators(hcols); } - } + }; // ============================================================================ @@ -125,7 +122,7 @@ public class ObserverAggregators { final int nHCols; final ByteBuffer[] hColValues; final int nTotalMeasures; - + MeasureType measureTypes[]; public ObserverAggregators(HCol[] _hcols) { @@ -163,7 +160,7 @@ public class ObserverAggregators { measureTypes[i++] = MeasureTypeFactory.create(col.funcNames[j], DataType.getType(col.dataTypes[j])); } } - + MeasureAggregator[] aggrs = new MeasureAggregator[nTotalMeasures]; for (int i = 0; i < nTotalMeasures; i++) { aggrs[i] = measureTypes[i].newAggregator(); http://git-wip-us.apache.org/repos/asf/kylin/blob/2ac2f56a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java index 9ca3fc8..14bc851 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java @@ -42,11 +42,11 @@ import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.ipc.BlockingRpcCallback; import org.apache.hadoop.hbase.ipc.ServerRpcController; import org.apache.kylin.common.debug.BackdoorToggles; +import org.apache.kylin.common.util.BytesSerializer; import org.apache.kylin.common.util.CompressionUtils; import org.apache.kylin.common.util.ImmutableBitSet; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.cuboid.Cuboid; -import org.apache.kylin.cube.util.KryoUtils; import org.apache.kylin.gridtable.GTInfo; import org.apache.kylin.gridtable.GTRecord; import org.apache.kylin.gridtable.GTScanRequest; @@ -153,9 +153,11 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { hbaseColumnsToGTIntList.add(IntList.newBuilder().addAllInts(list).build()); } - byte[] scanRequestBytes = KryoUtils.serialize(scanRequest); - final ByteString scanRequestBytesString = HBaseZeroCopyByteString.wrap(scanRequestBytes); - logger.info("Serialized scanRequestBytes's size is " + scanRequestBytes.length); + ByteBuffer buffer = ByteBuffer.allocate(BytesSerializer.SERIALIZE_BUFFER_SIZE); + GTScanRequest.serializer.serialize(scanRequest, buffer); + buffer.flip(); + final ByteString scanRequestBytesString = HBaseZeroCopyByteString.wrap(buffer.array(), buffer.position(), buffer.limit()); + logger.info("Serialized scanRequestBytes's size is " + (buffer.limit() - buffer.position())); final List<byte[]> rowBlocks = Collections.synchronizedList(Lists.<byte[]> newArrayList()); @@ -176,9 +178,13 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { Future<?> future = executorService.submit(new Runnable() { @Override public void run() { - final byte[] rawScanBytes = KryoUtils.serialize(rawScan); + + ByteBuffer rawScanBuffer = ByteBuffer.allocate(BytesSerializer.SERIALIZE_BUFFER_SIZE); + RawScan.serializer.serialize(rawScan, rawScanBuffer); + rawScanBuffer.flip(); + CubeVisitProtos.CubeVisitRequest.Builder builder = CubeVisitProtos.CubeVisitRequest.newBuilder(); - builder.setGtScanRequest(scanRequestBytesString).setHbaseRawScan(HBaseZeroCopyByteString.wrap(rawScanBytes)); + builder.setGtScanRequest(scanRequestBytesString).setHbaseRawScan(HBaseZeroCopyByteString.wrap(rawScanBuffer.array(), rawScanBuffer.position(), rawScanBuffer.limit())); for (IntList intList : hbaseColumnsToGTIntList) { builder.addHbaseColumnsToGT(intList); } http://git-wip-us.apache.org/repos/asf/kylin/blob/2ac2f56a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java index 477d32d..4a7a966 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java @@ -70,8 +70,8 @@ public abstract class CubeHBaseRPC { if (rawScan.endKey != null) { scan.setStopRow(rawScan.endKey); } - if (rawScan.fuzzyKey != null) { - applyFuzzyFilter(scan, rawScan.fuzzyKey); + if (rawScan.fuzzyKeys != null) { + applyFuzzyFilter(scan, rawScan.fuzzyKeys); } if (rawScan.hbaseColumns != null) { applyHBaseColums(scan, rawScan.hbaseColumns); @@ -257,8 +257,8 @@ public abstract class CubeHBaseRPC { info.append(rawScan.getEndKeyAsString()); info.append(" - "); info.append(Bytes.toStringBinary(rawScan.endKey)); - if (rawScan.fuzzyKey != null) { - info.append("\nFuzzy key counts: " + rawScan.fuzzyKey.size()); + if (rawScan.fuzzyKeys != null) { + info.append("\nFuzzy key counts: " + rawScan.fuzzyKeys.size()); info.append("\nFuzzy: "); info.append(rawScan.getFuzzyKeyAsString()); } else { http://git-wip-us.apache.org/repos/asf/kylin/blob/2ac2f56a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java index aadac3b..6e3e0d8 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java @@ -45,7 +45,6 @@ public class CubeSegmentScanner implements IGTScanner { final CubeSegment cubeSeg; final GTInfo info; - final byte[] trimmedInfoBytes; final List<GTScanRequest> scanRequests; final Scanner scanner; final Cuboid cuboid; @@ -82,13 +81,9 @@ public class CubeSegmentScanner implements IGTScanner { scanRequests = Lists.newArrayListWithCapacity(scanRanges.size()); - trimmedInfoBytes = GTInfo.serialize(info); - GTInfo trimmedInfo = GTInfo.deserialize(trimmedInfoBytes); - KylinConfig config = cubeSeg.getCubeInstance().getConfig(); for (GTScanRange range : scanRanges) { - GTScanRequest req = new GTScanRequest(trimmedInfo, range.replaceGTInfo(trimmedInfo), gtDimensions, gtAggrGroups, gtAggrMetrics, gtAggrFuncs, gtFilter, allowPreAggregate); - req.setAggrCacheGB(config.getQueryCoprocessorMemGB()); // limit the memory usage inside coprocessor + GTScanRequest req = new GTScanRequest(info, range, gtDimensions, gtAggrGroups, gtAggrMetrics, gtAggrFuncs, gtFilter, allowPreAggregate, config.getQueryCoprocessorMemGB()); scanRequests.add(req); } http://git-wip-us.apache.org/repos/asf/kylin/blob/2ac2f56a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java index 5a14d40..e988058 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java @@ -72,7 +72,6 @@ public class CubeStorageQuery implements ICachableStorageQuery { Set<TblColRef> groupsD = expandDerived(groups, derivedPostAggregation); Set<TblColRef> filterDimsD = expandDerived(filterDims, derivedPostAggregation); filterDimsD.removeAll(groupsD); - derivedPostAggregation.removeAll(groups); // identify cuboid Set<TblColRef> dimensionsD = new LinkedHashSet<TblColRef>(); http://git-wip-us.apache.org/repos/asf/kylin/blob/2ac2f56a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/RawScan.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/RawScan.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/RawScan.java index 9707a99..361b1dd 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/RawScan.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/RawScan.java @@ -18,27 +18,31 @@ package org.apache.kylin.storage.hbase.cube.v2; +import java.nio.ByteBuffer; import java.util.List; +import org.apache.kylin.common.util.BytesSerializer; import org.apache.kylin.common.util.BytesUtil; import org.apache.kylin.common.util.Pair; +import com.google.common.collect.Lists; + public class RawScan { public byte[] startKey; public byte[] endKey; public List<Pair<byte[], byte[]>> hbaseColumns;//only contain interested columns - public List<Pair<byte[], byte[]>> fuzzyKey; + public List<Pair<byte[], byte[]>> fuzzyKeys; public int hbaseCaching; public int hbaseMaxResultSize; public RawScan(byte[] startKey, byte[] endKey, List<Pair<byte[], byte[]>> hbaseColumns, // - List<Pair<byte[], byte[]>> fuzzyKey, int hbaseCaching, int hbaseMaxResultSize) { + List<Pair<byte[], byte[]>> fuzzyKeys, int hbaseCaching, int hbaseMaxResultSize) { this.startKey = startKey; this.endKey = endKey; this.hbaseColumns = hbaseColumns; - this.fuzzyKey = fuzzyKey; + this.fuzzyKeys = fuzzyKeys; this.hbaseCaching = hbaseCaching; this.hbaseMaxResultSize = hbaseMaxResultSize; } @@ -53,7 +57,7 @@ public class RawScan { public String getFuzzyKeyAsString() { StringBuilder buf = new StringBuilder(); - for (Pair<byte[], byte[]> fuzzyKey : this.fuzzyKey) { + for (Pair<byte[], byte[]> fuzzyKey : this.fuzzyKeys) { buf.append(BytesUtil.toHex(fuzzyKey.getFirst())); buf.append(" "); buf.append(BytesUtil.toHex(fuzzyKey.getSecond())); @@ -62,4 +66,48 @@ public class RawScan { return buf.toString(); } + public static final BytesSerializer<RawScan> serializer = new BytesSerializer<RawScan>() { + @Override + public void serialize(RawScan value, ByteBuffer out) { + BytesUtil.writeByteArray(value.startKey, out); + BytesUtil.writeByteArray(value.endKey, out); + BytesUtil.writeVInt(value.hbaseColumns.size(), out); + for (Pair<byte[], byte[]> hbaseColumn : value.hbaseColumns) { + BytesUtil.writeByteArray(hbaseColumn.getFirst(), out); + BytesUtil.writeByteArray(hbaseColumn.getSecond(), out); + } + BytesUtil.writeVInt(value.fuzzyKeys.size(), out); + for (Pair<byte[], byte[]> fuzzyKey : value.fuzzyKeys) { + BytesUtil.writeByteArray(fuzzyKey.getFirst(), out); + BytesUtil.writeByteArray(fuzzyKey.getSecond(), out); + } + BytesUtil.writeVInt(value.hbaseCaching, out); + BytesUtil.writeVInt(value.hbaseMaxResultSize, out); + } + + @Override + public RawScan deserialize(ByteBuffer in) { + byte[] sStartKey = BytesUtil.readByteArray(in); + byte[] sEndKey = BytesUtil.readByteArray(in); + int hbaseColumnsSize = BytesUtil.readVInt(in); + List<Pair<byte[], byte[]>> sHbaseCoumns = Lists.newArrayListWithCapacity(hbaseColumnsSize); + for (int i = 0; i < hbaseColumnsSize; i++) { + byte[] a = BytesUtil.readByteArray(in); + byte[] b = BytesUtil.readByteArray(in); + sHbaseCoumns.add(Pair.newPair(a, b)); + } + + int fuzzyKeysSize = BytesUtil.readVInt(in); + List<Pair<byte[], byte[]>> sFuzzyKeys = Lists.newArrayListWithCapacity(fuzzyKeysSize); + for (int i = 0; i < fuzzyKeysSize; i++) { + byte[] a = BytesUtil.readByteArray(in); + byte[] b = BytesUtil.readByteArray(in); + sFuzzyKeys.add(Pair.newPair(a, b)); + } + int sHBaseCaching = BytesUtil.readVInt(in); + int sHBaseMaxResultSize = BytesUtil.readVInt(in); + return new RawScan(sStartKey, sEndKey, sHbaseCoumns, sFuzzyKeys, sHBaseCaching, sHBaseMaxResultSize); + } + }; + }