KYLIN-2217 Code review, refactor IDictionaryBuilder
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/30cb1ac6 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/30cb1ac6 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/30cb1ac6 Branch: refs/heads/KYLIN-1875 Commit: 30cb1ac678b063ac164acd27591e8a1d0becafa0 Parents: 1af08e4 Author: Li Yang <[email protected]> Authored: Wed Nov 30 15:30:15 2016 +0800 Committer: Yang Li <[email protected]> Committed: Wed Nov 30 21:00:17 2016 +0800 ---------------------------------------------------------------------- .../kylin/cube/cli/DictionaryGeneratorCLI.java | 2 +- .../apache/kylin/dict/DictionaryGenerator.java | 165 ++++++++++++------- .../apache/kylin/dict/DictionaryManager.java | 16 +- .../apache/kylin/dict/DictionaryProvider.java | 4 +- .../dict/DictionaryReducerLocalGenerator.java | 156 ------------------ .../kylin/dict/GlobalDictionaryBuilder.java | 36 ++-- .../apache/kylin/dict/IDictionaryBuilder.java | 13 +- .../dict/IDictionaryReducerLocalBuilder.java | 31 ---- .../kylin/dict/DictionaryProviderTest.java | 63 +++---- .../engine/mr/steps/CreateDictionaryJob.java | 42 ++--- .../mr/steps/FactDistinctColumnsReducer.java | 115 +++++-------- .../mr/steps/FactDistinctHiveColumnsMapper.java | 9 +- .../mr/steps/UpdateCubeInfoAfterBuildStep.java | 6 +- 13 files changed, 231 insertions(+), 427 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/30cb1ac6/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java b/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java index a4e1df0..163c6ca 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java @@ -60,7 +60,7 @@ public class DictionaryGeneratorCLI { for (TblColRef col : cubeSeg.getCubeDesc().getAllColumnsNeedDictionaryBuilt()) { logger.info("Building dictionary for " + col); ReadableTable inpTable = decideInputTable(cubeSeg.getModel(), col, factTableValueProvider); - if (config.isReducerLocalBuildDict() && dictProvider != null) { + if (dictProvider != null) { Dictionary<String> dict = dictProvider.getDictionary(col); if (dict != null) { cubeMgr.saveDictionary(cubeSeg, col, inpTable, dict); http://git-wip-us.apache.org/repos/asf/kylin/blob/30cb1ac6/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java index 810a392..cd13d59 100644 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java @@ -19,12 +19,11 @@ package org.apache.kylin.dict; import java.io.IOException; -import java.text.ParseException; -import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.List; import org.apache.commons.lang.StringUtils; +import org.apache.kylin.common.util.DateFormat; import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.metadata.datatype.DataType; import org.slf4j.Logger; @@ -40,9 +39,7 @@ public class DictionaryGenerator { private static final Logger logger = LoggerFactory.getLogger(DictionaryGenerator.class); - private static final String[] DATE_PATTERNS = new String[] { "yyyy-MM-dd", "yyyyMMdd" }; - - public static Dictionary<String> buildDictionary(DataType dataType, IDictionaryValueEnumerator valueEnumerator) throws IOException { + public static IDictionaryBuilder newDictionaryBuilder(DataType dataType) { Preconditions.checkNotNull(dataType, "dataType cannot be null"); // build dict, case by data type @@ -57,16 +54,33 @@ public class DictionaryGenerator { } else { builder = new StringDictBuilder(); } + return builder; + } - return buildDictionary(builder, null, valueEnumerator); + public static Dictionary<String> buildDictionary(DataType dataType, IDictionaryValueEnumerator valueEnumerator) throws IOException { + return buildDictionary(newDictionaryBuilder(dataType), null, valueEnumerator); } - public static Dictionary<String> buildDictionary(IDictionaryBuilder builder, DictionaryInfo dictInfo, IDictionaryValueEnumerator valueEnumerator) throws IOException { + static Dictionary<String> buildDictionary(IDictionaryBuilder builder, DictionaryInfo dictInfo, IDictionaryValueEnumerator valueEnumerator) throws IOException { int baseId = 0; // always 0 for now int nSamples = 5; ArrayList<String> samples = new ArrayList<String>(nSamples); - Dictionary<String> dict = builder.build(dictInfo, valueEnumerator, baseId, nSamples, samples); + // init the builder + builder.init(dictInfo, baseId); + + // add values + while (valueEnumerator.moveNext()) { + String value = valueEnumerator.current(); + + boolean accept = builder.addValue(value); + + if (accept && samples.size() < nSamples && samples.contains(value) == false) + samples.add(value); + } + + // build + Dictionary<String> dict = builder.build(); // log a few samples StringBuilder buf = new StringBuilder(); @@ -88,81 +102,114 @@ public class DictionaryGenerator { } private static class DateDictBuilder implements IDictionaryBuilder { + private static final String[] DATE_PATTERNS = new String[] { "yyyy-MM-dd", "yyyyMMdd" }; + + private int baseId; + private String datePattern; + @Override - public Dictionary<String> build(DictionaryInfo dictInfo, IDictionaryValueEnumerator valueEnumerator, int baseId, int nSamples, ArrayList<String> returnSamples) throws IOException { - final int BAD_THRESHOLD = 0; - String matchPattern = null; - String value; - - for (String ptn : DATE_PATTERNS) { - matchPattern = ptn; // be optimistic - int badCount = 0; - SimpleDateFormat sdf = new SimpleDateFormat(ptn); - while (valueEnumerator.moveNext()) { - value = valueEnumerator.current(); - if (value == null || value.length() == 0) - continue; + public void init(DictionaryInfo info, int baseId) throws IOException { + this.baseId = baseId; + } + @Override + public boolean addValue(String value) { + if (StringUtils.isBlank(value)) // empty string is treated as null + return false; + + // detect date pattern on the first value + if (datePattern == null) { + for (String p : DATE_PATTERNS) { try { - sdf.parse(value); - if (returnSamples.size() < nSamples && returnSamples.contains(value) == false) - returnSamples.add(value); - } catch (ParseException e) { - logger.info("Unrecognized date value: " + value); - badCount++; - if (badCount > BAD_THRESHOLD) { - matchPattern = null; - break; - } + DateFormat.stringToDate(value, p); + datePattern = p; + break; + } catch (Exception e) { + // continue; } } - if (matchPattern != null) { - return new DateStrDictionary(matchPattern, baseId); - } + if (datePattern == null) + throw new IllegalArgumentException("Unknown date pattern for input value: " + value); } + + // check the date format + DateFormat.stringToDate(value, datePattern); + return true; + } + + @Override + public Dictionary<String> build() throws IOException { + if (datePattern == null) + datePattern = DATE_PATTERNS[0]; - throw new IllegalStateException("Unrecognized datetime value"); + return new DateStrDictionary(datePattern, baseId); } } private static class TimeDictBuilder implements IDictionaryBuilder { + @Override - public Dictionary<String> build(DictionaryInfo dictInfo, IDictionaryValueEnumerator valueEnumerator, int baseId, int nSamples, ArrayList<String> returnSamples) throws IOException { + public void init(DictionaryInfo info, int baseId) throws IOException { + } + + @Override + public boolean addValue(String value) { + if (StringUtils.isBlank(value)) // empty string is treated as null + return false; + + // check the time format + DateFormat.stringToMillis(value); + return true; + } + + @Override + public Dictionary<String> build() throws IOException { return new TimeStrDictionary(); // base ID is always 0 } } private static class StringDictBuilder implements IDictionaryBuilder { + TrieDictionaryForestBuilder builder; + @Override - public Dictionary<String> build(DictionaryInfo dictInfo, IDictionaryValueEnumerator valueEnumerator, int baseId, int nSamples, ArrayList<String> returnSamples) throws IOException { - TrieDictionaryForestBuilder builder = new TrieDictionaryForestBuilder(new StringBytesConverter(), baseId); - String value; - while (valueEnumerator.moveNext()) { - value = valueEnumerator.current(); - if (value == null) - continue; - builder.addValue(value); - if (returnSamples.size() < nSamples && returnSamples.contains(value) == false) - returnSamples.add(value); - } + public void init(DictionaryInfo info, int baseId) throws IOException { + builder = new TrieDictionaryForestBuilder(new StringBytesConverter(), baseId); + } + + @Override + public boolean addValue(String value) { + if (value == null) + return false; + + builder.addValue(value); + return true; + } + + @Override + public Dictionary<String> build() throws IOException { return builder.build(); } } private static class NumberDictBuilder implements IDictionaryBuilder { + NumberDictionaryForestBuilder builder; + @Override - public Dictionary<String> build(DictionaryInfo dictInfo, IDictionaryValueEnumerator valueEnumerator, int baseId, int nSamples, ArrayList<String> returnSamples) throws IOException { - NumberDictionaryForestBuilder builder = new NumberDictionaryForestBuilder(baseId); - String value; - while (valueEnumerator.moveNext()) { - value = valueEnumerator.current(); - if (StringUtils.isBlank(value)) // empty string is null for numbers - continue; - - builder.addValue(value); - if (returnSamples.size() < nSamples && returnSamples.contains(value) == false) - returnSamples.add(value); - } + public void init(DictionaryInfo info, int baseId) throws IOException { + builder = new NumberDictionaryForestBuilder(baseId); + } + + @Override + public boolean addValue(String value) { + if (StringUtils.isBlank(value)) // empty string is treated as null + return false; + + builder.addValue(value); + return true; + } + + @Override + public Dictionary<String> build() throws IOException { return builder.build(); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/30cb1ac6/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java index 0caef14..54bc1c4 100644 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java @@ -87,10 +87,6 @@ public class DictionaryManager { private KylinConfig config; private LoadingCache<String, DictionaryInfo> dictCache; // resource - - // path ==> - // DictionaryInfo - private DictionaryManager(KylinConfig config) { this.config = config; this.dictCache = CacheBuilder.newBuilder().removalListener(new RemovalListener<String, DictionaryInfo>() { @@ -276,12 +272,10 @@ public class DictionaryManager { return buildDictionary(model, col, inpTable, null); } - public DictionaryInfo buildDictionary(DataModelDesc model, TblColRef col, ReadableTable inpTable, String builderClass) throws IOException { if (inpTable.exists() == false) return null; - logger.info("building dictionary for " + col); DictionaryInfo dictInfo = createDictionaryInfo(model, col, inpTable); @@ -303,10 +297,12 @@ public class DictionaryManager { IDictionaryValueEnumerator columnValueEnumerator = null; try { columnValueEnumerator = new TableColumnValueEnumerator(inpTable.getReader(), dictInfo.getSourceColumnIndex()); - if (builderClass == null) + if (builderClass == null) { dictionary = DictionaryGenerator.buildDictionary(DataType.getType(dictInfo.getDataType()), columnValueEnumerator); - else - dictionary = DictionaryGenerator.buildDictionary((IDictionaryBuilder) ClassUtil.newInstance(builderClass), dictInfo, columnValueEnumerator); + } else { + IDictionaryBuilder builder = (IDictionaryBuilder) ClassUtil.newInstance(builderClass); + dictionary = DictionaryGenerator.buildDictionary(builder, dictInfo, columnValueEnumerator); + } } catch (Exception ex) { throw new RuntimeException("Failed to create dictionary on " + col, ex); } finally { @@ -365,7 +361,7 @@ public class DictionaryManager { while (join != null) { if (join.isInnerJoin() == false) return false; - + TableRef table = join.getFKSide(); join = model.getJoinByPKSide(table); } http://git-wip-us.apache.org/repos/asf/kylin/blob/30cb1ac6/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryProvider.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryProvider.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryProvider.java index 6387535..8476f5c 100644 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryProvider.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryProvider.java @@ -17,6 +17,8 @@ */ package org.apache.kylin.dict; +import java.io.IOException; + import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.metadata.model.TblColRef; @@ -24,5 +26,5 @@ import org.apache.kylin.metadata.model.TblColRef; * Created by xiefan on 16-11-23. */ public interface DictionaryProvider { - public Dictionary<String> getDictionary(TblColRef col); + public Dictionary<String> getDictionary(TblColRef col) throws IOException; } http://git-wip-us.apache.org/repos/asf/kylin/blob/30cb1ac6/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryReducerLocalGenerator.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryReducerLocalGenerator.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryReducerLocalGenerator.java deleted file mode 100644 index 35d379a..0000000 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryReducerLocalGenerator.java +++ /dev/null @@ -1,156 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ -package org.apache.kylin.dict; - -import com.google.common.base.Preconditions; -import org.apache.kylin.common.util.Dictionary; -import org.apache.kylin.metadata.datatype.DataType; - -import java.text.ParseException; -import java.text.SimpleDateFormat; - -/** - * Created by xiefan on 16-11-16. - * - * TODO:sample,mergeDict - */ -public class DictionaryReducerLocalGenerator { - - private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DictionaryReducerLocalGenerator.class); - - private static final String[] DATE_PATTERNS = new String[] { "yyyy-MM-dd", "yyyyMMdd" }; - - public static IDictionaryReducerLocalBuilder getBuilder(DataType dataType) { - Preconditions.checkNotNull(dataType, "dataType cannot be null"); - - IDictionaryReducerLocalBuilder builder; - if (dataType.isDateTimeFamily()) { - if (dataType.isDate()) - builder = new DateDictBuilder(); - else - builder = new TimeDictBuilder(); - } else if (dataType.isNumberFamily()) { - builder = new NumberDictBuilder(0); - } else { - builder = new StringDictBuilder(0); - } - return builder; - } - - private static class DateDictBuilder implements IDictionaryReducerLocalBuilder { - - private static final String[] DATE_PATTERNS = new String[] { "yyyy-MM-dd", "yyyyMMdd" }; - - private String matchPattern = null; - - private boolean isRecognizeFormat = false; - - private SimpleDateFormat sdf; - - @Override - public Dictionary<String> build(int baseId) throws Exception { - if (isRecognizeFormat) { - return new DateStrDictionary(matchPattern, baseId); - } else { - throw new IllegalStateException("Date format not match"); - } - } - - @Override - public void addValue(String value) throws Exception { - if (matchPattern == null) { //init match pattern - for (String ptn : DATE_PATTERNS) { - matchPattern = ptn; - SimpleDateFormat sdf = new SimpleDateFormat(ptn); - try { - sdf.parse(value); - isRecognizeFormat = true; - break; - } catch (ParseException e) { - - } - } - sdf = new SimpleDateFormat(matchPattern); - } - if (!isRecognizeFormat) { - throw new IllegalStateException("Date format not match"); - } - try { - sdf.parse(value); - } catch (ParseException e) { - isRecognizeFormat = false; - logger.info("Unrecognized date value: " + value); - } - } - - } - - private static class TimeDictBuilder implements IDictionaryReducerLocalBuilder { - - @Override - public Dictionary<String> build(int baseId) { - return new TimeStrDictionary(); - } - - @Override - public void addValue(String value) { - - } - - } - - private static class StringDictBuilder implements IDictionaryReducerLocalBuilder { - - private TrieDictionaryForestBuilder<String> builder; - - public StringDictBuilder(int baseId) { - builder = new TrieDictionaryForestBuilder<String>(new StringBytesConverter(), 0); - } - - @Override - public Dictionary<String> build(int baseId) { - return builder.build(); - } - - @Override - public void addValue(String value) { - builder.addValue(value); - } - - } - - public static class NumberDictBuilder implements IDictionaryReducerLocalBuilder { - - private NumberDictionaryForestBuilder builder; - - public NumberDictBuilder(int baseId) { - builder = new NumberDictionaryForestBuilder(baseId); - } - - @Override - public Dictionary<String> build(int baseId) { - return builder.build(); - } - - @Override - public void addValue(String value) { - builder.addValue(value); - } - - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/30cb1ac6/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java index 7adc262..b2a3664 100644 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java @@ -36,8 +36,11 @@ import org.slf4j.LoggerFactory; public class GlobalDictionaryBuilder implements IDictionaryBuilder { private static final Logger logger = LoggerFactory.getLogger(GlobalDictionaryBuilder.class); + AppendTrieDictionary.Builder<String> builder; + int baseId; + @Override - public Dictionary<String> build(DictionaryInfo dictInfo, IDictionaryValueEnumerator valueEnumerator, int baseId, int nSamples, ArrayList<String> returnSamples) throws IOException { + public void init(DictionaryInfo dictInfo, int baseId) throws IOException { if (dictInfo == null) { throw new IllegalArgumentException("GlobalDictinaryBuilder must used with an existing DictionaryInfo"); } @@ -55,28 +58,31 @@ public class GlobalDictionaryBuilder implements IDictionaryBuilder { } } - AppendTrieDictionary.Builder<String> builder; if (appendDicts.isEmpty()) { logger.info("GlobalDict {} is empty, create new one", dictInfo.getResourceDir()); - builder = AppendTrieDictionary.Builder.create(dictDir); + this.builder = AppendTrieDictionary.Builder.create(dictDir); } else if (appendDicts.size() == 1) { logger.info("GlobalDict {} exist, append value", appendDicts.get(0)); AppendTrieDictionary dict = (AppendTrieDictionary) DictionaryManager.getInstance(KylinConfig.getInstanceFromEnv()).getDictionary(appendDicts.get(0)); - builder = AppendTrieDictionary.Builder.create(dict); + this.builder = AppendTrieDictionary.Builder.create(dict); } else { throw new IllegalStateException(String.format("GlobalDict %s should have 0 or 1 append dict but %d", dictInfo.getResourceDir(), appendDicts.size())); } - - String value; - while (valueEnumerator.moveNext()) { - value = valueEnumerator.current(); - if (value == null) { - continue; - } - builder.addValue(value); - if (returnSamples.size() < nSamples && returnSamples.contains(value) == false) - returnSamples.add(value); - } + + this.baseId = baseId; + } + + @Override + public boolean addValue(String value) { + if (value == null) + return false; + + builder.addValue(value); + return true; + } + + @Override + public Dictionary<String> build() throws IOException { return builder.build(baseId); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/30cb1ac6/core-dictionary/src/main/java/org/apache/kylin/dict/IDictionaryBuilder.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/IDictionaryBuilder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/IDictionaryBuilder.java index 8f95a2a..0934a7d 100644 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/IDictionaryBuilder.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/IDictionaryBuilder.java @@ -19,11 +19,20 @@ package org.apache.kylin.dict; import java.io.IOException; -import java.util.ArrayList; import org.apache.kylin.common.util.Dictionary; +/** + * An once-only builder for dictionary. + */ public interface IDictionaryBuilder { - Dictionary<String> build(DictionaryInfo dictInfo, IDictionaryValueEnumerator valueEnumerator, int baseId, int nSamples, ArrayList<String> returnSamples) throws IOException; + /** Sets the dictionary info for the dictionary being built. Mainly for GlobalDictionaryBuilder. */ + void init(DictionaryInfo info, int baseId) throws IOException; + + /** Add a new value into dictionary, returns it is accepted (not null) or not. */ + boolean addValue(String value); + + /** Build the dictionary */ + Dictionary<String> build() throws IOException; } http://git-wip-us.apache.org/repos/asf/kylin/blob/30cb1ac6/core-dictionary/src/main/java/org/apache/kylin/dict/IDictionaryReducerLocalBuilder.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/IDictionaryReducerLocalBuilder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/IDictionaryReducerLocalBuilder.java deleted file mode 100644 index 19b1d28..0000000 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/IDictionaryReducerLocalBuilder.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.dict; - -import org.apache.kylin.common.util.Dictionary; - -/** - * Created by xiefan on 16-11-16. - */ -public interface IDictionaryReducerLocalBuilder { - Dictionary<String> build(int baseId) throws Exception; - - void addValue(String value) throws Exception; - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/30cb1ac6/core-dictionary/src/test/java/org/apache/kylin/dict/DictionaryProviderTest.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/test/java/org/apache/kylin/dict/DictionaryProviderTest.java b/core-dictionary/src/test/java/org/apache/kylin/dict/DictionaryProviderTest.java index 0225737..a4aee76 100644 --- a/core-dictionary/src/test/java/org/apache/kylin/dict/DictionaryProviderTest.java +++ b/core-dictionary/src/test/java/org/apache/kylin/dict/DictionaryProviderTest.java @@ -1,26 +1,20 @@ package org.apache.kylin.dict; -import org.apache.kylin.common.util.ClassUtil; -import org.apache.kylin.common.util.Dictionary; -import org.apache.kylin.metadata.datatype.DataType; -import org.apache.kylin.metadata.model.TblColRef; -import org.junit.Test; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; -import java.io.BufferedOutputStream; -import java.io.BufferedWriter; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; -import java.io.IOException; -import java.io.OutputStreamWriter; -import java.lang.reflect.ParameterizedType; import java.util.Arrays; import java.util.Iterator; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import org.apache.kylin.common.util.ClassUtil; +import org.apache.kylin.common.util.Dictionary; +import org.apache.kylin.metadata.datatype.DataType; +import org.junit.Test; /** * Created by xiefan on 16-11-23. @@ -28,53 +22,48 @@ import static org.junit.Assert.fail; public class DictionaryProviderTest { @Test - public void testReadWrite() throws Exception{ + public void testReadWrite() throws Exception { //string dict - Dictionary<String> dict = getDict(DataType.getType("string"), - Arrays.asList(new String[]{"a","b"}).iterator()); + Dictionary<String> dict = getDict(DataType.getType("string"), Arrays.asList(new String[] { "a", "b" }).iterator()); readWriteTest(dict); //number dict - Dictionary<String> dict2 = getDict(DataType.getType("long"), - Arrays.asList(new String[]{"1","2"}).iterator()); + Dictionary<String> dict2 = getDict(DataType.getType("long"), Arrays.asList(new String[] { "1", "2" }).iterator()); readWriteTest(dict2); //date dict - Dictionary<String> dict3 = getDict(DataType.getType("datetime"), - Arrays.asList(new String[]{"20161122","20161123"}).iterator()); + Dictionary<String> dict3 = getDict(DataType.getType("datetime"), Arrays.asList(new String[] { "20161122", "20161123" }).iterator()); readWriteTest(dict3); //date dict - Dictionary<String> dict4 = getDict(DataType.getType("datetime"), - Arrays.asList(new String[]{"2016-11-22","2016-11-23"}).iterator()); + Dictionary<String> dict4 = getDict(DataType.getType("datetime"), Arrays.asList(new String[] { "2016-11-22", "2016-11-23" }).iterator()); readWriteTest(dict4); //date dict try { - Dictionary<String> dict5 = getDict(DataType.getType("date"), - Arrays.asList(new String[]{"2016-11-22", "20161122"}).iterator()); + Dictionary<String> dict5 = getDict(DataType.getType("date"), Arrays.asList(new String[] { "2016-11-22", "20161122" }).iterator()); readWriteTest(dict5); fail("Date format not correct.Should throw exception"); - }catch (IllegalStateException e){ + } catch (IllegalArgumentException e) { //correct } } @Test - public void testReadWriteTime(){ + public void testReadWriteTime() { System.out.println(Long.MAX_VALUE); System.out.println(Long.MIN_VALUE); } - - private Dictionary<String> getDict(DataType type, Iterator<String> values) throws Exception{ - IDictionaryReducerLocalBuilder builder = DictionaryReducerLocalGenerator.getBuilder(type); - while(values.hasNext()){ + private Dictionary<String> getDict(DataType type, Iterator<String> values) throws Exception { + IDictionaryBuilder builder = DictionaryGenerator.newDictionaryBuilder(type); + builder.init(null, 0); + while (values.hasNext()) { builder.addValue(values.next()); } - return builder.build(0); + return builder.build(); } - private void readWriteTest(Dictionary<String> dict) throws Exception{ + private void readWriteTest(Dictionary<String> dict) throws Exception { final String path = "src/test/resources/dict/tmp_dict"; File f = new File(path); f.deleteOnExit(); @@ -93,15 +82,9 @@ public class DictionaryProviderTest { String dictClassName2 = in.readUTF(); dict2 = (Dictionary<String>) ClassUtil.newInstance(dictClassName2); dict2.readFields(in); - }catch(IOException e){ - e.printStackTrace(); - }finally { - if(in != null){ - try { - in.close(); - } catch (IOException e) { - e.printStackTrace(); - } + } finally { + if (in != null) { + in.close(); } } assertTrue(dict.equals(dict2)); http://git-wip-us.apache.org/repos/asf/kylin/blob/30cb1ac6/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java index 63005f9..4985503 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java @@ -18,8 +18,10 @@ package org.apache.kylin.engine.mr.steps; +import java.io.IOException; + import org.apache.commons.cli.Options; -import org.apache.hadoop.conf.Configuration; +import org.apache.commons.io.IOUtils; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -36,16 +38,8 @@ import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.source.ReadableTable; -import java.io.IOException; - -/** - * @author ysong1 - */ - public class CreateDictionaryJob extends AbstractHadoopJob { - private int returnCode = 0; - @Override public int run(String[] args) throws Exception { Options options = new Options(); @@ -68,38 +62,28 @@ public class CreateDictionaryJob extends AbstractHadoopJob { }, new DictionaryProvider() { @Override - public Dictionary<String> getDictionary(TblColRef col) { - if (!config.isReducerLocalBuildDict()) { + public Dictionary<String> getDictionary(TblColRef col) throws IOException { + Path colDir = new Path(factColumnsInputPath, col.getName()); + Path dictFile = new Path(colDir, col.getName() + FactDistinctColumnsReducer.DICT_FILE_POSTFIX); + FileSystem fs = HadoopUtil.getFileSystem(dictFile.toString()); + if (fs.exists(dictFile) == false) return null; - } + FSDataInputStream is = null; try { - Path colDir = new Path(factColumnsInputPath, col.getName()); - Path outputFile = new Path(colDir, col.getName() + FactDistinctColumnsReducer.DICT_FILE_POSTFIX); - Configuration conf = HadoopUtil.getCurrentConfiguration(); - FileSystem fs = HadoopUtil.getFileSystem(outputFile.getName()); - is = fs.open(outputFile); + is = fs.open(dictFile); String dictClassName = is.readUTF(); Dictionary<String> dict = (Dictionary<String>) ClassUtil.newInstance(dictClassName); dict.readFields(is); - logger.info("DictionaryProvider read dict form file : " + outputFile.getName()); + logger.info("DictionaryProvider read dict from file: " + dictFile); return dict; - } catch (Exception e) { - e.printStackTrace(); - return null; } finally { - if (is != null) { - try { - is.close(); - } catch (IOException e) { - e.printStackTrace(); - } - } + IOUtils.closeQuietly(is); } } }); - return returnCode; + return 0; } public static void main(String[] args) throws Exception { http://git-wip-us.apache.org/repos/asf/kylin/blob/30cb1ac6/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java index 5511626..8933ee2 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java @@ -27,8 +27,6 @@ import java.util.List; import java.util.Map; import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.time.FastDateFormat; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; @@ -43,14 +41,13 @@ import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.model.CubeDesc; -import org.apache.kylin.dict.DictionaryReducerLocalGenerator; -import org.apache.kylin.dict.IDictionaryReducerLocalBuilder; +import org.apache.kylin.dict.DictionaryGenerator; +import org.apache.kylin.dict.IDictionaryBuilder; import org.apache.kylin.engine.mr.KylinReducer; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.engine.mr.common.BatchConstants; import org.apache.kylin.engine.mr.common.CubeStatsWriter; import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter; -import org.apache.kylin.metadata.datatype.DataType; import org.apache.kylin.metadata.model.TblColRef; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,6 +59,8 @@ import com.google.common.collect.Maps; */ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableKey, Text, NullWritable, Text> { + protected static final Logger logger = LoggerFactory.getLogger(FactDistinctColumnsReducer.class); + private List<TblColRef> columnList; private String statisticsOutput = null; private List<Long> baseCuboidRowCountInMappers; @@ -75,19 +74,16 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK private boolean isStatistics = false; private KylinConfig cubeConfig; private int uhcReducerCount; - private Map<Integer, Integer> ReducerIdToColumnIndex = new HashMap<>(); + private Map<Integer, Integer> reducerIdToColumnIndex = new HashMap<>(); private int taskId; - protected static final Logger logger = LoggerFactory.getLogger(FactDistinctColumnsReducer.class); - //local build dict private boolean isReducerLocalBuildDict; - private IDictionaryReducerLocalBuilder builder; - private FastDateFormat dateFormat; + private IDictionaryBuilder builder; private long timeMaxValue = Long.MIN_VALUE; private long timeMinValue = Long.MAX_VALUE; - public static final String DICT_FILE_POSTFIX = ".RLD"; - public static final String PARTITION_COL_INFO_FILE_POSTFIX = ".PCI"; + public static final String DICT_FILE_POSTFIX = ".rldict"; + public static final String PARTITION_COL_INFO_FILE_POSTFIX = ".pci"; private boolean isPartitionCol = false; @Override @@ -121,43 +117,29 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK isPartitionCol = true; col = cubeDesc.getModel().getPartitionDesc().getPartitionDateColumnRef(); colValues = Lists.newLinkedList(); - DataType partitionColType = col.getType(); - if (partitionColType.isDate()) { - dateFormat = DateFormat.getDateFormat(DateFormat.DEFAULT_DATE_PATTERN); - } else if (partitionColType.isDatetime() || partitionColType.isTimestamp()) { - dateFormat = DateFormat.getDateFormat(DateFormat.DEFAULT_DATETIME_PATTERN_WITHOUT_MILLISECONDS); - } else if (partitionColType.isStringFamily()) { - String partitionDateFormat = cubeDesc.getModel().getPartitionDesc().getPartitionDateFormat(); - if (StringUtils.isEmpty(partitionDateFormat)) { - partitionDateFormat = DateFormat.DEFAULT_DATE_PATTERN; - } - dateFormat = DateFormat.getDateFormat(partitionDateFormat); - } else { - throw new IllegalStateException("Type " + partitionColType + " is not valid partition column type"); - } } else { - // col + // normal col isStatistics = false; - col = columnList.get(ReducerIdToColumnIndex.get(taskId)); + col = columnList.get(reducerIdToColumnIndex.get(taskId)); colValues = Lists.newLinkedList(); + + // local build dict + isReducerLocalBuildDict = config.isReducerLocalBuildDict(); + if (col != null && isReducerLocalBuildDict) { + builder = DictionaryGenerator.newDictionaryBuilder(col.getType()); + builder.init(null, 0); + } } - - //local build dict - isReducerLocalBuildDict = config.isReducerLocalBuildDict(); - if (col != null && isReducerLocalBuildDict) { - builder = DictionaryReducerLocalGenerator.getBuilder(col.getType()); - } - } private void initReducerIdToColumnIndex(KylinConfig config) throws IOException { int[] uhcIndex = CubeManager.getInstance(config).getUHCIndex(cubeDesc); int count = 0; for (int i = 0; i < uhcIndex.length; i++) { - ReducerIdToColumnIndex.put(count * (uhcReducerCount - 1) + i, i); + reducerIdToColumnIndex.put(count * (uhcReducerCount - 1) + i, i); if (uhcIndex[i] == 1) { for (int j = 1; j < uhcReducerCount; j++) { - ReducerIdToColumnIndex.put(count * (uhcReducerCount - 1) + j + i, i); + reducerIdToColumnIndex.put(count * (uhcReducerCount - 1) + j + i, i); } count++; } @@ -167,7 +149,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK @Override public void doReduce(SelfDefineSortableKey skey, Iterable<Text> values, Context context) throws IOException, InterruptedException { Text key = skey.getText(); - if (isStatistics == true) { + if (isStatistics) { // for hll long cuboidId = Bytes.toLong(key.getBytes(), 1, Bytes.SIZEOF_LONG); for (Text value : values) { @@ -187,20 +169,17 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK cuboidHLLMap.put(cuboidId, hll); } } + } else if (isPartitionCol) { + // partition col + String value = Bytes.toString(key.getBytes(), 1, key.getLength() - 1); + long time = DateFormat.stringToMillis(value); + timeMinValue = Math.min(timeMinValue, time); + timeMaxValue = Math.max(timeMaxValue, time); } else { + // normal col if (isReducerLocalBuildDict) { - String value = new String(key.getBytes(), 1, key.getLength() - 1); - //partition col - try { - if (isPartitionCol) { - long time = dateFormat.parse(value).getTime(); - timeMinValue = Math.min(timeMinValue, time); - timeMaxValue = Math.max(timeMaxValue, time); - } - builder.addValue(value); - } catch (Exception e) { - e.printStackTrace(); - } + String value = Bytes.toString(key.getBytes(), 1, key.getLength() - 1); + builder.addValue(value); } else { colValues.add(new ByteArray(Bytes.copy(key.getBytes(), 1, key.getLength() - 1))); if (colValues.size() == 1000000) { //spill every 1 million @@ -210,7 +189,6 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK } } } - } private void outputDistinctValues(TblColRef col, Collection<ByteArray> values, Context context) throws IOException { @@ -286,25 +264,8 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK @Override protected void doCleanup(Context context) throws IOException, InterruptedException { - if (isStatistics == false) { - if (isReducerLocalBuildDict) { - try { - if (isPartitionCol) { - outputPartitionInfo(context); - } - Dictionary<String> dict = builder.build(0); - outputDict(col, dict, context); - } catch (Exception e) { - e.printStackTrace(); - } - } else { - if (colValues.size() > 0) { - outputDistinctValues(col, colValues, context); - colValues.clear(); - } - } - } else { - //output the hll info; + if (isStatistics) { + // output the hll info long grandTotal = 0; for (HyperLogLogPlusCounter hll : cuboidHLLMap.values()) { grandTotal += hll.getCountEstimate(); @@ -316,6 +277,20 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK writeMapperAndCuboidStatistics(context); // for human check CubeStatsWriter.writeCuboidStatistics(context.getConfiguration(), new Path(statisticsOutput), // cuboidHLLMap, samplingPercentage, mapperNumber, mapperOverlapRatio); + } else if (isPartitionCol) { + // partition col + outputPartitionInfo(context); + } else { + // normal col + if (isReducerLocalBuildDict) { + Dictionary<String> dict = builder.build(); + outputDict(col, dict, context); + } else { + if (colValues.size() > 0) { + outputDistinctValues(col, colValues, context); + colValues.clear(); + } + } } } http://git-wip-us.apache.org/repos/asf/kylin/blob/30cb1ac6/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java index 762047b..a5c8fc0 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java @@ -97,15 +97,8 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMap // if partition col not on cube, no need needFetchPartitionCol = false; } else { - for (int x : dictionaryColumnIndex) { - if (x == partitionColumnIndex) { - // if partition col already build dict, no need - needFetchPartitionCol = false; - break; - } - } + needFetchPartitionCol = true; } - } } http://git-wip-us.apache.org/repos/asf/kylin/blob/30cb1ac6/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java index 977196c..d3becfe 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java @@ -21,7 +21,6 @@ package org.apache.kylin.engine.mr.steps; import java.io.IOException; import org.apache.commons.io.IOUtils; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -84,8 +83,7 @@ public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable { final String factColumnsInputPath = this.getParams().get(BatchConstants.CFG_OUTPUT_PATH); Path colDir = new Path(factColumnsInputPath, partitionCol.getName()); Path outputFile = new Path(colDir, partitionCol.getName() + FactDistinctColumnsReducer.PARTITION_COL_INFO_FILE_POSTFIX); - Configuration conf = HadoopUtil.getCurrentConfiguration(); - FileSystem fs = HadoopUtil.getFileSystem(outputFile.getName()); + FileSystem fs = HadoopUtil.getFileSystem(outputFile.toString()); FSDataInputStream is = null; long minValue = Long.MAX_VALUE, maxValue = Long.MIN_VALUE; try { @@ -94,8 +92,6 @@ public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable { long max = is.readLong(); minValue = Math.min(min, minValue); maxValue = Math.max(max, maxValue); - } catch (IOException e) { - throw new IOException(e); } finally { IOUtils.closeQuietly(is); }
