This is an automated email from the ASF dual-hosted git repository. shaofengshi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push: new e9ccd07 KYLIN-3520 Deal with NULL values of measures for inmem cubing e9ccd07 is described below commit e9ccd0711d46b5db1d2b48b89f2d1be67f949632 Author: Zhong <nju_y...@apache.org> AuthorDate: Wed Aug 29 17:31:27 2018 +0800 KYLIN-3520 Deal with NULL values of measures for inmem cubing --- .../inmemcubing/InputConverterUnitForRawData.java | 76 ++------------- .../apache/kylin/cube/util/KeyValueBuilder.java | 106 +++++++++++++++++++++ .../kylin/engine/mr/common/BaseCuboidBuilder.java | 93 ++++-------------- 3 files changed, 132 insertions(+), 143 deletions(-) diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnitForRawData.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnitForRawData.java index 2ff7ee0..f3e45bc 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnitForRawData.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnitForRawData.java @@ -17,25 +17,20 @@ */ package org.apache.kylin.cube.inmemcubing; -import java.util.List; import java.util.Map; -import org.apache.kylin.common.util.Bytes; import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich; +import org.apache.kylin.cube.util.KeyValueBuilder; import org.apache.kylin.gridtable.GTRecord; import org.apache.kylin.measure.MeasureIngester; -import org.apache.kylin.metadata.model.FunctionDesc; import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; import org.apache.kylin.metadata.model.MeasureDesc; -import org.apache.kylin.metadata.model.ParameterDesc; import org.apache.kylin.metadata.model.TblColRef; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.Lists; - /** */ public class InputConverterUnitForRawData implements InputConverterUnit<String[]> { @@ -51,7 +46,7 @@ public class InputConverterUnitForRawData implements InputConverterUnit<String[] private final MeasureIngester<?>[] measureIngesters; private final int measureCount; private final Map<TblColRef, Dictionary<String>> dictionaryMap; - protected List<byte[]> nullBytes; + private final KeyValueBuilder kvBuilder; public InputConverterUnitForRawData(CubeDesc cubeDesc, IJoinedFlatTableDesc flatDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) { @@ -60,12 +55,12 @@ public class InputConverterUnitForRawData implements InputConverterUnit<String[] this.measureDescs = cubeDesc.getMeasures().toArray(new MeasureDesc[measureCount]); this.measureIngesters = MeasureIngester.create(cubeDesc.getMeasures()); this.dictionaryMap = dictionaryMap; - initNullBytes(cubeDesc); + this.kvBuilder = new KeyValueBuilder(this.flatDesc); } @Override public final void convert(String[] row, GTRecord record) { - Object[] dimensions = buildKey(row); + Object[] dimensions = kvBuilder.buildKey(row); Object[] metricsValues = buildValue(row); Object[] recordValues = new Object[dimensions.length + metricsValues.length]; System.arraycopy(dimensions, 0, recordValues, 0, dimensions.length); @@ -93,20 +88,6 @@ public class InputConverterUnitForRawData implements InputConverterUnit<String[] return CUT_ROW; } - private Object[] buildKey(String[] row) { - int keySize = flatDesc.getRowKeyColumnIndexes().length; - Object[] key = new Object[keySize]; - - for (int i = 0; i < keySize; i++) { - key[i] = row[flatDesc.getRowKeyColumnIndexes()[i]]; - if (key[i] != null && isNull(Bytes.toBytes((String) key[i]))) { - key[i] = null; - } - } - - return key; - } - @Override public boolean ifChange() { return true; @@ -115,53 +96,10 @@ public class InputConverterUnitForRawData implements InputConverterUnit<String[] private Object[] buildValue(String[] row) { Object[] values = new Object[measureCount]; for (int i = 0; i < measureCount; i++) { - values[i] = buildValueOf(i, row); + String[] colValues = kvBuilder.buildValueOf(i, row); + MeasureDesc measure = measureDescs[i]; + values[i] = measureIngesters[i].valueOf(colValues, measure, dictionaryMap); } return values; } - - private Object buildValueOf(int idxOfMeasure, String[] row) { - MeasureDesc measure = measureDescs[idxOfMeasure]; - FunctionDesc function = measure.getFunction(); - int[] colIdxOnFlatTable = flatDesc.getMeasureColumnIndexes()[idxOfMeasure]; - - int paramCount = function.getParameterCount(); - String[] inputToMeasure = new String[paramCount]; - - // pick up parameter values - ParameterDesc param = function.getParameter(); - int paramColIdx = 0; // index among parameters of column type - for (int i = 0; i < paramCount; i++, param = param.getNextParameter()) { - String value; - if (function.isCount()) { - value = "1"; - } else if (param.isColumnType()) { - value = row[colIdxOnFlatTable[paramColIdx++]]; - } else { - value = param.getValue(); - } - inputToMeasure[i] = value; - } - - return measureIngesters[idxOfMeasure].valueOf(inputToMeasure, measure, dictionaryMap); - } - - private void initNullBytes(CubeDesc cubeDesc) { - nullBytes = Lists.newArrayList(); - String[] nullStrings = cubeDesc.getNullStrings(); - if (nullStrings != null) { - for (String s : nullStrings) { - nullBytes.add(Bytes.toBytes(s)); - } - } - } - - private boolean isNull(byte[] v) { - for (byte[] nullByte : nullBytes) { - if (Bytes.equals(v, nullByte)) - return true; - } - return false; - } - } diff --git a/core-cube/src/main/java/org/apache/kylin/cube/util/KeyValueBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/util/KeyValueBuilder.java new file mode 100644 index 0000000..0ba4fd8 --- /dev/null +++ b/core-cube/src/main/java/org/apache/kylin/cube/util/KeyValueBuilder.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.cube.util; + +import java.io.Serializable; +import java.util.List; +import java.util.Set; + +import org.apache.kylin.cube.model.CubeDesc; +import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich; +import org.apache.kylin.metadata.model.FunctionDesc; +import org.apache.kylin.metadata.model.MeasureDesc; +import org.apache.kylin.metadata.model.ParameterDesc; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +public class KeyValueBuilder implements Serializable { + + public static final String HIVE_NULL = "\\N"; + + private Set<String> nullStrs; + private CubeJoinedFlatTableEnrich flatDesc; + private CubeDesc cubeDesc; + + public KeyValueBuilder(CubeJoinedFlatTableEnrich intermediateTableDesc) { + flatDesc = intermediateTableDesc; + cubeDesc = flatDesc.getCubeDesc(); + initNullStrings(); + } + + private void initNullStrings() { + nullStrs = Sets.newHashSet(); + nullStrs.add(HIVE_NULL); + String[] nullStrings = cubeDesc.getNullStrings(); + if (nullStrings != null) { + for (String s : nullStrings) { + nullStrs.add(s); + } + } + } + + protected boolean isNull(String v) { + return nullStrs.contains(v); + } + + private String getCell(int i, String[] flatRow) { + if (isNull(flatRow[i])) + return null; + else + return flatRow[i]; + } + + public String[] buildKey(String[] row) { + int keySize = flatDesc.getRowKeyColumnIndexes().length; + String[] key = new String[keySize]; + + for (int i = 0; i < keySize; i++) { + key[i] = getCell(flatDesc.getRowKeyColumnIndexes()[i], row); + } + + return key; + } + + public String[] buildValueOf(int idxOfMeasure, String[] row) { + MeasureDesc measure = cubeDesc.getMeasures().get(idxOfMeasure); + FunctionDesc function = measure.getFunction(); + int[] colIdxOnFlatTable = flatDesc.getMeasureColumnIndexes()[idxOfMeasure]; + + int paramCount = function.getParameterCount(); + List<String> inputToMeasure = Lists.newArrayListWithExpectedSize(paramCount); + + // pick up parameter values + ParameterDesc param = function.getParameter(); + int colParamIdx = 0; // index among parameters of column type + for (int i = 0; i < paramCount; i++, param = param.getNextParameter()) { + String value; + if (function.isCount()) { + value = "1"; + } else if (param.isColumnType()) { + value = getCell(colIdxOnFlatTable[colParamIdx++], row); + } else { + value = param.getValue(); + } + inputToMeasure.add(value); + } + + return inputToMeasure.toArray(new String[inputToMeasure.size()]); + } +} diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java index 13bc688..9322162 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java @@ -21,7 +21,6 @@ package org.apache.kylin.engine.mr.common; import java.nio.ByteBuffer; import java.util.List; import java.util.Map; -import java.util.Set; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.Dictionary; @@ -30,35 +29,31 @@ import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.cube.kv.AbstractRowKeyEncoder; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich; +import org.apache.kylin.cube.util.KeyValueBuilder; import org.apache.kylin.measure.BufferedMeasureCodec; import org.apache.kylin.measure.MeasureIngester; -import org.apache.kylin.metadata.model.FunctionDesc; import org.apache.kylin.metadata.model.MeasureDesc; -import org.apache.kylin.metadata.model.ParameterDesc; import org.apache.kylin.metadata.model.TblColRef; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.Sets; - /** */ @SuppressWarnings("serial") public class BaseCuboidBuilder implements java.io.Serializable { protected static final Logger logger = LoggerFactory.getLogger(BaseCuboidBuilder.class); - protected String cubeName; - protected Cuboid baseCuboid; + protected KylinConfig kylinConfig; protected CubeDesc cubeDesc; protected CubeSegment cubeSegment; - protected Set<String> nullStrs; protected CubeJoinedFlatTableEnrich intermediateTableDesc; protected MeasureIngester<?>[] aggrIngesters; protected Map<TblColRef, Dictionary<String>> dictionaryMap; protected AbstractRowKeyEncoder rowKeyEncoder; + protected List<MeasureDesc> measureDescList; protected BufferedMeasureCodec measureCodec; + protected KeyValueBuilder kvBuilder; - protected KylinConfig kylinConfig; public BaseCuboidBuilder(KylinConfig kylinConfig, CubeDesc cubeDesc, CubeSegment cubeSegment, CubeJoinedFlatTableEnrich intermediateTableDesc, AbstractRowKeyEncoder rowKeyEncoder, MeasureIngester<?>[] aggrIngesters, Map<TblColRef, Dictionary<String>> dictionaryMap) { @@ -66,12 +61,14 @@ public class BaseCuboidBuilder implements java.io.Serializable { this.cubeDesc = cubeDesc; this.cubeSegment = cubeSegment; this.intermediateTableDesc = intermediateTableDesc; + this.dictionaryMap = dictionaryMap; this.rowKeyEncoder = rowKeyEncoder; this.aggrIngesters = aggrIngesters; - this.dictionaryMap = dictionaryMap; - init(); - measureCodec = new BufferedMeasureCodec(cubeDesc.getMeasures()); + measureDescList = cubeDesc.getMeasures(); + measureCodec = new BufferedMeasureCodec(measureDescList); + + kvBuilder = new KeyValueBuilder(intermediateTableDesc); } public BaseCuboidBuilder(KylinConfig kylinConfig, CubeDesc cubeDesc, CubeSegment cubeSegment, @@ -82,39 +79,19 @@ public class BaseCuboidBuilder implements java.io.Serializable { this.intermediateTableDesc = intermediateTableDesc; this.dictionaryMap = dictionaryMap; - init(); + Cuboid baseCuboid = Cuboid.getBaseCuboid(cubeDesc); rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cubeSegment, baseCuboid); - measureCodec = new BufferedMeasureCodec(cubeDesc.getMeasures()); - aggrIngesters = MeasureIngester.create(cubeDesc.getMeasures()); - } - private void init() { - baseCuboid = Cuboid.getBaseCuboid(cubeDesc); - initNullBytes(); - } + measureDescList = cubeDesc.getMeasures(); + aggrIngesters = MeasureIngester.create(measureDescList); + measureCodec = new BufferedMeasureCodec(measureDescList); - private void initNullBytes() { - nullStrs = Sets.newHashSet(); - String[] nullStrings = cubeDesc.getNullStrings(); - if (nullStrings != null) { - for (String s : nullStrings) { - nullStrs.add(s); - } - } - } - - protected boolean isNull(String v) { - return nullStrs.contains(v); + kvBuilder = new KeyValueBuilder(intermediateTableDesc); } public byte[] buildKey(String[] flatRow) { - int[] rowKeyColumnIndexes = intermediateTableDesc.getRowKeyColumnIndexes(); - List<TblColRef> columns = baseCuboid.getColumns(); - String[] colValues = new String[columns.size()]; - for (int i = 0; i < columns.size(); i++) { - colValues[i] = getCell(rowKeyColumnIndexes[i], flatRow); - } - return rowKeyEncoder.encode(colValues); + String[] colKeys = kvBuilder.buildKey(flatRow); + return rowKeyEncoder.encode(colKeys); } public ByteBuffer buildValue(String[] flatRow) { @@ -124,7 +101,9 @@ public class BaseCuboidBuilder implements java.io.Serializable { public Object[] buildValueObjects(String[] flatRow) { Object[] measures = new Object[cubeDesc.getMeasures().size()]; for (int i = 0; i < measures.length; i++) { - measures[i] = buildValueOf(i, flatRow); + String[] colValues = kvBuilder.buildValueOf(i, flatRow); + MeasureDesc measure = measureDescList.get(i); + measures[i] = aggrIngesters[i].valueOf(colValues, measure, dictionaryMap); } return measures; @@ -135,38 +114,4 @@ public class BaseCuboidBuilder implements java.io.Serializable { aggrIngesters[i].reset(); } } - - private Object buildValueOf(int idxOfMeasure, String[] flatRow) { - MeasureDesc measure = cubeDesc.getMeasures().get(idxOfMeasure); - FunctionDesc function = measure.getFunction(); - int[] colIdxOnFlatTable = intermediateTableDesc.getMeasureColumnIndexes()[idxOfMeasure]; - - int paramCount = function.getParameterCount(); - String[] inputToMeasure = new String[paramCount]; - - // pick up parameter values - ParameterDesc param = function.getParameter(); - int colParamIdx = 0; // index among parameters of column type - for (int i = 0; i < paramCount; i++, param = param.getNextParameter()) { - String value; - if (function.isCount()) { - value = "1"; - } else if (param.isColumnType()) { - value = getCell(colIdxOnFlatTable[colParamIdx++], flatRow); - } else { - value = param.getValue(); - } - inputToMeasure[i] = value; - } - - return aggrIngesters[idxOfMeasure].valueOf(inputToMeasure, measure, dictionaryMap); - } - - private String getCell(int i, String[] flatRow) { - if (isNull(flatRow[i])) - return null; - else - return flatRow[i]; - } - }