APACHE-KYLIN-2733: Introduce optimize job for adjusting cuboid set
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/faf08625 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/faf08625 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/faf08625 Branch: refs/heads/yaho-cube-planner Commit: faf08625f2df84a50a70bb93b54cdffd4f40e556 Parents: 74f581e Author: Zhong <[email protected]> Authored: Wed Aug 30 11:17:43 2017 +0800 Committer: Zhong <[email protected]> Committed: Fri Sep 8 11:33:04 2017 +0800 ---------------------------------------------------------------------- .../org/apache/kylin/cube/CubeInstance.java | 13 +- .../java/org/apache/kylin/cube/CubeManager.java | 82 ++++++- .../java/org/apache/kylin/cube/CubeUpdate.java | 14 +- .../kylin/cube/common/RowKeySplitter.java | 2 +- .../apache/kylin/cube/cuboid/CuboidUtil.java | 48 ++++ .../cube/cuboid/TreeCuboidSchedulerManager.java | 6 +- .../cube/inmemcubing/InMemCubeBuilder.java | 2 +- .../org/apache/kylin/cube/kv/RowKeyDecoder.java | 4 +- .../org/apache/kylin/engine/EngineFactory.java | 4 + .../apache/kylin/engine/IBatchCubingEngine.java | 3 + .../kylin/job/constant/ExecutableConstants.java | 5 + .../org/apache/kylin/job/dao/ExecutableDao.java | 18 +- .../org/apache/kylin/job/dao/ExecutablePO.java | 11 + .../kylin/job/execution/ExecutableManager.java | 41 +++- .../job/impl/threadpool/DefaultScheduler.java | 7 +- .../engine/mr/BatchOptimizeJobBuilder2.java | 226 +++++++++++++++++ .../mr/BatchOptimizeJobCheckpointBuilder.java | 89 +++++++ .../org/apache/kylin/engine/mr/CubingJob.java | 7 +- .../org/apache/kylin/engine/mr/IMROutput2.java | 13 +- .../kylin/engine/mr/JobBuilderSupport.java | 51 +++- .../kylin/engine/mr/MRBatchCubingEngine.java | 5 + .../kylin/engine/mr/MRBatchCubingEngine2.java | 5 + .../java/org/apache/kylin/engine/mr/MRUtil.java | 4 + .../engine/mr/common/AbstractHadoopJob.java | 17 ++ .../kylin/engine/mr/common/BatchConstants.java | 1 + .../kylin/engine/mr/common/CubeStatsReader.java | 126 ++++++---- .../engine/mr/common/CuboidRecommenderUtil.java | 14 +- .../engine/mr/common/CuboidSchedulerUtil.java | 54 ++++ .../engine/mr/common/CuboidStatsReaderUtil.java | 5 +- .../kylin/engine/mr/common/MapReduceUtil.java | 117 +++++++++ .../mr/common/StatisticsDecisionUtil.java | 2 +- .../engine/mr/steps/BaseCuboidMapperBase.java | 2 +- .../steps/CalculateStatsFromBaseCuboidJob.java | 116 +++++++++ .../CalculateStatsFromBaseCuboidMapper.java | 166 +++++++++++++ .../CalculateStatsFromBaseCuboidReducer.java | 112 +++++++++ .../engine/mr/steps/CopyDictionaryStep.java | 69 ++++++ .../engine/mr/steps/CubingExecutableUtil.java | 9 + .../apache/kylin/engine/mr/steps/CuboidJob.java | 28 ++- .../mr/steps/FactDistinctColumnsMapper.java | 37 +-- .../mr/steps/FactDistinctColumnsReducer.java | 3 +- .../mr/steps/FilterRecommendCuboidDataJob.java | 103 ++++++++ .../steps/FilterRecommendCuboidDataMapper.java | 107 ++++++++ .../mr/steps/InMemCuboidFromBaseCuboidJob.java | 154 ++++++++++++ .../steps/InMemCuboidFromBaseCuboidMapper.java | 95 ++++++++ .../steps/InMemCuboidFromBaseCuboidReducer.java | 23 ++ .../kylin/engine/mr/steps/InMemCuboidJob.java | 4 +- .../engine/mr/steps/InMemCuboidMapperBase.java | 10 +- .../kylin/engine/mr/steps/KVGTRecordWriter.java | 2 +- .../engine/mr/steps/MergeCuboidMapper.java | 2 +- .../mr/steps/MergeStatisticsWithOldStep.java | 144 +++++++++++ .../kylin/engine/mr/steps/NDCuboidMapper.java | 12 +- .../kylin/engine/mr/steps/ReducerNumSizing.java | 106 -------- .../UpdateCubeInfoAfterCheckpointStep.java | 69 ++++++ .../steps/UpdateCubeInfoAfterOptimizeStep.java | 72 ++++++ .../mr/steps/UpdateOldCuboidShardJob.java | 105 ++++++++ .../mr/steps/UpdateOldCuboidShardMapper.java | 142 +++++++++++ .../engine/spark/SparkBatchCubingEngine.java | 6 + .../kylin/rest/controller/CubeController.java | 60 +++++ .../kylin/rest/request/JobOptimizeRequest.java | 34 +++ .../apache/kylin/rest/service/CubeService.java | 4 + .../apache/kylin/rest/service/JobService.java | 244 ++++++++++++++++++- .../storage/hbase/steps/CreateHTableJob.java | 19 +- .../hbase/steps/HBaseMROutput2Transition.java | 37 ++- .../kylin/storage/hbase/steps/HBaseMRSteps.java | 68 ++++++ 64 files changed, 2911 insertions(+), 249 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/faf08625/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java index 05fb5be..e82144e 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java @@ -146,6 +146,17 @@ public class CubeInstance extends RootPersistentEntity implements IRealization, return segments.getMergingSegments(mergedSegment); } + public CubeSegment getOriginalSegmentToOptimize(CubeSegment optimizedSegment) { + for (CubeSegment segment : this.getSegments(SegmentStatusEnum.READY)) { + if (!optimizedSegment.equals(segment) // + && optimizedSegment.getDateRangeStart() == segment.getDateRangeStart() + && optimizedSegment.getDateRangeEnd() == segment.getDateRangeEnd()) { + return segment; + } + } + return null; + } + public CubeDesc getDescriptor() { return CubeDescManager.getInstance(config).getCubeDesc(descName); } @@ -351,7 +362,7 @@ public class CubeInstance extends RootPersistentEntity implements IRealization, return cuboidsRecommend; case RECOMMEND_MISSING_WITH_BASE: cuboidsRecommend.removeAll(currentCuboids); - currentCuboids.add(getCuboidScheduler().getBaseCuboidId()); + cuboidsRecommend.add(getCuboidScheduler().getBaseCuboidId()); return cuboidsRecommend; default: return null; http://git-wip-us.apache.org/repos/asf/kylin/blob/faf08625/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java index a679dde..b815e75 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java @@ -75,6 +75,7 @@ import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; /** * @author yangli9 @@ -425,6 +426,10 @@ public class CubeManager implements IRealizationProvider { cube.setCuboids(update.getCuboids()); } + if (update.getCuboidsRecommend() != null) { + cube.setCuboidsRecommend(update.getCuboidsRecommend()); + } + try { getStore().putResource(cube.getResourcePath(), cube, CUBE_SERIALIZER); } catch (IllegalStateException ise) { @@ -503,6 +508,29 @@ public class CubeManager implements IRealizationProvider { return newSegment; } + public CubeSegment[] optimizeSegments(CubeInstance cube, Set<Long> cuboidsRecommend) throws IOException { + checkReadyForOptimize(cube); + + List<CubeSegment> readySegments = cube.getSegments(SegmentStatusEnum.READY); + CubeSegment[] optimizeSegments = new CubeSegment[readySegments.size()]; + int i = 0; + for (CubeSegment segment : readySegments) { + CubeSegment newSegment = newSegment(cube, segment.getDateRangeStart(), segment.getDateRangeEnd(), 0, 0); + newSegment.setSourcePartitionOffsetStart(null); + newSegment.setSourcePartitionOffsetEnd(null); + validateNewSegments(cube, newSegment); + + optimizeSegments[i++] = newSegment; + } + + CubeUpdate cubeBuilder = new CubeUpdate(cube); + cubeBuilder.setCuboidsRecommend(cuboidsRecommend); + cubeBuilder.setToAddSegs(optimizeSegments); + updateCube(cubeBuilder); + + return optimizeSegments; + } + public CubeSegment refreshSegment(CubeInstance cube, long startDate, long endDate, long startOffset, long endOffset) throws IOException { checkBuildingSegment(cube); @@ -644,8 +672,15 @@ public class CubeManager implements IRealizationProvider { } private void checkBuildingSegment(CubeInstance cube) { - int maxBuldingSeg = cube.getConfig().getMaxBuildingSegments(); - if (cube.getBuildingSegments().size() >= maxBuldingSeg) { + checkBuildingSegment(cube, cube.getConfig().getMaxBuildingSegments()); + } + + public void checkReadyForOptimize(CubeInstance cube) { + checkBuildingSegment(cube, 1); + } + + private void checkBuildingSegment(CubeInstance cube, int maxBuildingSeg) { + if (cube.getBuildingSegments().size() >= maxBuildingSeg) { throw new IllegalStateException( "There is already " + cube.getBuildingSegments().size() + " building segment; "); } @@ -771,6 +806,49 @@ public class CubeManager implements IRealizationProvider { updateCube(cubeBuilder); } + public void promoteNewlyOptimizeSegments(CubeInstance cube, CubeSegment... optimizedSegments) throws IOException { + for (CubeSegment seg : optimizedSegments) { + seg.setStatus(SegmentStatusEnum.READY_PENDING); + } + + CubeUpdate cubeBuilder = new CubeUpdate(cube); + cubeBuilder.setToUpdateSegs(optimizedSegments); + updateCube(cubeBuilder); + } + + public void promoteCheckpointOptimizeSegments(CubeInstance cube, Map<Long, Long> recommendCuboids, + CubeSegment... optimizedSegments) throws IOException { + if (cube.getSegments().size() != optimizedSegments.length * 2) { + throw new IllegalStateException("For cube " + cube + + ", every READY segment should be optimized and all segments should be READY before optimizing"); + } + CubeSegment[] originalSegments = new CubeSegment[optimizedSegments.length]; + int i = 0; + for (CubeSegment seg : optimizedSegments) { + originalSegments[i++] = cube.getOriginalSegmentToOptimize(seg); + + if (StringUtils.isBlank(seg.getStorageLocationIdentifier())) + throw new IllegalStateException( + "For cube " + cube + ", segment " + seg + " missing StorageLocationIdentifier"); + + if (StringUtils.isBlank(seg.getLastBuildJobID())) + throw new IllegalStateException("For cube " + cube + ", segment " + seg + " missing LastBuildJobID"); + + seg.setStatus(SegmentStatusEnum.READY); + } + + logger.info("Promoting cube " + cube + ", new segments " + Arrays.toString(optimizedSegments) + + ", to remove segments " + originalSegments); + + CubeUpdate cubeBuilder = new CubeUpdate(cube); + cubeBuilder.setToRemoveSegs(originalSegments) // + .setToUpdateSegs(optimizedSegments) // + .setStatus(RealizationStatusEnum.READY) // + .setCuboids(recommendCuboids) // + .setCuboidsRecommend(Sets.<Long> newHashSet()); + updateCube(cubeBuilder); + } + public void validateNewSegments(CubeInstance cube, CubeSegment newSegments) { List<CubeSegment> tobe = cube.calculateToBeSegments(newSegments); List<CubeSegment> newList = Arrays.asList(newSegments); http://git-wip-us.apache.org/repos/asf/kylin/blob/faf08625/core-cube/src/main/java/org/apache/kylin/cube/CubeUpdate.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeUpdate.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeUpdate.java index fae20dc..2e1d652 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeUpdate.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeUpdate.java @@ -19,6 +19,7 @@ package org.apache.kylin.cube; import java.util.Map; +import java.util.Set; import org.apache.kylin.metadata.realization.RealizationStatusEnum; @@ -34,6 +35,7 @@ public class CubeUpdate { private String owner; private int cost = -1; private Map<Long, Long> cuboids = null; + private Set<Long> cuboidsRecommend = null; public CubeUpdate(CubeInstance cubeInstance) { this.cubeInstance = cubeInstance; @@ -106,7 +108,17 @@ public class CubeUpdate { return cuboids; } - public void setCuboids(Map<Long, Long> cuboids) { + public CubeUpdate setCuboids(Map<Long, Long> cuboids) { this.cuboids = cuboids; + return this; + } + + public Set<Long> getCuboidsRecommend() { + return cuboidsRecommend; + } + + public CubeUpdate setCuboidsRecommend(Set<Long> cuboidsRecommend) { + this.cuboidsRecommend = cuboidsRecommend; + return this; } } http://git-wip-us.apache.org/repos/asf/kylin/blob/faf08625/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java b/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java index cd26347..0c54ecf 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java @@ -115,7 +115,7 @@ public class RowKeySplitter implements java.io.Serializable { offset += RowConstants.ROWKEY_CUBOIDID_LEN; long lastSplittedCuboidId = Bytes.toLong(cuboidIdSplit.value, 0, cuboidIdSplit.length); - Cuboid cuboid = Cuboid.findById(cubeSegment, lastSplittedCuboidId); + Cuboid cuboid = Cuboid.findForMandatory(cubeDesc, lastSplittedCuboidId); // rowkey columns for (int i = 0; i < cuboid.getColumns().size(); i++) { http://git-wip-us.apache.org/repos/asf/kylin/blob/faf08625/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidUtil.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidUtil.java b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidUtil.java new file mode 100644 index 0000000..a84f153 --- /dev/null +++ b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidUtil.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.cube.cuboid; + +import com.google.common.base.Preconditions; + +public class CuboidUtil { + + public static Integer[][] getCuboidBitSet(Long[] cuboidIds, int nRowKey) { + Preconditions.checkArgument(nRowKey < Long.SIZE, + "the size of row key could not be large than " + (Long.SIZE - 1)); + + Integer[][] allCuboidsBitSet = new Integer[cuboidIds.length][]; + + for (int j = 0; j < cuboidIds.length; j++) { + Long cuboidId = cuboidIds[j]; + + allCuboidsBitSet[j] = new Integer[Long.bitCount(cuboidId)]; + + long mask = 1L << (nRowKey - 1); + int position = 0; + for (int i = 0; i < nRowKey; i++) { + if ((mask & cuboidId) > 0) { + allCuboidsBitSet[j][position] = i; + position++; + } + mask = mask >> 1; + } + } + return allCuboidsBitSet; + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/faf08625/core-cube/src/main/java/org/apache/kylin/cube/cuboid/TreeCuboidSchedulerManager.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/TreeCuboidSchedulerManager.java b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/TreeCuboidSchedulerManager.java index 5e8d965..22e636b 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/TreeCuboidSchedulerManager.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/TreeCuboidSchedulerManager.java @@ -64,7 +64,7 @@ public class TreeCuboidSchedulerManager { * @param cubeName * @return null if the cube has no pre-built cuboids */ - public static TreeCuboidScheduler getTreeCuboidScheduler(String cubeName) { + public TreeCuboidScheduler getTreeCuboidScheduler(String cubeName) { TreeCuboidScheduler result = cache.get(cubeName); if (result == null) { CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); @@ -83,14 +83,14 @@ public class TreeCuboidSchedulerManager { return result; } - public static TreeCuboidScheduler getTreeCuboidScheduler(CubeDesc cubeDesc, Map<Long, Long> cuboidsWithRowCnt) { + public TreeCuboidScheduler getTreeCuboidScheduler(CubeDesc cubeDesc, Map<Long, Long> cuboidsWithRowCnt) { if (cuboidsWithRowCnt == null || cuboidsWithRowCnt.isEmpty()) { return null; } return getTreeCuboidScheduler(cubeDesc, Lists.newArrayList(cuboidsWithRowCnt.keySet()), cuboidsWithRowCnt); } - public static TreeCuboidScheduler getTreeCuboidScheduler(CubeDesc cubeDesc, List<Long> cuboidIds, + public TreeCuboidScheduler getTreeCuboidScheduler(CubeDesc cubeDesc, List<Long> cuboidIds, Map<Long, Long> cuboidsWithRowCnt) { if (cuboidIds == null || cuboidsWithRowCnt == null) { return null; http://git-wip-us.apache.org/repos/asf/kylin/blob/faf08625/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java index f63b53f..97bb1de 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java @@ -108,7 +108,7 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder { } private GridTable newGridTableByCuboidID(long cuboidID) throws IOException { - GTInfo info = CubeGridTable.newGTInfo(Cuboid.findById(cuboidScheduler, cuboidID), + GTInfo info = CubeGridTable.newGTInfo(Cuboid.findForMandatory(cubeDesc, cuboidID), new CubeDimEncMap(cubeDesc, dictionaryMap) ); http://git-wip-us.apache.org/repos/asf/kylin/blob/faf08625/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java index 5a1f668..bb03c4c 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java @@ -36,7 +36,6 @@ import org.apache.kylin.metadata.model.TblColRef; */ public class RowKeyDecoder { - private final CubeSegment cubeSegment; private final CubeDesc cubeDesc; private final RowKeyColumnIO colIO; private final RowKeySplitter rowKeySplitter; @@ -45,7 +44,6 @@ public class RowKeyDecoder { private List<String> values; public RowKeyDecoder(CubeSegment cubeSegment) { - this.cubeSegment = cubeSegment; this.cubeDesc = cubeSegment.getCubeDesc(); this.rowKeySplitter = new RowKeySplitter(cubeSegment, 65, 255); this.colIO = new RowKeyColumnIO(cubeSegment.getDimensionEncodingMap()); @@ -75,7 +73,7 @@ public class RowKeyDecoder { if (this.cuboid != null && this.cuboid.getId() == cuboidID) { return; } - this.cuboid = Cuboid.findById(cubeSegment, cuboidID); + this.cuboid = Cuboid.findForMandatory(cubeDesc, cuboidID); } private void collectValue(TblColRef col, byte[] valueBytes, int length) throws IOException { http://git-wip-us.apache.org/repos/asf/kylin/blob/faf08625/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java b/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java index 78b1efe..03d986b 100644 --- a/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java +++ b/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java @@ -61,4 +61,8 @@ public class EngineFactory { return batchEngine(mergeSegment).createBatchMergeJob(mergeSegment, submitter); } + /** Optimize a segment based on the cuboid recommend list produced by the cube planner. */ + public static DefaultChainedExecutable createBatchOptimizeJob(CubeSegment optimizeSegment, String submitter) { + return batchEngine(optimizeSegment).createBatchOptimizeJob(optimizeSegment, submitter); + } } http://git-wip-us.apache.org/repos/asf/kylin/blob/faf08625/core-job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java b/core-job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java index 754dbde..a618eac 100644 --- a/core-job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java +++ b/core-job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java @@ -37,6 +37,9 @@ public interface IBatchCubingEngine { /** Merge multiple small segments into a big one. */ public DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter); + /** Optimize a segment based on the cuboid recommend list produced by the cube planner. */ + public DefaultChainedExecutable createBatchOptimizeJob(CubeSegment optimizeSegment, String submitter); + public Class<?> getSourceInterface(); public Class<?> getStorageInterface(); http://git-wip-us.apache.org/repos/asf/kylin/blob/faf08625/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java index 2de3efa..b39e3aa 100644 --- a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java +++ b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java @@ -36,6 +36,9 @@ public final class ExecutableConstants { public static final String STEP_NAME_CREATE_FLAT_HIVE_TABLE = "Create Intermediate Flat Hive Table"; public static final String STEP_NAME_MATERIALIZE_HIVE_VIEW_IN_LOOKUP = "Materialize Hive View in Lookup Tables"; public static final String STEP_NAME_FACT_DISTINCT_COLUMNS = "Extract Fact Table Distinct Columns"; + public static final String STEP_NAME_CALCULATE_STATS_FROM_BASE_CUBOID = "Calculate Statistics from Base Cuboid"; + public static final String STEP_NAME_FILTER_RECOMMEND_CUBOID_DATA_FOR_OPTIMIZATION = "Filter Recommend Cuboid Data for Optimization"; + public static final String STEP_NAME_UPDATE_OLD_CUBOID_SHARD = "Update Old Cuboid Shard for Optimization"; public static final String STEP_NAME_BUILD_BASE_CUBOID = "Build Base Cuboid"; public static final String STEP_NAME_BUILD_IN_MEM_CUBE = "Build Cube In-Mem"; public static final String STEP_NAME_BUILD_SPARK_CUBE = "Build Cube with Spark"; @@ -44,8 +47,10 @@ public final class ExecutableConstants { public static final String STEP_NAME_CREATE_HBASE_TABLE = "Create HTable"; public static final String STEP_NAME_CONVERT_CUBOID_TO_HFILE = "Convert Cuboid Data to HFile"; public static final String STEP_NAME_BULK_LOAD_HFILE = "Load HFile to HBase Table"; + public static final String STEP_NAME_COPY_DICTIONARY = "Copy dictionary from Old Segment"; public static final String STEP_NAME_MERGE_DICTIONARY = "Merge Cuboid Dictionary"; public static final String STEP_NAME_MERGE_STATISTICS = "Merge Cuboid Statistics"; + public static final String STEP_NAME_MERGE_STATISTICS_WITH_OLD = "Merge Cuboid Statistics with Old for Optimization"; public static final String STEP_NAME_SAVE_STATISTICS = "Save Cuboid Statistics"; public static final String STEP_NAME_MERGE_CUBOID = "Merge Cuboid Data"; public static final String STEP_NAME_UPDATE_CUBE_INFO = "Update Cube Info"; http://git-wip-us.apache.org/repos/asf/kylin/blob/faf08625/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java index bd020f8..60ccaaf 100644 --- a/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java +++ b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java @@ -92,8 +92,8 @@ public class ExecutableDao { return store.getResource(path, ExecutablePO.class, JOB_SERIALIZER); } - private void writeJobResource(String path, ExecutablePO job) throws IOException { - store.putResource(path, job, JOB_SERIALIZER); + private long writeJobResource(String path, ExecutablePO job) throws IOException { + return store.putResource(path, job, JOB_SERIALIZER); } private ExecutableOutputPO readJobOutputResource(String path) throws IOException { @@ -179,6 +179,20 @@ public class ExecutableDao { } } + public ExecutablePO updateJob(ExecutablePO job) throws PersistentException { + try { + if (getJob(job.getUuid()) == null) { + throw new IllegalArgumentException("job id:" + job.getUuid() + " does not exist"); + } + final long ts = writeJobResource(pathOfJob(job), job); + job.setLastModified(ts); + return job; + } catch (IOException e) { + logger.error("error update job:" + job.getUuid(), e); + throw new PersistentException(e); + } + } + public void deleteJob(String uuid) throws PersistentException { try { store.deleteResource(pathOfJob(uuid)); http://git-wip-us.apache.org/repos/asf/kylin/blob/faf08625/core-job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java index 75717e0..f48c876 100644 --- a/core-job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java +++ b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java @@ -39,6 +39,9 @@ public class ExecutablePO extends RootPersistentEntity { @JsonProperty("tasks") private List<ExecutablePO> tasks; + @JsonProperty("tasks_check") + private List<ExecutablePO> tasksForCheck; + @JsonProperty("type") private String type; @@ -61,6 +64,14 @@ public class ExecutablePO extends RootPersistentEntity { this.tasks = tasks; } + public List<ExecutablePO> getTasksForCheck() { + return tasksForCheck; + } + + public void setTasksForCheck(List<ExecutablePO> tasksForCheck) { + this.tasksForCheck = tasksForCheck; + } + public String getType() { return type; } http://git-wip-us.apache.org/repos/asf/kylin/blob/faf08625/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java index 124a5a9..bd2a2d3 100755 --- a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java @@ -101,6 +101,13 @@ public class ExecutableManager { } result.setTasks(tasks); } + if (executable instanceof CheckpointExecutable) { + List<ExecutablePO> tasksForCheck = Lists.newArrayList(); + for (AbstractExecutable taskForCheck : ((CheckpointExecutable) executable).getSubTasksForCheck()) { + tasksForCheck.add(parse(taskForCheck)); + } + result.setTasksForCheck(tasksForCheck); + } return result; } @@ -126,6 +133,23 @@ public class ExecutableManager { } } + public void updateCheckpointJob(String jobId, List<AbstractExecutable> subTasksForCheck) { + try { + final ExecutablePO job = executableDao.getJob(jobId); + Preconditions.checkArgument(job != null, "there is no related job for job id:" + jobId); + + List<ExecutablePO> tasksForCheck = Lists.newArrayListWithExpectedSize(subTasksForCheck.size()); + for (AbstractExecutable taskForCheck : subTasksForCheck) { + tasksForCheck.add(parse(taskForCheck)); + } + job.setTasksForCheck(tasksForCheck); + executableDao.updateJob(job); + } catch (PersistentException e) { + logger.error("fail to update checkpoint job:" + jobId, e); + throw new RuntimeException(e); + } + } + //for ut public void deleteJob(String jobId) { try { @@ -353,7 +377,15 @@ public class ExecutableManager { if (job == null) { return; } - + if (job.getStatus().isFinalState()) { + if (job.getStatus() != ExecutableState.DISCARDED) { + logger.warn("The status of job " + jobId + " is " + job.getStatus().toString() + + ". It's final state and cannot be transfer to be discarded!!!"); + } else { + logger.warn("The job " + jobId + " has been discarded."); + } + return; + } if (job instanceof DefaultChainedExecutable) { List<AbstractExecutable> tasks = ((DefaultChainedExecutable) job).getTasks(); for (AbstractExecutable task : tasks) { @@ -505,6 +537,13 @@ public class ExecutableManager { ((ChainedExecutable) result).addTask(parseTo(subTask)); } } + List<ExecutablePO> tasksForCheck = executablePO.getTasksForCheck(); + if (tasksForCheck != null && !tasksForCheck.isEmpty()) { + Preconditions.checkArgument(result instanceof CheckpointExecutable); + for (ExecutablePO subTaskForCheck : tasksForCheck) { + ((CheckpointExecutable) result).addTaskForCheck(parseTo(subTaskForCheck)); + } + } return result; } catch (ReflectiveOperationException e) { throw new IllegalStateException("cannot parse this job:" + executablePO.getId(), e); http://git-wip-us.apache.org/repos/asf/kylin/blob/faf08625/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java index b119e1b..315671c 100644 --- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java +++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java @@ -88,8 +88,9 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti nRunning++; continue; } - final Output output = executableManager.getOutput(id); - if ((output.getState() != ExecutableState.READY)) { + final AbstractExecutable executable = executableManager.getJob(id); + if (!executable.isReady()) { + final Output output = executableManager.getOutput(id); // logger.debug("Job id:" + id + " not runnable"); if (output.getState() == ExecutableState.DISCARDED) { nDiscarded++; @@ -105,10 +106,8 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti continue; } nReady++; - AbstractExecutable executable = null; String jobDesc = null; try { - executable = executableManager.getJob(id); jobDesc = executable.toString(); logger.info(jobDesc + " prepare to schedule"); context.addRunningJob(executable); http://git-wip-us.apache.org/repos/asf/kylin/blob/faf08625/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchOptimizeJobBuilder2.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchOptimizeJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchOptimizeJobBuilder2.java new file mode 100644 index 0000000..645525b --- /dev/null +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchOptimizeJobBuilder2.java @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.engine.mr; + +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.cuboid.CuboidModeEnum; +import org.apache.kylin.engine.mr.common.AbstractHadoopJob; +import org.apache.kylin.engine.mr.common.BatchConstants; +import org.apache.kylin.engine.mr.common.MapReduceExecutable; +import org.apache.kylin.engine.mr.steps.CopyDictionaryStep; +import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; +import org.apache.kylin.engine.mr.steps.FilterRecommendCuboidDataJob; +import org.apache.kylin.engine.mr.steps.InMemCuboidFromBaseCuboidJob; +import org.apache.kylin.engine.mr.steps.MergeStatisticsWithOldStep; +import org.apache.kylin.engine.mr.steps.NDCuboidJob; +import org.apache.kylin.engine.mr.steps.UpdateCubeInfoAfterOptimizeStep; +import org.apache.kylin.engine.mr.steps.UpdateOldCuboidShardJob; +import org.apache.kylin.job.constant.ExecutableConstants; +import org.apache.kylin.job.engine.JobEngineConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +public class BatchOptimizeJobBuilder2 extends JobBuilderSupport { + private static final Logger logger = LoggerFactory.getLogger(BatchOptimizeJobBuilder2.class); + + private final IMROutput2.IMRBatchOptimizeOutputSide2 outputSide; + + public BatchOptimizeJobBuilder2(CubeSegment optimizeSegment, String submitter) { + super(optimizeSegment, submitter); + this.outputSide = MRUtil.getBatchOptimizeOutputSide2(optimizeSegment); + } + + public CubingJob build() { + logger.info("MR_V2 new job to OPTIMIZE a segment " + seg); + + final CubingJob result = CubingJob.createOptimizeJob(seg, submitter, config); + CubingExecutableUtil.setCubeName(seg.getRealization().getName(), result.getParams()); + + final String jobId = result.getId(); + final String cuboidRootPath = getCuboidRootPath(jobId); + final String optimizeCuboidRootPath = getOptimizationCuboidPath(jobId); + + CubeSegment oldSegment = seg.getCubeInstance().getOriginalSegmentToOptimize(seg); + Preconditions.checkNotNull(oldSegment, "cannot find the original segment to be optimized by " + seg); + + // Phase 1: Prepare base cuboid data from old segment + String oldcuboidRootPath = getCuboidRootPath(oldSegment) + "*"; + result.addTask(createFilterRecommendCuboidDataStep(oldcuboidRootPath, optimizeCuboidRootPath)); + + // Phase 2: Prepare dictionary and statistics for new segment + result.addTask(createCopyDictionaryStep()); + String optStatsSourcePath = getBaseCuboidPath(optimizeCuboidRootPath); + String optStatsDstPath = getOptimizationStatisticsPath(jobId); + result.addTask(createCalculateStatsFromBaseCuboid(optStatsSourcePath, optStatsDstPath, + CuboidModeEnum.RECOMMEND_MISSING)); + result.addTask(createMergeStatisticsWithOldStep(jobId, optStatsDstPath, getStatisticsPath(jobId))); + outputSide.addStepPhase1_CreateHTable(result); + + result.addTask(createUpdateShardForOldCuboidDataStep(optimizeCuboidRootPath + "*", cuboidRootPath)); + + // Phase 3: Build Cube for Missing Cuboid Data + addLayerCubingSteps(result, jobId, CuboidModeEnum.RECOMMEND_MISSING_WITH_BASE, cuboidRootPath); // layer cubing + result.addTask(createInMemCubingStep(jobId, CuboidModeEnum.RECOMMEND_MISSING_WITH_BASE, cuboidRootPath)); + + outputSide.addStepPhase2_BuildCube(result); + + // Phase 4: Update Metadata & Cleanup + result.addTask(createUpdateCubeInfoAfterOptimizeStep(jobId)); + + return result; + } + + public MapReduceExecutable createFilterRecommendCuboidDataStep(String inputPath, String outputPath) { + MapReduceExecutable result = new MapReduceExecutable(); + result.setName(ExecutableConstants.STEP_NAME_FILTER_RECOMMEND_CUBOID_DATA_FOR_OPTIMIZATION); + + StringBuilder cmd = new StringBuilder(); + appendMapReduceParameters(cmd); + appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName()); + appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid()); + appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, inputPath); + appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, outputPath); + appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, + "Kylin_Filter_Recommend_Cuboid_Data_" + seg.getRealization().getName()); + + result.setMapReduceParams(cmd.toString()); + result.setMapReduceJobClass(FilterRecommendCuboidDataJob.class); + return result; + } + + public CopyDictionaryStep createCopyDictionaryStep() { + CopyDictionaryStep result = new CopyDictionaryStep(); + result.setName(ExecutableConstants.STEP_NAME_COPY_DICTIONARY); + + CubingExecutableUtil.setCubeName(seg.getRealization().getName(), result.getParams()); + CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams()); + return result; + } + + private MapReduceExecutable createUpdateShardForOldCuboidDataStep(String inputPath, String outputPath) { + MapReduceExecutable result = new MapReduceExecutable(); + result.setName(ExecutableConstants.STEP_NAME_UPDATE_OLD_CUBOID_SHARD); + + StringBuilder cmd = new StringBuilder(); + appendMapReduceParameters(cmd); + appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName()); + appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid()); + appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, inputPath); + appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, outputPath); + appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, + "Kylin_Update_Old_Cuboid_Shard_" + seg.getRealization().getName()); + + result.setMapReduceParams(cmd.toString()); + result.setMapReduceJobClass(UpdateOldCuboidShardJob.class); + return result; + } + + private MergeStatisticsWithOldStep createMergeStatisticsWithOldStep(final String jobId, final String optStatsPath, + final String mergedStatisticsFolder) { + MergeStatisticsWithOldStep result = new MergeStatisticsWithOldStep(); + result.setName(ExecutableConstants.STEP_NAME_MERGE_STATISTICS_WITH_OLD); + + CubingExecutableUtil.setCubingJobId(jobId, result.getParams()); + CubingExecutableUtil.setCubeName(seg.getRealization().getName(), result.getParams()); + CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams()); + CubingExecutableUtil.setStatisticsPath(optStatsPath, result.getParams()); + CubingExecutableUtil.setMergedStatisticsPath(mergedStatisticsFolder, result.getParams()); + + return result; + } + + private void addLayerCubingSteps(final CubingJob result, final String jobId, final CuboidModeEnum cuboidMode, + final String cuboidRootPath) { + // Don't know statistics so that tree cuboid scheduler is not determined. Determine the maxLevel at runtime + final int maxLevel = seg.getCubeDesc().getRowkey().getRowKeyColumns().length; + // Don't need to build base cuboid + // n dim cuboid steps + for (int i = 1; i <= maxLevel; i++) { + String parentCuboidPath = i == 1 ? getBaseCuboidPath(cuboidRootPath) + : getCuboidOutputPathsByLevel(cuboidRootPath, i - 1); + result.addTask(createNDimensionCuboidStep(parentCuboidPath, + getCuboidOutputPathsByLevel(cuboidRootPath, i), i, jobId, cuboidMode)); + } + } + + private MapReduceExecutable createNDimensionCuboidStep(String parentPath, String outputPath, int level, + String jobId, CuboidModeEnum cuboidMode) { + // ND cuboid job + MapReduceExecutable ndCuboidStep = new MapReduceExecutable(); + + ndCuboidStep.setName(ExecutableConstants.STEP_NAME_BUILD_N_D_CUBOID + " : level " + level); + StringBuilder cmd = new StringBuilder(); + + appendMapReduceParameters(cmd); + appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName()); + appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid()); + appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, parentPath); + appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, outputPath); + appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, + "Kylin_ND-Cuboid_Builder_" + seg.getRealization().getName() + "_Step"); + appendExecCmdParameters(cmd, BatchConstants.ARG_LEVEL, "" + level); + appendExecCmdParameters(cmd, BatchConstants.ARG_CUBING_JOB_ID, jobId); + appendExecCmdParameters(cmd, BatchConstants.ARG_CUBOID_MODE, cuboidMode.toString()); + + ndCuboidStep.setMapReduceParams(cmd.toString()); + ndCuboidStep.setMapReduceJobClass(getNDCuboidJob()); + return ndCuboidStep; + } + + private MapReduceExecutable createInMemCubingStep(String jobId, CuboidModeEnum cuboidMode, String cuboidRootPath) { + MapReduceExecutable cubeStep = new MapReduceExecutable(); + + StringBuilder cmd = new StringBuilder(); + appendMapReduceParameters(cmd, JobEngineConfig.IN_MEM_JOB_CONF_SUFFIX); + + cubeStep.setName(ExecutableConstants.STEP_NAME_BUILD_IN_MEM_CUBE); + + appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName()); + appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid()); + appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, getBaseCuboidPath(cuboidRootPath)); + appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, getInMemCuboidPath(cuboidRootPath)); + appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, + "Kylin_Cube_Builder_" + seg.getRealization().getName()); + appendExecCmdParameters(cmd, BatchConstants.ARG_CUBING_JOB_ID, jobId); + appendExecCmdParameters(cmd, BatchConstants.ARG_CUBOID_MODE, cuboidMode.toString()); + + cubeStep.setMapReduceParams(cmd.toString()); + cubeStep.setMapReduceJobClass(InMemCuboidFromBaseCuboidJob.class); + cubeStep.setCounterSaveAs( + CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES + "," + CubingJob.CUBE_SIZE_BYTES); + return cubeStep; + } + + public UpdateCubeInfoAfterOptimizeStep createUpdateCubeInfoAfterOptimizeStep(String jobId) { + final UpdateCubeInfoAfterOptimizeStep result = new UpdateCubeInfoAfterOptimizeStep(); + result.setName(ExecutableConstants.STEP_NAME_UPDATE_CUBE_INFO); + + CubingExecutableUtil.setCubeName(seg.getRealization().getName(), result.getParams()); + CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams()); + CubingExecutableUtil.setCubingJobId(jobId, result.getParams()); + + return result; + } + + protected Class<? extends AbstractHadoopJob> getNDCuboidJob() { + return NDCuboidJob.class; + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/faf08625/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchOptimizeJobCheckpointBuilder.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchOptimizeJobCheckpointBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchOptimizeJobCheckpointBuilder.java new file mode 100644 index 0000000..281cd64 --- /dev/null +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchOptimizeJobCheckpointBuilder.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.engine.mr; + +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.List; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; +import org.apache.kylin.engine.mr.steps.UpdateCubeInfoAfterCheckpointStep; +import org.apache.kylin.job.constant.ExecutableConstants; +import org.apache.kylin.job.execution.CheckpointExecutable; +import org.apache.kylin.metadata.project.ProjectInstance; +import org.apache.kylin.metadata.project.ProjectManager; + +import com.google.common.base.Preconditions; + +public class BatchOptimizeJobCheckpointBuilder { + + protected static SimpleDateFormat format = new SimpleDateFormat("z yyyy-MM-dd HH:mm:ss"); + + final protected CubeInstance cube; + final protected String submitter; + + private final IMROutput2.IMRBatchOptimizeOutputSide2 outputSide; + + public BatchOptimizeJobCheckpointBuilder(CubeInstance cube, String submitter) { + this.cube = cube; + this.submitter = submitter; + + Preconditions.checkNotNull(cube.getFirstSegment(), "Cube " + cube + " is empty!!!"); + this.outputSide = MRUtil.getBatchOptimizeOutputSide2(cube.getFirstSegment()); + } + + public CheckpointExecutable build() { + KylinConfig kylinConfig = cube.getConfig(); + List<ProjectInstance> projList = ProjectManager.getInstance(kylinConfig).findProjects(cube.getType(), + cube.getName()); + if (projList == null || projList.size() == 0) { + throw new RuntimeException("Cannot find the project containing the cube " + cube.getName() + "!!!"); + } else if (projList.size() >= 2) { + throw new RuntimeException("Find more than one project containing the cube " + cube.getName() + + ". It does't meet the uniqueness requirement!!! "); + } + + CheckpointExecutable checkpointJob = new CheckpointExecutable(); + checkpointJob.setSubmitter(submitter); + CubingExecutableUtil.setCubeName(cube.getName(), checkpointJob.getParams()); + checkpointJob.setName( + cube.getName() + " - OPTIMIZE CHECKPOINT - " + format.format(new Date(System.currentTimeMillis()))); + checkpointJob.setDeployEnvName(kylinConfig.getDeployEnv()); + checkpointJob.setProjectName(projList.get(0).getName()); + + // Phase 1: Update cube information + checkpointJob.addTask(createUpdateCubeInfoAfterCheckpointStep()); + + // Phase 2: Garbage collection + outputSide.addStepPhase3_Cleanup(checkpointJob); + + return checkpointJob; + } + + private UpdateCubeInfoAfterCheckpointStep createUpdateCubeInfoAfterCheckpointStep() { + UpdateCubeInfoAfterCheckpointStep result = new UpdateCubeInfoAfterCheckpointStep(); + result.setName(ExecutableConstants.STEP_NAME_UPDATE_CUBE_INFO); + + CubingExecutableUtil.setCubeName(cube.getName(), result.getParams()); + return result; + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/faf08625/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java index 3cd8931..1fa56c4 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java @@ -65,7 +65,7 @@ public class CubingJob extends DefaultChainedExecutable { } public enum CubingJobTypeEnum { - BUILD("BUILD"), MERGE("MERGE"); + BUILD("BUILD"), OPTIMIZE("OPTIMIZE"), MERGE("MERGE"); private final String name; @@ -106,6 +106,10 @@ public class CubingJob extends DefaultChainedExecutable { return initCubingJob(seg, CubingJobTypeEnum.BUILD.toString(), submitter, config); } + public static CubingJob createOptimizeJob(CubeSegment seg, String submitter, JobEngineConfig config) { + return initCubingJob(seg, CubingJobTypeEnum.OPTIMIZE.toString(), submitter, config); + } + public static CubingJob createMergeJob(CubeSegment seg, String submitter, JobEngineConfig config) { return initCubingJob(seg, CubingJobTypeEnum.MERGE.toString(), submitter, config); } @@ -135,6 +139,7 @@ public class CubingJob extends DefaultChainedExecutable { result.setJobType(jobType); CubingExecutableUtil.setCubeName(seg.getCubeInstance().getName(), result.getParams()); CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams()); + CubingExecutableUtil.setSegmentName(seg.getName(), result.getParams()); result.setName(jobType + " CUBE - " + seg.getCubeInstance().getName() + " - " + seg.getName() + " - " + format.format(new Date(System.currentTimeMillis()))); result.setSubmitter(submitter); http://git-wip-us.apache.org/repos/asf/kylin/blob/faf08625/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java index 69bba0a..e33d5c8 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java @@ -24,6 +24,7 @@ import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.cuboid.CuboidScheduler; import org.apache.kylin.job.execution.DefaultChainedExecutable; public interface IMROutput2 { @@ -67,7 +68,7 @@ public interface IMROutput2 { public void configureJobInput(Job job, String input) throws Exception; /** Configure the OutputFormat of given job. */ - public void configureJobOutput(Job job, String output, CubeSegment segment, int level) throws Exception; + public void configureJobOutput(Job job, String output, CubeSegment segment, CuboidScheduler cuboidScheduler, int level) throws Exception; } @@ -113,4 +114,14 @@ public interface IMROutput2 { public CubeSegment findSourceSegment(FileSplit fileSplit, CubeInstance cube); } + public IMRBatchOptimizeOutputSide2 getBatchOptimizeOutputSide(CubeSegment seg); + + public interface IMRBatchOptimizeOutputSide2 { + + public void addStepPhase1_CreateHTable(DefaultChainedExecutable jobFlow); + + public void addStepPhase2_BuildCube(DefaultChainedExecutable jobFlow); + + public void addStepPhase3_Cleanup(DefaultChainedExecutable jobFlow); + } } http://git-wip-us.apache.org/repos/asf/kylin/blob/faf08625/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java index c1ed345..27f1d7d 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java @@ -22,9 +22,11 @@ import java.io.IOException; import java.util.List; import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.cuboid.CuboidModeEnum; import org.apache.kylin.engine.mr.common.BatchConstants; import org.apache.kylin.engine.mr.common.HadoopShellExecutable; import org.apache.kylin.engine.mr.common.MapReduceExecutable; +import org.apache.kylin.engine.mr.steps.CalculateStatsFromBaseCuboidJob; import org.apache.kylin.engine.mr.steps.CreateDictionaryJob; import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; import org.apache.kylin.engine.mr.steps.FactDistinctColumnsJob; @@ -47,6 +49,10 @@ public class JobBuilderSupport { final public static String LayeredCuboidFolderPrefix = "level_"; + final public static String PathNameCuboidBase = "base_cuboid"; + final public static String PathNameCuboidOld = "old"; + final public static String PathNameCuboidInMem = "in_memory"; + public JobBuilderSupport(CubeSegment seg, String submitter) { Preconditions.checkNotNull(seg, "segment cannot be null"); this.config = new JobEngineConfig(seg.getConfig()); @@ -81,6 +87,31 @@ public class JobBuilderSupport { return result; } + public MapReduceExecutable createCalculateStatsFromBaseCuboid(String inputPath, String outputPath) { + return createCalculateStatsFromBaseCuboid(inputPath, outputPath, CuboidModeEnum.CURRENT); + } + + public MapReduceExecutable createCalculateStatsFromBaseCuboid(String inputPath, String outputPath, + CuboidModeEnum cuboidMode) { + MapReduceExecutable result = new MapReduceExecutable(); + result.setName(ExecutableConstants.STEP_NAME_CALCULATE_STATS_FROM_BASE_CUBOID); + result.setMapReduceJobClass(CalculateStatsFromBaseCuboidJob.class); + StringBuilder cmd = new StringBuilder(); + appendMapReduceParameters(cmd); + appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName()); + appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid()); + appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, inputPath); + appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, outputPath); + appendExecCmdParameters(cmd, BatchConstants.ARG_STATS_SAMPLING_PERCENT, + String.valueOf(config.getConfig().getCubingInMemSamplingPercent())); + appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, + "Calculate_Stats_For_Segment_" + seg.getRealization().getName() + "_Step"); + appendExecCmdParameters(cmd, BatchConstants.ARG_CUBOID_MODE, cuboidMode.toString()); + + result.setMapReduceParams(cmd.toString()); + return result; + } + public HadoopShellExecutable createBuildDictionaryStep(String jobId) { // base cuboid job HadoopShellExecutable buildDictionaryStep = new HadoopShellExecutable(); @@ -176,6 +207,18 @@ public class JobBuilderSupport { return getRealizationRootPath(jobId) + "/fact_distinct_columns/" + BatchConstants.CFG_OUTPUT_STATISTICS; } + public String getOptimizationRootPath(String jobId) { + return getRealizationRootPath(jobId) + "/optimize"; + } + + public String getOptimizationStatisticsPath(String jobId) { + return getOptimizationRootPath(jobId) + "/statistics"; + } + + public String getOptimizationCuboidPath(String jobId) { + return getOptimizationRootPath(jobId) + "/cuboid/"; + } + // ============================================================================ // static methods also shared by other job flow participant // ---------------------------------------------------------------------------- @@ -197,11 +240,17 @@ public class JobBuilderSupport { public static String getCuboidOutputPathsByLevel(String cuboidRootPath, int level) { if (level == 0) { - return cuboidRootPath + LayeredCuboidFolderPrefix + "base_cuboid"; + return cuboidRootPath + LayeredCuboidFolderPrefix + PathNameCuboidBase; } else { return cuboidRootPath + LayeredCuboidFolderPrefix + level + "_cuboid"; } } + public static String getBaseCuboidPath(String cuboidRootPath) { + return cuboidRootPath + PathNameCuboidBase; + } + public static String getInMemCuboidPath(String cuboidRootPath) { + return cuboidRootPath + PathNameCuboidInMem; + } } http://git-wip-us.apache.org/repos/asf/kylin/blob/faf08625/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine.java index 681c545..74c9b6d 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine.java @@ -48,6 +48,11 @@ public class MRBatchCubingEngine implements IBatchCubingEngine { } @Override + public DefaultChainedExecutable createBatchOptimizeJob(CubeSegment optimizeSegment, String submitter) { + return new BatchOptimizeJobBuilder2(optimizeSegment, submitter).build(); + } + + @Override public Class<?> getSourceInterface() { return IMRInput.class; } http://git-wip-us.apache.org/repos/asf/kylin/blob/faf08625/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine2.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine2.java index d9fdcb9..665e791 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine2.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine2.java @@ -48,6 +48,11 @@ public class MRBatchCubingEngine2 implements IBatchCubingEngine { } @Override + public DefaultChainedExecutable createBatchOptimizeJob(CubeSegment optimizeSegment, String submitter) { + return new BatchOptimizeJobBuilder2(optimizeSegment, submitter).build(); + } + + @Override public Class<?> getSourceInterface() { return IMRInput.class; } http://git-wip-us.apache.org/repos/asf/kylin/blob/faf08625/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java index cbb68d2..f52d64e 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java @@ -75,6 +75,10 @@ public class MRUtil { return SourceFactory.createEngineAdapter(seg, IMRInput.class).getBatchMergeInputSide(seg); } + public static IMROutput2.IMRBatchOptimizeOutputSide2 getBatchOptimizeOutputSide2(CubeSegment seg) { + return StorageFactory.createEngineAdapter(seg, IMROutput2.class).getBatchOptimizeOutputSide(seg); + } + // use this method instead of ToolRunner.run() because ToolRunner.run() is not thread-sale // Refer to: http://stackoverflow.com/questions/22462665/is-hadoops-toorunner-thread-safe public static int runMRJob(Tool tool, String[] args) throws Exception { http://git-wip-us.apache.org/repos/asf/kylin/blob/faf08625/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java index a608c40..7dbae55 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java @@ -29,6 +29,7 @@ import java.io.File; import java.io.IOException; import java.util.HashMap; import java.util.LinkedHashSet; +import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; @@ -93,6 +94,8 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { protected static final Option OPTION_STATISTICS_ENABLED = OptionBuilder.withArgName(BatchConstants.ARG_STATS_ENABLED).hasArg().isRequired(false).withDescription("Statistics enabled").create(BatchConstants.ARG_STATS_ENABLED); protected static final Option OPTION_STATISTICS_OUTPUT = OptionBuilder.withArgName(BatchConstants.ARG_STATS_OUTPUT).hasArg().isRequired(false).withDescription("Statistics output").create(BatchConstants.ARG_STATS_OUTPUT); protected static final Option OPTION_STATISTICS_SAMPLING_PERCENT = OptionBuilder.withArgName(BatchConstants.ARG_STATS_SAMPLING_PERCENT).hasArg().isRequired(false).withDescription("Statistics sampling percentage").create(BatchConstants.ARG_STATS_SAMPLING_PERCENT); + protected static final Option OPTION_CUBOID_MODE = OptionBuilder.withArgName(BatchConstants.ARG_CUBOID_MODE) + .hasArg().isRequired(false).withDescription("Cuboid Mode").create(BatchConstants.ARG_CUBOID_MODE); private static final String MAP_REDUCE_CLASSPATH = "mapreduce.application.classpath"; @@ -459,7 +462,21 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { dumpKylinPropsAndMetadata(dumpList, cube.getConfig(), conf); } + protected void attachSegmentsMetadataWithDict(List<CubeSegment> segments, Configuration conf) throws IOException { + Set<String> dumpList = new LinkedHashSet<>(); + CubeInstance cube = segments.get(0).getCubeInstance(); + dumpList.addAll(collectCubeMetadata(cube)); + for (CubeSegment segment : segments) { + dumpList.addAll(segment.getDictionaryPaths()); + } + dumpKylinPropsAndMetadata(dumpList, cube.getConfig(), conf); + } + protected void attachSegmentMetadataWithDict(CubeSegment segment, Configuration conf) throws IOException { + attachSegmentMetadata(segment, conf, true, false); + } + + protected void attachSegmentMetadataWithAll(CubeSegment segment, Configuration conf) throws IOException { attachSegmentMetadata(segment, conf, true, true); } http://git-wip-us.apache.org/repos/asf/kylin/blob/faf08625/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java index 74a9d09..eb7673b 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java @@ -70,6 +70,7 @@ public interface BatchConstants { */ String ARG_INPUT = "input"; String ARG_OUTPUT = "output"; + String ARG_CUBOID_MODE = "cuboidMode"; String ARG_JOB_NAME = "jobname"; String ARG_CUBING_JOB_ID = "cubingJobId"; String ARG_CUBE_NAME = "cubename"; http://git-wip-us.apache.org/repos/asf/kylin/blob/faf08625/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java index 8f1da6e..a6a9710 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java @@ -80,51 +80,52 @@ public class CubeStatsReader { final CuboidScheduler cuboidScheduler; public CubeStatsReader(CubeSegment cubeSegment, KylinConfig kylinConfig) throws IOException { + this(cubeSegment, cubeSegment.getCuboidScheduler(), kylinConfig); + } + + /** + * @param cuboidScheduler if it's null, part of it's functions will not be supported + */ + public CubeStatsReader(CubeSegment cubeSegment, CuboidScheduler cuboidScheduler, KylinConfig kylinConfig) + throws IOException { ResourceStore store = ResourceStore.getStore(kylinConfig); - cuboidScheduler = cubeSegment.getCuboidScheduler(); String statsKey = cubeSegment.getStatisticsResourcePath(); File tmpSeqFile = writeTmpSeqFile(store.getResource(statsKey).inputStream); - Reader reader = null; - - try { - Configuration hadoopConf = HadoopUtil.getCurrentConfiguration(); - - Path path = new Path(HadoopUtil.fixWindowsPath("file://" + tmpSeqFile.getAbsolutePath())); - Option seqInput = SequenceFile.Reader.file(path); - reader = new SequenceFile.Reader(hadoopConf, seqInput); - - int percentage = 100; - int mapperNumber = 0; - double mapperOverlapRatio = 0; - Map<Long, HLLCounter> counterMap = Maps.newHashMap(); - - LongWritable key = (LongWritable) ReflectionUtils.newInstance(reader.getKeyClass(), hadoopConf); - BytesWritable value = (BytesWritable) ReflectionUtils.newInstance(reader.getValueClass(), hadoopConf); - while (reader.next(key, value)) { - if (key.get() == 0L) { - percentage = Bytes.toInt(value.getBytes()); - } else if (key.get() == -1) { - mapperOverlapRatio = Bytes.toDouble(value.getBytes()); - } else if (key.get() == -2) { - mapperNumber = Bytes.toInt(value.getBytes()); - } else if (key.get() > 0) { - HLLCounter hll = new HLLCounter(kylinConfig.getCubeStatsHLLPrecision()); - ByteArray byteArray = new ByteArray(value.getBytes()); - hll.readRegisters(byteArray.asBuffer()); - counterMap.put(key.get(), hll); - } - } - - this.seg = cubeSegment; - this.samplingPercentage = percentage; - this.mapperNumberOfFirstBuild = mapperNumber; - this.mapperOverlapRatioOfFirstBuild = mapperOverlapRatio; - this.cuboidRowEstimatesHLL = counterMap; + Path path = new Path(HadoopUtil.fixWindowsPath("file://" + tmpSeqFile.getAbsolutePath())); + + CubeStatsResult cubeStatsResult = new CubeStatsResult(); + cubeStatsResult.initialize(path, kylinConfig.getCubeStatsHLLPrecision()); + tmpSeqFile.delete(); + + this.seg = cubeSegment; + this.cuboidScheduler = cuboidScheduler; + this.samplingPercentage = cubeStatsResult.percentage; + this.mapperNumberOfFirstBuild = cubeStatsResult.mapperNumber; + this.mapperOverlapRatioOfFirstBuild = cubeStatsResult.mapperOverlapRatio; + this.cuboidRowEstimatesHLL = cubeStatsResult.counterMap; + } - } finally { - IOUtils.closeStream(reader); - tmpSeqFile.delete(); - } + /** + * Read statistics from + * @param path + * rather than + * @param cubeSegment + * + * Since the statistics are from + * @param path + * cuboid scheduler should be provided by default + */ + public CubeStatsReader(CubeSegment cubeSegment, CuboidScheduler cuboidScheduler, KylinConfig kylinConfig, Path path) + throws IOException { + CubeStatsResult cubeStatsResult = new CubeStatsResult(); + cubeStatsResult.initialize(path, kylinConfig.getCubeStatsHLLPrecision()); + + this.seg = cubeSegment; + this.cuboidScheduler = cuboidScheduler; + this.samplingPercentage = cubeStatsResult.percentage; + this.mapperNumberOfFirstBuild = cubeStatsResult.mapperNumber; + this.mapperOverlapRatioOfFirstBuild = cubeStatsResult.mapperOverlapRatio; + this.cuboidRowEstimatesHLL = cubeStatsResult.counterMap; } private File writeTmpSeqFile(InputStream inputStream) throws IOException { @@ -140,6 +141,10 @@ public class CubeStatsReader { return tempFile; } + public int getSamplingPercentage() { + return samplingPercentage; + } + public Map<Long, HLLCounter> getCuboidRowEstimatesHLLOrigin() { return this.cuboidRowEstimatesHLL; } @@ -253,6 +258,9 @@ public class CubeStatsReader { //return MB public double estimateLayerSize(int level) { + if (cuboidScheduler == null) { + throw new UnsupportedOperationException("cuboid scheduler is null"); + } List<List<Long>> layeredCuboids = cuboidScheduler.getCuboidsByLayer(); Map<Long, Double> cuboidSizeMap = getCuboidSizeMap(); double ret = 0; @@ -265,11 +273,17 @@ public class CubeStatsReader { } public List<Long> getCuboidsByLayer(int level) { + if (cuboidScheduler == null) { + throw new UnsupportedOperationException("cuboid scheduler is null"); + } List<List<Long>> layeredCuboids = cuboidScheduler.getCuboidsByLayer(); return layeredCuboids.get(level); } private void printCuboidInfoTreeEntry(Map<Long, Long> cuboidRows, Map<Long, Double> cuboidSizes, PrintWriter out) { + if (cuboidScheduler == null) { + throw new UnsupportedOperationException("cuboid scheduler is null"); + } long baseCuboid = Cuboid.getBaseCuboidId(seg.getCubeDesc()); int dimensionCount = Long.bitCount(baseCuboid); printCuboidInfoTree(-1L, baseCuboid, cuboidScheduler, cuboidRows, cuboidSizes, dimensionCount, 0, out); @@ -317,6 +331,36 @@ public class CubeStatsReader { return new DecimalFormat("#.##").format(input); } + private class CubeStatsResult { + private int percentage = 100; + private double mapperOverlapRatio = 0; + private int mapperNumber = 0; + Map<Long, HLLCounter> counterMap = Maps.newHashMap(); + + void initialize(Path path, int precision) throws IOException { + Configuration hadoopConf = HadoopUtil.getCurrentConfiguration(); + Option seqInput = SequenceFile.Reader.file(path); + try (Reader reader = new SequenceFile.Reader(hadoopConf, seqInput)) { + LongWritable key = (LongWritable) ReflectionUtils.newInstance(reader.getKeyClass(), hadoopConf); + BytesWritable value = (BytesWritable) ReflectionUtils.newInstance(reader.getValueClass(), hadoopConf); + while (reader.next(key, value)) { + if (key.get() == 0L) { + percentage = Bytes.toInt(value.getBytes()); + } else if (key.get() == -1) { + mapperOverlapRatio = Bytes.toDouble(value.getBytes()); + } else if (key.get() == -2) { + mapperNumber = Bytes.toInt(value.getBytes()); + } else if (key.get() > 0) { + HLLCounter hll = new HLLCounter(precision); + ByteArray byteArray = new ByteArray(value.getBytes()); + hll.readRegisters(byteArray.asBuffer()); + counterMap.put(key.get(), hll); + } + } + } + } + } + public static void main(String[] args) throws IOException { System.out.println("CubeStatsReader is used to read cube statistic saved in metadata store"); KylinConfig config = KylinConfig.getInstanceFromEnv(); http://git-wip-us.apache.org/repos/asf/kylin/blob/faf08625/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidRecommenderUtil.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidRecommenderUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidRecommenderUtil.java index ba3f023..649eeb6 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidRecommenderUtil.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidRecommenderUtil.java @@ -39,20 +39,21 @@ public class CuboidRecommenderUtil { return null; } - CubeStatsReader cubeStatsReader = new CubeStatsReader(segment, segment.getConfig()); + CubeStatsReader cubeStatsReader = new CubeStatsReader(segment, null, segment.getConfig()); if (cubeStatsReader.getCuboidRowEstimatesHLL() == null || cubeStatsReader.getCuboidRowEstimatesHLL().isEmpty()) { logger.info("Cuboid Statistics is not enabled."); return null; } - long baseCuboid = segment.getCuboidScheduler().getBaseCuboidId(); + CubeInstance cube = segment.getCubeInstance(); + long baseCuboid = cube.getCuboidScheduler().getBaseCuboidId(); if (cubeStatsReader.getCuboidRowEstimatesHLL().get(baseCuboid) == null || cubeStatsReader.getCuboidRowEstimatesHLL().get(baseCuboid) == 0L) { logger.info("Base cuboid count in cuboid statistics is 0."); return null; } - String key = segment.getCubeInstance().getName(); + String key = cube.getName(); CuboidStats cuboidStats = new CuboidStats.Builder(key, baseCuboid, cubeStatsReader.getCuboidRowEstimatesHLL(), cubeStatsReader.getCuboidSizeMap()).build(); return CuboidRecommender.getInstance().getRecommendCuboidList(cuboidStats, segment.getConfig(), false); @@ -81,20 +82,21 @@ public class CuboidRecommenderUtil { return null; } - CubeStatsReader cubeStatsReader = new CubeStatsReader(segment, segment.getConfig()); + CubeStatsReader cubeStatsReader = new CubeStatsReader(segment, null, segment.getConfig()); if (cubeStatsReader.getCuboidRowEstimatesHLL() == null || cubeStatsReader.getCuboidRowEstimatesHLL().isEmpty()) { logger.info("Cuboid Statistics is not enabled."); return null; } - long baseCuboid = segment.getCuboidScheduler().getBaseCuboidId(); + CubeInstance cube = segment.getCubeInstance(); + long baseCuboid = cube.getCuboidScheduler().getBaseCuboidId(); if (cubeStatsReader.getCuboidRowEstimatesHLL().get(baseCuboid) == null || cubeStatsReader.getCuboidRowEstimatesHLL().get(baseCuboid) == 0L) { logger.info("Base cuboid count in cuboid statistics is 0."); return null; } - String key = segment.getCubeInstance().getName() + "-" + segment.getName(); + String key = cube.getName() + "-" + segment.getName(); CuboidStats cuboidStats = new CuboidStats.Builder(key, baseCuboid, cubeStatsReader.getCuboidRowEstimatesHLL(), cubeStatsReader.getCuboidSizeMap()).setHitFrequencyMap(hitFrequencyMap) .setRollingUpCountSourceMap(rollingUpCountSourceMap, http://git-wip-us.apache.org/repos/asf/kylin/blob/faf08625/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidSchedulerUtil.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidSchedulerUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidSchedulerUtil.java new file mode 100644 index 0000000..d684c04 --- /dev/null +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidSchedulerUtil.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.engine.mr.common; + +import java.io.IOException; +import java.util.Set; + +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.cuboid.CuboidModeEnum; +import org.apache.kylin.cube.cuboid.CuboidScheduler; +import org.apache.kylin.cube.cuboid.DefaultCuboidScheduler; +import org.apache.kylin.cube.cuboid.TreeCuboidSchedulerManager; + +public class CuboidSchedulerUtil { + + public static CuboidScheduler getCuboidSchedulerByMode(CubeSegment segment, String cuboidModeName) { + return getCuboidSchedulerByMode(segment, segment.getCubeInstance().getCuboidsByMode(cuboidModeName)); + } + + public static CuboidScheduler getCuboidSchedulerByMode(CubeSegment segment, CuboidModeEnum cuboidMode) { + return getCuboidSchedulerByMode(segment, segment.getCubeInstance().getCuboidsByMode(cuboidMode)); + } + + public static CuboidScheduler getCuboidSchedulerByMode(CubeSegment segment, Set<Long> cuboidSet) { + CuboidScheduler cuboidScheduler; + try { + cuboidScheduler = TreeCuboidSchedulerManager.getInstance().getTreeCuboidScheduler(segment.getCubeDesc(), // + CuboidStatsReaderUtil.readCuboidStatsFromSegment(cuboidSet, segment)); + } catch (IOException e) { + throw new RuntimeException("Fail to cube stats for segment" + segment + " due to " + e); + } + + if (cuboidScheduler == null) { + cuboidScheduler = new DefaultCuboidScheduler(segment.getCubeDesc()); + } + return cuboidScheduler; + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/faf08625/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsReaderUtil.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsReaderUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsReaderUtil.java index aaf9aa3..bfb37ce 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsReaderUtil.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsReaderUtil.java @@ -74,7 +74,7 @@ public class CuboidStatsReaderUtil { Map<Long, HLLCounter> cuboidHLLMapMerged = Maps.newHashMapWithExpectedSize(cuboidSet.size()); Map<Long, Double> sizeMapMerged = Maps.newHashMapWithExpectedSize(cuboidSet.size()); for (CubeSegment pSegment : segmentList) { - CubeStatsReader pReader = new CubeStatsReader(pSegment, pSegment.getConfig()); + CubeStatsReader pReader = new CubeStatsReader(pSegment, null, pSegment.getConfig()); Map<Long, HLLCounter> pHLLMap = pReader.getCuboidRowEstimatesHLLOrigin(); if (pHLLMap == null || pHLLMap.isEmpty()) { logger.info("Cuboid Statistics for segment " + pSegment.getName() + " is not enabled."); @@ -113,7 +113,7 @@ public class CuboidStatsReaderUtil { return null; } - CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSegment, cubeSegment.getConfig()); + CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSegment, null, cubeSegment.getConfig()); if (cubeStatsReader.getCuboidRowEstimatesHLL() == null || cubeStatsReader.getCuboidRowEstimatesHLL().isEmpty()) { logger.info("Cuboid Statistics is not enabled."); @@ -132,4 +132,5 @@ public class CuboidStatsReaderUtil { } return cuboidsWithStats; } + }
