KYLIN-1839 code review

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/aa1550c9
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/aa1550c9
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/aa1550c9

Branch: refs/heads/master
Commit: aa1550c9ecfb1c047c107fa51187a51e958ab189
Parents: 4473d71
Author: Yang Li <liy...@apache.org>
Authored: Wed Oct 19 08:19:35 2016 +0800
Committer: Yang Li <liy...@apache.org>
Committed: Wed Oct 19 08:19:35 2016 +0800

----------------------------------------------------------------------
 .../engine/mr/common/AbstractHadoopJob.java     | 44 +++++++++++---------
 1 file changed, 24 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/aa1550c9/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
index bbb1711..f70e3bb 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
@@ -46,7 +46,6 @@ import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -80,7 +79,7 @@ public abstract class AbstractHadoopJob extends Configured 
implements Tool {
     protected static final Option OPTION_JOB_NAME = 
OptionBuilder.withArgName(BatchConstants.ARG_JOB_NAME).hasArg().isRequired(true).withDescription("Job
 name. For example, 
Kylin_Cuboid_Builder-clsfd_v2_Step_22-D)").create(BatchConstants.ARG_JOB_NAME);
     protected static final Option OPTION_CUBE_NAME = 
OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg().isRequired(true).withDescription("Cube
 name. For exmaple, flat_item_cube").create(BatchConstants.ARG_CUBE_NAME);
     protected static final Option OPTION_CUBING_JOB_ID = 
OptionBuilder.withArgName(BatchConstants.ARG_CUBING_JOB_ID).hasArg().isRequired(false).withDescription("ID
 of cubing job executable").create(BatchConstants.ARG_CUBING_JOB_ID);
-//    @Deprecated
+    //    @Deprecated
     protected static final Option OPTION_SEGMENT_NAME = 
OptionBuilder.withArgName(BatchConstants.ARG_SEGMENT_NAME).hasArg().isRequired(true).withDescription("Cube
 segment name").create(BatchConstants.ARG_SEGMENT_NAME);
     protected static final Option OPTION_SEGMENT_ID = 
OptionBuilder.withArgName(BatchConstants.ARG_SEGMENT_ID).hasArg().isRequired(true).withDescription("Cube
 segment id").create(BatchConstants.ARG_SEGMENT_ID);
     protected static final Option OPTION_INPUT_PATH = 
OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg().isRequired(true).withDescription("Input
 path").create(BatchConstants.ARG_INPUT);
@@ -253,10 +252,10 @@ public abstract class AbstractHadoopJob extends 
Configured implements Tool {
         // for KylinJobMRLibDir
         String mrLibDir = kylinConf.getKylinJobMRLibDir();
         if (!StringUtils.isBlank(mrLibDir)) {
-                if(kylinDependency.length() > 0) {
-                        kylinDependency.append(",");
-                }
-                kylinDependency.append(mrLibDir);
+            if (kylinDependency.length() > 0) {
+                kylinDependency.append(",");
+            }
+            kylinDependency.append(mrLibDir);
         }
 
         setJobTmpJarsAndFiles(job, kylinDependency.toString());
@@ -296,7 +295,7 @@ public abstract class AbstractHadoopJob extends Configured 
implements Tool {
 
         try {
             Configuration jobConf = job.getConfiguration();
-            FileSystem fs = FileSystem.getLocal(jobConf);
+            FileSystem localfs = FileSystem.getLocal(jobConf);
             FileSystem hdfs = FileSystem.get(jobConf);
 
             StringBuilder jarList = new StringBuilder();
@@ -304,20 +303,29 @@ public abstract class AbstractHadoopJob extends 
Configured implements Tool {
 
             for (String fileName : fNameList) {
                 Path p = new Path(fileName);
-                FileSystem current = 
(fileName.startsWith(HdfsConstants.HDFS_URI_SCHEME) ? hdfs : fs);
-                if(!current.exists(p)) {
-                    logger.warn("The directory '" + fileName + "for kylin 
dependency does not exist!!!");
+                if (p.isAbsolute() == false) {
+                    logger.warn("The directory of kylin dependency '" + 
fileName + "' is not absolute, skip");
+                    continue;
+                }
+                FileSystem fs;
+                if (hdfs.exists(p)) {
+                    fs = hdfs;
+                } else if (localfs.exists(p)) {
+                    fs = localfs;
+                } else {
+                    logger.warn("The directory of kylin dependency '" + 
fileName + "' does not exist, skip");
                     continue;
                 }
-                if (current.getFileStatus(p).isDirectory()) {
-                    appendTmpDir(job, current, fileName);
+                
+                if (fs.getFileStatus(p).isDirectory()) {
+                    appendTmpDir(job, fs, p);
                     continue;
                 }
 
                 StringBuilder list = (p.getName().endsWith(".jar")) ? jarList 
: fileList;
                 if (list.length() > 0)
                     list.append(",");
-                list.append(current.getFileStatus(p).getPath());
+                list.append(fs.getFileStatus(p).getPath());
             }
 
             appendTmpFiles(fileList.toString(), jobConf);
@@ -327,13 +335,10 @@ public abstract class AbstractHadoopJob extends 
Configured implements Tool {
         }
     }
 
-    private void appendTmpDir(Job job, FileSystem fs, String tmpDir) {
-        if (StringUtils.isBlank(tmpDir))
-            return;
-
+    private void appendTmpDir(Job job, FileSystem fs, Path tmpDir) {
         try {
             Configuration jobConf = job.getConfiguration();
-            FileStatus[] fList = fs.listStatus(new Path(tmpDir));
+            FileStatus[] fList = fs.listStatus(tmpDir);
 
             StringBuilder jarList = new StringBuilder();
             StringBuilder fileList = new StringBuilder();
@@ -341,7 +346,7 @@ public abstract class AbstractHadoopJob extends Configured 
implements Tool {
             for (FileStatus file : fList) {
                 Path p = file.getPath();
                 if (fs.getFileStatus(p).isDirectory()) {
-                    appendTmpDir(job, fs, p.toString());
+                    appendTmpDir(job, fs, p);
                     continue;
                 }
 
@@ -624,4 +629,3 @@ public abstract class AbstractHadoopJob extends Configured 
implements Tool {
     }
 
 }
-

Reply via email to