http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a6a9d940/streaming/src/main/java/org/apache/kylin/streaming/cube/InMemCubeBuilder.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/kylin/streaming/cube/InMemCubeBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/cube/InMemCubeBuilder.java new file mode 100644 index 0000000..e5a5b5f --- /dev/null +++ b/streaming/src/main/java/org/apache/kylin/streaming/cube/InMemCubeBuilder.java @@ -0,0 +1,567 @@ +/* + * + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * + * contributor license agreements. See the NOTICE file distributed with + * + * this work for additional information regarding copyright ownership. + * + * The ASF licenses this file to You under the Apache License, Version 2.0 + * + * (the "License"); you may not use this file except in compliance with + * + * the License. You may obtain a copy of the License at + * + * + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * + * + * Unless required by applicable law or agreed to in writing, software + * + * distributed under the License is distributed on an "AS IS" BASIS, + * + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * + * See the License for the specific language governing permissions and + * + * limitations under the License. + * + * / + */ +package org.apache.kylin.streaming.cube; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.util.StringUtils; +import org.apache.kylin.common.hll.HyperLogLogPlusCounter; +import org.apache.kylin.common.util.ByteArray; +import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.common.util.BytesUtil; +import org.apache.kylin.common.util.Pair; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.cuboid.Cuboid; +import org.apache.kylin.cube.cuboid.CuboidScheduler; +import org.apache.kylin.cube.model.CubeDesc; +import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc; +import org.apache.kylin.cube.model.HBaseColumnDesc; +import org.apache.kylin.cube.model.HBaseColumnFamilyDesc; +import org.apache.kylin.dict.Dictionary; +import org.apache.kylin.metadata.measure.MeasureCodec; +import org.apache.kylin.metadata.model.FunctionDesc; +import org.apache.kylin.metadata.model.MeasureDesc; +import org.apache.kylin.metadata.model.TblColRef; +import org.apache.kylin.storage.cube.CubeGridTable; +import org.apache.kylin.storage.gridtable.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.*; +import java.util.concurrent.*; + +/** + */ +public class InMemCubeBuilder implements Runnable { + + private static Logger logger = LoggerFactory.getLogger(InMemCubeBuilder.class); + private static final int DEFAULT_TIMEOUT = 25; + + private BlockingQueue<List<String>> queue; + private CubeDesc desc = null; + private long baseCuboidId; + private CuboidScheduler cuboidScheduler = null; + private Map<TblColRef, Dictionary<?>> dictionaryMap = null; + private CubeJoinedFlatTableDesc intermediateTableDesc; + private MeasureCodec measureCodec; + private String[] metricsAggrFuncs = null; + private Map<Integer, Integer> dependentMeasures = null; // key: index of Measure which depends on another measure; value: index of Measure which is depended on; + public static final LongWritable ONE = new LongWritable(1l); + private int[] hbaseMeasureRefIndex; + private MeasureDesc[] measureDescs; + private int measureCount; + + protected IGTRecordWriter gtRecordWriter; + + + /** + * @param queue + * @param cube + * @param dictionaryMap + * @param gtRecordWriter + */ + public InMemCubeBuilder(BlockingQueue<List<String>> queue, CubeInstance cube, Map<TblColRef, Dictionary<?>> dictionaryMap, IGTRecordWriter gtRecordWriter) { + if (dictionaryMap == null || dictionaryMap.isEmpty()) { + throw new IllegalArgumentException("dictionary cannot be empty"); + } + this.queue = queue; + this.desc = cube.getDescriptor(); + this.cuboidScheduler = new CuboidScheduler(desc); + this.dictionaryMap = dictionaryMap; + this.gtRecordWriter = gtRecordWriter; + this.baseCuboidId = Cuboid.getBaseCuboidId(desc); + this.intermediateTableDesc = new CubeJoinedFlatTableDesc(desc, null); + this.measureCodec = new MeasureCodec(desc.getMeasures()); + + Map<String, Integer> measureIndexMap = Maps.newHashMap(); + List<String> metricsAggrFuncsList = Lists.newArrayList(); + measureCount = desc.getMeasures().size(); + + List<MeasureDesc> measureDescsList = Lists.newArrayList(); + hbaseMeasureRefIndex = new int[measureCount]; + int measureRef = 0; + for (HBaseColumnFamilyDesc familyDesc : desc.getHbaseMapping().getColumnFamily()) { + for (HBaseColumnDesc hbaseColDesc : familyDesc.getColumns()) { + for (MeasureDesc measure : hbaseColDesc.getMeasures()) { + for (int j = 0; j < measureCount; j++) { + if (desc.getMeasures().get(j).equals(measure)) { + measureDescsList.add(measure); + hbaseMeasureRefIndex[measureRef] = j; + break; + } + } + measureRef++; + } + } + } + + for (int i = 0; i < measureCount; i++) { + MeasureDesc measureDesc = measureDescsList.get(i); + metricsAggrFuncsList.add(measureDesc.getFunction().getExpression()); + measureIndexMap.put(measureDesc.getName(), i); + } + this.metricsAggrFuncs = metricsAggrFuncsList.toArray(new String[metricsAggrFuncsList.size()]); + + this.dependentMeasures = Maps.newHashMap(); + for (int i = 0; i < measureCount; i++) { + String depMsrRef = measureDescsList.get(i).getDependentMeasureRef(); + if (depMsrRef != null) { + int index = measureIndexMap.get(depMsrRef); + dependentMeasures.put(i, index); + } + } + + this.measureDescs = desc.getMeasures().toArray(new MeasureDesc[measureCount]); + } + + + private GridTable newGridTableByCuboidID(long cuboidID, boolean memStore) { + GTInfo info = CubeGridTable.newGTInfo(desc, cuboidID, dictionaryMap); + GTComboStore store = new GTComboStore(info, memStore); + GridTable gridTable = new GridTable(info, store); + return gridTable; + } + + private GridTable aggregateCuboid(GridTable parentCuboid, long parentCuboidId, long cuboidId) throws IOException { + Pair<BitSet, BitSet> columnBitSets = getDimensionAndMetricColumnBitSet(parentCuboidId); + BitSet parentDimensions = columnBitSets.getFirst(); + BitSet measureColumns = columnBitSets.getSecond(); + BitSet childDimensions = (BitSet) parentDimensions.clone(); + + long mask = Long.highestOneBit(parentCuboidId); + long childCuboidId = cuboidId; + long parentCuboidIdActualLength = Long.SIZE - Long.numberOfLeadingZeros(parentCuboidId); + int index = 0; + for (int i = 0; i < parentCuboidIdActualLength; i++) { + if ((mask & parentCuboidId) > 0) { + if ((mask & childCuboidId) == 0) { + // this dim will be aggregated + childDimensions.set(index, false); + } + index++; + } + mask = mask >> 1; + } + + return scanAndAggregateGridTable(parentCuboid, cuboidId, childDimensions, measureColumns); + + } + + private GridTable scanAndAggregateGridTable(GridTable gridTable, long cuboidId, BitSet aggregationColumns, BitSet measureColumns) throws IOException { + GTScanRequest req = new GTScanRequest(gridTable.getInfo(), null, aggregationColumns, measureColumns, metricsAggrFuncs, null); + IGTScanner scanner = gridTable.scan(req); + GridTable newGridTable = newGridTableByCuboidID(cuboidId, true); + GTBuilder builder = newGridTable.rebuild(); + + BitSet allNeededColumns = new BitSet(); + allNeededColumns.or(aggregationColumns); + allNeededColumns.or(measureColumns); + + GTRecord newRecord = new GTRecord(newGridTable.getInfo()); + int counter = 0; + ByteArray byteArray = new ByteArray(8); + ByteBuffer byteBuffer = ByteBuffer.allocate(8); + try { + BitSet dependentMetrics = new BitSet(allNeededColumns.cardinality()); + for (Integer i : dependentMeasures.keySet()) { + dependentMetrics.set((allNeededColumns.cardinality() - measureCount + dependentMeasures.get(i))); + } + + Object[] hllObjects = new Object[dependentMeasures.keySet().size()]; + + for (GTRecord record : scanner) { + counter++; + for (int i = allNeededColumns.nextSetBit(0), index = 0; i >= 0; i = allNeededColumns.nextSetBit(i + 1), index++) { + newRecord.set(index, record.get(i)); + } + + if(dependentMeasures.size() > 0) { + // update measures which have 'dependent_measure_ref' + newRecord.getValues(dependentMetrics, hllObjects); + + for (Integer i : dependentMeasures.keySet()) { + for (int index = 0, c = dependentMetrics.nextSetBit(0); c >= 0; index++, c = dependentMetrics.nextSetBit(c + 1)) { + if (c == allNeededColumns.cardinality() - measureCount + dependentMeasures.get(i)) { + assert hllObjects[index] instanceof HyperLogLogPlusCounter; // currently only HLL is allowed + + byteBuffer.clear(); + BytesUtil.writeVLong(((HyperLogLogPlusCounter) hllObjects[index]).getCountEstimate(), byteBuffer); + byteArray.set(byteBuffer.array(), 0, byteBuffer.position()); + newRecord.set(allNeededColumns.cardinality() - measureCount + i, byteArray); + } + } + + } + } + + builder.write(newRecord); + } + } finally { + builder.close(); + } + logger.info("Cuboid " + cuboidId + " has rows: " + counter); + + return newGridTable; + } + + private Pair<BitSet, BitSet> getDimensionAndMetricColumnBitSet(long cuboidId) { + BitSet bitSet = BitSet.valueOf(new long[]{cuboidId}); + BitSet dimension = new BitSet(); + dimension.set(0, bitSet.cardinality()); + BitSet metrics = new BitSet(); + metrics.set(bitSet.cardinality(), bitSet.cardinality() + this.measureCount); + return new Pair<BitSet, BitSet>(dimension, metrics); + } + + private Object[] buildKey(List<String> row) { + int keySize = intermediateTableDesc.getRowKeyColumnIndexes().length; + Object[] key = new Object[keySize]; + + for (int i = 0; i < keySize; i++) { + key[i] = row.get(intermediateTableDesc.getRowKeyColumnIndexes()[i]); + } + + return key; + } + + private Object[] buildValue(List<String> row) { + + Object[] values = new Object[measureCount]; + MeasureDesc measureDesc = null; + + for (int position = 0; position < hbaseMeasureRefIndex.length; position++) { + int i = hbaseMeasureRefIndex[position]; + measureDesc = measureDescs[i]; + + Object value = null; + int[] flatTableIdx = intermediateTableDesc.getMeasureColumnIndexes()[i]; + FunctionDesc function = desc.getMeasures().get(i).getFunction(); + if (function.isCount() || function.isHolisticCountDistinct()) { + // note for holistic count distinct, this value will be ignored + value = ONE; + } else if (flatTableIdx == null) { + value = measureCodec.getSerializer(i).valueOf(measureDesc.getFunction().getParameter().getValue()); + } else if (flatTableIdx.length == 1) { + value = measureCodec.getSerializer(i).valueOf(Bytes.toBytes(row.get(flatTableIdx[0]))); + } else { + + byte[] result = null; + for (int x = 0; x < flatTableIdx.length; x++) { + byte[] split = Bytes.toBytes(row.get(flatTableIdx[x])); + if (result == null) { + result = Arrays.copyOf(split, split.length); + } else { + byte[] newResult = new byte[result.length + split.length]; + System.arraycopy(result, 0, newResult, 0, result.length); + System.arraycopy(split, 0, newResult, result.length, split.length); + result = newResult; + } + } + value = measureCodec.getSerializer(i).valueOf(result); + } + values[position] = value; + } + return values; + } + + + @Override + public void run() { + try { + logger.info("Create base cuboid " + baseCuboidId); + final GridTable baseCuboidGT = newGridTableByCuboidID(baseCuboidId, false); + + GTBuilder baseGTBuilder = baseCuboidGT.rebuild(); + final GTRecord baseGTRecord = new GTRecord(baseCuboidGT.getInfo()); + + IGTScanner queueScanner = new IGTScanner() { + + @Override + public Iterator<GTRecord> iterator() { + return new Iterator<GTRecord>() { + + List<String> currentObject = null; + + @Override + public boolean hasNext() { + try { + currentObject = queue.take(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return currentObject != null && currentObject.size() > 0; + } + + @Override + public GTRecord next() { + if (currentObject.size() == 0) + throw new IllegalStateException(); + + buildGTRecord(currentObject, baseGTRecord); + return baseGTRecord; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + + @Override + public void close() throws IOException { + } + + @Override + public GTInfo getInfo() { + return baseCuboidGT.getInfo(); + } + + @Override + public int getScannedRowCount() { + return 0; + } + + @Override + public int getScannedRowBlockCount() { + return 0; + } + }; + + Pair<BitSet, BitSet> dimensionMetricsBitSet = getDimensionAndMetricColumnBitSet(baseCuboidId); + GTScanRequest req = new GTScanRequest(baseCuboidGT.getInfo(), null, dimensionMetricsBitSet.getFirst(), dimensionMetricsBitSet.getSecond(), metricsAggrFuncs, null); + IGTScanner aggregationScanner = new GTAggregateScanner(queueScanner, req); + + int counter = 0; + for (GTRecord r : aggregationScanner) { + baseGTBuilder.write(r); + counter++; + } + baseGTBuilder.close(); + aggregationScanner.close(); + + logger.info("Base cuboid has " + counter + " rows;"); + SimpleGridTableTree tree = new SimpleGridTableTree(); + tree.data = baseCuboidGT; + tree.id = baseCuboidId; + tree.parent = null; + if (counter > 0) { + List<Long> children = cuboidScheduler.getSpanningCuboid(baseCuboidId); + Collections.sort(children); + for (Long childId : children) { + createNDCuboidGT(tree, baseCuboidId, childId); + } + } + outputGT(baseCuboidId, baseCuboidGT); + dropStore(baseCuboidGT); + + } catch (IOException e) { + logger.error("Fail to build cube", e); + throw new RuntimeException(e); + } + + } + + private void buildGTRecord(List<String> row, GTRecord record) { + + Object[] dimensions = buildKey(row); + Object[] metricsValues = buildValue(row); + Object[] recordValues = new Object[dimensions.length + metricsValues.length]; + System.arraycopy(dimensions, 0, recordValues, 0, dimensions.length); + System.arraycopy(metricsValues, 0, recordValues, dimensions.length, metricsValues.length); + record.setValues(recordValues); + } + + private boolean gc(TreeNode<GridTable> parentNode) { + final List<TreeNode<GridTable>> gridTables = parentNode.getAncestorList(); + logger.info("trying to select node to flush to disk, from:" + StringUtils.join(",", gridTables)); + for (TreeNode<GridTable> gridTable : gridTables) { + final GTComboStore store = (GTComboStore) gridTable.data.getStore(); + if (store.memoryUsage() > 0) { + logger.info("cuboid id:" + gridTable.id + " flush to disk"); + long t = System.currentTimeMillis(); + store.switchToDiskStore(); + logger.info("switch to disk store cost:" + (System.currentTimeMillis() - t) + "ms"); + waitForGc(); + return true; + } + } + logger.warn("all ancestor nodes of " + parentNode.id + " has been flushed to disk"); + return false; + + } + + private GridTable createChildCuboid(final GridTable parentCuboid, final long parentCuboidId, final long cuboidId) { + final ExecutorService executorService = Executors.newSingleThreadExecutor(); + final Future<GridTable> task = executorService.submit(new Callable<GridTable>() { + @Override + public GridTable call() throws Exception { + return aggregateCuboid(parentCuboid, parentCuboidId, cuboidId); + } + }); + try { + final GridTable gridTable = task.get(DEFAULT_TIMEOUT, TimeUnit.SECONDS); + return gridTable; + } catch (InterruptedException e) { + throw new RuntimeException("this should not happen", e); + } catch (ExecutionException e) { + if (e.getCause() instanceof OutOfMemoryError) { + logger.warn("Future.get() OutOfMemory, stop the thread"); + } else { + throw new RuntimeException("this should not happen", e); + } + } catch (TimeoutException e) { + logger.warn("Future.get() timeout, stop the thread"); + } + logger.info("shutdown executor service"); + final List<Runnable> runnables = executorService.shutdownNow(); + try { + executorService.awaitTermination(DEFAULT_TIMEOUT, TimeUnit.SECONDS); + waitForGc(); + } catch (InterruptedException e) { + throw new RuntimeException("this should not happen", e); + } + return null; + + } + + private void waitForGc() { + System.gc(); + logger.info("wait 5 seconds for gc"); + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + throw new RuntimeException("should not happen", e); + } + } + + private void createNDCuboidGT(SimpleGridTableTree parentNode, long parentCuboidId, long cuboidId) throws IOException { + + long startTime = System.currentTimeMillis(); + +// GTComboStore parentStore = (GTComboStore) parentNode.data.getStore(); +// if (parentStore.memoryUsage() <= 0) { +// long t = System.currentTimeMillis(); +// parentStore.switchToMemStore(); +// logger.info("node " + parentNode.id + " switch to mem store cost:" + (System.currentTimeMillis() - t) + "ms"); +// } + + GridTable currentCuboid; + while (true) { + logger.info("Calculating cuboid " + cuboidId + " from parent " + parentCuboidId); + currentCuboid = createChildCuboid(parentNode.data, parentCuboidId, cuboidId); + if (currentCuboid != null) { + break; + } else { + logger.warn("create child cuboid:" + cuboidId + " from parent:" + parentCuboidId + " failed, prepare to gc"); + if (gc(parentNode)) { + continue; + } else { + logger.warn("all parent node has been flushed into disk, memory is still insufficient"); + throw new RuntimeException("all parent node has been flushed into disk, memory is still insufficient"); + } + } + } + SimpleGridTableTree node = new SimpleGridTableTree(); + node.parent = parentNode; + node.data = currentCuboid; + node.id = cuboidId; + parentNode.children.add(node); + + logger.info("Cuboid " + cuboidId + " build takes " + (System.currentTimeMillis() - startTime) + "ms"); + + List<Long> children = cuboidScheduler.getSpanningCuboid(cuboidId); + if (!children.isEmpty()) { + Collections.sort(children); // sort cuboids + for (Long childId : children) { + createNDCuboidGT(node, cuboidId, childId); + } + } + + + //output the grid table + outputGT(cuboidId, currentCuboid); + dropStore(currentCuboid); + parentNode.children.remove(node); + if (parentNode.children.size() > 0) { + logger.info("cuboid:" + cuboidId + " has finished, parent node:" + parentNode.id + " need to switch to mem store"); + ((GTComboStore) parentNode.data.getStore()).switchToMemStore(); + } + } + + private void dropStore(GridTable gt) throws IOException { + ((GTComboStore) gt.getStore()).drop(); + } + + + private void outputGT(Long cuboidId, GridTable gridTable) throws IOException { + long startTime = System.currentTimeMillis(); + GTScanRequest req = new GTScanRequest(gridTable.getInfo(), null, null, null); + IGTScanner scanner = gridTable.scan(req); + for (GTRecord record : scanner) { + this.gtRecordWriter.write(cuboidId, record); + } + logger.info("Cuboid" + cuboidId + " output takes " + (System.currentTimeMillis() - startTime) + "ms"); + } + + private static class TreeNode<T> { + T data; + long id; + TreeNode<T> parent; + List<TreeNode<T>> children = Lists.newArrayList(); + + List<TreeNode<T>> getAncestorList() { + ArrayList<TreeNode<T>> result = Lists.newArrayList(); + TreeNode<T> parent = this; + while (parent != null) { + result.add(parent); + parent = parent.parent; + } + return Lists.reverse(result); + } + + @Override + public String toString() { + return id + ""; + } + } + + private static class SimpleGridTableTree extends TreeNode<GridTable> { + } + + +}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a6a9d940/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java index 755b490..56435bd 100644 --- a/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java +++ b/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java @@ -61,16 +61,19 @@ import java.util.concurrent.TimeUnit; public class IIStreamBuilder extends StreamBuilder { private static Logger logger = LoggerFactory.getLogger(IIStreamBuilder.class); + private static final int BATCH_BUILD_INTERVAL_THRESHOLD = 2 * 60 * 1000; private final IIDesc desc; private final HTableInterface hTable; private final SliceBuilder sliceBuilder; private final int shardId; private final String streaming; + private final int batchSize; private StreamingManager streamingManager; public IIStreamBuilder(BlockingQueue<StreamMessage> queue, String streaming, String hTableName, IIDesc iiDesc, int shard) { - super(queue, iiDesc.getSliceSize()); + super(queue); + this.batchSize = iiDesc.getSliceSize(); this.streaming = streaming; this.desc = iiDesc; this.shardId = shard; @@ -117,6 +120,16 @@ public class IIStreamBuilder extends StreamBuilder { } } + @Override + protected int batchInterval() { + return BATCH_BUILD_INTERVAL_THRESHOLD; + } + + @Override + protected int batchSize() { + return batchSize; + } + private void loadToHBase(HTableInterface hTable, Slice slice, IIKeyValueCodec codec) throws IOException { List<Put> data = Lists.newArrayList(); for (IIRow row : codec.encodeKeyValue(slice)) { http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a6a9d940/streaming/src/test/java/org/apache/kylin/streaming/cube/InMemCubeBuilderBenchmarkTest.java ---------------------------------------------------------------------- diff --git a/streaming/src/test/java/org/apache/kylin/streaming/cube/InMemCubeBuilderBenchmarkTest.java b/streaming/src/test/java/org/apache/kylin/streaming/cube/InMemCubeBuilderBenchmarkTest.java new file mode 100644 index 0000000..b530cdc --- /dev/null +++ b/streaming/src/test/java/org/apache/kylin/streaming/cube/InMemCubeBuilderBenchmarkTest.java @@ -0,0 +1,117 @@ +package org.apache.kylin.streaming.cube; + +import com.google.common.collect.Maps; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.LocalFileMetadataTestCase; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.model.CubeDesc; +import org.apache.kylin.cube.model.DimensionDesc; +import org.apache.kylin.dict.Dictionary; +import org.apache.kylin.metadata.model.TblColRef; +import org.apache.kylin.storage.gridtable.GTRecord; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; + +/** + */ +public class InMemCubeBuilderBenchmarkTest extends LocalFileMetadataTestCase { + + private static final Logger logger = LoggerFactory.getLogger(InMemCubeBuilderBenchmarkTest.class); + + private static final int BENCHMARK_RECORD_LIMIT = 2000000; + private static final String CUBE_NAME = "test_kylin_cube_with_slr_1_new_segment"; + + @Before + public void setUp() throws Exception { + this.createTestMetadata(); + } + + @After + public void after() throws Exception { + this.cleanupTestMetadata(); + } + + private Map<TblColRef, Dictionary<?>> getDictionaryMap(CubeSegment cubeSegment) { + final Map<TblColRef, Dictionary<?>> dictionaryMap = Maps.newHashMap(); + final CubeDesc desc = cubeSegment.getCubeDesc(); + for (DimensionDesc dim : desc.getDimensions()) { + // dictionary + for (TblColRef col : dim.getColumnRefs()) { + if (desc.getRowkey().isUseDictionary(col)) { + Dictionary dict = cubeSegment.getDictionary(col); + if (dict == null) { + throw new IllegalArgumentException("Dictionary for " + col + " was not found."); + } + logger.info("Dictionary for " + col + " was put into dictionary map."); + dictionaryMap.put(col, cubeSegment.getDictionary(col)); + } + } + } + return dictionaryMap; + } + + private static class ConsoleGTRecordWriter implements IGTRecordWriter { + + boolean verbose = false; + + @Override + public void write(Long cuboidId, GTRecord record) throws IOException { + if (verbose) + System.out.println(record.toString()); + } + } + + private void loadDataFromLocalFile(LinkedBlockingQueue queue) throws IOException, InterruptedException { + BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("../table.txt"))); + String line; + int counter = 0; + while ((line = br.readLine()) != null) { + queue.put(Arrays.asList(line.split("\t"))); + counter++; + if (counter == BENCHMARK_RECORD_LIMIT) { + break; + } + } + queue.put(Collections.emptyList()); + } + + private void loadDataFromRandom(LinkedBlockingQueue queue) throws IOException, InterruptedException { + queue.put(Collections.emptyList()); + } + + + @Test + public void test() throws Exception { + final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); + final CubeManager cubeManager = CubeManager.getInstance(kylinConfig); + final CubeInstance cube = cubeManager.getCube(CUBE_NAME); + final CubeSegment cubeSegment = cube.getFirstSegment(); + + LinkedBlockingQueue queue = new LinkedBlockingQueue<List<String>>(); + + InMemCubeBuilder cubeBuilder = new InMemCubeBuilder(queue, cube, getDictionaryMap(cubeSegment), new ConsoleGTRecordWriter()); + ExecutorService executorService = Executors.newSingleThreadExecutor(); + Future<?> future = executorService.submit(cubeBuilder); + loadDataFromLocalFile(queue); + future.get(); + logger.info("stream build finished"); + } +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a6a9d940/streaming/src/test/java/org/apache/kylin/streaming/invertedindex/PrintOutStreamBuilder.java ---------------------------------------------------------------------- diff --git a/streaming/src/test/java/org/apache/kylin/streaming/invertedindex/PrintOutStreamBuilder.java b/streaming/src/test/java/org/apache/kylin/streaming/invertedindex/PrintOutStreamBuilder.java deleted file mode 100644 index b3d7742..0000000 --- a/streaming/src/test/java/org/apache/kylin/streaming/invertedindex/PrintOutStreamBuilder.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * - * contributor license agreements. See the NOTICE file distributed with - * - * this work for additional information regarding copyright ownership. - * - * The ASF licenses this file to You under the Apache License, Version 2.0 - * - * (the "License"); you may not use this file except in compliance with - * - * the License. You may obtain a copy of the License at - * - * - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * - * - * Unless required by applicable law or agreed to in writing, software - * - * distributed under the License is distributed on an "AS IS" BASIS, - * - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * - * See the License for the specific language governing permissions and - * - * limitations under the License. - * - * / - */ - -package org.apache.kylin.streaming.invertedindex; - -import org.apache.commons.lang3.StringUtils; -import org.apache.kylin.metadata.model.TblColRef; -import org.apache.kylin.streaming.JsonStreamParser; -import org.apache.kylin.streaming.StreamMessage; -import org.apache.kylin.streaming.StreamBuilder; - -import java.util.List; -import java.util.concurrent.BlockingQueue; - -/** - */ -public class PrintOutStreamBuilder extends StreamBuilder { - - private final List<TblColRef> allColumns; - - public PrintOutStreamBuilder(BlockingQueue<StreamMessage> streamMessageQueue, int sliceSize, List<TblColRef> allColumns) { - super(streamMessageQueue, sliceSize); - setStreamParser(new JsonStreamParser(allColumns)); - this.allColumns = allColumns; - } - - @Override - protected void build(List<StreamMessage> streamsToBuild) throws Exception { - for (StreamMessage streamMessage : streamsToBuild) { - final List<String> row = getStreamParser().parse(streamMessage); - System.out.println("offset:" + streamMessage.getOffset() + " " + StringUtils.join(row, ",")); - } - } - - @Override - protected void onStop() { - - } -}