streaming cubing
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/a6a9d940 Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/a6a9d940 Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/a6a9d940 Branch: refs/heads/0.8.0 Commit: a6a9d940ca220f9c94d4c4ff6cc3a2cccb617c0b Parents: cf25daa Author: qianhao.zhou <qianz...@ebay.com> Authored: Fri May 22 17:13:25 2015 +0800 Committer: honma <ho...@ebay.com> Committed: Fri May 29 15:44:25 2015 +0800 ---------------------------------------------------------------------- .../common/persistence/HBaseResourceStore.java | 35 +- .../org/apache/kylin/common/util/TimeUtil.java | 13 +- .../apache/kylin/common/util/TimeUtilTest.java | 25 +- .../job/hadoop/cubev2/IGTRecordWriter.java | 11 - .../job/hadoop/cubev2/InMemCubeBuilder.java | 567 ------------------- .../job/hadoop/cubev2/InMemCuboidMapper.java | 1 + .../hadoop/cubev2/MapContextGTRecordWriter.java | 1 + .../kylin/job/streaming/CubeStreamBuilder.java | 403 +++++++++++++ .../kylin/job/streaming/StreamingBootstrap.java | 96 +++- .../kylin/job/BuildCubeWithStreamTest.java | 4 +- .../job/ITKafkaBasedIIStreamBuilderTest.java | 32 +- .../apache/kylin/job/InMemCubeBuilderTest.java | 4 +- .../cubev2/InMemCubeBuilderBenchmarkTest.java | 119 ---- .../job/streaming/CubeStreamBuilderTest.java | 76 +++ .../kylin/storage/StorageEngineFactory.java | 3 +- .../org/apache/kylin/streaming/KafkaConfig.java | 12 +- .../kylin/streaming/SEOJsonStreamParser.java | 100 ++++ .../apache/kylin/streaming/StreamBuilder.java | 20 +- .../kylin/streaming/cube/IGTRecordWriter.java | 11 + .../kylin/streaming/cube/InMemCubeBuilder.java | 567 +++++++++++++++++++ .../invertedindex/IIStreamBuilder.java | 15 +- .../cube/InMemCubeBuilderBenchmarkTest.java | 117 ++++ .../invertedindex/PrintOutStreamBuilder.java | 70 --- 23 files changed, 1437 insertions(+), 865 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a6a9d940/common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java b/common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java index 2868368..e665298 100644 --- a/common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java +++ b/common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java @@ -18,34 +18,27 @@ package org.apache.kylin.common.persistence; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.LinkedHashMap; -import java.util.Map; - import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HTableInterface; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.filter.KeyOnlyFilter; -import org.apache.kylin.common.util.Bytes; - import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.Bytes; import org.apache.kylin.common.util.BytesUtil; import org.apache.kylin.common.util.HadoopUtil; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.Map; + public class HBaseResourceStore extends ResourceStore { private static final String DEFAULT_TABLE_NAME = "kylin_metadata"; @@ -197,8 +190,10 @@ public class HBaseResourceStore extends ResourceStore { Put put = buildPut(resPath, newTS, row, content, table); boolean ok = table.checkAndPut(row, B_FAMILY, B_COLUMN_TS, bOldTS, put); - if (!ok) - throw new IllegalStateException("Overwriting conflict " + resPath + ", expect old TS " + oldTS + ", but it is " + getResourceTimestamp(resPath)); + if (!ok) { + long real = getResourceTimestamp(resPath); + throw new IllegalStateException("Overwriting conflict " + resPath + ", expect old TS " + oldTS + ", but it is " + real); + } table.flushCommits(); http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a6a9d940/common/src/main/java/org/apache/kylin/common/util/TimeUtil.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/kylin/common/util/TimeUtil.java b/common/src/main/java/org/apache/kylin/common/util/TimeUtil.java index 26e3e06..0aa58e4 100644 --- a/common/src/main/java/org/apache/kylin/common/util/TimeUtil.java +++ b/common/src/main/java/org/apache/kylin/common/util/TimeUtil.java @@ -3,8 +3,13 @@ package org.apache.kylin.common.util; /** */ public class TimeUtil { - private static int ONE_MINUTE_TS = 60 * 1000; - private static int ONE_HOUR_TS = 60 * 60 * 1000; + public enum NormalizedTimeUnit { + MINUTE, HOUR, DAY + } + + private static long ONE_MINUTE_TS = 60 * 1000; + private static long ONE_HOUR_TS = 60 * ONE_MINUTE_TS; + private static long ONE_DAY_TS = 24 * ONE_HOUR_TS; public static long getMinuteStart(long ts) { return ts / ONE_MINUTE_TS * ONE_MINUTE_TS; @@ -13,4 +18,8 @@ public class TimeUtil { public static long getHourStart(long ts) { return ts / ONE_HOUR_TS * ONE_HOUR_TS; } + + public static long getDayStart(long ts) { + return ts / ONE_DAY_TS * ONE_DAY_TS; + } } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a6a9d940/common/src/test/java/org/apache/kylin/common/util/TimeUtilTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/kylin/common/util/TimeUtilTest.java b/common/src/test/java/org/apache/kylin/common/util/TimeUtilTest.java index 90a0c40..cfa11d8 100644 --- a/common/src/test/java/org/apache/kylin/common/util/TimeUtilTest.java +++ b/common/src/test/java/org/apache/kylin/common/util/TimeUtilTest.java @@ -1,24 +1,25 @@ package org.apache.kylin.common.util; +import org.apache.kylin.common.util.TimeUtil.NormalizedTimeUnit; +import org.junit.Assert; +import org.junit.Test; + import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Calendar; -import org.junit.Assert; -import org.junit.Test; - /** */ public class TimeUtilTest { - public static long normalizeTime(long timeMillis, NormalizeUnit unit) { + public static long normalizeTime(long timeMillis, NormalizedTimeUnit unit) { Calendar a = Calendar.getInstance(); Calendar b = Calendar.getInstance(); b.clear(); a.setTimeInMillis(timeMillis); - if (unit == NormalizeUnit.MINUTE) { + if (unit == NormalizedTimeUnit.MINUTE) { b.set(a.get(Calendar.YEAR), a.get(Calendar.MONTH), a.get(Calendar.DAY_OF_MONTH), a.get(Calendar.HOUR_OF_DAY), a.get(Calendar.MINUTE)); - } else if (unit == NormalizeUnit.HOUR) { + } else if (unit == NormalizedTimeUnit.HOUR) { b.set(a.get(Calendar.YEAR), a.get(Calendar.MONTH), a.get(Calendar.DAY_OF_MONTH), a.get(Calendar.HOUR_OF_DAY), 0); } return b.getTimeInMillis(); @@ -29,15 +30,13 @@ public class TimeUtilTest { java.text.DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss"); long t1 = dateFormat.parse("2012/01/01 00:00:01").getTime(); - Assert.assertEquals(normalizeTime(t1, NormalizeUnit.HOUR), TimeUtil.getHourStart(t1)); - Assert.assertEquals(normalizeTime(t1, NormalizeUnit.MINUTE), TimeUtil.getMinuteStart(t1)); + Assert.assertEquals(normalizeTime(t1, NormalizedTimeUnit.HOUR), TimeUtil.getHourStart(t1)); + Assert.assertEquals(normalizeTime(t1, NormalizedTimeUnit.MINUTE), TimeUtil.getMinuteStart(t1)); long t2 = dateFormat.parse("2012/12/31 11:02:01").getTime(); - Assert.assertEquals(normalizeTime(t2, NormalizeUnit.HOUR), TimeUtil.getHourStart(t2)); - Assert.assertEquals(normalizeTime(t2, NormalizeUnit.MINUTE), TimeUtil.getMinuteStart(t2)); + Assert.assertEquals(normalizeTime(t2, NormalizedTimeUnit.HOUR), TimeUtil.getHourStart(t2)); + Assert.assertEquals(normalizeTime(t2, NormalizedTimeUnit.MINUTE), TimeUtil.getMinuteStart(t2)); } - public enum NormalizeUnit { - MINUTE, HOUR - } + } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a6a9d940/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/IGTRecordWriter.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/IGTRecordWriter.java b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/IGTRecordWriter.java deleted file mode 100644 index cccc995..0000000 --- a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/IGTRecordWriter.java +++ /dev/null @@ -1,11 +0,0 @@ -package org.apache.kylin.job.hadoop.cubev2; - -import org.apache.kylin.storage.gridtable.GTRecord; - -import java.io.IOException; - -/** - */ -public interface IGTRecordWriter { - void write(Long cuboidId, GTRecord record) throws IOException; -} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a6a9d940/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCubeBuilder.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCubeBuilder.java b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCubeBuilder.java deleted file mode 100644 index 56989a6..0000000 --- a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCubeBuilder.java +++ /dev/null @@ -1,567 +0,0 @@ -/* - * - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * - * contributor license agreements. See the NOTICE file distributed with - * - * this work for additional information regarding copyright ownership. - * - * The ASF licenses this file to You under the Apache License, Version 2.0 - * - * (the "License"); you may not use this file except in compliance with - * - * the License. You may obtain a copy of the License at - * - * - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * - * - * Unless required by applicable law or agreed to in writing, software - * - * distributed under the License is distributed on an "AS IS" BASIS, - * - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * - * See the License for the specific language governing permissions and - * - * limitations under the License. - * - * / - */ -package org.apache.kylin.job.hadoop.cubev2; - -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.util.StringUtils; -import org.apache.kylin.common.hll.HyperLogLogPlusCounter; -import org.apache.kylin.common.util.ByteArray; -import org.apache.kylin.common.util.Bytes; -import org.apache.kylin.common.util.BytesUtil; -import org.apache.kylin.common.util.Pair; -import org.apache.kylin.cube.CubeInstance; -import org.apache.kylin.cube.cuboid.Cuboid; -import org.apache.kylin.cube.cuboid.CuboidScheduler; -import org.apache.kylin.cube.model.CubeDesc; -import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc; -import org.apache.kylin.cube.model.HBaseColumnDesc; -import org.apache.kylin.cube.model.HBaseColumnFamilyDesc; -import org.apache.kylin.dict.Dictionary; -import org.apache.kylin.metadata.measure.MeasureCodec; -import org.apache.kylin.metadata.model.FunctionDesc; -import org.apache.kylin.metadata.model.MeasureDesc; -import org.apache.kylin.metadata.model.TblColRef; -import org.apache.kylin.storage.cube.CubeGridTable; -import org.apache.kylin.storage.gridtable.*; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.*; -import java.util.concurrent.*; - -/** - */ -public class InMemCubeBuilder implements Runnable { - - private static Logger logger = LoggerFactory.getLogger(InMemCubeBuilder.class); - private static final int DEFAULT_TIMEOUT = 25; - - private BlockingQueue<List<String>> queue; - private CubeDesc desc = null; - private long baseCuboidId; - private CuboidScheduler cuboidScheduler = null; - private Map<TblColRef, Dictionary<?>> dictionaryMap = null; - private CubeJoinedFlatTableDesc intermediateTableDesc; - private MeasureCodec measureCodec; - private String[] metricsAggrFuncs = null; - private Map<Integer, Integer> dependentMeasures = null; // key: index of Measure which depends on another measure; value: index of Measure which is depended on; - public static final LongWritable ONE = new LongWritable(1l); - private int[] hbaseMeasureRefIndex; - private MeasureDesc[] measureDescs; - private int measureCount; - - protected IGTRecordWriter gtRecordWriter; - - - /** - * @param queue - * @param cube - * @param dictionaryMap - * @param gtRecordWriter - */ - public InMemCubeBuilder(BlockingQueue<List<String>> queue, CubeInstance cube, Map<TblColRef, Dictionary<?>> dictionaryMap, IGTRecordWriter gtRecordWriter) { - if (dictionaryMap == null || dictionaryMap.isEmpty()) { - throw new IllegalArgumentException(); - } - this.queue = queue; - this.desc = cube.getDescriptor(); - this.cuboidScheduler = new CuboidScheduler(desc); - this.dictionaryMap = dictionaryMap; - this.gtRecordWriter = gtRecordWriter; - this.baseCuboidId = Cuboid.getBaseCuboidId(desc); - this.intermediateTableDesc = new CubeJoinedFlatTableDesc(desc, null); - this.measureCodec = new MeasureCodec(desc.getMeasures()); - - Map<String, Integer> measureIndexMap = Maps.newHashMap(); - List<String> metricsAggrFuncsList = Lists.newArrayList(); - measureCount = desc.getMeasures().size(); - - List<MeasureDesc> measureDescsList = Lists.newArrayList(); - hbaseMeasureRefIndex = new int[measureCount]; - int measureRef = 0; - for (HBaseColumnFamilyDesc familyDesc : desc.getHbaseMapping().getColumnFamily()) { - for (HBaseColumnDesc hbaseColDesc : familyDesc.getColumns()) { - for (MeasureDesc measure : hbaseColDesc.getMeasures()) { - for (int j = 0; j < measureCount; j++) { - if (desc.getMeasures().get(j).equals(measure)) { - measureDescsList.add(measure); - hbaseMeasureRefIndex[measureRef] = j; - break; - } - } - measureRef++; - } - } - } - - for (int i = 0; i < measureCount; i++) { - MeasureDesc measureDesc = measureDescsList.get(i); - metricsAggrFuncsList.add(measureDesc.getFunction().getExpression()); - measureIndexMap.put(measureDesc.getName(), i); - } - this.metricsAggrFuncs = metricsAggrFuncsList.toArray(new String[metricsAggrFuncsList.size()]); - - this.dependentMeasures = Maps.newHashMap(); - for (int i = 0; i < measureCount; i++) { - String depMsrRef = measureDescsList.get(i).getDependentMeasureRef(); - if (depMsrRef != null) { - int index = measureIndexMap.get(depMsrRef); - dependentMeasures.put(i, index); - } - } - - this.measureDescs = desc.getMeasures().toArray(new MeasureDesc[measureCount]); - } - - - private GridTable newGridTableByCuboidID(long cuboidID, boolean memStore) { - GTInfo info = CubeGridTable.newGTInfo(desc, cuboidID, dictionaryMap); - GTComboStore store = new GTComboStore(info, memStore); - GridTable gridTable = new GridTable(info, store); - return gridTable; - } - - private GridTable aggregateCuboid(GridTable parentCuboid, long parentCuboidId, long cuboidId) throws IOException { - Pair<BitSet, BitSet> columnBitSets = getDimensionAndMetricColumnBitSet(parentCuboidId); - BitSet parentDimensions = columnBitSets.getFirst(); - BitSet measureColumns = columnBitSets.getSecond(); - BitSet childDimensions = (BitSet) parentDimensions.clone(); - - long mask = Long.highestOneBit(parentCuboidId); - long childCuboidId = cuboidId; - long parentCuboidIdActualLength = Long.SIZE - Long.numberOfLeadingZeros(parentCuboidId); - int index = 0; - for (int i = 0; i < parentCuboidIdActualLength; i++) { - if ((mask & parentCuboidId) > 0) { - if ((mask & childCuboidId) == 0) { - // this dim will be aggregated - childDimensions.set(index, false); - } - index++; - } - mask = mask >> 1; - } - - return scanAndAggregateGridTable(parentCuboid, cuboidId, childDimensions, measureColumns); - - } - - private GridTable scanAndAggregateGridTable(GridTable gridTable, long cuboidId, BitSet aggregationColumns, BitSet measureColumns) throws IOException { - GTScanRequest req = new GTScanRequest(gridTable.getInfo(), null, aggregationColumns, measureColumns, metricsAggrFuncs, null); - IGTScanner scanner = gridTable.scan(req); - GridTable newGridTable = newGridTableByCuboidID(cuboidId, true); - GTBuilder builder = newGridTable.rebuild(); - - BitSet allNeededColumns = new BitSet(); - allNeededColumns.or(aggregationColumns); - allNeededColumns.or(measureColumns); - - GTRecord newRecord = new GTRecord(newGridTable.getInfo()); - int counter = 0; - ByteArray byteArray = new ByteArray(8); - ByteBuffer byteBuffer = ByteBuffer.allocate(8); - try { - BitSet dependentMetrics = new BitSet(allNeededColumns.cardinality()); - for (Integer i : dependentMeasures.keySet()) { - dependentMetrics.set((allNeededColumns.cardinality() - measureCount + dependentMeasures.get(i))); - } - - Object[] hllObjects = new Object[dependentMeasures.keySet().size()]; - - for (GTRecord record : scanner) { - counter++; - for (int i = allNeededColumns.nextSetBit(0), index = 0; i >= 0; i = allNeededColumns.nextSetBit(i + 1), index++) { - newRecord.set(index, record.get(i)); - } - - if(dependentMeasures.size() > 0) { - // update measures which have 'dependent_measure_ref' - newRecord.getValues(dependentMetrics, hllObjects); - - for (Integer i : dependentMeasures.keySet()) { - for (int index = 0, c = dependentMetrics.nextSetBit(0); c >= 0; index++, c = dependentMetrics.nextSetBit(c + 1)) { - if (c == allNeededColumns.cardinality() - measureCount + dependentMeasures.get(i)) { - assert hllObjects[index] instanceof HyperLogLogPlusCounter; // currently only HLL is allowed - - byteBuffer.clear(); - BytesUtil.writeVLong(((HyperLogLogPlusCounter) hllObjects[index]).getCountEstimate(), byteBuffer); - byteArray.set(byteBuffer.array(), 0, byteBuffer.position()); - newRecord.set(allNeededColumns.cardinality() - measureCount + i, byteArray); - } - } - - } - } - - builder.write(newRecord); - } - } finally { - builder.close(); - } - logger.info("Cuboid " + cuboidId + " has rows: " + counter); - - return newGridTable; - } - - private Pair<BitSet, BitSet> getDimensionAndMetricColumnBitSet(long cuboidId) { - BitSet bitSet = BitSet.valueOf(new long[]{cuboidId}); - BitSet dimension = new BitSet(); - dimension.set(0, bitSet.cardinality()); - BitSet metrics = new BitSet(); - metrics.set(bitSet.cardinality(), bitSet.cardinality() + this.measureCount); - return new Pair<BitSet, BitSet>(dimension, metrics); - } - - private Object[] buildKey(List<String> row) { - int keySize = intermediateTableDesc.getRowKeyColumnIndexes().length; - Object[] key = new Object[keySize]; - - for (int i = 0; i < keySize; i++) { - key[i] = row.get(intermediateTableDesc.getRowKeyColumnIndexes()[i]); - } - - return key; - } - - private Object[] buildValue(List<String> row) { - - Object[] values = new Object[measureCount]; - MeasureDesc measureDesc = null; - - for (int position = 0; position < hbaseMeasureRefIndex.length; position++) { - int i = hbaseMeasureRefIndex[position]; - measureDesc = measureDescs[i]; - - Object value = null; - int[] flatTableIdx = intermediateTableDesc.getMeasureColumnIndexes()[i]; - FunctionDesc function = desc.getMeasures().get(i).getFunction(); - if (function.isCount() || function.isHolisticCountDistinct()) { - // note for holistic count distinct, this value will be ignored - value = ONE; - } else if (flatTableIdx == null) { - value = measureCodec.getSerializer(i).valueOf(measureDesc.getFunction().getParameter().getValue()); - } else if (flatTableIdx.length == 1) { - value = measureCodec.getSerializer(i).valueOf(Bytes.toBytes(row.get(flatTableIdx[0]))); - } else { - - byte[] result = null; - for (int x = 0; x < flatTableIdx.length; x++) { - byte[] split = Bytes.toBytes(row.get(flatTableIdx[x])); - if (result == null) { - result = Arrays.copyOf(split, split.length); - } else { - byte[] newResult = new byte[result.length + split.length]; - System.arraycopy(result, 0, newResult, 0, result.length); - System.arraycopy(split, 0, newResult, result.length, split.length); - result = newResult; - } - } - value = measureCodec.getSerializer(i).valueOf(result); - } - values[position] = value; - } - return values; - } - - - @Override - public void run() { - try { - logger.info("Create base cuboid " + baseCuboidId); - final GridTable baseCuboidGT = newGridTableByCuboidID(baseCuboidId, false); - - GTBuilder baseGTBuilder = baseCuboidGT.rebuild(); - final GTRecord baseGTRecord = new GTRecord(baseCuboidGT.getInfo()); - - IGTScanner queueScanner = new IGTScanner() { - - @Override - public Iterator<GTRecord> iterator() { - return new Iterator<GTRecord>() { - - List<String> currentObject = null; - - @Override - public boolean hasNext() { - try { - currentObject = queue.take(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - return currentObject != null && currentObject.size() > 0; - } - - @Override - public GTRecord next() { - if (currentObject.size() == 0) - throw new IllegalStateException(); - - buildGTRecord(currentObject, baseGTRecord); - return baseGTRecord; - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - }; - } - - @Override - public void close() throws IOException { - } - - @Override - public GTInfo getInfo() { - return baseCuboidGT.getInfo(); - } - - @Override - public int getScannedRowCount() { - return 0; - } - - @Override - public int getScannedRowBlockCount() { - return 0; - } - }; - - Pair<BitSet, BitSet> dimensionMetricsBitSet = getDimensionAndMetricColumnBitSet(baseCuboidId); - GTScanRequest req = new GTScanRequest(baseCuboidGT.getInfo(), null, dimensionMetricsBitSet.getFirst(), dimensionMetricsBitSet.getSecond(), metricsAggrFuncs, null); - IGTScanner aggregationScanner = new GTAggregateScanner(queueScanner, req); - - int counter = 0; - for (GTRecord r : aggregationScanner) { - baseGTBuilder.write(r); - counter++; - } - baseGTBuilder.close(); - aggregationScanner.close(); - - logger.info("Base cuboid has " + counter + " rows;"); - SimpleGridTableTree tree = new SimpleGridTableTree(); - tree.data = baseCuboidGT; - tree.id = baseCuboidId; - tree.parent = null; - if (counter > 0) { - List<Long> children = cuboidScheduler.getSpanningCuboid(baseCuboidId); - Collections.sort(children); - for (Long childId : children) { - createNDCuboidGT(tree, baseCuboidId, childId); - } - } - outputGT(baseCuboidId, baseCuboidGT); - dropStore(baseCuboidGT); - - } catch (IOException e) { - logger.error("Fail to build cube", e); - throw new RuntimeException(e); - } - - } - - private void buildGTRecord(List<String> row, GTRecord record) { - - Object[] dimensions = buildKey(row); - Object[] metricsValues = buildValue(row); - Object[] recordValues = new Object[dimensions.length + metricsValues.length]; - System.arraycopy(dimensions, 0, recordValues, 0, dimensions.length); - System.arraycopy(metricsValues, 0, recordValues, dimensions.length, metricsValues.length); - record.setValues(recordValues); - } - - private boolean gc(TreeNode<GridTable> parentNode) { - final List<TreeNode<GridTable>> gridTables = parentNode.getAncestorList(); - logger.info("trying to select node to flush to disk, from:" + StringUtils.join(",", gridTables)); - for (TreeNode<GridTable> gridTable : gridTables) { - final GTComboStore store = (GTComboStore) gridTable.data.getStore(); - if (store.memoryUsage() > 0) { - logger.info("cuboid id:" + gridTable.id + " flush to disk"); - long t = System.currentTimeMillis(); - store.switchToDiskStore(); - logger.info("switch to disk store cost:" + (System.currentTimeMillis() - t) + "ms"); - waitForGc(); - return true; - } - } - logger.warn("all ancestor nodes of " + parentNode.id + " has been flushed to disk"); - return false; - - } - - private GridTable createChildCuboid(final GridTable parentCuboid, final long parentCuboidId, final long cuboidId) { - final ExecutorService executorService = Executors.newSingleThreadExecutor(); - final Future<GridTable> task = executorService.submit(new Callable<GridTable>() { - @Override - public GridTable call() throws Exception { - return aggregateCuboid(parentCuboid, parentCuboidId, cuboidId); - } - }); - try { - final GridTable gridTable = task.get(DEFAULT_TIMEOUT, TimeUnit.SECONDS); - return gridTable; - } catch (InterruptedException e) { - throw new RuntimeException("this should not happen", e); - } catch (ExecutionException e) { - if (e.getCause() instanceof OutOfMemoryError) { - logger.warn("Future.get() OutOfMemory, stop the thread"); - } else { - throw new RuntimeException("this should not happen", e); - } - } catch (TimeoutException e) { - logger.warn("Future.get() timeout, stop the thread"); - } - logger.info("shutdown executor service"); - final List<Runnable> runnables = executorService.shutdownNow(); - try { - executorService.awaitTermination(DEFAULT_TIMEOUT, TimeUnit.SECONDS); - waitForGc(); - } catch (InterruptedException e) { - throw new RuntimeException("this should not happen", e); - } - return null; - - } - - private void waitForGc() { - System.gc(); - logger.info("wait 5 seconds for gc"); - try { - Thread.sleep(5000); - } catch (InterruptedException e) { - throw new RuntimeException("should not happen", e); - } - } - - private void createNDCuboidGT(SimpleGridTableTree parentNode, long parentCuboidId, long cuboidId) throws IOException { - - long startTime = System.currentTimeMillis(); - -// GTComboStore parentStore = (GTComboStore) parentNode.data.getStore(); -// if (parentStore.memoryUsage() <= 0) { -// long t = System.currentTimeMillis(); -// parentStore.switchToMemStore(); -// logger.info("node " + parentNode.id + " switch to mem store cost:" + (System.currentTimeMillis() - t) + "ms"); -// } - - GridTable currentCuboid; - while (true) { - logger.info("Calculating cuboid " + cuboidId + " from parent " + parentCuboidId); - currentCuboid = createChildCuboid(parentNode.data, parentCuboidId, cuboidId); - if (currentCuboid != null) { - break; - } else { - logger.warn("create child cuboid:" + cuboidId + " from parent:" + parentCuboidId + " failed, prepare to gc"); - if (gc(parentNode)) { - continue; - } else { - logger.warn("all parent node has been flushed into disk, memory is still insufficient"); - throw new RuntimeException("all parent node has been flushed into disk, memory is still insufficient"); - } - } - } - SimpleGridTableTree node = new SimpleGridTableTree(); - node.parent = parentNode; - node.data = currentCuboid; - node.id = cuboidId; - parentNode.children.add(node); - - logger.info("Cuboid " + cuboidId + " build takes " + (System.currentTimeMillis() - startTime) + "ms"); - - List<Long> children = cuboidScheduler.getSpanningCuboid(cuboidId); - if (!children.isEmpty()) { - Collections.sort(children); // sort cuboids - for (Long childId : children) { - createNDCuboidGT(node, cuboidId, childId); - } - } - - - //output the grid table - outputGT(cuboidId, currentCuboid); - dropStore(currentCuboid); - parentNode.children.remove(node); - if (parentNode.children.size() > 0) { - logger.info("cuboid:" + cuboidId + " has finished, parent node:" + parentNode.id + " need to switch to mem store"); - ((GTComboStore) parentNode.data.getStore()).switchToMemStore(); - } - } - - private void dropStore(GridTable gt) throws IOException { - ((GTComboStore) gt.getStore()).drop(); - } - - - private void outputGT(Long cuboidId, GridTable gridTable) throws IOException { - long startTime = System.currentTimeMillis(); - GTScanRequest req = new GTScanRequest(gridTable.getInfo(), null, null, null); - IGTScanner scanner = gridTable.scan(req); - for (GTRecord record : scanner) { - this.gtRecordWriter.write(cuboidId, record); - } - logger.info("Cuboid" + cuboidId + " output takes " + (System.currentTimeMillis() - startTime) + "ms"); - } - - private static class TreeNode<T> { - T data; - long id; - TreeNode<T> parent; - List<TreeNode<T>> children = Lists.newArrayList(); - - List<TreeNode<T>> getAncestorList() { - ArrayList<TreeNode<T>> result = Lists.newArrayList(); - TreeNode<T> parent = this; - while (parent != null) { - result.add(parent); - parent = parent.parent; - } - return Lists.reverse(result); - } - - @Override - public String toString() { - return id + ""; - } - } - - private static class SimpleGridTableTree extends TreeNode<GridTable> { - } - - -} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a6a9d940/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java index 4454e43..4efff16 100644 --- a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java +++ b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java @@ -20,6 +20,7 @@ import org.apache.kylin.job.constant.BatchConstants; import org.apache.kylin.job.hadoop.AbstractHadoopJob; import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.apache.kylin.metadata.model.TblColRef; +import org.apache.kylin.streaming.cube.InMemCubeBuilder; import java.io.IOException; import java.util.ArrayList; http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a6a9d940/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MapContextGTRecordWriter.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MapContextGTRecordWriter.java b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MapContextGTRecordWriter.java index b8e1ffe..283bed6 100644 --- a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MapContextGTRecordWriter.java +++ b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MapContextGTRecordWriter.java @@ -16,6 +16,7 @@ import org.apache.kylin.cube.model.HBaseColumnFamilyDesc; import org.apache.kylin.metadata.model.MeasureDesc; import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.storage.gridtable.GTRecord; +import org.apache.kylin.streaming.cube.IGTRecordWriter; import java.io.IOException; import java.nio.ByteBuffer; http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a6a9d940/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java new file mode 100644 index 0000000..82892dc --- /dev/null +++ b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java @@ -0,0 +1,403 @@ +package org.apache.kylin.job.streaming; + +import com.google.common.base.Function; +import com.google.common.collect.Collections2; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.hash.HashFunction; +import com.google.common.hash.Hasher; +import com.google.common.hash.Hashing; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.util.ToolRunner; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.hll.HyperLogLogPlusCounter; +import org.apache.kylin.common.persistence.HBaseConnection; +import org.apache.kylin.common.persistence.ResourceStore; +import org.apache.kylin.common.util.ByteArray; +import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.common.util.HadoopUtil; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.cuboid.Cuboid; +import org.apache.kylin.cube.cuboid.CuboidScheduler; +import org.apache.kylin.cube.model.CubeDesc; +import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc; +import org.apache.kylin.cube.model.HBaseColumnDesc; +import org.apache.kylin.cube.model.HBaseColumnFamilyDesc; +import org.apache.kylin.dict.Dictionary; +import org.apache.kylin.dict.DictionaryGenerator; +import org.apache.kylin.dict.DictionaryInfo; +import org.apache.kylin.dict.DictionaryManager; +import org.apache.kylin.dict.lookup.ReadableTable; +import org.apache.kylin.dict.lookup.TableSignature; +import org.apache.kylin.job.constant.BatchConstants; +import org.apache.kylin.job.hadoop.cube.FactDistinctColumnsReducer; +import org.apache.kylin.job.hadoop.cubev2.InMemKeyValueCreator; +import org.apache.kylin.job.hadoop.hbase.CreateHTableJob; +import org.apache.kylin.metadata.model.SegmentStatusEnum; +import org.apache.kylin.metadata.model.TblColRef; +import org.apache.kylin.storage.cube.CuboidToGridTableMapping; +import org.apache.kylin.storage.gridtable.GTRecord; +import org.apache.kylin.streaming.SEOJsonStreamParser; +import org.apache.kylin.streaming.StreamBuilder; +import org.apache.kylin.streaming.StreamMessage; +import org.apache.kylin.streaming.cube.IGTRecordWriter; +import org.apache.kylin.streaming.cube.InMemCubeBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.*; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; + +/** + */ +public class CubeStreamBuilder extends StreamBuilder { + + private static final Logger logger = LoggerFactory.getLogger(CubeStreamBuilder.class); + + private final CubeManager cubeManager; + private final String cubeName; + private final KylinConfig kylinConfig; + private final ExecutorService executorService = Executors.newSingleThreadExecutor(); + + + public CubeStreamBuilder(BlockingQueue<StreamMessage> streamMessageQueue, String cubeName) { + super(streamMessageQueue); + this.kylinConfig = KylinConfig.getInstanceFromEnv(); + this.cubeManager = CubeManager.getInstance(kylinConfig); + this.cubeName = cubeName; + setStreamParser(new SEOJsonStreamParser(cubeManager.getCube(cubeName).getAllColumns())); + } + + @Override + protected void build(List<StreamMessage> streamMessages) throws Exception { + if (CollectionUtils.isEmpty(streamMessages)) { + logger.info("nothing to build, skip to next iteration"); + return; + } + final List<List<String>> parsedStreamMessages = parseStream(streamMessages); + long startOffset = streamMessages.get(0).getOffset(); + long endOffset = streamMessages.get(streamMessages.size() - 1).getOffset(); + LinkedBlockingQueue<List<String>> blockingQueue = new LinkedBlockingQueue<List<String>>(parsedStreamMessages); + blockingQueue.put(Collections.<String>emptyList()); + + final CubeInstance cubeInstance = cubeManager.getCube(cubeName); + final CubeDesc cubeDesc = cubeInstance.getDescriptor(); + final CubeSegment cubeSegment = cubeManager.appendSegments(cubeManager.getCube(cubeName), System.currentTimeMillis()); + final Map<Long, HyperLogLogPlusCounter> samplingResult = sampling(cubeInstance.getDescriptor(), parsedStreamMessages); + + final Configuration conf = HadoopUtil.getCurrentConfiguration(); + final String outputPath = "/tmp/kylin/cuboidstatistics/" + UUID.randomUUID().toString(); + FactDistinctColumnsReducer.writeCuboidStatistics(conf, outputPath, samplingResult, 100); + ResourceStore.getStore(kylinConfig).putResource(cubeSegment.getStatisticsResourcePath(), FileSystem.get(conf).open(new Path(outputPath, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION)), 0); + + final Map<TblColRef, Dictionary<?>> dictionaryMap = buildDictionary(getTblColRefMap(cubeInstance), parsedStreamMessages); + writeDictionary(cubeSegment, dictionaryMap, startOffset, endOffset); + + final HTableInterface hTable = createHTable(cubeSegment); + + final CubeStreamRecordWriter gtRecordWriter = new CubeStreamRecordWriter(cubeDesc, hTable); + InMemCubeBuilder inMemCubeBuilder = new InMemCubeBuilder(blockingQueue, cubeInstance, + dictionaryMap, gtRecordWriter); + + executorService.submit(inMemCubeBuilder).get(); + gtRecordWriter.flush(); + commitSegment(cubeSegment); + } + + private void writeDictionary(CubeSegment cubeSegment, Map<TblColRef, Dictionary<?>> dictionaryMap, long startOffset, long endOffset) { + for (Map.Entry<TblColRef, Dictionary<?>> entry : dictionaryMap.entrySet()) { + final TblColRef tblColRef = entry.getKey(); + final Dictionary<?> dictionary = entry.getValue(); + TableSignature signature = new TableSignature(); + signature.setLastModifiedTime(System.currentTimeMillis()); + signature.setPath(String.format("streaming_%s_%s", startOffset, endOffset)); + signature.setSize(endOffset - startOffset); + DictionaryInfo dictInfo = new DictionaryInfo(tblColRef.getTable(), + tblColRef.getName(), + tblColRef.getColumnDesc().getZeroBasedIndex(), + tblColRef.getDatatype(), + signature, + ReadableTable.DELIM_AUTO); + logger.info("writing dictionary for TblColRef:" + tblColRef.toString()); + DictionaryManager dictionaryManager = DictionaryManager.getInstance(kylinConfig); + try { + cubeSegment.putDictResPath(tblColRef, dictionaryManager.trySaveNewDict(dictionary, dictInfo).getResourcePath()); + } catch (IOException e) { + logger.error("error save dictionary for column:" + tblColRef, e); + throw new RuntimeException("error save dictionary for column:" + tblColRef, e); + } + } + } + + private class CubeStreamRecordWriter implements IGTRecordWriter { + final List<InMemKeyValueCreator> keyValueCreators; + final int nColumns; + final HTableInterface hTable; + private final ByteBuffer byteBuffer; + private final CubeDesc cubeDesc; + private List<Put> puts = Lists.newArrayList(); + + private CubeStreamRecordWriter(CubeDesc cubeDesc, HTableInterface hTable) { + this.keyValueCreators = Lists.newArrayList(); + this.cubeDesc = cubeDesc; + int startPosition = 0; + for (HBaseColumnFamilyDesc cfDesc : cubeDesc.getHBaseMapping().getColumnFamily()) { + for (HBaseColumnDesc colDesc : cfDesc.getColumns()) { + keyValueCreators.add(new InMemKeyValueCreator(colDesc, startPosition)); + startPosition += colDesc.getMeasures().length; + } + } + this.nColumns = keyValueCreators.size(); + this.hTable = hTable; + this.byteBuffer = ByteBuffer.allocate(1<<20); + } + + private byte[] copy(byte[] array, int offset, int length) { + byte[] result = new byte[length]; + System.arraycopy(array, offset, result, 0, length); + return result; + } + + private ByteBuffer createKey(Long cuboidId, GTRecord record) { + byteBuffer.clear(); + byteBuffer.put(Bytes.toBytes(cuboidId)); + final int cardinality = BitSet.valueOf(new long[]{cuboidId}).cardinality(); + for (int i = 0; i < cardinality; i++) { + final ByteArray byteArray = record.get(i); + byteBuffer.put(byteArray.array(), byteArray.offset(), byteArray.length()); + } + return byteBuffer; + } + + @Override + public void write(Long cuboidId, GTRecord record) throws IOException { + final ByteBuffer key = createKey(cuboidId, record); + final CuboidToGridTableMapping mapping = new CuboidToGridTableMapping(Cuboid.findById(cubeDesc, cuboidId)); + final BitSet bitSet = new BitSet(); + bitSet.set(mapping.getDimensionCount(), mapping.getColumnCount()); + for (int i = 0; i < nColumns; i++) { + final KeyValue keyValue = keyValueCreators.get(i).create(key.array(), 0, key.position(), record.getValues(bitSet, new Object[bitSet.cardinality()])); + final Put put = new Put(copy(key.array(), 0, key.position())); + byte[] family = copy(keyValue.getFamilyArray(), keyValue.getFamilyOffset(), keyValue.getFamilyLength()); + byte[] qualifier = copy(keyValue.getQualifierArray(), keyValue.getQualifierOffset(), keyValue.getQualifierLength()); + byte[] value = copy(keyValue.getValueArray(), keyValue.getValueOffset(), keyValue.getValueLength()); + put.add(family, qualifier, value); + puts.add(put); + } + if (puts.size() >= batchSize()) { + flush(); + } + } + + public final void flush() { + try { + if (!puts.isEmpty()) { + long t = System.currentTimeMillis(); + if (hTable != null) { + hTable.put(puts); + hTable.flushCommits(); + } + logger.info("commit total " + puts.size() + " puts, totally cost:" + (System.currentTimeMillis() - t) + "ms"); + puts.clear(); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + + private Map<Integer, TblColRef> getTblColRefMap(CubeInstance cubeInstance) { + final List<TblColRef> columns = cubeInstance.getAllColumns(); + final List<TblColRef> allDimensions = cubeInstance.getAllDimensions(); + final HashMap<Integer, TblColRef> result = Maps.newHashMap(); + for (int i = 0; i < columns.size(); i++) { + final TblColRef tblColRef = columns.get(i); + if (allDimensions.contains(tblColRef)) { + result.put(i, tblColRef); + } + } + return result; + } + + private Map<TblColRef, Dictionary<?>> buildDictionary(final Map<Integer, TblColRef> tblColRefMap, List<List<String>> recordList) throws IOException { + HashMap<TblColRef, Dictionary<?>> result = Maps.newHashMap(); + + HashMultimap<TblColRef, String> valueMap = HashMultimap.create(); + for (List<String> row : recordList) { + for (int i = 0; i < row.size(); i++) { + String cell = row.get(i); + if (tblColRefMap.containsKey(i)) { + valueMap.put(tblColRefMap.get(i), cell); + } + } + } + for (TblColRef tblColRef : valueMap.keySet()) { + final Collection<byte[]> bytes = Collections2.transform(valueMap.get(tblColRef), new Function<String, byte[]>() { + @Nullable + @Override + public byte[] apply(String input) { + return input == null ? null : input.getBytes(); + } + }); + final Dictionary<?> dict = DictionaryGenerator.buildDictionaryFromValueList(tblColRef.getType(), bytes); + result.put(tblColRef, dict); + } + return result; + } + + private Map<Long, HyperLogLogPlusCounter> sampling(CubeDesc cubeDesc, List<List<String>> streams) { + CubeJoinedFlatTableDesc intermediateTableDesc = new CubeJoinedFlatTableDesc(cubeDesc, null); + final int rowkeyLength = cubeDesc.getRowkey().getRowKeyColumns().length; + final List<Long> allCuboidIds = getAllCuboidIds(cubeDesc); + final long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc); + final Map<Long, Integer[]> allCuboidsBitSet = Maps.newHashMap(); + + + Lists.transform(allCuboidIds, new Function<Long, Integer[]>() { + @Nullable + @Override + public Integer[] apply(@Nullable Long cuboidId) { + BitSet bitSet = BitSet.valueOf(new long[]{cuboidId}); + Integer[] result = new Integer[bitSet.cardinality()]; + + long mask = Long.highestOneBit(baseCuboidId); + int position = 0; + for (int i = 0; i < rowkeyLength; i++) { + if ((mask & cuboidId) > 0) { + result[position] = i; + position++; + } + mask = mask >> 1; + } + return result; + } + }); + final Map<Long, HyperLogLogPlusCounter> result = Maps.newHashMapWithExpectedSize(allCuboidIds.size()); + for (Long cuboidId : allCuboidIds) { + result.put(cuboidId, new HyperLogLogPlusCounter(14)); + BitSet bitSet = BitSet.valueOf(new long[]{cuboidId}); + Integer[] cuboidBitSet = new Integer[bitSet.cardinality()]; + + long mask = Long.highestOneBit(baseCuboidId); + int position = 0; + for (int i = 0; i < rowkeyLength; i++) { + if ((mask & cuboidId) > 0) { + cuboidBitSet[position] = i; + position++; + } + mask = mask >> 1; + } + allCuboidsBitSet.put(cuboidId, cuboidBitSet); + } + + HashFunction hf = Hashing.murmur3_32(); + ByteArray[] row_hashcodes = new ByteArray[rowkeyLength]; + for (int i = 0; i < rowkeyLength; i++) { + row_hashcodes[i] = new ByteArray(); + } + for (List<String> row : streams) { + //generate hash for each row key column + for (int i = 0; i < rowkeyLength; i++) { + Hasher hc = hf.newHasher(); + final String cell = row.get(intermediateTableDesc.getRowKeyColumnIndexes()[i]); + if (cell != null) { + row_hashcodes[i].set(hc.putString(cell).hash().asBytes()); + } else { + row_hashcodes[i].set(hc.putInt(0).hash().asBytes()); + } + } + + for (Map.Entry<Long, HyperLogLogPlusCounter> longHyperLogLogPlusCounterEntry : result.entrySet()) { + Long cuboidId = longHyperLogLogPlusCounterEntry.getKey(); + HyperLogLogPlusCounter counter = longHyperLogLogPlusCounterEntry.getValue(); + Hasher hc = hf.newHasher(); + final Integer[] cuboidBitSet = allCuboidsBitSet.get(cuboidId); + for (int position = 0; position < cuboidBitSet.length; position++) { + hc.putBytes(row_hashcodes[cuboidBitSet[position]].array()); + } + counter.add(hc.hash().asBytes()); + } + } + return result; + } + + private void commitSegment(CubeSegment cubeSegment) throws IOException { + cubeSegment.setStatus(SegmentStatusEnum.READY); + CubeManager.getInstance(kylinConfig).updateCube(cubeSegment.getCubeInstance(), true); + } + + private List<Long> getAllCuboidIds(CubeDesc cubeDesc) { + final long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc); + List<Long> result = Lists.newArrayList(); + CuboidScheduler cuboidScheduler = new CuboidScheduler(cubeDesc); + getSubCuboidIds(cuboidScheduler, baseCuboidId, result); + return result; + } + + private void getSubCuboidIds(CuboidScheduler cuboidScheduler, long parentCuboidId, List<Long> result) { + result.add(parentCuboidId); + for (Long cuboidId: cuboidScheduler.getSpanningCuboid(parentCuboidId)) { + getSubCuboidIds(cuboidScheduler, cuboidId, result); + } + } + + + private List<List<String>> parseStream(List<StreamMessage> streamMessages) { + return Lists.transform(streamMessages, new Function<StreamMessage, List<String>>() { + @Nullable + @Override + public List<String> apply(StreamMessage input) { + return getStreamParser().parse(input); + } + }); + } + + private HTableInterface createHTable(final CubeSegment cubeSegment) throws Exception { + final String hTableName = cubeSegment.getStorageLocationIdentifier(); + String[] args = new String[]{"-cubename", cubeName, + "-segmentname", cubeSegment.getName(), + "-input", "/empty", + "-htablename", hTableName, + "-statisticsenabled", "true"}; + ToolRunner.run(new CreateHTableJob(), args); + final HTableInterface hTable = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()).getTable(hTableName); + logger.info("hTable:" + hTableName + " for segment:" + cubeSegment.getName() + " created!"); + return hTable; + } + + private void loadToHTable(String hTableName) throws IOException { + final HTableInterface table = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()).getTable(hTableName); + + } + + @Override + protected void onStop() { + + } + + @Override + protected int batchInterval() { + return 30 * 60 * 1000;//30 min + } + + @Override + protected int batchSize() { + return 1000; + } +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a6a9d940/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java index 7854fd5..3929098 100644 --- a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java +++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java @@ -35,6 +35,7 @@ package org.apache.kylin.job.streaming; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import kafka.api.OffsetRequest; import kafka.cluster.Broker; @@ -42,20 +43,22 @@ import kafka.javaapi.PartitionMetadata; import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.HBaseConnection; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; import org.apache.kylin.invertedindex.IIInstance; import org.apache.kylin.invertedindex.IIManager; import org.apache.kylin.invertedindex.IISegment; -import org.apache.kylin.job.hadoop.invertedindex.IICreateHTableJob; +import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.streaming.*; import org.apache.kylin.streaming.invertedindex.IIStreamBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; import java.util.List; import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.concurrent.*; /** */ @@ -65,14 +68,12 @@ public class StreamingBootstrap { private KylinConfig kylinConfig; private StreamingManager streamingManager; - private IIManager iiManager; private Map<String, KafkaConsumer> kafkaConsumers = Maps.newConcurrentMap(); private StreamingBootstrap(KylinConfig kylinConfig) { this.kylinConfig = kylinConfig; this.streamingManager = StreamingManager.getInstance(kylinConfig); - this.iiManager = IIManager.getInstance(kylinConfig); } public static StreamingBootstrap getInstance(KylinConfig kylinConfig) { @@ -115,10 +116,73 @@ public class StreamingBootstrap { public void start(String streaming, int partitionId) throws Exception { final KafkaConfig kafkaConfig = streamingManager.getKafkaConfig(streaming); Preconditions.checkArgument(kafkaConfig != null, "cannot find kafka config:" + streaming); - final IIInstance ii = iiManager.getII(kafkaConfig.getIiName()); - Preconditions.checkNotNull(ii, "cannot find ii name:" + kafkaConfig.getIiName()); + final int partitionCount = KafkaRequester.getKafkaTopicMeta(kafkaConfig).getPartitionIds().size(); Preconditions.checkArgument(partitionId >= 0 && partitionId < partitionCount, "invalid partition id:" + partitionId); + + if (!StringUtils.isEmpty(kafkaConfig.getIiName())) { + startIIStreaming(kafkaConfig, partitionId, partitionCount); + } else if (!StringUtils.isEmpty(kafkaConfig.getCubeName())) { + startCubeStreaming(kafkaConfig, partitionId, partitionCount); + } else { + throw new IllegalArgumentException("no cube or ii in kafka config"); + } + } + + private List<BlockingQueue<StreamMessage>> consume(KafkaConfig kafkaConfig, final int partitionCount) { + List<BlockingQueue<StreamMessage>> result = Lists.newArrayList(); + for (int partitionId = 0 ; partitionId < partitionCount && partitionId < 10; ++partitionId) { + final Broker leadBroker = getLeadBroker(kafkaConfig, partitionId); + long streamingOffset = getEarliestStreamingOffset(kafkaConfig.getName(), 0, 0); + final long latestOffset = KafkaRequester.getLastOffset(kafkaConfig.getTopic(), partitionId, OffsetRequest.LatestTime(), leadBroker, kafkaConfig); + streamingOffset = Math.max(streamingOffset, latestOffset); + KafkaConsumer consumer = new KafkaConsumer(kafkaConfig.getTopic(), partitionId, + streamingOffset, kafkaConfig.getBrokers(), kafkaConfig, 1); + Executors.newSingleThreadExecutor().submit(consumer); + result.add(consumer.getStreamQueue(0)); + } + return result; + } + + private void startCubeStreaming(KafkaConfig kafkaConfig, final int partitionId, final int partitionCount) throws Exception { + final String cubeName = kafkaConfig.getCubeName(); + final CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName); + + final List<BlockingQueue<StreamMessage>> queues = consume(kafkaConfig, partitionCount); + final LinkedBlockingDeque<StreamMessage> streamQueue = new LinkedBlockingDeque<>(); + Executors.newSingleThreadExecutor().execute(new Runnable() { + @Override + public void run() { + while (true) { + for (BlockingQueue<StreamMessage> queue : queues) { + try { + streamQueue.put(queue.take()); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + } + }); + CubeStreamBuilder cubeStreamBuilder = new CubeStreamBuilder(streamQueue, cubeName); + cubeStreamBuilder.setStreamParser(getStreamParser(kafkaConfig, cubeInstance.getAllColumns())); + final Future<?> future = Executors.newSingleThreadExecutor().submit(cubeStreamBuilder); + future.get(); + } + + private StreamParser getStreamParser(KafkaConfig kafkaConfig, List<TblColRef> columns) throws Exception { + if (!StringUtils.isEmpty(kafkaConfig.getParserName())) { + Class clazz = Class.forName(kafkaConfig.getParserName()); + Constructor constructor = clazz.getConstructor(List.class); + return (StreamParser) constructor.newInstance(columns); + } else { + return new JsonStreamParser(columns); + } + } + + private void startIIStreaming(KafkaConfig kafkaConfig, final int partitionId, final int partitionCount) throws Exception { + final IIInstance ii = IIManager.getInstance(this.kylinConfig).getII(kafkaConfig.getIiName()); + Preconditions.checkNotNull(ii, "cannot find ii name:" + kafkaConfig.getIiName()); Preconditions.checkArgument(ii.getSegments().size() > 0); final IISegment iiSegment = ii.getSegments().get(0); @@ -129,7 +193,8 @@ public class StreamingBootstrap { final int parallelism = shard / partitionCount; final int startShard = partitionId * parallelism; final int endShard = startShard + parallelism; - long streamingOffset = getEarliestStreamingOffset(streaming, startShard, endShard); + + long streamingOffset = getEarliestStreamingOffset(kafkaConfig.getName(), startShard, endShard); streamingOffset = streamingOffset - (streamingOffset % parallelism); logger.info("offset from ii desc is " + streamingOffset); final long earliestOffset = KafkaRequester.getLastOffset(kafkaConfig.getTopic(), partitionId, OffsetRequest.EarliestTime(), leadBroker, kafkaConfig); @@ -143,21 +208,14 @@ public class StreamingBootstrap { } KafkaConsumer consumer = new KafkaConsumer(kafkaConfig.getTopic(), partitionId, streamingOffset, kafkaConfig.getBrokers(), kafkaConfig, parallelism); - kafkaConsumers.put(getKey(streaming, partitionId), consumer); + kafkaConsumers.put(getKey(kafkaConfig.getName(), partitionId), consumer); + - StreamParser parser; - if (!StringUtils.isEmpty(kafkaConfig.getParserName())) { - Class clazz = Class.forName(kafkaConfig.getParserName()); - Constructor constructor = clazz.getConstructor(List.class); - parser = (StreamParser) constructor.newInstance(ii.getDescriptor().listAllColumns()); - } else { - parser = new JsonStreamParser(ii.getDescriptor().listAllColumns()); - } Executors.newSingleThreadExecutor().submit(consumer); final ExecutorService streamingBuilderPool = Executors.newFixedThreadPool(parallelism); for (int i = startShard; i < endShard; ++i) { - final IIStreamBuilder task = new IIStreamBuilder(consumer.getStreamQueue(i % parallelism), streaming, iiSegment.getStorageLocationIdentifier(), iiSegment.getIIDesc(), i); - task.setStreamParser(parser); + final IIStreamBuilder task = new IIStreamBuilder(consumer.getStreamQueue(i % parallelism), kafkaConfig.getName(), iiSegment.getStorageLocationIdentifier(), iiSegment.getIIDesc(), i); + task.setStreamParser(getStreamParser(kafkaConfig, ii.getDescriptor().listAllColumns())); if (i == endShard - 1) { streamingBuilderPool.submit(task).get(); } else { http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a6a9d940/job/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java ---------------------------------------------------------------------- diff --git a/job/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java b/job/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java index a437bba..319d7fa 100644 --- a/job/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java +++ b/job/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java @@ -55,11 +55,11 @@ import org.apache.kylin.cube.model.DimensionDesc; import org.apache.kylin.dict.Dictionary; import org.apache.kylin.dict.DictionaryGenerator; import org.apache.kylin.dict.lookup.HiveTableReader; -import org.apache.kylin.job.hadoop.cubev2.IGTRecordWriter; -import org.apache.kylin.job.hadoop.cubev2.InMemCubeBuilder; import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.storage.gridtable.GTRecord; +import org.apache.kylin.streaming.cube.IGTRecordWriter; +import org.apache.kylin.streaming.cube.InMemCubeBuilder; import org.junit.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a6a9d940/job/src/test/java/org/apache/kylin/job/ITKafkaBasedIIStreamBuilderTest.java ---------------------------------------------------------------------- diff --git a/job/src/test/java/org/apache/kylin/job/ITKafkaBasedIIStreamBuilderTest.java b/job/src/test/java/org/apache/kylin/job/ITKafkaBasedIIStreamBuilderTest.java index 2432220..71c9644 100644 --- a/job/src/test/java/org/apache/kylin/job/ITKafkaBasedIIStreamBuilderTest.java +++ b/job/src/test/java/org/apache/kylin/job/ITKafkaBasedIIStreamBuilderTest.java @@ -34,19 +34,19 @@ package org.apache.kylin.job; -import org.apache.hadoop.util.ToolRunner; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.AbstractKylinTestCase; import org.apache.kylin.common.util.ClassUtil; import org.apache.kylin.common.util.HBaseMetadataTestCase; -import org.apache.kylin.job.hadoop.cube.StorageCleanupJob; import org.apache.kylin.job.streaming.StreamingBootstrap; -import org.junit.*; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; -import java.io.IOException; /** */ @@ -63,30 +63,6 @@ public class ITKafkaBasedIIStreamBuilderTest { System.setProperty("hdp.version", "2.2.4.2-2"); // mapred-site.xml ref this } - @AfterClass - public static void afterClass() throws Exception { -// backup(); - } - - private static void backup() throws Exception { - int exitCode = cleanupOldStorage(); - if (exitCode == 0) { - exportHBaseData(); - } - } - - private static int cleanupOldStorage() throws Exception { - String[] args = {"--delete", "true"}; - - int exitCode = ToolRunner.run(new StorageCleanupJob(), args); - return exitCode; - } - - private static void exportHBaseData() throws IOException { - ExportHBaseData export = new ExportHBaseData(); - export.exportTables(); - } - @Before public void before() throws Exception { HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA); http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a6a9d940/job/src/test/java/org/apache/kylin/job/InMemCubeBuilderTest.java ---------------------------------------------------------------------- diff --git a/job/src/test/java/org/apache/kylin/job/InMemCubeBuilderTest.java b/job/src/test/java/org/apache/kylin/job/InMemCubeBuilderTest.java index c4bda5f..781273d 100644 --- a/job/src/test/java/org/apache/kylin/job/InMemCubeBuilderTest.java +++ b/job/src/test/java/org/apache/kylin/job/InMemCubeBuilderTest.java @@ -38,10 +38,10 @@ import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc; import org.apache.kylin.dict.Dictionary; import org.apache.kylin.dict.DictionaryGenerator; import org.apache.kylin.dict.lookup.FileTableReader; -import org.apache.kylin.job.hadoop.cubev2.IGTRecordWriter; -import org.apache.kylin.job.hadoop.cubev2.InMemCubeBuilder; import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.storage.gridtable.GTRecord; +import org.apache.kylin.streaming.cube.IGTRecordWriter; +import org.apache.kylin.streaming.cube.InMemCubeBuilder; import org.junit.After; import org.junit.Before; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a6a9d940/job/src/test/java/org/apache/kylin/job/hadoop/cubev2/InMemCubeBuilderBenchmarkTest.java ---------------------------------------------------------------------- diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/cubev2/InMemCubeBuilderBenchmarkTest.java b/job/src/test/java/org/apache/kylin/job/hadoop/cubev2/InMemCubeBuilderBenchmarkTest.java deleted file mode 100644 index f61aa66..0000000 --- a/job/src/test/java/org/apache/kylin/job/hadoop/cubev2/InMemCubeBuilderBenchmarkTest.java +++ /dev/null @@ -1,119 +0,0 @@ -package org.apache.kylin.job.hadoop.cubev2; - -import com.google.common.collect.Maps; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.LocalFileMetadataTestCase; -import org.apache.kylin.cube.CubeInstance; -import org.apache.kylin.cube.CubeManager; -import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.cube.model.CubeDesc; -import org.apache.kylin.cube.model.DimensionDesc; -import org.apache.kylin.dict.Dictionary; -import org.apache.kylin.metadata.model.TblColRef; -import org.apache.kylin.storage.gridtable.GTRecord; -import org.junit.After; -import org.junit.Before; -import org.junit.Ignore; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.BufferedReader; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStreamReader; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; - -/** - */ -@Ignore -public class InMemCubeBuilderBenchmarkTest extends LocalFileMetadataTestCase { - - private static final Logger logger = LoggerFactory.getLogger(InMemCubeBuilderBenchmarkTest.class); - - private static final int BENCHMARK_RECORD_LIMIT = 2000000; - private static final String CUBE_NAME = "test_kylin_cube_with_slr_1_new_segment"; - - @Before - public void setUp() throws Exception { - this.createTestMetadata(); - } - - @After - public void after() throws Exception { - this.cleanupTestMetadata(); - } - - private Map<TblColRef, Dictionary<?>> getDictionaryMap(CubeSegment cubeSegment) { - final Map<TblColRef, Dictionary<?>> dictionaryMap = Maps.newHashMap(); - final CubeDesc desc = cubeSegment.getCubeDesc(); - for (DimensionDesc dim : desc.getDimensions()) { - // dictionary - for (TblColRef col : dim.getColumnRefs()) { - if (desc.getRowkey().isUseDictionary(col)) { - Dictionary dict = cubeSegment.getDictionary(col); - if (dict == null) { - throw new IllegalArgumentException("Dictionary for " + col + " was not found."); - } - logger.info("Dictionary for " + col + " was put into dictionary map."); - dictionaryMap.put(col, cubeSegment.getDictionary(col)); - } - } - } - return dictionaryMap; - } - - private static class ConsoleGTRecordWriter implements IGTRecordWriter { - - boolean verbose = false; - - @Override - public void write(Long cuboidId, GTRecord record) throws IOException { - if (verbose) - System.out.println(record.toString()); - } - } - - private void loadDataFromLocalFile(LinkedBlockingQueue queue) throws IOException, InterruptedException { - BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("../table.txt"))); - String line; - int counter = 0; - while ((line = br.readLine()) != null) { - queue.put(Arrays.asList(line.split("\t"))); - counter++; - if (counter == BENCHMARK_RECORD_LIMIT) { - break; - } - } - queue.put(Collections.emptyList()); - } - - private void loadDataFromRandom(LinkedBlockingQueue queue) throws IOException, InterruptedException { - queue.put(Collections.emptyList()); - } - - - @Test - public void test() throws Exception { - final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); - final CubeManager cubeManager = CubeManager.getInstance(kylinConfig); - final CubeInstance cube = cubeManager.getCube(CUBE_NAME); - final CubeSegment cubeSegment = cube.getFirstSegment(); - - LinkedBlockingQueue queue = new LinkedBlockingQueue<List<String>>(); - - InMemCubeBuilder cubeBuilder = new InMemCubeBuilder(queue, cube, getDictionaryMap(cubeSegment), new ConsoleGTRecordWriter()); - ExecutorService executorService = Executors.newSingleThreadExecutor(); - Future<?> future = executorService.submit(cubeBuilder); - loadDataFromLocalFile(queue); - future.get(); - logger.info("stream build finished"); - } -} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a6a9d940/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamBuilderTest.java ---------------------------------------------------------------------- diff --git a/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamBuilderTest.java b/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamBuilderTest.java new file mode 100644 index 0000000..9d62fda --- /dev/null +++ b/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamBuilderTest.java @@ -0,0 +1,76 @@ +package org.apache.kylin.job.streaming; + +import org.apache.hadoop.util.StringUtils; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.AbstractKylinTestCase; +import org.apache.kylin.common.util.ClassUtil; +import org.apache.kylin.common.util.HBaseMetadataTestCase; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.job.DeployUtil; +import org.apache.kylin.streaming.StreamMessage; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.*; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingDeque; + +/** + */ +public class CubeStreamBuilderTest { + + private static final Logger logger = LoggerFactory.getLogger(CubeStreamBuilderTest.class); + + private KylinConfig kylinConfig; + + private static final String CUBE_NAME = "test_kylin_cube_without_slr_left_join_ready"; + + @BeforeClass + public static void beforeClass() throws Exception { + ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath()); + System.setProperty("hdp.version", "2.2.0.0-2041"); // mapred-site.xml ref this + } + + @Before + public void before() throws Exception { + HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA); + + kylinConfig = KylinConfig.getInstanceFromEnv(); + DeployUtil.initCliWorkDir(); + DeployUtil.deployMetadata(); + DeployUtil.overrideJobJarLocations(); + final CubeInstance cube = CubeManager.getInstance(kylinConfig).getCube(CUBE_NAME); + cube.getSegments().clear(); + CubeManager.getInstance(kylinConfig).updateCube(cube, true); + + } + + @Test + public void test() throws Exception { + LinkedBlockingDeque<StreamMessage> queue = new LinkedBlockingDeque<>(); + CubeStreamBuilder cubeStreamBuilder = new CubeStreamBuilder(queue, CUBE_NAME); + final Future<?> future = Executors.newSingleThreadExecutor().submit(cubeStreamBuilder); + loadDataFromLocalFile(queue, 100000); + future.get(); + } + + private void loadDataFromLocalFile(BlockingQueue<StreamMessage> queue, final int maxCount) throws IOException, InterruptedException { + BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("../table.txt"))); + String line; + int count = 0; + while ((line = br.readLine()) != null && count++ < maxCount) { + final List<String> strings = Arrays.asList(line.split("\t")); + queue.put(new StreamMessage(System.currentTimeMillis(), StringUtils.join(",", strings).getBytes())); + } + queue.put(StreamMessage.EOF); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a6a9d940/storage/src/main/java/org/apache/kylin/storage/StorageEngineFactory.java ---------------------------------------------------------------------- diff --git a/storage/src/main/java/org/apache/kylin/storage/StorageEngineFactory.java b/storage/src/main/java/org/apache/kylin/storage/StorageEngineFactory.java index f880b73..0d720ab 100644 --- a/storage/src/main/java/org/apache/kylin/storage/StorageEngineFactory.java +++ b/storage/src/main/java/org/apache/kylin/storage/StorageEngineFactory.java @@ -29,6 +29,7 @@ import org.apache.kylin.metadata.realization.IRealization; import org.apache.kylin.metadata.realization.RealizationType; import org.apache.kylin.storage.cache.CacheFledgedDynamicStorageEngine; import org.apache.kylin.storage.cache.CacheFledgedStaticStorageEngine; +import org.apache.kylin.storage.hbase.CubeStorageEngine; import org.apache.kylin.storage.hbase.InvertedIndexStorageEngine; import org.apache.kylin.storage.hybrid.HybridInstance; import org.apache.kylin.storage.hybrid.HybridStorageEngine; @@ -51,7 +52,7 @@ public class StorageEngineFactory { return ret; } } else if (realization.getType() == RealizationType.CUBE) { - ICachableStorageEngine ret = new org.apache.kylin.storage.cube.CubeStorageEngine((CubeInstance) realization); + ICachableStorageEngine ret = new CubeStorageEngine((CubeInstance) realization); if (allowStorageLayerCache) { return wrapWithCache(ret, realization); } else { http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a6a9d940/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java b/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java index 9949c96..b6f5025 100644 --- a/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java +++ b/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java @@ -75,6 +75,9 @@ public class KafkaConfig extends RootPersistentEntity { @JsonProperty("iiName") private String iiName; + @JsonProperty("cubeName") + private String cubeName; + @JsonProperty("parserName") private String parserName; @@ -86,7 +89,6 @@ public class KafkaConfig extends RootPersistentEntity { this.parserName = parserName; } - public int getTimeout() { return timeout; } @@ -133,6 +135,14 @@ public class KafkaConfig extends RootPersistentEntity { }); } + public String getCubeName() { + return cubeName; + } + + public void setCubeName(String cubeName) { + this.cubeName = cubeName; + } + public String getIiName() { return iiName; } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a6a9d940/streaming/src/main/java/org/apache/kylin/streaming/SEOJsonStreamParser.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/kylin/streaming/SEOJsonStreamParser.java b/streaming/src/main/java/org/apache/kylin/streaming/SEOJsonStreamParser.java new file mode 100644 index 0000000..5dab7f9 --- /dev/null +++ b/streaming/src/main/java/org/apache/kylin/streaming/SEOJsonStreamParser.java @@ -0,0 +1,100 @@ +/* + * + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * + * contributor license agreements. See the NOTICE file distributed with + * + * this work for additional information regarding copyright ownership. + * + * The ASF licenses this file to You under the Apache License, Version 2.0 + * + * (the "License"); you may not use this file except in compliance with + * + * the License. You may obtain a copy of the License at + * + * + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * + * + * Unless required by applicable law or agreed to in writing, software + * + * distributed under the License is distributed on an "AS IS" BASIS, + * + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * + * See the License for the specific language governing permissions and + * + * limitations under the License. + * + * / + */ + +package org.apache.kylin.streaming; + +import com.fasterxml.jackson.databind.JavaType; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.type.MapType; +import com.fasterxml.jackson.databind.type.SimpleType; +import com.google.common.collect.Lists; +import org.apache.kylin.common.util.DateFormat; +import org.apache.kylin.common.util.TimeUtil; +import org.apache.kylin.metadata.model.TblColRef; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + */ +public final class SEOJsonStreamParser implements StreamParser { + + private static final Logger logger = LoggerFactory.getLogger(SEOJsonStreamParser.class); + + private final List<TblColRef> allColumns; + private final ObjectMapper mapper = new ObjectMapper(); + private final JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(String.class), SimpleType.construct(String.class)); + + public SEOJsonStreamParser(List<TblColRef> allColumns) { + this.allColumns = allColumns; + } + + @Override + public List<String> parse(StreamMessage stream) { + try { + Map<String, String> root = mapper.readValue(stream.getRawData(), mapType); + String trafficSource = root.get("trafficsourceid"); + if ("20".equals(trafficSource) || "21".equals(trafficSource) || "22".equals(trafficSource) || "23".equals(trafficSource)) { + ArrayList<String> result = Lists.newArrayList(); + for (TblColRef column : allColumns) { + String columnName = column.getName(); + if (columnName.equalsIgnoreCase("minute_start")) { + result.add(String.valueOf(TimeUtil.getMinuteStart(Long.valueOf(root.get("timestamp"))))); + } else if (columnName.equalsIgnoreCase("hour_start")) { + result.add(String.valueOf(TimeUtil.getHourStart(Long.valueOf(root.get("timestamp"))))); + } else if (columnName.equalsIgnoreCase("day")) { + //of day start we'll add yyyy-mm-dd + long ts = TimeUtil.getDayStart(Long.valueOf(root.get("timestamp"))); + result.add(DateFormat.formatToDateStr(ts)); + } else { + String x = root.get(columnName.toLowerCase()); + result.add(x); + } + } + + return result; + } else { + return null; + } + } catch (IOException e) { + logger.error("error parsing:" + new String(stream.getRawData()), e); + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a6a9d940/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java index 67cb109..e9cb046 100644 --- a/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java +++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java @@ -48,16 +48,14 @@ public abstract class StreamBuilder implements Runnable { private static final Logger logger = LoggerFactory.getLogger(StreamBuilder.class); - private static final int BATCH_BUILD_INTERVAL_THRESHOLD = 2 * 60 * 1000; - private final int sliceSize; + private StreamParser streamParser = StringStreamParser.instance; private BlockingQueue<StreamMessage> streamMessageQueue; private long lastBuildTime = System.currentTimeMillis(); - public StreamBuilder(BlockingQueue<StreamMessage> streamMessageQueue, int sliceSize) { + public StreamBuilder(BlockingQueue<StreamMessage> streamMessageQueue) { this.streamMessageQueue = streamMessageQueue; - this.sliceSize = sliceSize; } protected abstract void build(List<StreamMessage> streamsToBuild) throws Exception; @@ -81,10 +79,11 @@ public abstract class StreamBuilder implements Runnable { logger.warn("stream queue interrupted", e); continue; } - if (streamMessage == null) { - - logger.info("The stream queue is drained, current available stream count: " + streamMessageToBuild.size()); - if ((System.currentTimeMillis() - lastBuildTime) > BATCH_BUILD_INTERVAL_THRESHOLD) { + if (streamMessage == null || getStreamParser().parse(streamMessage) == null) { + if (streamMessage == null) { + logger.info("The stream queue is drained, current available stream count: " + streamMessageToBuild.size()); + } + if ((System.currentTimeMillis() - lastBuildTime) > batchInterval()) { build(streamMessageToBuild); clearCounter(); streamMessageToBuild.clear(); @@ -98,7 +97,7 @@ public abstract class StreamBuilder implements Runnable { } } streamMessageToBuild.add(streamMessage); - if (streamMessageToBuild.size() >= this.sliceSize) { + if (streamMessageToBuild.size() >= batchSize()) { build(streamMessageToBuild); clearCounter(); streamMessageToBuild.clear(); @@ -117,4 +116,7 @@ public abstract class StreamBuilder implements Runnable { public final void setStreamParser(StreamParser streamParser) { this.streamParser = streamParser; } + + protected abstract int batchInterval(); + protected abstract int batchSize(); } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a6a9d940/streaming/src/main/java/org/apache/kylin/streaming/cube/IGTRecordWriter.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/kylin/streaming/cube/IGTRecordWriter.java b/streaming/src/main/java/org/apache/kylin/streaming/cube/IGTRecordWriter.java new file mode 100644 index 0000000..2d1e97e --- /dev/null +++ b/streaming/src/main/java/org/apache/kylin/streaming/cube/IGTRecordWriter.java @@ -0,0 +1,11 @@ +package org.apache.kylin.streaming.cube; + +import org.apache.kylin.storage.gridtable.GTRecord; + +import java.io.IOException; + +/** + */ +public interface IGTRecordWriter { + void write(Long cuboidId, GTRecord record) throws IOException; +}