KYLIN-608 support HLL at ii storage
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/c5d329fe Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/c5d329fe Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/c5d329fe Branch: refs/heads/inverted-index Commit: c5d329fe098a0d4886f3e73a6ce0e99a621c8e67 Parents: 0a96d74 Author: honma <ho...@ebay.com> Authored: Thu Feb 12 10:36:37 2015 +0800 Committer: honma <ho...@ebay.com> Committed: Thu Feb 12 10:36:37 2015 +0800 ---------------------------------------------------------------------- .../common/hll/HyperLogLogPlusCounter.java | 14 +- .../org/apache/kylin/common/util/BasicTest.java | 19 +- .../common/util/HyperLogLogCounterTest.java | 12 + .../invertedindex/index/RawTableRecord.java | 20 +- .../apache/kylin/invertedindex/index/Slice.java | 330 +++++++++---------- .../kylin/invertedindex/index/TableRecord.java | 12 +- .../measure/fixedlen/FixedHLLCodec.java | 14 +- .../measure/fixedlen/FixedLenMeasureCodec.java | 3 +- .../measure/fixedlen/FixedPointLongCodec.java | 6 +- .../endpoint/EndpointAggregators.java | 117 ++++--- .../endpoint/EndpointTupleIterator.java | 18 +- .../hbase/coprocessor/endpoint/IIEndpoint.java | 2 +- .../org/apache/kylin/storage/tuple/Tuple.java | 12 +- 13 files changed, 325 insertions(+), 254 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c5d329fe/common/src/main/java/org/apache/kylin/common/hll/HyperLogLogPlusCounter.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/kylin/common/hll/HyperLogLogPlusCounter.java b/common/src/main/java/org/apache/kylin/common/hll/HyperLogLogPlusCounter.java index c323b90..49a6756 100644 --- a/common/src/main/java/org/apache/kylin/common/hll/HyperLogLogPlusCounter.java +++ b/common/src/main/java/org/apache/kylin/common/hll/HyperLogLogPlusCounter.java @@ -28,10 +28,10 @@ import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; import org.apache.commons.compress.utils.IOUtils; +import org.apache.kylin.common.util.BytesUtil; import com.google.common.hash.HashFunction; import com.google.common.hash.Hashing; -import org.apache.kylin.common.util.BytesUtil; import com.ning.compress.lzf.LZFDecoder; import com.ning.compress.lzf.LZFEncoder; @@ -72,8 +72,12 @@ public class HyperLogLogPlusCounter implements Comparable<HyperLogLogPlusCounter } public void clear() { - for (int i = 0; i < m; i++) - registers[i] = 0; + byte zero = (byte) 0; + Arrays.fill(registers, zero); + } + + public void add(int value) { + add(hashFunc.hashInt(value).asLong()); } public void add(String value) { @@ -84,6 +88,10 @@ public class HyperLogLogPlusCounter implements Comparable<HyperLogLogPlusCounter add(hashFunc.hashBytes(value).asLong()); } + public void add(byte[] value, int offset, int length) { + add(hashFunc.hashBytes(value, offset, length).asLong()); + } + protected void add(long hash) { int bucketMask = m - 1; int bucket = (int) (hash & bucketMask); http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c5d329fe/common/src/test/java/org/apache/kylin/common/util/BasicTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/kylin/common/util/BasicTest.java b/common/src/test/java/org/apache/kylin/common/util/BasicTest.java index 5952c33..0a33f9f 100644 --- a/common/src/test/java/org/apache/kylin/common/util/BasicTest.java +++ b/common/src/test/java/org/apache/kylin/common/util/BasicTest.java @@ -22,17 +22,9 @@ import java.io.IOException; import java.nio.ByteBuffer; import org.apache.commons.configuration.ConfigurationException; -import org.apache.commons.configuration.PropertiesConfiguration; -import org.apache.commons.httpclient.DefaultHttpMethodRetryHandler; -import org.apache.commons.httpclient.HttpClient; -import org.apache.commons.httpclient.HttpException; -import org.apache.commons.httpclient.HttpStatus; -import org.apache.commons.httpclient.methods.GetMethod; -import org.apache.commons.httpclient.params.HttpMethodParams; import org.junit.Ignore; import org.junit.Test; -import org.slf4j.*; -import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Created by honma on 10/17/14. @@ -59,12 +51,19 @@ public class BasicTest { System.out.printf("b"); } + private enum MetricType { + Count, DimensionAsMetric, DistinctCount, Normal + } + @Test @Ignore("convenient trial tool for dev") public void test1() throws Exception { + String x = MetricType.DimensionAsMetric.toString(); + System.out.println(x); + MetricType y = MetricType.valueOf(x); + System.out.println(y == MetricType.DimensionAsMetric); } - @Test @Ignore("fix it later") public void test2() throws IOException, ConfigurationException { http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c5d329fe/common/src/test/java/org/apache/kylin/common/util/HyperLogLogCounterTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/kylin/common/util/HyperLogLogCounterTest.java b/common/src/test/java/org/apache/kylin/common/util/HyperLogLogCounterTest.java index 088219f..a7d275a 100644 --- a/common/src/test/java/org/apache/kylin/common/util/HyperLogLogCounterTest.java +++ b/common/src/test/java/org/apache/kylin/common/util/HyperLogLogCounterTest.java @@ -204,6 +204,18 @@ public class HyperLogLogCounterTest { System.out.println("Perf test result: " + duration / 1000 + " seconds"); } + @Test + public void testEquivalence() { + byte[] a = new byte[] { 0, 3, 4, 42, 2, 2 }; + byte[] b = new byte[] { 3, 4, 42 }; + HyperLogLogPlusCounter ha = new HyperLogLogPlusCounter(); + HyperLogLogPlusCounter hb = new HyperLogLogPlusCounter(); + ha.add(a, 1, 3); + hb.add(b); + + Assert.assertTrue(ha.getCountEstimate()==hb.getCountEstimate()); + } + private HyperLogLogPlusCounter newHLLC() { return new HyperLogLogPlusCounter(16); } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c5d329fe/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/RawTableRecord.java ---------------------------------------------------------------------- diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/RawTableRecord.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/RawTableRecord.java index 14ea62b..895fd4f 100644 --- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/RawTableRecord.java +++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/RawTableRecord.java @@ -49,42 +49,44 @@ public class RawTableRecord implements Cloneable { Arrays.fill(buf, Dictionary.NULL); } - protected boolean isMetric(int col) { + public boolean isMetric(int col) { return digest.isMetrics(col); } - protected FixedLenMeasureCodec<LongWritable> codec(int col) { + public FixedLenMeasureCodec<LongWritable> codec(int col) { return digest.codec(col); } - protected int length(int col) { + public int length(int col) { return digest.length(col); } - protected int getColumnCount() { + public int getColumnCount() { return digest.getColumnCount(); } - protected void setValueID(int col, int id) { + public void setValueID(int col, int id) { BytesUtil.writeUnsigned(id, buf, digest.offset(col), digest.length(col)); } - protected int getValueID(int col) { + public int getValueID(int col) { return BytesUtil.readUnsigned(buf, digest.offset(col), digest.length(col)); } - protected void setValueMetrics(int col, LongWritable value) { + public void setValueMetrics(int col, LongWritable value) { digest.codec(col).write(value, buf, digest.offset(col)); } - protected LongWritable getValueMetrics(int col) { - return digest.codec(col).read(buf, digest.offset(col)); + public String getValueMetric(int col) { + digest.codec(col).read(buf, digest.offset(col)); + return (String) digest.codec(col).getValue(); } public byte[] getBytes() { return buf; } + //TODO is it possible to avoid copying? public void setBytes(byte[] bytes, int offset, int length) { assert buf.length == length; System.arraycopy(bytes, offset, buf, 0, length); http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c5d329fe/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/Slice.java ---------------------------------------------------------------------- diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/Slice.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/Slice.java index fef9892..59dd9cd 100644 --- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/Slice.java +++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/Slice.java @@ -31,171 +31,169 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable; */ public class Slice implements Iterable<RawTableRecord>, Comparable<Slice> { - TableRecordInfoDigest info; - int nColumns; - - short shard; - long timestamp; - int nRecords; - ColumnValueContainer[] containers; - - public Slice(TableRecordInfoDigest digest, short shard, long timestamp, - ColumnValueContainer[] containers) { - this.info = digest; - this.nColumns = digest.getColumnCount(); - - this.shard = shard; - this.timestamp = timestamp; - this.nRecords = containers[0].getSize(); - this.containers = containers; - - assert nColumns == containers.length; - for (int i = 0; i < nColumns; i++) { - assert nRecords == containers[i].getSize(); - } - } - - public int getRecordCount() { - return this.nRecords; - } - - public short getShard() { - return shard; - } - - public long getTimestamp() { - return timestamp; - } - - public ColumnValueContainer[] getColumnValueContainers() { - return containers; - } - - public ColumnValueContainer getColumnValueContainer(int col) { - return containers[col]; - } - - public Iterator<RawTableRecord> iterateWithBitmap( - final ConciseSet resultBitMap) { - if (resultBitMap == null) { - return this.iterator(); - } else { - return new Iterator<RawTableRecord>() { - int i = 0; - int iteratedCount = 0; - int resultSize = resultBitMap.size(); - - RawTableRecord rec = info.createTableRecordBytes(); - ImmutableBytesWritable temp = new ImmutableBytesWritable(); - - @Override - public boolean hasNext() { - return iteratedCount < resultSize; - } - - @Override - public RawTableRecord next() { - while (!resultBitMap.contains(i)) { - i++; - } - for (int col = 0; col < nColumns; col++) { - containers[col].getValueAt(i, temp); - rec.setValueBytes(col, temp); - } - iteratedCount++; - i++; - - return rec; - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - - }; - } - } - - @Override - public Iterator<RawTableRecord> iterator() { - return new Iterator<RawTableRecord>() { - int i = 0; - RawTableRecord rec = info.createTableRecordBytes(); - ImmutableBytesWritable temp = new ImmutableBytesWritable(); - - @Override - public boolean hasNext() { - return i < nRecords; - } - - @Override - public RawTableRecord next() { - for (int col = 0; col < nColumns; col++) { - containers[col].getValueAt(i, temp); - rec.setValueBytes(col, temp); - } - i++; - return rec; - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - - }; - } - - /* - * (non-Javadoc) - * - * @see java.lang.Object#hashCode() - */ - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + ((info == null) ? 0 : info.hashCode()); - result = prime * result + shard; - result = prime * result + (int) (timestamp ^ (timestamp >>> 32)); - return result; - } - - /* - * (non-Javadoc) - * - * @see java.lang.Object#equals(java.lang.Object) - */ - @Override - public boolean equals(Object obj) { - if (this == obj) - return true; - if (obj == null) - return false; - if (getClass() != obj.getClass()) - return false; - Slice other = (Slice) obj; - if (info == null) { - if (other.info != null) - return false; - } else if (!info.equals(other.info)) - return false; - if (shard != other.shard) - return false; - if (timestamp != other.timestamp) - return false; - return true; - } - - @Override - public int compareTo(Slice o) { - int comp = this.shard - o.shard; - if (comp != 0) - return comp; - - comp = (int) (this.timestamp - o.timestamp); - return comp; - } + TableRecordInfoDigest info; + int nColumns; + + short shard; + long timestamp; + int nRecords; + ColumnValueContainer[] containers; + + public Slice(TableRecordInfoDigest digest, short shard, long timestamp, ColumnValueContainer[] containers) { + this.info = digest; + this.nColumns = digest.getColumnCount(); + + this.shard = shard; + this.timestamp = timestamp; + this.nRecords = containers[0].getSize(); + this.containers = containers; + + assert nColumns == containers.length; + for (int i = 0; i < nColumns; i++) { + assert nRecords == containers[i].getSize(); + } + } + + public int getRecordCount() { + return this.nRecords; + } + + public short getShard() { + return shard; + } + + public long getTimestamp() { + return timestamp; + } + + public ColumnValueContainer[] getColumnValueContainers() { + return containers; + } + + public ColumnValueContainer getColumnValueContainer(int col) { + return containers[col]; + } + + public Iterator<RawTableRecord> iterateWithBitmap(final ConciseSet resultBitMap) { + if (resultBitMap == null) { + return this.iterator(); + } else { + final RawTableRecord rec = info.createTableRecordBytes(); + final ImmutableBytesWritable temp = new ImmutableBytesWritable(); + + return new Iterator<RawTableRecord>() { + int i = 0; + int iteratedCount = 0; + int resultSize = resultBitMap.size(); + + @Override + public boolean hasNext() { + return iteratedCount < resultSize; + } + + @Override + public RawTableRecord next() { + while (!resultBitMap.contains(i)) { + i++; + } + for (int col = 0; col < nColumns; col++) { + containers[col].getValueAt(i, temp); + rec.setValueBytes(col, temp); + } + iteratedCount++; + i++; + + return rec; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + + }; + } + } + + @Override + public Iterator<RawTableRecord> iterator() { + return new Iterator<RawTableRecord>() { + int i = 0; + RawTableRecord rec = info.createTableRecordBytes(); + ImmutableBytesWritable temp = new ImmutableBytesWritable(); + + @Override + public boolean hasNext() { + return i < nRecords; + } + + @Override + public RawTableRecord next() { + for (int col = 0; col < nColumns; col++) { + containers[col].getValueAt(i, temp); + rec.setValueBytes(col, temp); + } + i++; + return rec; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + + }; + } + + /* + * (non-Javadoc) + * + * @see java.lang.Object#hashCode() + */ + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((info == null) ? 0 : info.hashCode()); + result = prime * result + shard; + result = prime * result + (int) (timestamp ^ (timestamp >>> 32)); + return result; + } + + /* + * (non-Javadoc) + * + * @see java.lang.Object#equals(java.lang.Object) + */ + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + Slice other = (Slice) obj; + if (info == null) { + if (other.info != null) + return false; + } else if (!info.equals(other.info)) + return false; + if (shard != other.shard) + return false; + if (timestamp != other.timestamp) + return false; + return true; + } + + @Override + public int compareTo(Slice o) { + int comp = this.shard - o.shard; + if (comp != 0) + return comp; + + comp = (int) (this.timestamp - o.timestamp); + return comp; + } } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c5d329fe/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecord.java ---------------------------------------------------------------------- diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecord.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecord.java index 1abbe18..3b8d969 100644 --- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecord.java +++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecord.java @@ -73,7 +73,7 @@ public class TableRecord implements Cloneable { return rawRecord.length(col); } - public List<String> getValueList() { + public List<String> getOriginTableColumnValues() { List<String> ret = Lists.newArrayList(); for (int i = 0; i < info.nColumns; ++i) { ret.add(getValueString(i)); @@ -91,9 +91,13 @@ public class TableRecord implements Cloneable { } } + /** + * get value of columns which belongs to the original table columns. + * i.e. columns like min_xx, max_yy will never appear + */ public String getValueString(int col) { if (rawRecord.isMetric(col)) - return rawRecord.codec(col).toString(getValueMetrics(col)); + return getValueMetric(col); else return info.dict(col).getValueFromId(rawRecord.getValueID(col)); } @@ -106,8 +110,8 @@ public class TableRecord implements Cloneable { rawRecord.setValueMetrics(col, value); } - private LongWritable getValueMetrics(int col) { - return rawRecord.getValueMetrics(col); + private String getValueMetric(int col) { + return rawRecord.getValueMetric(col); } public short getShard() { http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c5d329fe/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedHLLCodec.java ---------------------------------------------------------------------- diff --git a/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedHLLCodec.java b/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedHLLCodec.java index 9f6a1ba..d787cbc 100644 --- a/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedHLLCodec.java +++ b/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedHLLCodec.java @@ -1,11 +1,10 @@ package org.apache.kylin.metadata.measure.fixedlen; +import java.nio.ByteBuffer; + import org.apache.kylin.common.hll.HyperLogLogPlusCounter; -import org.apache.kylin.metadata.measure.HLLCSerializer; import org.apache.kylin.metadata.model.DataType; -import java.util.Map; - /** * Created by Hongbin Ma(Binmahone) on 2/10/15. */ @@ -42,17 +41,18 @@ public class FixedHLLCodec extends FixedLenMeasureCodec<HyperLogLogPlusCounter> } @Override - public String toString(HyperLogLogPlusCounter value) { - return String.valueOf(value.getCountEstimate()); + public Object getValue() { + return current; } @Override public HyperLogLogPlusCounter read(byte[] buf, int offset) { - return serializer.deserialize(); + current.readRegisters(ByteBuffer.wrap(buf, offset, buf.length - offset)); + return current; } @Override public void write(HyperLogLogPlusCounter v, byte[] buf, int offset) { - + current.writeRegisters(ByteBuffer.wrap(buf, offset, buf.length - offset)); } } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c5d329fe/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedLenMeasureCodec.java ---------------------------------------------------------------------- diff --git a/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedLenMeasureCodec.java b/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedLenMeasureCodec.java index 41a6356..650432a 100644 --- a/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedLenMeasureCodec.java +++ b/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedLenMeasureCodec.java @@ -33,7 +33,8 @@ abstract public class FixedLenMeasureCodec<T> { abstract public T valueOf(String value); - abstract public String toString(T value); + + abstract public Object getValue(); abstract public T read(byte[] buf, int offset); http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c5d329fe/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedPointLongCodec.java ---------------------------------------------------------------------- diff --git a/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedPointLongCodec.java b/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedPointLongCodec.java index f27e446..9ccb479 100644 --- a/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedPointLongCodec.java +++ b/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedPointLongCodec.java @@ -61,11 +61,11 @@ public class FixedPointLongCodec extends FixedLenMeasureCodec<LongWritable> { } @Override - public String toString(LongWritable value) { + public String getValue() { if (scale == 0) - return value.toString(); + return current.toString(); else - return "" + (new BigDecimal(value.get()).divide(scalePower)); + return "" + (new BigDecimal(current.get()).divide(scalePower)); } @Override http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c5d329fe/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregators.java ---------------------------------------------------------------------- diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregators.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregators.java index b199862..c6d8c49 100644 --- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregators.java +++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregators.java @@ -18,23 +18,24 @@ package org.apache.kylin.storage.hbase.coprocessor.endpoint; -import com.google.common.collect.Lists; +import java.nio.ByteBuffer; +import java.util.List; -import com.yammer.metrics.core.Metric; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.kylin.common.hll.HyperLogLogPlusCounter; import org.apache.kylin.common.util.BytesSerializer; import org.apache.kylin.common.util.BytesUtil; +import org.apache.kylin.invertedindex.index.RawTableRecord; import org.apache.kylin.invertedindex.index.TableRecordInfo; import org.apache.kylin.invertedindex.index.TableRecordInfoDigest; import org.apache.kylin.metadata.measure.MeasureAggregator; import org.apache.kylin.metadata.measure.fixedlen.FixedLenMeasureCodec; import org.apache.kylin.metadata.model.DataType; import org.apache.kylin.metadata.model.FunctionDesc; -import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.storage.hbase.coprocessor.CoprocessorConstants; -import org.apache.hadoop.io.LongWritable; -import java.nio.ByteBuffer; -import java.util.List; +import com.google.common.collect.Lists; /** * @author honma @@ -49,6 +50,13 @@ public class EndpointAggregators { private static class MetricInfo { private MetricType type; private int refIndex = -1; + private int presision = -1; + + public MetricInfo(MetricType type, int refIndex, int presision) { + this.type = type; + this.refIndex = refIndex; + this.presision = presision; + } public MetricInfo(MetricType type, int refIndex) { this.type = type; @@ -58,6 +66,7 @@ public class EndpointAggregators { public MetricInfo(MetricType type) { this.type = type; } + } public static EndpointAggregators fromFunctions(TableRecordInfo tableInfo, List<FunctionDesc> metrics) { @@ -83,7 +92,7 @@ public class EndpointAggregators { } if (functionDesc.isCountDistinct()) { - metricInfos[i] = new MetricInfo(MetricType.DistinctCount, index); + metricInfos[i] = new MetricInfo(MetricType.DistinctCount, index, functionDesc.getReturnDataType().getPrecision()); } else { metricInfos[i] = new MetricInfo(MetricType.Normal, index); } @@ -96,8 +105,11 @@ public class EndpointAggregators { final String[] funcNames; final String[] dataTypes; final MetricInfo[] metricInfos; - final TableRecordInfoDigest tableRecordInfo; + final transient TableRecordInfoDigest tableRecordInfoDigest; + final transient RawTableRecord rawTableRecord; + final transient ImmutableBytesWritable byteBuffer; + final transient HyperLogLogPlusCounter[] hllcs; final transient FixedLenMeasureCodec[] measureSerializers; final transient Object[] metricValues; @@ -107,8 +119,11 @@ public class EndpointAggregators { this.funcNames = funcNames; this.dataTypes = dataTypes; this.metricInfos = metricInfos; - this.tableRecordInfo = tableInfo; + this.tableRecordInfoDigest = tableInfo; + this.rawTableRecord = tableInfo.createTableRecordBytes(); + this.byteBuffer = new ImmutableBytesWritable(); + this.hllcs = new HyperLogLogPlusCounter[this.metricInfos.length]; this.metricValues = new Object[funcNames.length]; this.measureSerializers = new FixedLenMeasureCodec[funcNames.length]; for (int i = 0; i < this.measureSerializers.length; ++i) { @@ -116,8 +131,8 @@ public class EndpointAggregators { } } - public TableRecordInfoDigest getTableRecordInfo() { - return tableRecordInfo; + public TableRecordInfoDigest getTableRecordInfoDigest() { + return tableRecordInfoDigest; } public boolean isEmpty() { @@ -133,35 +148,41 @@ public class EndpointAggregators { return aggrs; } + /** + * this method is heavily called at coprocessor side, + * Make sure as little object creation as possible + */ public void aggregate(MeasureAggregator[] measureAggrs, byte[] row) { - int rawIndex = 0; - int columnCount = tableRecordInfo.getColumnCount(); - - for (int columnIndex = 0; columnIndex < columnCount; ++columnIndex) { - for (int metricIndex = 0; metricIndex < metricInfos.length; ++metricIndex) { - if (metricInfos[metricIndex].refIndex == columnIndex) { - if (metricInfos[metricIndex].type == MetricType.Normal) { - //normal column values to aggregate - measureAggrs[metricIndex].aggregate(measureSerializers[metricIndex].read(row, rawIndex)); - } else if (metricInfos[metricIndex].type == MetricType.DistinctCount) { - if (tableRecordInfo.isMetrics(columnCount)) { - measureAggrs[metricIndex].aggregate(measureSerializers[metricIndex].read(row, rawIndex)); - } else { - //TODO: for unified dictionary, this is okay. but if different data blocks uses different dictionary, we'll have to aggregate original data - measureAggrs[metricIndex].aggregate(tableRecordInfo.); - } - } + + rawTableRecord.setBytes(row, 0, row.length); + + for (int metricIndex = 0; metricIndex < metricInfos.length; ++metricIndex) { + + MetricInfo metricInfo = metricInfos[metricIndex]; + MeasureAggregator aggregator = measureAggrs[metricIndex]; + FixedLenMeasureCodec measureSerializer = measureSerializers[metricIndex]; + + //get the raw bytes + rawTableRecord.getValueBytes(metricInfo.refIndex, byteBuffer); + + if (metricInfo.type == MetricType.Normal) { + aggregator.aggregate(measureSerializer.read(byteBuffer.get(), byteBuffer.getOffset())); + } else if (metricInfo.type == MetricType.DistinctCount) { + //TODO: for unified dictionary, this is okay. but if different data blocks uses different dictionary, we'll have to aggregate original data + HyperLogLogPlusCounter hllc = hllcs[metricIndex]; + if (hllc == null) { + hllc = new HyperLogLogPlusCounter(metricInfo.presision); } + hllc.clear(); + hllc.add(byteBuffer.get(), byteBuffer.getOffset(), byteBuffer.getLength()); + aggregator.aggregate(hllc); } - rawIndex += tableRecordInfo.length(columnIndex); } //aggregate for "count" for (int i = 0; i < metricInfos.length; ++i) { if (metricInfos[i].type == MetricType.Count) { measureAggrs[i].aggregate(ONE); - } else if (metricInfos[i].type == MetricType.DistinctCount) { - } } } @@ -184,11 +205,12 @@ public class EndpointAggregators { return metricBytesOffset; } - public List<String> deserializeMetricValues(byte[] metricBytes, int offset) { - List<String> ret = Lists.newArrayList(); + public List<Object> deserializeMetricValues(byte[] metricBytes, int offset) { + List<Object> ret = Lists.newArrayList(); int metricBytesOffset = offset; for (int i = 0; i < measureSerializers.length; i++) { - String valueString = measureSerializers[i].toString(measureSerializers[i].read(metricBytes, metricBytesOffset)); + measureSerializers[i].read(metricBytes, metricBytesOffset); + Object valueString = measureSerializers[i].getValue(); metricBytesOffset += measureSerializers[i].getLength(); ret.add(valueString); } @@ -215,18 +237,37 @@ public class EndpointAggregators { public void serialize(EndpointAggregators value, ByteBuffer out) { BytesUtil.writeAsciiStringArray(value.funcNames, out); BytesUtil.writeAsciiStringArray(value.dataTypes, out); - BytesUtil.writeIntArray(value.metricInfos, out); - BytesUtil.writeByteArray(TableRecordInfoDigest.serialize(value.tableRecordInfo), out); + + BytesUtil.writeVInt(value.metricInfos.length, out); + for (int i = 0; i < value.metricInfos.length; ++i) { + MetricInfo metricInfo = value.metricInfos[i]; + BytesUtil.writeAsciiString(metricInfo.type.toString(), out); + BytesUtil.writeVInt(metricInfo.refIndex, out); + BytesUtil.writeVInt(metricInfo.presision, out); + } + + BytesUtil.writeByteArray(TableRecordInfoDigest.serialize(value.tableRecordInfoDigest), out); } @Override public EndpointAggregators deserialize(ByteBuffer in) { + String[] funcNames = BytesUtil.readAsciiStringArray(in); String[] dataTypes = BytesUtil.readAsciiStringArray(in); - int[] refColIndex = BytesUtil.readIntArray(in); + + int metricInfoLength = BytesUtil.readVInt(in); + MetricInfo[] infos = new MetricInfo[metricInfoLength]; + for (int i = 0; i < infos.length; ++i) { + MetricType type = MetricType.valueOf(BytesUtil.readAsciiString(in)); + int refIndex = BytesUtil.readVInt(in); + int presision = BytesUtil.readVInt(in); + infos[i] = new MetricInfo(type, refIndex, presision); + } + byte[] temp = BytesUtil.readByteArray(in); TableRecordInfoDigest tableInfo = TableRecordInfoDigest.deserialize(temp); - return new EndpointAggregators(funcNames, dataTypes, refColIndex, tableInfo); + + return new EndpointAggregators(funcNames, dataTypes, infos, tableInfo); } } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c5d329fe/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointTupleIterator.java ---------------------------------------------------------------------- diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointTupleIterator.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointTupleIterator.java index d63bc0d..465f7f3 100644 --- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointTupleIterator.java +++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointTupleIterator.java @@ -266,7 +266,7 @@ public class EndpointTupleIterator implements ITupleIterator { //not thread safe! private TableRecord tableRecord; - private List<String> measureValues; + private List<Object> measureValues; private Tuple tuple; public SingleRegionTupleIterator(List<IIProtos.IIResponse.IIRow> rows) { @@ -305,26 +305,32 @@ public class EndpointTupleIterator implements ITupleIterator { } - private ITuple makeTuple(TableRecord tableRecord, List<String> measureValues) { + private ITuple makeTuple(TableRecord tableRecord, List<Object> measureValues) { // groups - List<String> columnValues = tableRecord.getValueList(); + List<String> columnValues = tableRecord.getOriginTableColumnValues(); for (int i = 0; i < columnNames.size(); i++) { TblColRef column = columns.get(i); if (!tuple.hasColumn(column)) { continue; } - tuple.setValue(columnNames.get(i), columnValues.get(i)); + tuple.setDimensionValue(columnNames.get(i), columnValues.get(i)); } if (measureValues != null) { for (int i = 0; i < measures.size(); ++i) { if (!measures.get(i).isAppliedOnDimension()) { - tuple.setValue(measures.get(i).getRewriteFieldName(), measureValues.get(i)); + String fieldName = measures.get(i).getRewriteFieldName(); + Object value = measureValues.get(i); + String dataType = tuple.getDataType(fieldName); + //TODO: currently in II all metrics except HLLC is returned as String + if (dataType.toLowerCase().equalsIgnoreCase("hllc")) { + value = Tuple.convertOptiqCellValue((String) value, dataType); + } + tuple.setMeasureValue(fieldName, value); } } } return tuple; } - } } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c5d329fe/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/IIEndpoint.java ---------------------------------------------------------------------- diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/IIEndpoint.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/IIEndpoint.java index db68803..4852e3b 100644 --- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/IIEndpoint.java +++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/IIEndpoint.java @@ -85,7 +85,7 @@ public class IIEndpoint extends IIProtos.RowsService implements Coprocessor, Cop aggregators = EndpointAggregators.deserialize(request.getAggregator().toByteArray()); filter = CoprocessorFilter.deserialize(request.getFilter().toByteArray()); - TableRecordInfoDigest tableRecordInfoDigest = aggregators.getTableRecordInfo(); + TableRecordInfoDigest tableRecordInfoDigest = aggregators.getTableRecordInfoDigest(); IIProtos.IIResponse response = null; RegionScanner innerScanner = null; http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c5d329fe/storage/src/main/java/org/apache/kylin/storage/tuple/Tuple.java ---------------------------------------------------------------------- diff --git a/storage/src/main/java/org/apache/kylin/storage/tuple/Tuple.java b/storage/src/main/java/org/apache/kylin/storage/tuple/Tuple.java index 7b5fe1f..dd19e0c 100644 --- a/storage/src/main/java/org/apache/kylin/storage/tuple/Tuple.java +++ b/storage/src/main/java/org/apache/kylin/storage/tuple/Tuple.java @@ -78,18 +78,17 @@ public class Tuple implements ITuple { return values[index]; } + public String getDataType(String fieldName) { + return info.getDataType(fieldName); + } + private void setFieldObjectValue(String fieldName, Object fieldValue) { int index = info.getFieldIndex(fieldName); values[index] = fieldValue; } - public void setValue(String fieldName, String fieldValue) { - this.setDimensionValue(fieldName, fieldValue); - } - public void setDimensionValue(String fieldName, String fieldValue) { - String dataType = info.getDataType(fieldName); - Object objectValue = convertOptiqCellValue(fieldValue, dataType); + Object objectValue = convertOptiqCellValue(fieldValue, getDataType(fieldName)); setFieldObjectValue(fieldName, objectValue); } @@ -121,6 +120,7 @@ public class Tuple implements ITuple { return sb.toString(); } + public static Object convertOptiqCellValue(String strValue, String dataType) { if (strValue == null) return null;