Repository: kylin Updated Branches: refs/heads/KYLIN-2314 ab65bd56e -> 8cbd6a6c1
KYLIN-2314 Use col identity instead of col name in dictionary refs Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/8cbd6a6c Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/8cbd6a6c Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/8cbd6a6c Branch: refs/heads/KYLIN-2314 Commit: 8cbd6a6c14f9639145faa2b8ea728e5d4f510c46 Parents: ab65bd5 Author: Yang Li <[email protected]> Authored: Sat Dec 24 13:14:37 2016 +0800 Committer: Yang Li <[email protected]> Committed: Sat Dec 24 13:14:37 2016 +0800 ---------------------------------------------------------------------- .../java/org/apache/kylin/cube/CubeManager.java | 2 +- .../java/org/apache/kylin/cube/CubeSegment.java | 30 ++++---- .../kylin/engine/mr/JobBuilderSupport.java | 2 - .../engine/mr/steps/CreateDictionaryJob.java | 5 +- .../engine/mr/steps/CubingExecutableUtil.java | 8 -- .../engine/mr/steps/FactDistinctColumnsJob.java | 3 + .../mr/steps/FactDistinctColumnsReducer.java | 39 ++++++---- .../mr/steps/FactDistinctHiveColumnsMapper.java | 81 ++++++++++---------- .../mr/steps/UpdateCubeInfoAfterBuildStep.java | 4 +- .../mr/steps/UpdateCubeInfoAfterMergeStep.java | 1 - 10 files changed, 87 insertions(+), 88 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/8cbd6a6c/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java index fe2030a..9670b89 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java @@ -240,7 +240,7 @@ public class CubeManager implements IRealizationProvider { if (dictInfo != null) { Dictionary<?> dict = dictInfo.getDictionaryObject(); cubeSeg.putDictResPath(col, dictInfo.getResourcePath()); - cubeSeg.getRowkeyStats().add(new Object[] { col.getName(), dict.getSize(), dict.getSizeOfId() }); + cubeSeg.getRowkeyStats().add(new Object[] { col.getIdentity(), dict.getSize(), dict.getSizeOfId() }); CubeUpdate update = new CubeUpdate(cubeSeg.getCubeInstance()); update.setToUpdateSegs(cubeSeg); http://git-wip-us.apache.org/repos/asf/kylin/blob/8cbd6a6c/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java index e155f86..36a6044 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java @@ -98,9 +98,6 @@ public class CubeSegment implements Comparable<CubeSegment>, IBuildable, ISegmen @JsonProperty("snapshots") private ConcurrentHashMap<String, String> snapshots; // table name ==> snapshot resource path - @JsonProperty("index_path") - private String indexPath; - @JsonProperty("rowkey_stats") private List<Object[]> rowkeyStats = Lists.newArrayList(); @@ -296,15 +293,22 @@ public class CubeSegment implements Comparable<CubeSegment>, IBuildable, ISegmen } public String getDictResPath(TblColRef col) { - return getDictionaries().get(dictKey(col)); + String r; + String dictKey = col.getIdentity(); + r = getDictionaries().get(dictKey); + + // try Kylin v1.x dict key as well + if (r == null) { + String v1DictKey = col.getTable() + "/" + col.getName(); + r = getDictionaries().get(v1DictKey); + } + + return r; } public void putDictResPath(TblColRef col, String dictResPath) { - getDictionaries().put(dictKey(col), dictResPath); - } - - private String dictKey(TblColRef col) { - return col.getTable() + "/" + col.getName(); + String dictKey = col.getIdentity(); + getDictionaries().put(dictKey, dictResPath); } public void setStorageLocationIdentifier(String storageLocationIdentifier) { @@ -523,14 +527,6 @@ public class CubeSegment implements Comparable<CubeSegment>, IBuildable, ISegmen return cubeInstance; } - public String getIndexPath() { - return indexPath; - } - - public void setIndexPath(String indexPath) { - this.indexPath = indexPath; - } - public Map<String, String> getAdditionalInfo() { return additionalInfo; } http://git-wip-us.apache.org/repos/asf/kylin/blob/8cbd6a6c/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java index 47eb9c3..5f5814b 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java @@ -100,7 +100,6 @@ public class JobBuilderSupport { CubingExecutableUtil.setCubeName(seg.getRealization().getName(), result.getParams()); CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams()); CubingExecutableUtil.setCubingJobId(jobId, result.getParams()); - CubingExecutableUtil.setIndexPath(this.getSecondaryIndexPath(jobId), result.getParams()); return result; @@ -125,7 +124,6 @@ public class JobBuilderSupport { CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams()); CubingExecutableUtil.setCubingJobId(jobId, result.getParams()); CubingExecutableUtil.setMergingSegmentIds(mergingSegmentIds, result.getParams()); - CubingExecutableUtil.setIndexPath(this.getSecondaryIndexPath(jobId), result.getParams()); return result; } http://git-wip-us.apache.org/repos/asf/kylin/blob/8cbd6a6c/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java index 4985503..8b9697e 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java @@ -57,14 +57,13 @@ public class CreateDictionaryJob extends AbstractHadoopJob { DictionaryGeneratorCLI.processSegment(config, cubeName, segmentID, new DistinctColumnValuesProvider() { @Override public ReadableTable getDistinctValuesFor(TblColRef col) { - return new SortedColumnDFSFile(factColumnsInputPath + "/" + col.getName(), col.getType()); + return new SortedColumnDFSFile(factColumnsInputPath + "/" + col.getIdentity(), col.getType()); } }, new DictionaryProvider() { @Override public Dictionary<String> getDictionary(TblColRef col) throws IOException { - Path colDir = new Path(factColumnsInputPath, col.getName()); - Path dictFile = new Path(colDir, col.getName() + FactDistinctColumnsReducer.DICT_FILE_POSTFIX); + Path dictFile = new Path(factColumnsInputPath, col.getIdentity() + FactDistinctColumnsReducer.DICT_FILE_POSTFIX); FileSystem fs = HadoopUtil.getFileSystem(dictFile.toString()); if (fs.exists(dictFile) == false) return null; http://git-wip-us.apache.org/repos/asf/kylin/blob/8cbd6a6c/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CubingExecutableUtil.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CubingExecutableUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CubingExecutableUtil.java index b0d5a89..65c5869 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CubingExecutableUtil.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CubingExecutableUtil.java @@ -135,12 +135,4 @@ public class CubingExecutableUtil { return params.get(MERGED_STATISTICS_PATH); } - public static void setIndexPath(String indexPath, Map<String, String> params) { - params.put(INDEX_PATH, indexPath); - } - - public static String getIndexPath(Map<String, String> params) { - return params.get(INDEX_PATH); - } - } http://git-wip-us.apache.org/repos/asf/kylin/blob/8cbd6a6c/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java index 2eb694e..9fc8922 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java @@ -145,6 +145,9 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob { job.setPartitionerClass(FactDistinctColumnPartitioner.class); job.setNumReduceTasks(numberOfReducers); + // important, reducer writes HDFS directly at the moment + job.setReduceSpeculativeExecution(false); + FileOutputFormat.setOutputPath(job, output); job.getConfiguration().set(BatchConstants.CFG_OUTPUT_PATH, output.toString()); http://git-wip-us.apache.org/repos/asf/kylin/blob/8cbd6a6c/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java index 0223914..5d42797 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java @@ -59,7 +59,7 @@ import com.google.common.collect.Maps; */ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableKey, Text, NullWritable, Text> { - protected static final Logger logger = LoggerFactory.getLogger(FactDistinctColumnsReducer.class); + private static final Logger logger = LoggerFactory.getLogger(FactDistinctColumnsReducer.class); private List<TblColRef> columnList; private String statisticsOutput = null; @@ -76,6 +76,8 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK private int uhcReducerCount; private Map<Integer, Integer> reducerIdToColumnIndex = new HashMap<>(); private int taskId; + private boolean isPartitionCol = false; + private int rowCount = 0; //local build dict private boolean isReducerLocalBuildDict; @@ -84,7 +86,6 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK private long timeMinValue = Long.MAX_VALUE; public static final String DICT_FILE_POSTFIX = ".rldict"; public static final String PARTITION_COL_INFO_FILE_POSTFIX = ".pci"; - private boolean isPartitionCol = false; @Override protected void setup(Context context) throws IOException { @@ -111,6 +112,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK baseCuboidRowCountInMappers = Lists.newArrayList(); cuboidHLLMap = Maps.newHashMap(); samplingPercentage = Integer.parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT)); + logger.info("Reducer " + taskId + " handling stats"); } else if (collectStatistics && (taskId == numberOfTasks - 2)) { // partition col isStatistics = false; @@ -120,6 +122,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK logger.info("Do not have partition col. This reducer will keep empty"); } colValues = Lists.newLinkedList(); + logger.info("Reducer " + taskId + " handling partition column " + col); } else { // normal col isStatistics = false; @@ -135,6 +138,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK builder = DictionaryGenerator.newDictionaryBuilder(col.getType()); builder.init(null, 0); } + logger.info("Reducer " + taskId + " handling column " + col + ", isReducerLocalBuildDict=" + isReducerLocalBuildDict); } } @@ -178,6 +182,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK } else if (isPartitionCol) { // partition col String value = Bytes.toString(key.getBytes(), 1, key.getLength() - 1); + logAFewRows(value); long time = DateFormat.stringToMillis(value); timeMinValue = Math.min(timeMinValue, time); timeMaxValue = Math.max(timeMaxValue, time); @@ -185,6 +190,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK // normal col if (isReducerLocalBuildDict) { String value = Bytes.toString(key.getBytes(), 1, key.getLength() - 1); + logAFewRows(value); builder.addValue(value); } else { colValues.add(new ByteArray(Bytes.copy(key.getBytes(), 1, key.getLength() - 1))); @@ -195,14 +201,22 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK } } } + + rowCount++; + } + + private void logAFewRows(String value) { + if (rowCount < 10) { + logger.info("Received value: " + value); + } } private void outputDistinctValues(TblColRef col, Collection<ByteArray> values, Context context) throws IOException { final Configuration conf = context.getConfiguration(); final FileSystem fs = FileSystem.get(conf); final String outputPath = conf.get(BatchConstants.CFG_OUTPUT_PATH); - final Path colDir = new Path(outputPath, col.getName()); - final String fileName = col.getName() + "-" + taskId % uhcReducerCount; + final Path colDir = new Path(outputPath, col.getIdentity()); + final String fileName = col.getIdentity() + "-" + taskId % uhcReducerCount; final Path outputFile = new Path(colDir, fileName); FSDataOutputStream out = null; @@ -229,7 +243,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK } private void outputDict(TblColRef col, Dictionary<String> dict, Context context) throws IOException { - final String fileName = col.getName() + DICT_FILE_POSTFIX; + final String fileName = col.getIdentity() + DICT_FILE_POSTFIX; FSDataOutputStream out = getOutputStream(context, fileName); try { String dictClassName = dict.getClass().getName(); @@ -242,7 +256,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK } private void outputPartitionInfo(Context context) throws IOException { - final String fileName = col.getName() + PARTITION_COL_INFO_FILE_POSTFIX; + final String fileName = col.getIdentity() + PARTITION_COL_INFO_FILE_POSTFIX; FSDataOutputStream out = getOutputStream(context, fileName); try { out.writeLong(timeMinValue); @@ -256,15 +270,12 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK private FSDataOutputStream getOutputStream(Context context, String outputFileName) throws IOException { final Configuration conf = context.getConfiguration(); final FileSystem fs = FileSystem.get(conf); - final String outputPath = conf.get(BatchConstants.CFG_OUTPUT_PATH); - final Path colDir = new Path(outputPath, col.getName()); - final Path outputFile = new Path(colDir, outputFileName); - FSDataOutputStream out = null; - if (!fs.exists(colDir)) { - fs.mkdirs(colDir); + final Path outputPath = new Path(conf.get(BatchConstants.CFG_OUTPUT_PATH)); + final Path outputFile = new Path(outputPath, outputFileName); + if (!fs.exists(outputPath)) { + fs.mkdirs(outputPath); } - fs.deleteOnExit(outputFile); - out = fs.create(outputFile); + FSDataOutputStream out = fs.create(outputFile); return out; } http://git-wip-us.apache.org/repos/asf/kylin/blob/8cbd6a6c/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java index 5692c76..ed65343 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java @@ -31,6 +31,8 @@ import org.apache.kylin.engine.mr.common.BatchConstants; import org.apache.kylin.measure.BufferedMeasureCodec; import org.apache.kylin.measure.hllc.HLLCounter; import org.apache.kylin.metadata.model.TblColRef; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.collect.Lists; import com.google.common.hash.HashFunction; @@ -41,6 +43,8 @@ import com.google.common.hash.Hashing; */ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMapperBase<KEYIN, Object> { + private static final Logger logger = LoggerFactory.getLogger(FactDistinctHiveColumnsMapper.class); + protected boolean collectStatistics = false; protected CuboidScheduler cuboidScheduler = null; protected int nRowKey; @@ -51,7 +55,7 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMap private int rowCount = 0; private int samplingPercentage; private ByteArray[] row_hashcodes = null; - private ByteBuffer keyBuffer; + private ByteBuffer tmpbuf; private static final Text EMPTY_TEXT = new Text(); public static final byte MARK_FOR_PARTITION_COL = (byte) 0xFE; public static final byte MARK_FOR_HLL = (byte) 0xFF; @@ -62,7 +66,7 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMap @Override protected void setup(Context context) throws IOException { super.setup(context); - keyBuffer = ByteBuffer.allocate(4096); + tmpbuf = ByteBuffer.allocate(4096); collectStatistics = Boolean.parseBoolean(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_ENABLED)); if (collectStatistics) { samplingPercentage = Integer.parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT)); @@ -127,55 +131,54 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMap public void doMap(KEYIN key, Object record, Context context) throws IOException, InterruptedException { String[] row = flatTableInputFormat.parseMapperInput(record); - keyBuffer.clear(); - try { - for (int i = 0; i < factDictCols.size(); i++) { - String fieldValue = row[dictionaryColumnIndex[i]]; - if (fieldValue == null) - continue; - int offset = keyBuffer.position(); - - int reducerIndex; - if (uhcIndex[i] == 0) { - //for the normal dictionary column - reducerIndex = columnIndexToReducerBeginId.get(i); - } else { - //for the uhc - reducerIndex = columnIndexToReducerBeginId.get(i) + (fieldValue.hashCode() & 0x7fffffff) % uhcReducerCount; - } - keyBuffer.put(Bytes.toBytes(reducerIndex)[3]); - keyBuffer.put(Bytes.toBytes(fieldValue)); - outputKey.set(keyBuffer.array(), offset, keyBuffer.position() - offset); - sortableKey.setText(outputKey); - //judge type - sortableKey.setTypeIdByDatatype(factDictCols.get(i).getType()); - context.write(sortableKey, EMPTY_TEXT); + for (int i = 0; i < factDictCols.size(); i++) { + String fieldValue = row[dictionaryColumnIndex[i]]; + if (fieldValue == null) + continue; + + int reducerIndex; + if (uhcIndex[i] == 0) { + //for the normal dictionary column + reducerIndex = columnIndexToReducerBeginId.get(i); + } else { + //for the uhc + reducerIndex = columnIndexToReducerBeginId.get(i) + (fieldValue.hashCode() & 0x7fffffff) % uhcReducerCount; + } + + tmpbuf.clear(); + tmpbuf.put(Bytes.toBytes(reducerIndex)[3]); + tmpbuf.put(Bytes.toBytes(fieldValue)); + outputKey.set(tmpbuf.array(), 0, tmpbuf.position()); + sortableKey.setText(outputKey); + //judge type + sortableKey.setTypeIdByDatatype(factDictCols.get(i).getType()); + context.write(sortableKey, EMPTY_TEXT); + + // log a few rows for troubleshooting + if (rowCount < 10) { + logger.info("Sample output: " + factDictCols.get(i) + " '" + fieldValue + "' => reducer " + reducerIndex); } - } catch (Exception ex) { - handleErrorRecord(row, ex); } if (collectStatistics) { - if (rowCount < samplingPercentage) { + if (rowCount % 100 < samplingPercentage) { putRowKeyToHLL(row); } if (needFetchPartitionCol == true) { String fieldValue = row[partitionColumnIndex]; if (fieldValue != null) { - int offset = keyBuffer.position(); - keyBuffer.put(MARK_FOR_PARTITION_COL); - keyBuffer.put(Bytes.toBytes(fieldValue)); - outputKey.set(keyBuffer.array(), offset, keyBuffer.position() - offset); + tmpbuf.clear(); + tmpbuf.put(MARK_FOR_PARTITION_COL); + tmpbuf.put(Bytes.toBytes(fieldValue)); + outputKey.set(tmpbuf.array(), 0, tmpbuf.position()); sortableKey.setText(outputKey); sortableKey.setTypeId((byte) 0); context.write(sortableKey, EMPTY_TEXT); } } } - - if (rowCount++ == 100) - rowCount = 0; + rowCount++; } private void putRowKeyToHLL(String[] row) { @@ -211,10 +214,10 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMap for (int i = 0; i < cuboidIds.length; i++) { hll = allCuboidsHLL[i]; - keyBuffer.clear(); - keyBuffer.put(MARK_FOR_HLL); // one byte - keyBuffer.putLong(cuboidIds[i]); - outputKey.set(keyBuffer.array(), 0, keyBuffer.position()); + tmpbuf.clear(); + tmpbuf.put(MARK_FOR_HLL); // one byte + tmpbuf.putLong(cuboidIds[i]); + outputKey.set(tmpbuf.array(), 0, tmpbuf.position()); hllBuf.clear(); hll.writeRegisters(hllBuf); outputValue.set(hllBuf.array(), 0, hllBuf.position()); http://git-wip-us.apache.org/repos/asf/kylin/blob/8cbd6a6c/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java index d3becfe..dcc9190 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java @@ -59,7 +59,6 @@ public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable { long cubeSizeBytes = cubingJob.findCubeSizeBytes(); segment.setLastBuildJobID(CubingExecutableUtil.getCubingJobId(this.getParams())); - segment.setIndexPath(CubingExecutableUtil.getIndexPath(this.getParams())); segment.setLastBuildTime(System.currentTimeMillis()); segment.setSizeKB(cubeSizeBytes / 1024); segment.setInputRecords(sourceCount); @@ -81,8 +80,7 @@ public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable { private void updateTimeRange(CubeSegment segment) throws IOException { final TblColRef partitionCol = segment.getCubeDesc().getModel().getPartitionDesc().getPartitionDateColumnRef(); final String factColumnsInputPath = this.getParams().get(BatchConstants.CFG_OUTPUT_PATH); - Path colDir = new Path(factColumnsInputPath, partitionCol.getName()); - Path outputFile = new Path(colDir, partitionCol.getName() + FactDistinctColumnsReducer.PARTITION_COL_INFO_FILE_POSTFIX); + Path outputFile = new Path(factColumnsInputPath, partitionCol.getIdentity() + FactDistinctColumnsReducer.PARTITION_COL_INFO_FILE_POSTFIX); FileSystem fs = HadoopUtil.getFileSystem(outputFile.toString()); FSDataInputStream is = null; long minValue = Long.MAX_VALUE, maxValue = Long.MIN_VALUE; http://git-wip-us.apache.org/repos/asf/kylin/blob/8cbd6a6c/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java index d2fa73e..add5c42 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java @@ -75,7 +75,6 @@ public class UpdateCubeInfoAfterMergeStep extends AbstractExecutable { mergedSegment.setInputRecords(sourceCount); mergedSegment.setInputRecordsSize(sourceSize); mergedSegment.setLastBuildJobID(CubingExecutableUtil.getCubingJobId(this.getParams())); - mergedSegment.setIndexPath(CubingExecutableUtil.getIndexPath(this.getParams())); mergedSegment.setLastBuildTime(System.currentTimeMillis()); try {
