Repository: incubator-kylin Updated Branches: refs/heads/streaming-cubing 838c8717e -> 1ecf68185
logger Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/1ecf6818 Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/1ecf6818 Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/1ecf6818 Branch: refs/heads/streaming-cubing Commit: 1ecf68185f220b88e9c489ef29fce72968be69fb Parents: 838c871 Author: qianhao.zhou <qianz...@ebay.com> Authored: Thu May 28 18:51:00 2015 +0800 Committer: qianhao.zhou <qianz...@ebay.com> Committed: Thu May 28 18:51:00 2015 +0800 ---------------------------------------------------------------------- .../kylin/job/streaming/CubeStreamBuilder.java | 58 +++++++++++--------- 1 file changed, 31 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1ecf6818/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java index 846ade5..a2a60fa 100644 --- a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java +++ b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java @@ -9,6 +9,7 @@ import com.google.common.hash.HashFunction; import com.google.common.hash.Hasher; import com.google.common.hash.Hashing; import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -88,33 +89,36 @@ public class CubeStreamBuilder extends StreamBuilder { return; } final List<List<String>> parsedStreamMessages = parseStream(streamMessages); - long startOffset = streamMessages.get(0).getOffset(); - long endOffset = streamMessages.get(streamMessages.size() - 1).getOffset(); - LinkedBlockingQueue<List<String>> blockingQueue = new LinkedBlockingQueue<List<String>>(parsedStreamMessages); - blockingQueue.put(Collections.<String>emptyList()); - - final CubeInstance cubeInstance = cubeManager.getCube(cubeName); - final CubeDesc cubeDesc = cubeInstance.getDescriptor(); - final CubeSegment cubeSegment = cubeManager.appendSegments(cubeManager.getCube(cubeName), System.currentTimeMillis()); - final Map<Long, HyperLogLogPlusCounter> samplingResult = sampling(cubeInstance.getDescriptor(), parsedStreamMessages); - - final Configuration conf = HadoopUtil.getCurrentConfiguration(); - final String outputPath = "/tmp/kylin/cuboidstatistics/" + UUID.randomUUID().toString(); - FactDistinctColumnsReducer.writeCuboidStatistics(conf, outputPath, samplingResult, 100); - ResourceStore.getStore(kylinConfig).putResource(cubeSegment.getStatisticsResourcePath(), FileSystem.get(conf).open(new Path(outputPath, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION)), 0); - - final Map<TblColRef, Dictionary<?>> dictionaryMap = buildDictionary(getTblColRefMap(cubeInstance), parsedStreamMessages); - writeDictionary(cubeSegment, dictionaryMap, startOffset, endOffset); - - final HTableInterface hTable = createHTable(cubeSegment); - - final CubeStreamRecordWriter gtRecordWriter = new CubeStreamRecordWriter(cubeDesc, hTable); - InMemCubeBuilder inMemCubeBuilder = new InMemCubeBuilder(blockingQueue, cubeInstance, - dictionaryMap, gtRecordWriter); - - executorService.submit(inMemCubeBuilder).get(); - gtRecordWriter.flush(); - commitSegment(cubeSegment); + for (List<String> parsedStreamMessage : parsedStreamMessages) { + logger.info(StringUtils.join(parsedStreamMessage, ",")); + } +// long startOffset = streamMessages.get(0).getOffset(); +// long endOffset = streamMessages.get(streamMessages.size() - 1).getOffset(); +// LinkedBlockingQueue<List<String>> blockingQueue = new LinkedBlockingQueue<List<String>>(parsedStreamMessages); +// blockingQueue.put(Collections.<String>emptyList()); +// +// final CubeInstance cubeInstance = cubeManager.getCube(cubeName); +// final CubeDesc cubeDesc = cubeInstance.getDescriptor(); +// final CubeSegment cubeSegment = cubeManager.appendSegments(cubeManager.getCube(cubeName), System.currentTimeMillis()); +// final Map<Long, HyperLogLogPlusCounter> samplingResult = sampling(cubeInstance.getDescriptor(), parsedStreamMessages); +// +// final Configuration conf = HadoopUtil.getCurrentConfiguration(); +// final String outputPath = "/tmp/kylin/cuboidstatistics/" + UUID.randomUUID().toString(); +// FactDistinctColumnsReducer.writeCuboidStatistics(conf, outputPath, samplingResult, 100); +// ResourceStore.getStore(kylinConfig).putResource(cubeSegment.getStatisticsResourcePath(), FileSystem.get(conf).open(new Path(outputPath, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION)), 0); +// +// final Map<TblColRef, Dictionary<?>> dictionaryMap = buildDictionary(getTblColRefMap(cubeInstance), parsedStreamMessages); +// writeDictionary(cubeSegment, dictionaryMap, startOffset, endOffset); +// +// final HTableInterface hTable = createHTable(cubeSegment); +// +// final CubeStreamRecordWriter gtRecordWriter = new CubeStreamRecordWriter(cubeDesc, hTable); +// InMemCubeBuilder inMemCubeBuilder = new InMemCubeBuilder(blockingQueue, cubeInstance, +// dictionaryMap, gtRecordWriter); +// +// executorService.submit(inMemCubeBuilder).get(); +// gtRecordWriter.flush(); +// commitSegment(cubeSegment); } private void writeDictionary(CubeSegment cubeSegment, Map<TblColRef, Dictionary<?>> dictionaryMap, long startOffset, long endOffset) {