Repository: incubator-kylin Updated Branches: refs/heads/2.x-staging effd9a821 -> c3880be69
KYLIN-980 FactDistinctColumnsJob support high cardinality columns Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/c3880be6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/c3880be6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/c3880be6 Branch: refs/heads/2.x-staging Commit: c3880be697ce6038604e4dfa5cd950710b5e811e Parents: effd9a8 Author: shaofengshi <shaofeng...@apache.org> Authored: Tue Nov 24 16:42:35 2015 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Tue Nov 24 16:43:06 2015 +0800 ---------------------------------------------------------------------- .../mr/steps/FactDistinctColumnsReducer.java | 55 +++++++++++++------- 1 file changed, 36 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c3880be6/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java index 568dd77..66d63fb 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java @@ -20,11 +20,7 @@ package org.apache.kylin.engine.mr.steps; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Map; +import java.util.*; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; @@ -96,24 +92,23 @@ public class FactDistinctColumnsReducer extends KylinReducer<LongWritable, Text, TblColRef col = columnList.get((int) key.get()); HashSet<ByteArray> set = new HashSet<ByteArray>(); - for (Text textValue : values) { + + Text textValue = null; + Iterator<Text> valueItr = values.iterator(); + while (valueItr.hasNext()) { + textValue = valueItr.next(); ByteArray value = new ByteArray(Bytes.copy(textValue.getBytes(), 0, textValue.getLength())); set.add(value); - } - - Configuration conf = context.getConfiguration(); - FileSystem fs = FileSystem.get(conf); - String outputPath = conf.get(BatchConstants.OUTPUT_PATH); - FSDataOutputStream out = fs.create(new Path(outputPath, col.getName())); - - try { - for (ByteArray value : set) { - out.write(value.array(), value.offset(), value.length()); - out.write('\n'); + if (set.size() >= 5000000) { // output when count reach 5 Million + outputDistinctValues(col, set, context); + set.clear(); } - } finally { - out.close(); } + + if (set.isEmpty() == false) { + outputDistinctValues(col, set, context); + } + } else { // for hll long cuboidId = 0 - key.get(); @@ -138,6 +133,28 @@ public class FactDistinctColumnsReducer extends KylinReducer<LongWritable, Text, } } + + private void outputDistinctValues(TblColRef col, Set<ByteArray> set, Context context) throws IOException { + final Configuration conf = context.getConfiguration(); + final FileSystem fs = FileSystem.get(conf); + final String outputPath = conf.get(BatchConstants.OUTPUT_PATH); + final Path outputFile = new Path(outputPath, col.getName()); + FSDataOutputStream out; + if (fs.exists(outputFile)) { + out = fs.append(outputFile); + } else { + out = fs.create(outputFile); + } + + try { + for (ByteArray value : set) { + out.write(value.array(), value.offset(), value.length()); + out.write('\n'); + } + } finally { + out.close(); + } + } @Override protected void cleanup(Context context) throws IOException, InterruptedException {