This is an automated email from the ASF dual-hosted git repository. shaofengshi pushed a commit to branch KYLIN-3368 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 18b34e52b35abe82c9df1a3d88dfff4f2e278edf Author: shaofengshi <[email protected]> AuthorDate: Fri May 11 21:29:40 2018 +0800 KYLIN-3368 Move Spark cubing metadata dump to job folder --- .../java/org/apache/kylin/engine/mr/JobBuilderSupport.java | 5 +++++ .../kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java | 4 +--- .../kylin/engine/spark/SparkBatchCubingJobBuilder2.java | 12 ++++-------- .../org/apache/kylin/engine/spark/SparkCubingByLayer.java | 8 ++++---- .../localmeta/cube_desc/ci_inner_join_cube.json | 3 ++- .../localmeta/cube_desc/ci_left_join_cube.json | 3 ++- 6 files changed, 18 insertions(+), 17 deletions(-) diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java index 8228f87..8a420df 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java @@ -261,4 +261,9 @@ public class JobBuilderSupport { public static String getInMemCuboidPath(String cuboidRootPath) { return cuboidRootPath + PathNameCuboidInMem; } + + public String getDumpMetadataPath(String jobId) { + return getRealizationRootPath(jobId) + "/metadata"; + } + } diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java index 018abab..8b478aa 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java @@ -21,7 +21,6 @@ package org.apache.kylin.engine.mr.steps; import java.io.IOException; import java.util.List; -import org.apache.kylin.common.KylinConfig; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; @@ -39,14 +38,13 @@ public class UpdateCubeInfoAfterMergeStep extends AbstractExecutable { private static final Logger logger = LoggerFactory.getLogger(UpdateCubeInfoAfterMergeStep.class); - private final CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); - public UpdateCubeInfoAfterMergeStep() { super(); } @Override protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { + final CubeManager cubeManager = CubeManager.getInstance(context.getConfig()); final CubeInstance cube = cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams())); CubeSegment mergedSegment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams())); diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java index 7d76ce4..57d4fb0 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java @@ -67,7 +67,7 @@ public class SparkBatchCubingJobBuilder2 extends BatchCubingJobBuilder2 { return ""; } - public static void configureSparkJob(final CubeSegment seg, final SparkExecutable sparkExecutable, + public void configureSparkJob(final CubeSegment seg, final SparkExecutable sparkExecutable, final String jobId, final String cuboidRootPath) { IJoinedFlatTableDesc flatTableDesc = EngineFactory.getJoinedFlatTableDesc(seg); sparkExecutable.setParam(SparkCubingByLayer.OPTION_CUBE_NAME.getOpt(), seg.getRealization().getName()); @@ -75,15 +75,12 @@ public class SparkBatchCubingJobBuilder2 extends BatchCubingJobBuilder2 { sparkExecutable.setParam(SparkCubingByLayer.OPTION_INPUT_TABLE.getOpt(), seg.getConfig().getHiveDatabaseForIntermediateTable() + "." + flatTableDesc.getTableName()); sparkExecutable.setParam(SparkCubingByLayer.OPTION_META_URL.getOpt(), - getSegmentMetadataUrl(seg.getConfig(), seg.getUuid())); + getSegmentMetadataUrl(seg.getConfig(), jobId)); sparkExecutable.setParam(SparkCubingByLayer.OPTION_OUTPUT_PATH.getOpt(), cuboidRootPath); sparkExecutable.setJobId(jobId); StringBuilder jars = new StringBuilder(); - StringUtil.appendWithSeparator(jars, findJar("org.htrace.HTraceConfiguration", null)); // htrace-core.jar - StringUtil.appendWithSeparator(jars, findJar("org.apache.htrace.Trace", null)); // htrace-core.jar - StringUtil.appendWithSeparator(jars, findJar("org.cloudera.htrace.HTraceConfiguration", null)); // htrace-core.jar StringUtil.appendWithSeparator(jars, findJar("com.yammer.metrics.core.Gauge", null)); // metrics-core.jar StringUtil.appendWithSeparator(jars, findJar("com.google.common.collect.Maps", "guava")); //guava.jar @@ -92,10 +89,9 @@ public class SparkBatchCubingJobBuilder2 extends BatchCubingJobBuilder2 { sparkExecutable.setName(ExecutableConstants.STEP_NAME_BUILD_SPARK_CUBE); } - private static String getSegmentMetadataUrl(KylinConfig kylinConfig, String segmentID) { + private String getSegmentMetadataUrl(KylinConfig kylinConfig, String jobId) { Map<String, String> param = new HashMap<>(); - param.put("path", kylinConfig.getHdfsWorkingDirectory() + "metadata/" + segmentID); + param.put("path", getDumpMetadataPath(jobId)); return new StorageURL(kylinConfig.getMetadataUrl().getIdentifier(), "hdfs", param).toString(); -// return kylinConfig.getHdfsWorkingDirectory() + "metadata/" + segmentID + "@hdfs"; } } diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java index 714991d..76e7e22 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java @@ -33,6 +33,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.StorageURL; import org.apache.kylin.common.util.AbstractApplication; import org.apache.kylin.common.util.ByteArray; import org.apache.kylin.common.util.HadoopUtil; @@ -488,9 +489,8 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa } protected void deleteHDFSMeta(String metaUrl) throws IOException { - int cut = metaUrl.indexOf('@'); - String path = metaUrl.substring(0, cut); - HadoopUtil.getFileSystem(path).delete(new Path(path), true); - logger.info("Delete metadata in HDFS for this job: " + path); + String realHdfsPath = StorageURL.valueOf(metaUrl).getParameter("path"); + HadoopUtil.getFileSystem(realHdfsPath).delete(new Path(realHdfsPath), true); + logger.info("Delete metadata in HDFS for this job: " + realHdfsPath); } } diff --git a/examples/test_case_data/localmeta/cube_desc/ci_inner_join_cube.json b/examples/test_case_data/localmeta/cube_desc/ci_inner_join_cube.json index ec1100a..47b8e24 100644 --- a/examples/test_case_data/localmeta/cube_desc/ci_inner_join_cube.json +++ b/examples/test_case_data/localmeta/cube_desc/ci_inner_join_cube.json @@ -569,7 +569,8 @@ "engine_type": 4, "storage_type": 2, "override_kylin_properties": { - "kylin.cube.algorithm": "LAYER" + "kylin.cube.algorithm": "LAYER", + "kylin.cube.size-estimate-ratio" : 0.01 }, "partition_date_start": 0 } diff --git a/examples/test_case_data/localmeta/cube_desc/ci_left_join_cube.json b/examples/test_case_data/localmeta/cube_desc/ci_left_join_cube.json index f1a42b1..c2419b6 100644 --- a/examples/test_case_data/localmeta/cube_desc/ci_left_join_cube.json +++ b/examples/test_case_data/localmeta/cube_desc/ci_left_join_cube.json @@ -580,7 +580,8 @@ "engine_type": 2, "storage_type": 2, "override_kylin_properties": { - "kylin.cube.algorithm": "INMEM" + "kylin.cube.algorithm": "INMEM", + "kylin.cube.size-estimate-ratio" : 0.01 }, "partition_date_start": 0 } -- To stop receiving notification emails like this one, please contact [email protected].
