Repository: kylin Updated Branches: refs/heads/master 9410b015b -> 350547e6e
http://git-wip-us.apache.org/repos/asf/kylin/blob/734a4f98/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctColumnsReducer2.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctColumnsReducer2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctColumnsReducer2.java new file mode 100644 index 0000000..b5aeef6 --- /dev/null +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctColumnsReducer2.java @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.engine.mr.steps.fdc2; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.ByteArray; +import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.model.CubeDesc; +import org.apache.kylin.engine.mr.KylinReducer; +import org.apache.kylin.engine.mr.common.AbstractHadoopJob; +import org.apache.kylin.engine.mr.common.BatchConstants; +import org.apache.kylin.engine.mr.common.CubeStatsWriter; +import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter; +import org.apache.kylin.metadata.model.TblColRef; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +/** + */ +public class FactDistinctColumnsReducer2 extends KylinReducer<SelfDefineSortableKey, Text, NullWritable, Text> { + + private List<TblColRef> columnList; + private String statisticsOutput = null; + private List<Long> baseCuboidRowCountInMappers; + protected Map<Long, HyperLogLogPlusCounter> cuboidHLLMap = null; + protected long baseCuboidId; + protected CubeDesc cubeDesc; + private long totalRowsBeforeMerge = 0; + private int samplingPercentage; + private List<ByteArray> colValues; + private TblColRef col = null; + private boolean isStatistics = false; + private boolean isPartitionCol = false; + private KylinConfig cubeConfig; + protected static final Logger logger = LoggerFactory.getLogger(FactDistinctColumnsReducer2.class); + + @Override + protected void setup(Context context) throws IOException { + super.bindCurrentConfiguration(context.getConfiguration()); + + Configuration conf = context.getConfiguration(); + KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(); + String cubeName = conf.get(BatchConstants.CFG_CUBE_NAME); + CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName); + cubeConfig = cube.getConfig(); + cubeDesc = cube.getDescriptor(); + columnList = CubeManager.getInstance(config).getAllDictColumnsOnFact(cubeDesc); + + boolean collectStatistics = Boolean.parseBoolean(conf.get(BatchConstants.CFG_STATISTICS_ENABLED)); + int numberOfTasks = context.getNumReduceTasks(); + int taskId = context.getTaskAttemptID().getTaskID().getId(); + + if (collectStatistics && (taskId == numberOfTasks - 1)) { + // hll + isStatistics = true; + statisticsOutput = conf.get(BatchConstants.CFG_STATISTICS_OUTPUT); + baseCuboidRowCountInMappers = Lists.newArrayList(); + cuboidHLLMap = Maps.newHashMap(); + samplingPercentage = Integer.parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT)); + } else if (collectStatistics && (taskId == numberOfTasks - 2)) { + // partition col + isStatistics = false; + isPartitionCol = true; + col = cubeDesc.getModel().getPartitionDesc().getPartitionDateColumnRef(); + colValues = Lists.newLinkedList(); + } else { + // col + isStatistics = false; + isPartitionCol = false; + col = columnList.get(taskId); + colValues = Lists.newLinkedList(); + } + } + + @Override + protected void doReduce(SelfDefineSortableKey skey, Iterable<Text> values, Context context) throws IOException, InterruptedException { + Text key = skey.getText(); + if (isStatistics == true) { + // for hll + long cuboidId = Bytes.toLong(key.getBytes(), 1, Bytes.SIZEOF_LONG); + for (Text value : values) { + HyperLogLogPlusCounter hll = new HyperLogLogPlusCounter(cubeConfig.getCubeStatsHLLPrecision()); + ByteBuffer bf = ByteBuffer.wrap(value.getBytes(), 0, value.getLength()); + hll.readRegisters(bf); + + totalRowsBeforeMerge += hll.getCountEstimate(); + + if (cuboidId == baseCuboidId) { + baseCuboidRowCountInMappers.add(hll.getCountEstimate()); + } + + if (cuboidHLLMap.get(cuboidId) != null) { + cuboidHLLMap.get(cuboidId).merge(hll); + } else { + cuboidHLLMap.put(cuboidId, hll); + } + } + } else if (isPartitionCol == true) { + // for partition col min/max value + ByteArray value = new ByteArray(Bytes.copy(key.getBytes(), 1, key.getLength() - 1)); + if (colValues.size() > 1) { + colValues.set(1, value); + } else { + colValues.add(value); + } + } else { + colValues.add(new ByteArray(Bytes.copy(key.getBytes(), 1, key.getLength() - 1))); + if (colValues.size() == 1000000) { //spill every 1 million + logger.info("spill values to disk..."); + outputDistinctValues(col, colValues, context); + colValues.clear(); + } + } + } + + private void outputDistinctValues(TblColRef col, Collection<ByteArray> values, Context context) throws IOException { + final Configuration conf = context.getConfiguration(); + final FileSystem fs = FileSystem.get(conf); + final String outputPath = conf.get(BatchConstants.CFG_OUTPUT_PATH); + final Path outputFile = new Path(outputPath, col.getName()); + + FSDataOutputStream out = null; + try { + if (fs.exists(outputFile)) { + out = fs.append(outputFile); + logger.info("append file " + outputFile); + } else { + out = fs.create(outputFile); + logger.info("create file " + outputFile); + } + + for (ByteArray value : values) { + out.write(value.array(), value.offset(), value.length()); + out.write('\n'); + } + } finally { + IOUtils.closeQuietly(out); + } + } + + @Override + protected void doCleanup(Context context) throws IOException, InterruptedException { + + if (isStatistics == false) { + if (colValues.size() > 0) { + outputDistinctValues(col, colValues, context); + colValues.clear(); + } + } else { + //output the hll info; + long grandTotal = 0; + for (HyperLogLogPlusCounter hll : cuboidHLLMap.values()) { + grandTotal += hll.getCountEstimate(); + } + double mapperOverlapRatio = grandTotal == 0 ? 0 : (double) totalRowsBeforeMerge / grandTotal; + + int mapperNumber = baseCuboidRowCountInMappers.size(); + + writeMapperAndCuboidStatistics(context); // for human check + CubeStatsWriter.writeCuboidStatistics(context.getConfiguration(), new Path(statisticsOutput), // + cuboidHLLMap, samplingPercentage, mapperNumber, mapperOverlapRatio); + } + } + + private void writeMapperAndCuboidStatistics(Context context) throws IOException { + Configuration conf = context.getConfiguration(); + FileSystem fs = FileSystem.get(conf); + FSDataOutputStream out = fs.create(new Path(statisticsOutput, BatchConstants.CFG_STATISTICS_CUBE_ESTIMATION_FILENAME)); + + try { + String msg; + + List<Long> allCuboids = Lists.newArrayList(); + allCuboids.addAll(cuboidHLLMap.keySet()); + Collections.sort(allCuboids); + + msg = "Total cuboid number: \t" + allCuboids.size(); + writeLine(out, msg); + msg = "Samping percentage: \t" + samplingPercentage; + writeLine(out, msg); + + writeLine(out, "The following statistics are collected based on sampling data."); + writeLine(out, "Number of Mappers: " + baseCuboidRowCountInMappers.size()); + for (int i = 0; i < baseCuboidRowCountInMappers.size(); i++) { + if (baseCuboidRowCountInMappers.get(i) > 0) { + msg = "Base Cuboid in Mapper " + i + " row count: \t " + baseCuboidRowCountInMappers.get(i); + writeLine(out, msg); + } + } + + long grantTotal = 0; + for (long i : allCuboids) { + grantTotal += cuboidHLLMap.get(i).getCountEstimate(); + msg = "Cuboid " + i + " row count is: \t " + cuboidHLLMap.get(i).getCountEstimate(); + writeLine(out, msg); + } + + msg = "Sum of all the cube segments (before merge) is: \t " + totalRowsBeforeMerge; + writeLine(out, msg); + + msg = "After merge, the cube has row count: \t " + grantTotal; + writeLine(out, msg); + + if (grantTotal > 0) { + msg = "The mapper overlap ratio is: \t" + totalRowsBeforeMerge / grantTotal; + writeLine(out, msg); + } + + } finally { + IOUtils.closeQuietly(out); + } + } + + private void writeLine(FSDataOutputStream out, String msg) throws IOException { + out.write(msg.getBytes()); + out.write('\n'); + + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/734a4f98/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/SelfDefineSortableKey.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/SelfDefineSortableKey.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/SelfDefineSortableKey.java index cadbcbf..a3351fa 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/SelfDefineSortableKey.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/SelfDefineSortableKey.java @@ -95,7 +95,8 @@ public class SelfDefineSortableKey implements WritableComparable<SelfDefineSorta @Override public void readFields(DataInput dataInput) throws IOException { - dataInput.readByte(); + this.typeId = dataInput.readByte(); + this.text = new Text(); text.readFields(dataInput); } http://git-wip-us.apache.org/repos/asf/kylin/blob/734a4f98/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapperTest.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapperTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapperTest.java index 01d47b8..df68f76 100644 --- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapperTest.java +++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapperTest.java @@ -78,7 +78,7 @@ public class MergeCuboidMapperTest extends LocalFileMetadataTestCase { values.add(new byte[] { 102, 102, 102 }); Dictionary<?> dict = DictionaryGenerator.buildDictionary(DataType.getType(newDictInfo.getDataType()), new IterableDictionaryValueEnumerator(values)); dictionaryManager.trySaveNewDict(dict, newDictInfo); - ((TrieDictionary) dict).dump(System.out); + dict.dump(System.out); return newDictInfo; } @@ -130,7 +130,7 @@ public class MergeCuboidMapperTest extends LocalFileMetadataTestCase { values.add(new byte[] { 98, 98, 98 }); Dictionary<?> dict = DictionaryGenerator.buildDictionary(DataType.getType(newDictInfo.getDataType()), new IterableDictionaryValueEnumerator(values)); dictionaryManager.trySaveNewDict(dict, newDictInfo); - ((TrieDictionary) dict).dump(System.out); + dict.dump(System.out); segment.putDictResPath(lfn, newDictInfo.getResourcePath()); segment.putDictResPath(lsi, sharedDict.getResourcePath()); http://git-wip-us.apache.org/repos/asf/kylin/blob/734a4f98/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NumberDictionaryForestTest.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NumberDictionaryForestTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NumberDictionaryForestTest.java index 70197ac..554ee9c 100644 --- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NumberDictionaryForestTest.java +++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NumberDictionaryForestTest.java @@ -35,37 +35,37 @@ import static org.junit.Assert.assertTrue; public class NumberDictionaryForestTest { @Test - public void testNumberDictionaryForestLong(){ + public void testNumberDictionaryForestLong() { List<String> list = randomLongData(10); - testData(list,TypeFlag.INTEGER_FAMILY_TYPE); + testData(list, TypeFlag.INTEGER_FAMILY_TYPE); } @Test - public void testNumberDictionaryForestDouble(){ + public void testNumberDictionaryForestDouble() { List<String> list = randomDoubleData(10); - testData(list,TypeFlag.DOUBLE_FAMILY_TYPE); + testData(list, TypeFlag.DOUBLE_FAMILY_TYPE); } - private void testData(List<String> list,TypeFlag flag){ + private void testData(List<String> list, TypeFlag flag) { //stimulate map-reduce job - ArrayList<SelfDefineSortableKey> keyList = createKeyList(list,(byte)flag.ordinal()); + ArrayList<SelfDefineSortableKey> keyList = createKeyList(list, (byte) flag.ordinal()); Collections.sort(keyList); //build tree NumberDictionaryForestBuilder<String> b = new NumberDictionaryForestBuilder<String>( - new StringBytesConverter(),0); - TrieDictionaryForestBuilder.MaxTrieTreeSize = 0; - for(SelfDefineSortableKey key : keyList){ + new StringBytesConverter(), 0, 0); + + for (SelfDefineSortableKey key : keyList) { String fieldValue = printKey(key); b.addValue(fieldValue); } NumberDictionaryForest<String> dict = b.build(); dict.dump(System.out); ArrayList<Integer> resultIds = new ArrayList<>(); - for(SelfDefineSortableKey key : keyList){ + for (SelfDefineSortableKey key : keyList) { String fieldValue = getFieldValue(key); resultIds.add(dict.getIdFromValue(fieldValue)); - assertEquals(fieldValue,dict.getValueFromId(dict.getIdFromValue(fieldValue))); + assertEquals(fieldValue, dict.getValueFromId(dict.getIdFromValue(fieldValue))); } assertTrue(isIncreasedOrder(resultIds, new Comparator<Integer>() { @Override @@ -83,7 +83,7 @@ public class NumberDictionaryForestTest { testData.add("100"); //TrieDictionaryForestBuilder.MaxTrieTreeSize = 0; NumberDictionaryForestBuilder<String> b = new NumberDictionaryForestBuilder<String>(new StringBytesConverter()); - for(String str : testData) + for (String str : testData) b.addValue(str); NumberDictionaryForest<String> dict = b.build(); dict = testSerialize(dict); @@ -94,20 +94,20 @@ public class NumberDictionaryForestTest { } @Test - public void testVerySmallDouble(){ + public void testVerySmallDouble() { List<String> testData = new ArrayList<>(); - testData.add(-1.0+""); - testData.add(Double.MIN_VALUE+""); + testData.add(-1.0 + ""); + testData.add(Double.MIN_VALUE + ""); testData.add("1.01"); testData.add("2.0"); NumberDictionaryForestBuilder<String> b = new NumberDictionaryForestBuilder<String>(new StringBytesConverter()); - for(String str : testData) + for (String str : testData) b.addValue(str); NumberDictionaryForest<String> dict = b.build(); dict.dump(System.out); NumberDictionaryBuilder<String> b2 = new NumberDictionaryBuilder<>(new StringBytesConverter()); - for(String str : testData) + for (String str : testData) b2.addValue(str); NumberDictionary<String> dict2 = b2.build(0); dict2.dump(System.out); @@ -133,31 +133,31 @@ public class NumberDictionaryForestTest { } } - private List<String> randomLongData(int count){ + private List<String> randomLongData(int count) { Random rand = new Random(System.currentTimeMillis()); ArrayList<String> list = new ArrayList<>(); - for(int i=0;i<count;i++){ - list.add(rand.nextLong()+""); + for (int i = 0; i < count; i++) { + list.add(rand.nextLong() + ""); } - list.add(Long.MAX_VALUE+""); - list.add(Long.MIN_VALUE+""); + list.add(Long.MAX_VALUE + ""); + list.add(Long.MIN_VALUE + ""); return list; } - private List<String> randomDoubleData(int count){ + private List<String> randomDoubleData(int count) { Random rand = new Random(System.currentTimeMillis()); ArrayList<String> list = new ArrayList<>(); - for(int i=0;i<count;i++){ - list.add(rand.nextDouble()+""); + for (int i = 0; i < count; i++) { + list.add(rand.nextDouble() + ""); } list.add("-1"); return list; } - private List<String> randomStringData(int count){ + private List<String> randomStringData(int count) { Random rand = new Random(System.currentTimeMillis()); ArrayList<String> list = new ArrayList<>(); - for(int i=0;i<count;i++){ + for (int i = 0; i < count; i++) { list.add(UUID.randomUUID().toString()); } list.add("123"); @@ -165,47 +165,47 @@ public class NumberDictionaryForestTest { return list; } - private ArrayList<SelfDefineSortableKey> createKeyList(List<String> strNumList,byte typeFlag){ + private ArrayList<SelfDefineSortableKey> createKeyList(List<String> strNumList, byte typeFlag) { int partationId = 0; ArrayList<SelfDefineSortableKey> keyList = new ArrayList<>(); - for(String str : strNumList){ + for (String str : strNumList) { ByteBuffer keyBuffer = ByteBuffer.allocate(4096); int offset = keyBuffer.position(); keyBuffer.put(Bytes.toBytes(partationId)[3]); keyBuffer.put(Bytes.toBytes(str)); //System.out.println(Arrays.toString(keyBuffer.array())); - byte[] valueField = Bytes.copy(keyBuffer.array(),1,keyBuffer.position()-offset-1); + byte[] valueField = Bytes.copy(keyBuffer.array(), 1, keyBuffer.position() - offset - 1); //System.out.println("new string:"+new String(valueField)); //System.out.println("arrays toString:"+Arrays.toString(valueField)); Text outputKey = new Text(); - outputKey.set(keyBuffer.array(),offset,keyBuffer.position()-offset); - SelfDefineSortableKey sortableKey = new SelfDefineSortableKey(typeFlag,outputKey); + outputKey.set(keyBuffer.array(), offset, keyBuffer.position() - offset); + SelfDefineSortableKey sortableKey = new SelfDefineSortableKey(typeFlag, outputKey); keyList.add(sortableKey); } return keyList; } - private String printKey(SelfDefineSortableKey key){ + private String printKey(SelfDefineSortableKey key) { byte[] data = key.getText().getBytes(); - byte[] fieldValue = Bytes.copy(data,1,data.length-1); - System.out.println("type flag:"+key.getTypeId()+" fieldValue:"+new String(fieldValue)); + byte[] fieldValue = Bytes.copy(data, 1, data.length - 1); + System.out.println("type flag:" + key.getTypeId() + " fieldValue:" + new String(fieldValue)); return new String(fieldValue); } - private String getFieldValue(SelfDefineSortableKey key){ + private String getFieldValue(SelfDefineSortableKey key) { byte[] data = key.getText().getBytes(); - byte[] fieldValue = Bytes.copy(data,1,data.length-1); + byte[] fieldValue = Bytes.copy(data, 1, data.length - 1); return new String(fieldValue); } - private<T> boolean isIncreasedOrder(List<T> list, Comparator<T> comp){ + private <T> boolean isIncreasedOrder(List<T> list, Comparator<T> comp) { int flag; T previous = null; - for(T t : list){ - if(previous == null) previous = t; - else{ - flag = comp.compare(previous,t); - if(flag > 0) return false; + for (T t : list) { + if (previous == null) previous = t; + else { + flag = comp.compare(previous, t); + if (flag > 0) return false; previous = t; } } http://git-wip-us.apache.org/repos/asf/kylin/blob/734a4f98/examples/test_case_data/sandbox/kylin.properties ---------------------------------------------------------------------- diff --git a/examples/test_case_data/sandbox/kylin.properties b/examples/test_case_data/sandbox/kylin.properties index 0988536..93b86c9 100644 --- a/examples/test_case_data/sandbox/kylin.properties +++ b/examples/test_case_data/sandbox/kylin.properties @@ -63,7 +63,7 @@ kylin.job.retry=0 # you will have to specify kylin.job.remote.cli.hostname, kylin.job.remote.cli.username and kylin.job.remote.cli.password # It should not be set to "true" unless you're NOT running Kylin.sh on a hadoop client machine # (Thus kylin instance has to ssh to another real hadoop client machine to execute hbase,hive,hadoop commands) -kylin.job.run.as.remote.cmd=false +kylin.job.run.as.remote.cmd=true # Only necessary when kylin.job.run.as.remote.cmd=true kylin.job.remote.cli.hostname=sandbox @@ -160,4 +160,7 @@ kylin.web.contact_mail= kylin.query.metrics.percentiles.intervals=60, 360, 3600 # Env DEV|QA|PROD -deploy.env=DEV \ No newline at end of file +deploy.env=DEV + +#default 500MB +kylin.dictionary.forest.trie.size.max_mb=500 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/734a4f98/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java b/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java index 52461c4..e1303e4 100644 --- a/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java +++ b/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java @@ -491,7 +491,10 @@ public class KylinTestBase { ITable h2Table = executeQuery(h2Conn, queryName, sql, needSort); try { + //compare before junit // compare the result + System.out.println("h2 Table rows count:"+h2Table.getRowCount()); + System.out.println("kylin Table rows count:"+kylinTable.getRowCount()); Assertion.assertEquals(h2Table, kylinTable); } catch (Throwable t) { printInfo("execAndCompQuery failed on: " + sqlFile.getAbsolutePath());
