KYLIN-2857 code review
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/e4624779 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/e4624779 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/e4624779 Branch: refs/heads/security_update Commit: e4624779bcf8e3a68b53c77700c9ff695559c76a Parents: 21a2560 Author: Li Yang <liy...@apache.org> Authored: Sun Sep 17 19:03:51 2017 +0800 Committer: Li Yang <liy...@apache.org> Committed: Sun Sep 17 21:34:38 2017 +0800 ---------------------------------------------------------------------- .../engine/mr/common/MapReduceExecutable.java | 29 +++++--------------- .../test_case_data/sandbox/kylin_job_conf.xml | 26 ++++++++++++++++++ .../sandbox/kylin_job_conf_inmem.xml | 17 ++++++++---- 3 files changed, 45 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/e4624779/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java index 4e6458f..94874dc 100755 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java @@ -32,6 +32,7 @@ import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Cluster; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobID; @@ -80,10 +81,8 @@ public class MapReduceExecutable extends AbstractExecutable { return; } try { - String params = getMapReduceParams(); - String[] args = params.trim().split("\\s+"); - Configuration conf = HadoopUtil.getCurrentConfiguration(); - overwriteJobConf(conf, executableContext.getConfig(), args); + Configuration conf = new Configuration(HadoopUtil.getCurrentConfiguration()); + overwriteJobConf(conf, executableContext.getConfig(), getMapReduceParams().trim().split("\\s+")); Job job = new Cluster(conf).getJob(JobID.forName(mrJobId)); if (job == null || job.getJobState() == JobStatus.State.FAILED) { //remove previous mr job info @@ -107,15 +106,12 @@ public class MapReduceExecutable extends AbstractExecutable { @Override protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { final String mapReduceJobClass = getMapReduceJobClass(); - String params = getMapReduceParams(); Preconditions.checkNotNull(mapReduceJobClass); - Preconditions.checkNotNull(params); try { Job job; ExecutableManager mgr = getManager(); - String[] args = params.trim().split("\\s+"); - Configuration conf = HadoopUtil.getCurrentConfiguration(); - String[] jobArgs = overwriteJobConf(conf, context.getConfig(), args); + Configuration conf = new Configuration(HadoopUtil.getCurrentConfiguration()); + String[] jobArgs = overwriteJobConf(conf, context.getConfig(), getMapReduceParams().trim().split("\\s+")); final Map<String, String> extra = mgr.getOutput(getId()).getExtra(); if (extra.containsKey(ExecutableConstants.MR_JOB_ID)) { job = new Cluster(conf).getJob(JobID.forName(extra.get(ExecutableConstants.MR_JOB_ID))); @@ -125,12 +121,9 @@ public class MapReduceExecutable extends AbstractExecutable { final AbstractHadoopJob hadoopJob = constructor.newInstance(); hadoopJob.setConf(conf); hadoopJob.setAsync(true); // so the ToolRunner.run() returns right away - logger.info("parameters of the MapReduceExecutable: {}", params); + logger.info("parameters of the MapReduceExecutable: {}", getMapReduceParams()); try { - //for async mr job, ToolRunner just return 0; - // use this method instead of ToolRunner.run() because ToolRunner.run() is not thread-sale - // Refer to: http://stackoverflow.com/questions/22462665/is-hadoops-toorunner-thread-safe hadoopJob.run(jobArgs); if (hadoopJob.isSkipped()) { @@ -150,14 +143,6 @@ public class MapReduceExecutable extends AbstractExecutable { final StringBuilder output = new StringBuilder(); final HadoopCmdOutput hadoopCmdOutput = new HadoopCmdOutput(job, output); - // final String restStatusCheckUrl = getRestStatusCheckUrl(job, context.getConfig()); - // if (restStatusCheckUrl == null) { - // logger.error("restStatusCheckUrl is null"); - // return new ExecuteResult(ExecuteResult.State.ERROR, "restStatusCheckUrl is null"); - // } - // String mrJobId = hadoopCmdOutput.getMrJobId(); - // boolean useKerberosAuth = context.getConfig().isGetJobStatusWithKerberos(); - // HadoopStatusChecker statusChecker = new HadoopStatusChecker(restStatusCheckUrl, mrJobId, output, useKerberosAuth); JobStepStatusEnum status = JobStepStatusEnum.NEW; while (!isDiscarded() && !isPaused()) { @@ -269,7 +254,7 @@ public class MapReduceExecutable extends AbstractExecutable { String fileName = commandLine.getOptionValue(BatchConstants.ARG_CONF); String cubeName = commandLine.getOptionValue(BatchConstants.ARG_CUBE_NAME); Preconditions.checkArgument(cubeName != null && fileName != null, "Can't get job config"); - conf.addResource(fileName); + conf.addResource(new Path(fileName)); for (Map.Entry<String, String> entry : CubeManager.getInstance(config).getCube(cubeName).getConfig() .getMRConfigOverride().entrySet()) { conf.set(entry.getKey(), entry.getValue()); http://git-wip-us.apache.org/repos/asf/kylin/blob/e4624779/examples/test_case_data/sandbox/kylin_job_conf.xml ---------------------------------------------------------------------- diff --git a/examples/test_case_data/sandbox/kylin_job_conf.xml b/examples/test_case_data/sandbox/kylin_job_conf.xml index 8f5817e..64e69a4 100644 --- a/examples/test_case_data/sandbox/kylin_job_conf.xml +++ b/examples/test_case_data/sandbox/kylin_job_conf.xml @@ -76,4 +76,30 @@ <description>Block replication</description> </property> + <!-- memory configuration for sandbox --> + <property> + <name>mapreduce.map.memory.mb</name> + <value>384</value> + </property> + + <property> + <name>mapreduce.map.java.opts</name> + <value>-Xmx350m</value> + </property> + + <property> + <name>mapreduce.task.io.sort.mb</name> + <value>30</value> + </property> + + <property> + <name>mapreduce.reduce.memory.mb</name> + <value>384</value> + </property> + + <property> + <name>mapreduce.reduce.java.opts</name> + <value>-Xmx350m</value> + </property> + </configuration> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/e4624779/examples/test_case_data/sandbox/kylin_job_conf_inmem.xml ---------------------------------------------------------------------- diff --git a/examples/test_case_data/sandbox/kylin_job_conf_inmem.xml b/examples/test_case_data/sandbox/kylin_job_conf_inmem.xml index b05495f..5094d24 100644 --- a/examples/test_case_data/sandbox/kylin_job_conf_inmem.xml +++ b/examples/test_case_data/sandbox/kylin_job_conf_inmem.xml @@ -75,23 +75,30 @@ <description>Block replication</description> </property> - <!--Additional config for in-mem cubing, giving mapper more memory --> + <!-- memory configuration for sandbox --> <property> <name>mapreduce.map.memory.mb</name> <value>768</value> - <description></description> </property> <property> <name>mapreduce.map.java.opts</name> <value>-Xmx700m</value> - <description></description> </property> <property> <name>mapreduce.task.io.sort.mb</name> - <value>10</value> - <description></description> + <value>30</value> + </property> + + <property> + <name>mapreduce.reduce.memory.mb</name> + <value>512</value> + </property> + + <property> + <name>mapreduce.reduce.java.opts</name> + <value>-Xmx400m</value> </property> </configuration> \ No newline at end of file