Repository: kylin
Updated Branches:
  refs/heads/ly 09e8bbe90 -> 8b276caf0 (forced update)


KYLIN-2857 code review


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/8b276caf
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/8b276caf
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/8b276caf

Branch: refs/heads/ly
Commit: 8b276caf013a3d07bfd735f4385bace45fe17bf1
Parents: e39d4e6
Author: Li Yang <liy...@apache.org>
Authored: Sun Sep 17 19:03:51 2017 +0800
Committer: Li Yang <liy...@apache.org>
Committed: Sun Sep 17 20:19:57 2017 +0800

----------------------------------------------------------------------
 .../engine/mr/common/MapReduceExecutable.java   | 29 +++++---------------
 .../test_case_data/sandbox/kylin_job_conf.xml   | 26 ++++++++++++++++++
 .../sandbox/kylin_job_conf_inmem.xml            | 17 ++++++++----
 3 files changed, 45 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/8b276caf/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
index 4e6458f..94874dc 100755
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
@@ -32,6 +32,7 @@ import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Cluster;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobID;
@@ -80,10 +81,8 @@ public class MapReduceExecutable extends AbstractExecutable {
                 return;
             }
             try {
-                String params = getMapReduceParams();
-                String[] args = params.trim().split("\\s+");
-                Configuration conf = HadoopUtil.getCurrentConfiguration();
-                overwriteJobConf(conf, executableContext.getConfig(), args);
+                Configuration conf = new 
Configuration(HadoopUtil.getCurrentConfiguration());
+                overwriteJobConf(conf, executableContext.getConfig(), 
getMapReduceParams().trim().split("\\s+"));
                 Job job = new Cluster(conf).getJob(JobID.forName(mrJobId));
                 if (job == null || job.getJobState() == 
JobStatus.State.FAILED) {
                     //remove previous mr job info
@@ -107,15 +106,12 @@ public class MapReduceExecutable extends 
AbstractExecutable {
     @Override
     protected ExecuteResult doWork(ExecutableContext context) throws 
ExecuteException {
         final String mapReduceJobClass = getMapReduceJobClass();
-        String params = getMapReduceParams();
         Preconditions.checkNotNull(mapReduceJobClass);
-        Preconditions.checkNotNull(params);
         try {
             Job job;
             ExecutableManager mgr = getManager();
-            String[] args = params.trim().split("\\s+");
-            Configuration conf = HadoopUtil.getCurrentConfiguration();
-            String[] jobArgs = overwriteJobConf(conf, context.getConfig(), 
args);
+            Configuration conf = new 
Configuration(HadoopUtil.getCurrentConfiguration());
+            String[] jobArgs = overwriteJobConf(conf, context.getConfig(), 
getMapReduceParams().trim().split("\\s+"));
             final Map<String, String> extra = 
mgr.getOutput(getId()).getExtra();
             if (extra.containsKey(ExecutableConstants.MR_JOB_ID)) {
                 job = new 
Cluster(conf).getJob(JobID.forName(extra.get(ExecutableConstants.MR_JOB_ID)));
@@ -125,12 +121,9 @@ public class MapReduceExecutable extends 
AbstractExecutable {
                 final AbstractHadoopJob hadoopJob = constructor.newInstance();
                 hadoopJob.setConf(conf);
                 hadoopJob.setAsync(true); // so the ToolRunner.run() returns 
right away
-                logger.info("parameters of the MapReduceExecutable: {}", 
params);
+                logger.info("parameters of the MapReduceExecutable: {}", 
getMapReduceParams());
                 try {
-                    //for async mr job, ToolRunner just return 0;
 
-                    // use this method instead of ToolRunner.run() because 
ToolRunner.run() is not thread-sale
-                    // Refer to: 
http://stackoverflow.com/questions/22462665/is-hadoops-toorunner-thread-safe
                     hadoopJob.run(jobArgs);
 
                     if (hadoopJob.isSkipped()) {
@@ -150,14 +143,6 @@ public class MapReduceExecutable extends 
AbstractExecutable {
             final StringBuilder output = new StringBuilder();
             final HadoopCmdOutput hadoopCmdOutput = new HadoopCmdOutput(job, 
output);
 
-            //            final String restStatusCheckUrl = 
getRestStatusCheckUrl(job, context.getConfig());
-            //            if (restStatusCheckUrl == null) {
-            //                logger.error("restStatusCheckUrl is null");
-            //                return new 
ExecuteResult(ExecuteResult.State.ERROR, "restStatusCheckUrl is null");
-            //            }
-            //            String mrJobId = hadoopCmdOutput.getMrJobId();
-            //            boolean useKerberosAuth = 
context.getConfig().isGetJobStatusWithKerberos();
-            //            HadoopStatusChecker statusChecker = new 
HadoopStatusChecker(restStatusCheckUrl, mrJobId, output, useKerberosAuth);
             JobStepStatusEnum status = JobStepStatusEnum.NEW;
             while (!isDiscarded() && !isPaused()) {
 
@@ -269,7 +254,7 @@ public class MapReduceExecutable extends AbstractExecutable 
{
         String fileName = commandLine.getOptionValue(BatchConstants.ARG_CONF);
         String cubeName = 
commandLine.getOptionValue(BatchConstants.ARG_CUBE_NAME);
         Preconditions.checkArgument(cubeName != null && fileName != null, 
"Can't get job config");
-        conf.addResource(fileName);
+        conf.addResource(new Path(fileName));
         for (Map.Entry<String, String> entry : 
CubeManager.getInstance(config).getCube(cubeName).getConfig()
                 .getMRConfigOverride().entrySet()) {
             conf.set(entry.getKey(), entry.getValue());

http://git-wip-us.apache.org/repos/asf/kylin/blob/8b276caf/examples/test_case_data/sandbox/kylin_job_conf.xml
----------------------------------------------------------------------
diff --git a/examples/test_case_data/sandbox/kylin_job_conf.xml 
b/examples/test_case_data/sandbox/kylin_job_conf.xml
index 8f5817e..64e69a4 100644
--- a/examples/test_case_data/sandbox/kylin_job_conf.xml
+++ b/examples/test_case_data/sandbox/kylin_job_conf.xml
@@ -76,4 +76,30 @@
         <description>Block replication</description>
     </property>
 
+    <!-- memory configuration for sandbox -->
+    <property>
+        <name>mapreduce.map.memory.mb</name>
+        <value>384</value>
+    </property>
+
+    <property>
+        <name>mapreduce.map.java.opts</name>
+        <value>-Xmx350m</value>
+    </property>
+
+    <property>
+        <name>mapreduce.task.io.sort.mb</name>
+        <value>30</value>
+    </property>
+
+    <property>
+        <name>mapreduce.reduce.memory.mb</name>
+        <value>384</value>
+    </property>
+
+    <property>
+        <name>mapreduce.reduce.java.opts</name>
+        <value>-Xmx350m</value>
+    </property>
+
 </configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/8b276caf/examples/test_case_data/sandbox/kylin_job_conf_inmem.xml
----------------------------------------------------------------------
diff --git a/examples/test_case_data/sandbox/kylin_job_conf_inmem.xml 
b/examples/test_case_data/sandbox/kylin_job_conf_inmem.xml
index b05495f..5094d24 100644
--- a/examples/test_case_data/sandbox/kylin_job_conf_inmem.xml
+++ b/examples/test_case_data/sandbox/kylin_job_conf_inmem.xml
@@ -75,23 +75,30 @@
         <description>Block replication</description>
     </property>
 
-    <!--Additional config for in-mem cubing, giving mapper more memory -->
+    <!-- memory configuration for sandbox -->
     <property>
         <name>mapreduce.map.memory.mb</name>
         <value>768</value>
-        <description></description>
     </property>
 
     <property>
         <name>mapreduce.map.java.opts</name>
         <value>-Xmx700m</value>
-        <description></description>
     </property>
 
     <property>
         <name>mapreduce.task.io.sort.mb</name>
-        <value>10</value>
-        <description></description>
+        <value>30</value>
+    </property>
+
+    <property>
+        <name>mapreduce.reduce.memory.mb</name>
+        <value>512</value>
+    </property>
+
+    <property>
+        <name>mapreduce.reduce.java.opts</name>
+        <value>-Xmx400m</value>
     </property>
 
 </configuration>
\ No newline at end of file

Reply via email to