[SYSTEMML-2326] Fix distributed cache umask issues in local mode 

This patch fixes test issues in local mode for servers with restrictive
umask settings. In such environments the MR distributed cache does not
allow the addition of "hdfs" file (which reside in local file systems
with restricted permissions) to the public cache. We now simple do not
add files to distributed cache in local mode because all relevant jobs
anyway have fallbacks to read from local files in local mode.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/31580f52
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/31580f52
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/31580f52

Branch: refs/heads/master
Commit: 31580f52d318fda5d9ec34297cd6cb4a401b83df
Parents: 9426001
Author: Matthias Boehm <[email protected]>
Authored: Mon May 14 20:07:09 2018 -0700
Committer: Matthias Boehm <[email protected]>
Committed: Mon May 14 20:07:09 2018 -0700

----------------------------------------------------------------------
 .../sysml/runtime/matrix/CSVReblockMR.java      |  7 ++++--
 .../org/apache/sysml/runtime/matrix/SortMR.java | 23 ++++++++++----------
 .../matrix/mapred/MRJobConfiguration.java       | 19 ++++++++--------
 3 files changed, 26 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/31580f52/src/main/java/org/apache/sysml/runtime/matrix/CSVReblockMR.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/CSVReblockMR.java 
b/src/main/java/org/apache/sysml/runtime/matrix/CSVReblockMR.java
index 89c1bee..97379dd 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/CSVReblockMR.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/CSVReblockMR.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.mapred.SequenceFileOutputFormat;
 import org.apache.sysml.conf.ConfigurationManager;
 import org.apache.sysml.conf.DMLConfig;
+import 
org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
 import org.apache.sysml.runtime.instructions.MRJobInstruction;
 import org.apache.sysml.runtime.matrix.data.InputInfo;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
@@ -442,8 +443,10 @@ public class CSVReblockMR
                //set unique working dir
                MRJobConfiguration.setUniqueWorkingDir(job);
                Path cachefile=new Path(counterFile, "part-00000");
-               DistributedCache.addCacheFile(cachefile.toUri(), job);
-               DistributedCache.createSymlink(job);
+               if( !InfrastructureAnalyzer.isLocalMode(job) ) {
+                       DistributedCache.addCacheFile(cachefile.toUri(), job);
+                       DistributedCache.createSymlink(job);
+               }
                job.set(ROWID_FILE_NAME, cachefile.toString());
                
                RunningJob runjob=JobClient.runJob(job);

http://git-wip-us.apache.org/repos/asf/systemml/blob/31580f52/src/main/java/org/apache/sysml/runtime/matrix/SortMR.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/SortMR.java 
b/src/main/java/org/apache/sysml/runtime/matrix/SortMR.java
index d8b7aed..bbd0ef3 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/SortMR.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/SortMR.java
@@ -239,17 +239,18 @@ public class SortMR
                    job.setReducerClass(ValueSortReducer.class);        
                    job.setOutputKeyClass(outputInfo.outputKeyClass); //double
                    job.setOutputValueClass(outputInfo.outputValueClass); //int
-           }
-           job.setPartitionerClass(TotalOrderPartitioner.class);
-           
-           
-           //setup distributed cache
-           DistributedCache.addCacheFile(partitionUri, job);
-           DistributedCache.createSymlink(job);
-           
-           //setup replication factor
-           job.setInt(MRConfigurationNames.DFS_REPLICATION, replication);
-           
+               }
+               job.setPartitionerClass(TotalOrderPartitioner.class);
+
+               //setup distributed cache
+               if( !InfrastructureAnalyzer.isLocalMode(job) ) {
+                       DistributedCache.addCacheFile(partitionUri, job);
+                       DistributedCache.createSymlink(job);
+               }
+
+               //setup replication factor
+               job.setInt(MRConfigurationNames.DFS_REPLICATION, replication);
+
                //set up custom map/reduce configurations 
                DMLConfig config = ConfigurationManager.getDMLConfig();
                MRJobConfiguration.setupCustomMRConfigurations(job, config);

http://git-wip-us.apache.org/repos/asf/systemml/blob/31580f52/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRJobConfiguration.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRJobConfiguration.java 
b/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRJobConfiguration.java
index 1db292f..7efefbd 100644
--- 
a/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRJobConfiguration.java
+++ 
b/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRJobConfiguration.java
@@ -914,11 +914,13 @@ public class MRJobConfiguration
                job.set(DISTCACHE_INPUT_PATHS, pathsString);
                Path p = null;
                
-               for(String spath : paths) {
-                       p = new Path(spath);
-                       
-                       DistributedCache.addCacheFile(p.toUri(), job);
-                       DistributedCache.createSymlink(job);
+               if( !InfrastructureAnalyzer.isLocalMode(job) ) {
+                       for(String spath : paths) {
+                               p = new Path(spath);
+                               
+                               DistributedCache.addCacheFile(p.toUri(), job);
+                               DistributedCache.createSymlink(job);
+                       }
                }
        }
        
@@ -1132,16 +1134,13 @@ public class MRJobConfiguration
                                outputInfos, inBlockRepresentation, false);
        }
 
-       public static String setUpSortPartitionFilename( JobConf job ) 
-       {
+       public static String setUpSortPartitionFilename( JobConf job ) {
                String pfname = constructPartitionFilename();
                job.set( SORT_PARTITION_FILENAME, pfname );
-               
                return pfname;
        }
 
-       public static String getSortPartitionFilename( JobConf job )
-       {
+       public static String getSortPartitionFilename( JobConf job ) {
                return job.get( SORT_PARTITION_FILENAME );
        }
        

Reply via email to