This is an automated email from the ASF dual-hosted git repository. shaofengshi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 637f45d8444c7b52713780c1701d33d6656fffc0 Author: Zhong <[email protected]> AuthorDate: Wed Aug 15 17:40:54 2018 +0800 KYLIN-3491 add a shrunken global dictionary step to improve the encoding process Signed-off-by: shaofengshi <[email protected]> --- .../org/apache/kylin/common/KylinConfigBase.java | 4 + .../java/org/apache/kylin/cube/model/CubeDesc.java | 4 + .../org/apache/kylin/dict/ShrunkenDictionary.java | 159 +++++++++++++++++++++ .../kylin/dict/ShrunkenDictionaryBuilder.java | 49 +++++++ .../apache/kylin/dict/ShrunkenDictionaryTest.java | 77 ++++++++++ .../kylin/job/constant/ExecutableConstants.java | 1 + .../kylin/engine/mr/BatchCubingJobBuilder2.java | 10 ++ .../java/org/apache/kylin/engine/mr/IMRInput.java | 4 + .../apache/kylin/engine/mr/JobBuilderSupport.java | 22 +++ .../kylin/engine/mr/common/AbstractHadoopJob.java | 3 + .../kylin/engine/mr/common/BaseCuboidBuilder.java | 6 +- .../kylin/engine/mr/common/BatchConstants.java | 1 + .../engine/mr/common/DictionaryGetterUtil.java | 76 ++++++++++ .../engine/mr/steps/BaseCuboidMapperBase.java | 10 +- .../apache/kylin/engine/mr/steps/CuboidJob.java | 5 + ...ob.java => ExtractDictionaryFromGlobalJob.java} | 108 ++++++-------- .../steps/ExtractDictionaryFromGlobalMapper.java | 141 ++++++++++++++++++ .../kylin/engine/mr/steps/InMemCuboidJob.java | 5 + .../engine/mr/steps/InMemCuboidMapperBase.java | 15 +- .../org/apache/kylin/source/hive/HiveMRInput.java | 9 ++ .../apache/kylin/source/kafka/KafkaMRInput.java | 7 + .../kylin/storage/hbase/steps/HBaseJobSteps.java | 1 + 22 files changed, 633 insertions(+), 84 deletions(-) diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 925488f..58d9caa 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -430,6 +430,10 @@ abstract public class KylinConfigBase implements Serializable { return Double.parseDouble(getOptional("kylin.snapshot.ext.local.cache.max-size-gb", "200")); } + public boolean isShrunkenDictFromGlobalEnabled() { + return Boolean.parseBoolean(this.getOptional("kylin.dictionary.shrunken-from-global-enabled", "false")); + } + // ============================================================================ // CUBE // ============================================================================ diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java index 95c8b40..15d67ea 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java @@ -1480,6 +1480,10 @@ public class CubeDesc extends RootPersistentEntity implements IEngineAware { } return globalDictCols; } + + public boolean isShrunkenDictFromGlobalEnabled() { + return config.isShrunkenDictFromGlobalEnabled() && !getAllGlobalDictColumns().isEmpty(); + } // UHC (ultra high cardinality column): contain the ShardByColumns and the GlobalDictionaryColumns public List<TblColRef> getAllUHCColumns() { diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/ShrunkenDictionary.java b/core-dictionary/src/main/java/org/apache/kylin/dict/ShrunkenDictionary.java new file mode 100644 index 0000000..35c995e --- /dev/null +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/ShrunkenDictionary.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.dict; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.PrintStream; +import java.util.Map; + +import org.apache.kylin.common.util.Dictionary; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; + +public class ShrunkenDictionary<T> extends Dictionary<T> { + + private ImmutableMap<T, Integer> valueToIdMap; + private ImmutableMap<Integer, T> idToValueMap; + + private int minId; + private int maxId; + private int sizeOfId; + private int sizeOfValue; + + private ValueSerializer<T> valueSerializer; + + public ShrunkenDictionary(ValueSerializer<T> valueSerializer) { // default constructor for Writable interface + this.valueSerializer = valueSerializer; + } + + public ShrunkenDictionary(ValueSerializer<T> valueSerializer, int minId, int maxId, int sizeOfId, int sizeOfValue, + Map<T, Integer> valueToIdMap) { + this.valueSerializer = valueSerializer; + + this.minId = minId; + this.maxId = maxId; + this.sizeOfId = sizeOfId; + this.sizeOfValue = sizeOfValue; + + Preconditions.checkNotNull(valueToIdMap); + this.valueToIdMap = ImmutableMap.<T, Integer> builder().putAll(valueToIdMap).build(); + } + + @Override + public int getMinId() { + return minId; + } + + @Override + public int getMaxId() { + return maxId; + } + + @Override + public int getSizeOfId() { + return sizeOfId; + } + + @Override + public int getSizeOfValue() { + return sizeOfValue; + } + + @Override + public boolean contains(Dictionary<?> another) { + return false; + } + + protected int getIdFromValueImpl(T value, int roundingFlag) { + Integer id = valueToIdMap.get(value); + if (id == null) { + return -1; + } + return id; + } + + protected T getValueFromIdImpl(int id) { + if (idToValueMap == null) { + idToValueMap = buildIdToValueMap(); + } + return idToValueMap.get(id); + } + + private ImmutableMap<Integer, T> buildIdToValueMap() { + ImmutableMap.Builder<Integer, T> idToValueMapBuilder = ImmutableMap.builder(); + for (T value : valueToIdMap.keySet()) { + idToValueMapBuilder.put(valueToIdMap.get(value), value); + } + return idToValueMapBuilder.build(); + } + + public void dump(PrintStream out) { + out.println(String.format("Total %d values for ShrunkenDictionary", valueToIdMap.size())); + } + + public void write(DataOutput out) throws IOException { + out.writeInt(minId); + out.writeInt(maxId); + out.writeInt(sizeOfId); + out.writeInt(sizeOfValue); + + out.writeInt(valueToIdMap.size()); + for (T value : valueToIdMap.keySet()) { + valueSerializer.writeValue(out, value); + out.writeInt(valueToIdMap.get(value)); + } + } + + public void readFields(DataInput in) throws IOException { + this.minId = in.readInt(); + this.maxId = in.readInt(); + this.sizeOfId = in.readInt(); + this.sizeOfValue = in.readInt(); + + int sizeValueMap = in.readInt(); + ImmutableMap.Builder<T, Integer> valueToIdMapBuilder = ImmutableMap.builder(); + for (int i = 0; i < sizeValueMap; i++) { + T value = valueSerializer.readValue(in); + int id = in.readInt(); + valueToIdMapBuilder.put(value, id); + } + this.valueToIdMap = valueToIdMapBuilder.build(); + } + + public interface ValueSerializer<T> { + void writeValue(DataOutput out, T value) throws IOException; + + T readValue(DataInput in) throws IOException; + } + + public static class StringValueSerializer implements ValueSerializer<String> { + @Override + public void writeValue(DataOutput out, String value) throws IOException { + out.writeUTF(value); + } + + @Override + public String readValue(DataInput in) throws IOException { + return in.readUTF(); + } + } +} diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/ShrunkenDictionaryBuilder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/ShrunkenDictionaryBuilder.java new file mode 100644 index 0000000..ab3df5e --- /dev/null +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/ShrunkenDictionaryBuilder.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.dict; + +import java.util.Map; + +import org.apache.kylin.common.util.Dictionary; +import org.apache.kylin.dict.ShrunkenDictionary.ValueSerializer; + +import com.google.common.collect.Maps; + +public class ShrunkenDictionaryBuilder<T> { + + private Map<T, Integer> valueToIdMap; + + private Dictionary<T> fullDict; + + public ShrunkenDictionaryBuilder(Dictionary<T> fullDict) { + this.fullDict = fullDict; + + this.valueToIdMap = Maps.newHashMap(); + } + + public void addValue(T value) { + int id = fullDict.getIdFromValue(value); + valueToIdMap.put(value, id); + } + + public ShrunkenDictionary<T> build(ValueSerializer<T> valueSerializer) { + return new ShrunkenDictionary<>(valueSerializer, fullDict.getMinId(), fullDict.getMaxId(), + fullDict.getSizeOfId(), fullDict.getSizeOfValue(), valueToIdMap); + } +} diff --git a/core-dictionary/src/test/java/org/apache/kylin/dict/ShrunkenDictionaryTest.java b/core-dictionary/src/test/java/org/apache/kylin/dict/ShrunkenDictionaryTest.java new file mode 100644 index 0000000..7a86e5f --- /dev/null +++ b/core-dictionary/src/test/java/org/apache/kylin/dict/ShrunkenDictionaryTest.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.dict; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.ArrayList; + +import org.apache.kylin.common.util.Dictionary; +import org.junit.Assert; +import org.junit.Test; + +public class ShrunkenDictionaryTest { + + @Test + public void testStringDictionary() { + ArrayList<String> strList = new ArrayList<String>(); + strList.add(""); + strList.add("part"); + strList.add("par"); + strList.add("partition"); + strList.add("party"); + strList.add("parties"); + strList.add("paint"); + + TrieDictionaryBuilder<String> dictBuilder = new TrieDictionaryBuilder<>(new StringBytesConverter()); + for (String str : strList) { + dictBuilder.addValue(str); + } + Dictionary<String> dict = dictBuilder.build(0); + + ShrunkenDictionary.StringValueSerializer valueSerializer = new ShrunkenDictionary.StringValueSerializer(); + ShrunkenDictionaryBuilder<String> shrunkenDictBuilder = new ShrunkenDictionaryBuilder<>(dict); + for (int i = 0; i < strList.size(); i += 2) { + shrunkenDictBuilder.addValue(strList.get(i)); + } + Dictionary<String> shrunkenDict = shrunkenDictBuilder.build(valueSerializer); + + try { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(bos); + + shrunkenDict.write(dos); + + ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray()); + DataInputStream dis = new DataInputStream(bis); + + Dictionary<String> dShrunkenDict = new ShrunkenDictionary<>(valueSerializer); + dShrunkenDict.readFields(dis); + + for (int i = 0; i < strList.size(); i += 2) { + String value = strList.get(i); + Assert.assertEquals(dict.getIdFromValue(value), dShrunkenDict.getIdFromValue(value)); + } + } catch (IOException e) { + } + } +} diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java index c805f8a..560293c 100644 --- a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java +++ b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java @@ -34,6 +34,7 @@ public final class ExecutableConstants { public static final String SOURCE_RECORDS_COUNT = "source_records_count"; public static final String SOURCE_RECORDS_SIZE = "source_records_size"; + public static final String STEP_NAME_EXTRACT_DICTIONARY_FROM_GLOBAL = "Extract Dictionary from Global Dictionary"; public static final String STEP_NAME_BUILD_DICTIONARY = "Build Dimension Dictionary"; public static final String STEP_NAME_BUILD_UHC_DICTIONARY = "Build UHC Dictionary"; public static final String STEP_NAME_CREATE_FLAT_HIVE_TABLE = "Create Intermediate Flat Hive Table"; diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java index 5498365..1695a22 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java @@ -73,6 +73,10 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport { outputSide.addStepPhase2_BuildDictionary(result); + if (seg.getCubeDesc().isShrunkenDictFromGlobalEnabled()) { + result.addTask(createExtractDictionaryFromGlobalJob(jobId)); + } + // Phase 3: Build Cube addLayerCubingSteps(result, jobId, cuboidRootPath); // layer cubing, only selected algorithm will execute addInMemCubingSteps(result, jobId, cuboidRootPath); // inmem cubing, only selected algorithm will execute @@ -124,6 +128,9 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport { appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, cuboidRootPath); appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Cube_Builder_" + seg.getRealization().getName()); appendExecCmdParameters(cmd, BatchConstants.ARG_CUBING_JOB_ID, jobId); + if (seg.getCubeDesc().isShrunkenDictFromGlobalEnabled()) { + appendExecCmdParameters(cmd, BatchConstants.ARG_SHRUNKEN_DICT_PATH, getShrunkenDictionaryPath(jobId)); + } cubeStep.setMapReduceParams(cmd.toString()); cubeStep.setMapReduceJobClass(getInMemCuboidJob()); @@ -150,6 +157,9 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport { appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Base_Cuboid_Builder_" + seg.getRealization().getName()); appendExecCmdParameters(cmd, BatchConstants.ARG_LEVEL, "0"); appendExecCmdParameters(cmd, BatchConstants.ARG_CUBING_JOB_ID, jobId); + if (seg.getCubeDesc().isShrunkenDictFromGlobalEnabled()) { + appendExecCmdParameters(cmd, BatchConstants.ARG_SHRUNKEN_DICT_PATH, getShrunkenDictionaryPath(jobId)); + } baseCuboidStep.setMapReduceParams(cmd.toString()); baseCuboidStep.setMapReduceJobClass(getBaseCuboidJob()); diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java index aca9853..f650321 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java @@ -20,6 +20,7 @@ package org.apache.kylin.engine.mr; import java.util.Collection; +import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; import org.apache.kylin.job.execution.DefaultChainedExecutable; import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; @@ -50,6 +51,9 @@ public interface IMRInput { /** Parse a mapper input object into column values. */ public Collection<String[]> parseMapperInput(Object mapperInput); + + /** Get the signature for the input split*/ + public String getInputSplitSignature(InputSplit inputSplit); } /** diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java index c6abf16..02e9fe5 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java @@ -36,6 +36,7 @@ import org.apache.kylin.engine.mr.common.MapReduceExecutable; import org.apache.kylin.engine.mr.steps.CalculateStatsFromBaseCuboidJob; import org.apache.kylin.engine.mr.steps.CreateDictionaryJob; import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; +import org.apache.kylin.engine.mr.steps.ExtractDictionaryFromGlobalJob; import org.apache.kylin.engine.mr.steps.FactDistinctColumnsJob; import org.apache.kylin.engine.mr.steps.MergeDictionaryStep; import org.apache.kylin.engine.mr.steps.MergeStatisticsStep; @@ -175,6 +176,23 @@ public class JobBuilderSupport { return buildDictionaryStep; } + public MapReduceExecutable createExtractDictionaryFromGlobalJob(String jobId) { + MapReduceExecutable result = new MapReduceExecutable(); + result.setName(ExecutableConstants.STEP_NAME_EXTRACT_DICTIONARY_FROM_GLOBAL); + result.setMapReduceJobClass(ExtractDictionaryFromGlobalJob.class); + StringBuilder cmd = new StringBuilder(); + appendMapReduceParameters(cmd); + appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName()); + appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid()); + appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, + "Kylin_Extract_Dictionary_from_Global_" + seg.getRealization().getName() + "_Step"); + appendExecCmdParameters(cmd, BatchConstants.ARG_CUBING_JOB_ID, jobId); + appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, getShrunkenDictionaryPath(jobId)); + + result.setMapReduceParams(cmd.toString()); + return result; + } + public UpdateCubeInfoAfterBuildStep createUpdateCubeInfoAfterBuildStep(String jobId, LookupMaterializeContext lookupMaterializeContext) { final UpdateCubeInfoAfterBuildStep result = new UpdateCubeInfoAfterBuildStep(); result.setName(ExecutableConstants.STEP_NAME_UPDATE_CUBE_INFO); @@ -291,6 +309,10 @@ public class JobBuilderSupport { return getRealizationRootPath(jobId) + "/fact_distinct_columns/" + BatchConstants.CFG_OUTPUT_STATISTICS; } + public String getShrunkenDictionaryPath(String jobId) { + return getRealizationRootPath(jobId) + "/dictionary_shrunken"; + } + public String getDictRootPath(String jobId) { return getRealizationRootPath(jobId) + "/dict"; } diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java index 2976080..8873f30 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java @@ -112,6 +112,9 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { .hasArg().isRequired(true).withDescription("Partition file path.").create(BatchConstants.ARG_PARTITION); protected static final Option OPTION_HTABLE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_HTABLE_NAME) .hasArg().isRequired(true).withDescription("HTable name").create(BatchConstants.ARG_HTABLE_NAME); + protected static final Option OPTION_DICTIONARY_SHRUNKEN_PATH = OptionBuilder + .withArgName(BatchConstants.ARG_SHRUNKEN_DICT_PATH).hasArg().isRequired(false) + .withDescription("Dictionary shrunken path").create(BatchConstants.ARG_SHRUNKEN_DICT_PATH); protected static final Option OPTION_STATISTICS_OUTPUT = OptionBuilder.withArgName(BatchConstants.ARG_STATS_OUTPUT) .hasArg().isRequired(false).withDescription("Statistics output").create(BatchConstants.ARG_STATS_OUTPUT); diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java index 5dd55b2..13bc688 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java @@ -74,18 +74,18 @@ public class BaseCuboidBuilder implements java.io.Serializable { measureCodec = new BufferedMeasureCodec(cubeDesc.getMeasures()); } - public BaseCuboidBuilder(KylinConfig kylinConfig, CubeDesc cubeDesc, CubeSegment cubeSegment, CubeJoinedFlatTableEnrich intermediateTableDesc) { + public BaseCuboidBuilder(KylinConfig kylinConfig, CubeDesc cubeDesc, CubeSegment cubeSegment, + CubeJoinedFlatTableEnrich intermediateTableDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) { this.kylinConfig = kylinConfig; this.cubeDesc = cubeDesc; this.cubeSegment = cubeSegment; this.intermediateTableDesc = intermediateTableDesc; + this.dictionaryMap = dictionaryMap; init(); rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cubeSegment, baseCuboid); measureCodec = new BufferedMeasureCodec(cubeDesc.getMeasures()); aggrIngesters = MeasureIngester.create(cubeDesc.getMeasures()); - dictionaryMap = cubeSegment.buildDictionaryMap(); - } private void init() { diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java index 6fe55e2..8c2ba7f 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java @@ -105,6 +105,7 @@ public interface BatchConstants { String ARG_EXT_LOOKUP_SNAPSHOTS_INFO = "extlookupsnapshots"; String ARG_META_URL = "metadataUrl"; String ARG_HBASE_CONF_PATH = "hbaseConfPath"; + String ARG_SHRUNKEN_DICT_PATH = "shrunkenDictPath"; /** * logger and counter */ diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/DictionaryGetterUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/DictionaryGetterUtil.java new file mode 100644 index 0000000..0895244 --- /dev/null +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/DictionaryGetterUtil.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.engine.mr.common; + +import java.io.DataInputStream; +import java.io.IOException; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.kylin.common.util.Dictionary; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.dict.ShrunkenDictionary; +import org.apache.kylin.engine.mr.MRUtil; +import org.apache.kylin.metadata.model.TblColRef; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DictionaryGetterUtil { + + private static final Logger logger = LoggerFactory.getLogger(DictionaryGetterUtil.class); + + public static String getInputSplitSignature(CubeSegment cubeSegment, InputSplit inputSplit) { + return MRUtil.getBatchCubingInputSide(cubeSegment).getFlatTableInputFormat().getInputSplitSignature(inputSplit); + } + + public static Map<TblColRef, Dictionary<String>> getDictionaryMap(CubeSegment cubeSegment, InputSplit inputSplit, + Configuration configuration) throws IOException { + Map<TblColRef, Dictionary<String>> dictionaryMap = cubeSegment.buildDictionaryMap(); + + String shrunkenDictPath = configuration.get(BatchConstants.ARG_SHRUNKEN_DICT_PATH); + if (shrunkenDictPath == null) { + return dictionaryMap; + } + + // replace global dictionary with shrunken dictionary if possible + String inputSplitSignature = getInputSplitSignature(cubeSegment, inputSplit); + FileSystem fs = FileSystem.get(configuration); + ShrunkenDictionary.StringValueSerializer valueSerializer = new ShrunkenDictionary.StringValueSerializer(); + for (TblColRef colRef : cubeSegment.getCubeDesc().getAllGlobalDictColumns()) { + Path colShrunkenDictDir = new Path(shrunkenDictPath, colRef.getIdentity()); + Path colShrunkenDictPath = new Path(colShrunkenDictDir, inputSplitSignature); + if (!fs.exists(colShrunkenDictPath)) { + logger.warn("Shrunken dictionary for column " + colRef.getIdentity() + " in split " + + inputSplitSignature + " does not exist!!!"); + continue; + } + try (DataInputStream dis = fs.open(colShrunkenDictPath)) { + Dictionary<String> shrunkenDict = new ShrunkenDictionary(valueSerializer); + shrunkenDict.readFields(dis); + + dictionaryMap.put(colRef, shrunkenDict); + } + } + + return dictionaryMap; + } +} diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java index 091f9a2..b5dc961 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java @@ -21,10 +21,12 @@ package org.apache.kylin.engine.mr.steps; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Arrays; +import java.util.Map; import org.apache.hadoop.io.Text; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; @@ -35,6 +37,8 @@ import org.apache.kylin.engine.mr.KylinMapper; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.engine.mr.common.BaseCuboidBuilder; import org.apache.kylin.engine.mr.common.BatchConstants; +import org.apache.kylin.engine.mr.common.DictionaryGetterUtil; +import org.apache.kylin.metadata.model.TblColRef; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,8 +70,12 @@ abstract public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<K cubeDesc = cube.getDescriptor(); cubeSegment = cube.getSegmentById(segmentID); CubeJoinedFlatTableEnrich intermediateTableDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSegment), cubeDesc); - baseCuboidBuilder = new BaseCuboidBuilder(kylinConfig, cubeDesc, cubeSegment, intermediateTableDesc); + Map<TblColRef, Dictionary<String>> dictionaryMap = DictionaryGetterUtil.getDictionaryMap(cubeSegment, + context.getInputSplit(), context.getConfiguration()); + + baseCuboidBuilder = new BaseCuboidBuilder(kylinConfig, cubeDesc, cubeSegment, intermediateTableDesc, + dictionaryMap); } diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java index b49b639..d7da2c2 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java @@ -93,6 +93,7 @@ public class CuboidJob extends AbstractHadoopJob { options.addOption(OPTION_NCUBOID_LEVEL); options.addOption(OPTION_CUBING_JOB_ID); options.addOption(OPTION_CUBOID_MODE); + options.addOption(OPTION_DICTIONARY_SHRUNKEN_PATH); parseOptions(options, args); String output = getOptionValue(OPTION_OUTPUT_PATH); @@ -118,6 +119,10 @@ public class CuboidJob extends AbstractHadoopJob { job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME)); job.getConfiguration().set(BatchConstants.ARG_CUBING_JOB_ID, cubingJobId); + String shrunkenDictPath = getOptionValue(OPTION_DICTIONARY_SHRUNKEN_PATH); + if (shrunkenDictPath != null) { + job.getConfiguration().set(BatchConstants.ARG_SHRUNKEN_DICT_PATH, shrunkenDictPath); + } logger.info("Starting: " + job.getJobName()); setJobClasspath(job, cube.getConfig()); diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ExtractDictionaryFromGlobalJob.java similarity index 54% copy from engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java copy to engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ExtractDictionaryFromGlobalJob.java index b0ea7b7..df61ca9 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ExtractDictionaryFromGlobalJob.java @@ -6,59 +6,44 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. -*/ + */ package org.apache.kylin.engine.mr.steps; import org.apache.commons.cli.Options; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.util.ToolRunner; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.engine.mr.ByteArrayWritable; -import org.apache.kylin.engine.mr.CubingJob; -import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat; -import org.apache.kylin.engine.mr.IMROutput2; +import org.apache.kylin.engine.mr.IMRInput; import org.apache.kylin.engine.mr.MRUtil; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.engine.mr.common.BatchConstants; -import org.apache.kylin.job.execution.ExecutableManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** + * Directly using global dictionary to encode values will bring lots of memory swapping of the slices, which will make + * the encoding process very slow. This job will change the encoding process for the raw column values to + * 1. For each data block, a mapper will generating distinct values, sort them, extract shrunken dictionary from global + * 2. For each data block, scan again to encode the raw values by the shrunken dictionary rather than the global one */ -public class InMemCuboidJob extends AbstractHadoopJob { - - protected static final Logger logger = LoggerFactory.getLogger(InMemCuboidJob.class); - - private boolean skipped = false; - - @Override - public boolean isSkipped() { - return skipped; - } - - private boolean checkSkip(String cubingJobId) { - if (cubingJobId == null) - return false; - - ExecutableManager execMgr = ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv()); - CubingJob cubingJob = (CubingJob) execMgr.getJob(cubingJobId); - skipped = cubingJob.isInMemCubing() == false; - return skipped; - } +public class ExtractDictionaryFromGlobalJob extends AbstractHadoopJob { + protected static final Logger logger = LoggerFactory.getLogger(ExtractDictionaryFromGlobalJob.class); @Override public int run(String[] args) throws Exception { @@ -66,68 +51,57 @@ public class InMemCuboidJob extends AbstractHadoopJob { try { options.addOption(OPTION_JOB_NAME); + options.addOption(OPTION_CUBING_JOB_ID); + options.addOption(OPTION_OUTPUT_PATH); options.addOption(OPTION_CUBE_NAME); options.addOption(OPTION_SEGMENT_ID); - options.addOption(OPTION_OUTPUT_PATH); - options.addOption(OPTION_CUBING_JOB_ID); parseOptions(options, args); - String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase(); + job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME)); + String job_id = getOptionValue(OPTION_CUBING_JOB_ID); + job.getConfiguration().set(BatchConstants.ARG_CUBING_JOB_ID, job_id); + + String cubeName = getOptionValue(OPTION_CUBE_NAME); String segmentID = getOptionValue(OPTION_SEGMENT_ID); - String output = getOptionValue(OPTION_OUTPUT_PATH); + // ---------------------------------------------------------------------------- + // add metadata to distributed cache CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); CubeInstance cube = cubeMgr.getCube(cubeName); CubeSegment segment = cube.getSegmentById(segmentID); - String cubingJobId = getOptionValue(OPTION_CUBING_JOB_ID); - if (checkSkip(cubingJobId)) { - logger.info("Skip job " + getOptionValue(OPTION_JOB_NAME) + " for " + segment); - return 0; - } + job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName); + job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentID); - job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME)); - job.getConfiguration().set(BatchConstants.ARG_CUBING_JOB_ID, cubingJobId); logger.info("Starting: " + job.getJobName()); + job.getConfiguration().set("mapreduce.map.speculative", "false"); setJobClasspath(job, cube.getConfig()); - // add metadata to distributed cache - attachSegmentMetadataWithAll(segment, job.getConfiguration()); - - // set job configuration - job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName); - job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentID); + // Mapper + job.setMapperClass(ExtractDictionaryFromGlobalMapper.class); - // set mapper - job.setMapperClass(InMemCuboidMapper.class); - job.setMapOutputKeyClass(ByteArrayWritable.class); - job.setMapOutputValueClass(ByteArrayWritable.class); + // Reducer + job.setNumReduceTasks(0); - // set reducer - // the cuboid file and KV class must be compatible with 0.7 version for smooth upgrade - job.setReducerClass(InMemCuboidReducer.class); + // Input + IMRInput.IMRTableInputFormat flatTableInputFormat = MRUtil.getBatchCubingInputSide(segment) + .getFlatTableInputFormat(); + flatTableInputFormat.configureJob(job); + // Output + //// prevent to create zero-sized default output + LazyOutputFormat.setOutputFormatClass(job, SequenceFileOutputFormat.class); + Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH)); + FileOutputFormat.setOutputPath(job, output); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); + deletePath(job.getConfiguration(), output); - // set input - IMRTableInputFormat flatTableInputFormat = MRUtil.getBatchCubingInputSide(segment).getFlatTableInputFormat(); - flatTableInputFormat.configureJob(job); - - // set output - IMROutput2.IMROutputFormat outputFormat = MRUtil.getBatchCubingOutputSide2(segment).getOuputFormat(); - outputFormat.configureJobOutput(job, output, segment, segment.getCuboidScheduler(), 0); - + attachSegmentMetadataWithDict(segment, job.getConfiguration()); return waitForCompletion(job); } finally { if (job != null) cleanupTempConfFile(job.getConfiguration()); } } - - public static void main(String[] args) throws Exception { - InMemCuboidJob job = new InMemCuboidJob(); - int exitCode = ToolRunner.run(job, args); - System.exit(exitCode); - } } diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ExtractDictionaryFromGlobalMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ExtractDictionaryFromGlobalMapper.java new file mode 100644 index 0000000..34a5ec7 --- /dev/null +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ExtractDictionaryFromGlobalMapper.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.engine.mr.steps; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.Dictionary; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.model.CubeDesc; +import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich; +import org.apache.kylin.dict.ShrunkenDictionary; +import org.apache.kylin.dict.ShrunkenDictionaryBuilder; +import org.apache.kylin.engine.EngineFactory; +import org.apache.kylin.engine.mr.IMRInput; +import org.apache.kylin.engine.mr.KylinMapper; +import org.apache.kylin.engine.mr.MRUtil; +import org.apache.kylin.engine.mr.common.AbstractHadoopJob; +import org.apache.kylin.engine.mr.common.BatchConstants; +import org.apache.kylin.engine.mr.common.DictionaryGetterUtil; +import org.apache.kylin.metadata.model.TblColRef; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +public class ExtractDictionaryFromGlobalMapper<KEYIN, Object> extends KylinMapper<KEYIN, Object, Text, Text> { + private String cubeName; + private CubeDesc cubeDesc; + private CubeInstance cube; + private CubeSegment cubeSeg; + + private IMRInput.IMRTableInputFormat flatTableInputFormat; + private CubeJoinedFlatTableEnrich intermediateTableDesc; + + private List<TblColRef> globalColumns; + private int[] globalColumnIndex; + private List<Set<String>> globalColumnValues; + private List<Dictionary<String>> globalDicts; + + private String splitKey; + + @Override + protected void doSetup(Context context) throws IOException { + Configuration conf = context.getConfiguration(); + bindCurrentConfiguration(conf); + KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(); + + cubeName = conf.get(BatchConstants.CFG_CUBE_NAME); + cube = CubeManager.getInstance(config).getCube(cubeName); + cubeDesc = cube.getDescriptor(); + cubeSeg = cube.getSegmentById(conf.get(BatchConstants.CFG_CUBE_SEGMENT_ID)); + flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat(); + + intermediateTableDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSeg), cubeDesc); + + globalColumns = cubeDesc.getAllGlobalDictColumns(); + globalColumnIndex = new int[globalColumns.size()]; + globalColumnValues = Lists.newArrayListWithExpectedSize(globalColumns.size()); + globalDicts = Lists.newArrayListWithExpectedSize(globalColumns.size()); + for (int i = 0; i < globalColumns.size(); i++) { + TblColRef colRef = globalColumns.get(i); + int columnIndexOnFlatTbl = intermediateTableDesc.getColumnIndex(colRef); + globalColumnIndex[i] = columnIndexOnFlatTbl; + + globalColumnValues.add(Sets.<String> newHashSet()); + globalDicts.add(cubeSeg.getDictionary(colRef)); + } + + splitKey = DictionaryGetterUtil.getInputSplitSignature(cubeSeg, context.getInputSplit()); + } + + @Override + public void doMap(KEYIN key, Object record, Context context) throws IOException, InterruptedException { + Collection<String[]> rowCollection = flatTableInputFormat.parseMapperInput(record); + + for (String[] row : rowCollection) { + for (int i = 0; i < globalColumnIndex.length; i++) { + String fieldValue = row[globalColumnIndex[i]]; + if (fieldValue == null) + continue; + + globalColumnValues.get(i).add(fieldValue); + } + } + } + + @Override + protected void doCleanup(Context context) throws IOException, InterruptedException { + FileSystem fs = FileSystem.get(context.getConfiguration()); + Path outputDirBase = new Path(context.getConfiguration().get(FileOutputFormat.OUTDIR)); + + ShrunkenDictionary.StringValueSerializer strValueSerializer = new ShrunkenDictionary.StringValueSerializer(); + for (int i = 0; i < globalColumns.size(); i++) { + List<String> colDistinctValues = Lists.newArrayList(globalColumnValues.get(i)); + // sort values to accelerate the encoding process by reducing the swapping of global dictionary slices + Collections.sort(colDistinctValues); + + ShrunkenDictionaryBuilder<String> dictBuilder = new ShrunkenDictionaryBuilder<>(globalDicts.get(i)); + for (String colValue : colDistinctValues) { + dictBuilder.addValue(colValue); + } + Dictionary<String> shrunkenDict = dictBuilder.build(strValueSerializer); + + Path colDictDir = new Path(outputDirBase, globalColumns.get(i).getIdentity()); + if (!fs.exists(colDictDir)) { + fs.mkdirs(colDictDir); + } + try (DataOutputStream dos = fs.create(new Path(colDictDir, splitKey))) { + shrunkenDict.write(dos); + } + } + } +} diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java index b0ea7b7..f8874fe 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java @@ -70,6 +70,7 @@ public class InMemCuboidJob extends AbstractHadoopJob { options.addOption(OPTION_SEGMENT_ID); options.addOption(OPTION_OUTPUT_PATH); options.addOption(OPTION_CUBING_JOB_ID); + options.addOption(OPTION_DICTIONARY_SHRUNKEN_PATH); parseOptions(options, args); String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase(); @@ -88,6 +89,10 @@ public class InMemCuboidJob extends AbstractHadoopJob { job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME)); job.getConfiguration().set(BatchConstants.ARG_CUBING_JOB_ID, cubingJobId); + String shrunkenDictPath = getOptionValue(OPTION_DICTIONARY_SHRUNKEN_PATH); + if (shrunkenDictPath != null) { + job.getConfiguration().set(BatchConstants.ARG_SHRUNKEN_DICT_PATH, shrunkenDictPath); + } logger.info("Starting: " + job.getJobName()); setJobClasspath(job, cube.getConfig()); diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapperBase.java index 73af138..e95ce8a 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapperBase.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapperBase.java @@ -42,13 +42,12 @@ import org.apache.kylin.engine.mr.KylinMapper; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.engine.mr.common.BatchConstants; import org.apache.kylin.engine.mr.common.CuboidSchedulerUtil; +import org.apache.kylin.engine.mr.common.DictionaryGetterUtil; import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; import org.apache.kylin.metadata.model.TblColRef; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.Maps; - /** */ public abstract class InMemCuboidMapperBase<KEYIN, VALUEIN, KEYOUT, VALUEOUT, T> extends KylinMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { @@ -94,17 +93,7 @@ public abstract class InMemCuboidMapperBase<KEYIN, VALUEIN, KEYOUT, VALUEOUT, T> cubeSegment = cube.getSegmentById(segmentID); flatDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSegment), cubeDesc); - dictionaryMap = Maps.newHashMap(); - - // dictionary - for (TblColRef col : cubeDesc.getAllColumnsHaveDictionary()) { - Dictionary<?> dict = cubeSegment.getDictionary(col); - if (dict == null) { - logger.warn("Dictionary for " + col + " was not found."); - } - - dictionaryMap.put(col, cubeSegment.getDictionary(col)); - } + dictionaryMap = DictionaryGetterUtil.getDictionaryMap(cubeSegment, context.getInputSplit(), conf); // check memory more often if a single row is big if (cubeDesc.hasMemoryHungryMeasures()) { diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java index 2e39285..33b1059 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java @@ -22,9 +22,12 @@ import java.io.IOException; import java.util.Collections; import java.util.List; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; import org.apache.hive.hcatalog.data.HCatRecord; import org.apache.hive.hcatalog.mapreduce.HCatInputFormat; +import org.apache.hive.hcatalog.mapreduce.HCatSplit; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.common.util.StringUtil; @@ -100,6 +103,12 @@ public class HiveMRInput extends HiveInputBase implements IMRInput { return Collections.singletonList(HiveTableReader.getRowAsStringArray((HCatRecord) mapperInput)); } + @Override + public String getInputSplitSignature(InputSplit inputSplit) { + FileSplit baseSplit = (FileSplit) ((HCatSplit) inputSplit).getBaseSplit(); + //file name(for intermediate table) + start pos + length + return baseSplit.getPath().getName() + "_" + baseSplit.getStart() + "_" + baseSplit.getLength(); + } } public static class BatchCubingInputSide implements IMRBatchCubingInputSide { diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java index a45cc63..2c95c1c 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java @@ -24,6 +24,8 @@ import java.util.List; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; @@ -100,6 +102,11 @@ public class KafkaMRInput extends KafkaInputBase implements IMRInput { return Collections.singletonList(columns); } + @Override + public String getInputSplitSignature(InputSplit inputSplit) { + FileSplit baseSplit = (FileSplit) inputSplit; + return baseSplit.getPath().getName() + "_" + baseSplit.getStart() + "_" + baseSplit.getLength(); + } } public static class BatchCubingInputSide implements IMRBatchCubingInputSide { diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseJobSteps.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseJobSteps.java index e48090d..4d61d9b 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseJobSteps.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseJobSteps.java @@ -250,6 +250,7 @@ public abstract class HBaseJobSteps extends JobBuilderSupport { List<String> toDeletePaths = new ArrayList<>(); toDeletePaths.add(getFactDistinctColumnsPath(jobId)); toDeletePaths.add(getHFilePath(jobId)); + toDeletePaths.add(getShrunkenDictionaryPath(jobId)); HDFSPathGarbageCollectionStep step = new HDFSPathGarbageCollectionStep(); step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION_HBASE);
