Repository: incubator-systemml
Updated Branches:
  refs/heads/master f8f423c3b -> 17ea62a31


New pass-through of mapred/mapreduce configurations via sysml config

There are various use cases, where SystemML users might want to change
mapreduce configurations on a per script invocation basis without
affecting the global cluster configuration (e.g., in a production
cluster). This new pass-through configuration feature allows to specify
arbitrary mapred/mapreduce configurations in our systemml configuration
file - all configurations with prefix 'mapred' or 'mapreduce' are put
into the jobconf of all MR jobs generated by SystemML. For example, the
following custom configuration

<mapreduce.map.java.opts>-Xmx2048m -Xms2048m
-Xmn256m</mapreduce.map.java.opts>

allows to specify the map task JVM arguments incl memory configurations
(that potentially also change the runtime plans generated by SystemML).
This gives users full flexibility, even with regard to configurations
added in the future. However, the user is in charge of a proper
configuration (e.g., to ensure a proper container configuration on yarn,
i.e., mapreduce.map.memory.mb set to something like 3072 in this case).

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/17ea62a3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/17ea62a3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/17ea62a3

Branch: refs/heads/master
Commit: 17ea62a3182aecdee83b328a9ba30b1fb2f9b24c
Parents: f8f423c
Author: Matthias Boehm <[email protected]>
Authored: Mon Feb 15 21:16:17 2016 -0800
Committer: Matthias Boehm <[email protected]>
Committed: Mon Feb 15 22:32:20 2016 -0800

----------------------------------------------------------------------
 .../java/org/apache/sysml/conf/DMLConfig.java   | 35 ++++++++++++++++++++
 .../parfor/DataPartitionerRemoteMR.java         |  3 ++
 .../controlprogram/parfor/RemoteDPParForMR.java |  3 ++
 .../controlprogram/parfor/RemoteParForMR.java   |  3 ++
 .../parfor/ResultMergeRemoteMR.java             |  6 ++++
 .../apache/sysml/runtime/matrix/CMCOVMR.java    |  6 ++++
 .../sysml/runtime/matrix/CSVReblockMR.java      | 10 +++++-
 .../apache/sysml/runtime/matrix/CombineMR.java  |  6 ++++
 .../apache/sysml/runtime/matrix/DataGenMR.java  |  3 ++
 .../org/apache/sysml/runtime/matrix/GMR.java    |  5 ++-
 .../sysml/runtime/matrix/GroupedAggMR.java      |  6 ++++
 .../org/apache/sysml/runtime/matrix/MMCJMR.java |  3 ++
 .../org/apache/sysml/runtime/matrix/MMRJMR.java |  3 ++
 .../apache/sysml/runtime/matrix/ReblockMR.java  |  6 +++-
 .../org/apache/sysml/runtime/matrix/SortMR.java |  6 ++++
 .../apache/sysml/runtime/matrix/WriteCSVMR.java |  6 +++-
 .../matrix/mapred/MRJobConfiguration.java       | 15 +++++++++
 .../gdfo/SystemML-config-globalopt.xml          |  4 +++
 18 files changed, 125 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/17ea62a3/src/main/java/org/apache/sysml/conf/DMLConfig.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/conf/DMLConfig.java 
b/src/main/java/org/apache/sysml/conf/DMLConfig.java
index a7312d0..df682c8 100644
--- a/src/main/java/org/apache/sysml/conf/DMLConfig.java
+++ b/src/main/java/org/apache/sysml/conf/DMLConfig.java
@@ -24,6 +24,7 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.StringWriter;
 import java.util.HashMap;
+import java.util.Map;
 
 import javax.xml.parsers.DocumentBuilder;
 import javax.xml.parsers.DocumentBuilderFactory;
@@ -69,6 +70,10 @@ public class DMLConfig
        public static final String CP_PARALLEL_MATRIXMULT = 
"cp.parallel.matrixmult";
        public static final String CP_PARALLEL_TEXTIO   = "cp.parallel.textio";
 
+       // supported prefixes for custom map/reduce configurations
+       public static final String PREFIX_MAPRED = "mapred";
+       public static final String PREFIX_MAPREDUCE = "mapreduce";
+       
        //internal config
        public static final String DEFAULT_SHARED_DIR_PERMISSION = "777"; //for 
local fs and DFS
        public static String LOCAL_MR_MODE_STAGING_DIR = null;
@@ -304,6 +309,36 @@ public class DMLConfig
        }
 
        /**
+        * Get a map of key/value pairs of all configurations w/ the prefix 
'mapred'
+        * or 'mapreduce'. 
+        * 
+        * @return
+        */
+       public Map<String, String> getCustomMRConfig()
+       {
+               HashMap<String, String> ret = new HashMap<String, String>();
+       
+               //check for non-existing config xml tree
+               if( xml_root == null )
+                       return ret;
+               
+               //get all mapred.* and mapreduce.* tag / value pairs            
+               NodeList list = xml_root.getElementsByTagName("*");
+               for( int i=0; list!=null && i<list.getLength(); i++ ) {
+                       if( list.item(i) instanceof Element &&
+                               (  
((Element)list.item(i)).getNodeName().startsWith(PREFIX_MAPRED) 
+                               || 
((Element)list.item(i)).getNodeName().startsWith(PREFIX_MAPREDUCE)) )
+                       {
+                               Element elem = (Element) list.item(i);
+                               ret.put(elem.getNodeName(), 
+                                               
elem.getFirstChild().getNodeValue());
+                       }
+               }
+               
+               return ret;
+       }
+       
+       /**
         * 
         * @return
         * @throws DMLRuntimeException

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/17ea62a3/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteMR.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteMR.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteMR.java
index 0d45af0..9f609bb 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteMR.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteMR.java
@@ -179,6 +179,9 @@ public class DataPartitionerRemoteMR extends DataPartitioner
                        DMLConfig config = ConfigurationManager.getConfig();
                        DMLAppMasterUtils.setupMRJobRemoteMaxMemory(job, 
config);
                        
+                       //set up custom map/reduce configurations 
+                       MRJobConfiguration.setupCustomMRConfigurations(job, 
config);
+                       
                        //set the max number of retries per map task
                        //  disabled job-level configuration to respect cluster 
configuration
                        //  note: this refers to hadoop2, hence it never had 
effect on mr1

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/17ea62a3/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java
index 4910023..2cbfd36 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java
@@ -163,6 +163,9 @@ public class RemoteDPParForMR
                        DMLConfig config = ConfigurationManager.getConfig();
                        DMLAppMasterUtils.setupMRJobRemoteMaxMemory(job, 
config);
                        
+                       //set up custom map/reduce configurations 
+                       MRJobConfiguration.setupCustomMRConfigurations(job, 
config);
+                       
                        //disable JVM reuse
                        job.setNumTasksToExecutePerJvm( 1 ); //-1 for unlimited 
                        

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/17ea62a3/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForMR.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForMR.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForMR.java
index 7253e34..94be120 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForMR.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForMR.java
@@ -167,6 +167,9 @@ public class RemoteParForMR
                        DMLConfig config = ConfigurationManager.getConfig();
                        DMLAppMasterUtils.setupMRJobRemoteMaxMemory(job, 
config);
                        
+                       //set up custom map/reduce configurations 
+                       MRJobConfiguration.setupCustomMRConfigurations(job, 
config);
+                       
                        //enables the reuse of JVMs (multiple tasks per MR task)
                        if( jvmReuse )
                                job.setNumTasksToExecutePerJvm(-1); //unlimited

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/17ea62a3/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteMR.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteMR.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteMR.java
index 1bdafae..356ae3c 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteMR.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteMR.java
@@ -32,6 +32,8 @@ import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.sysml.api.DMLScript;
+import org.apache.sysml.conf.ConfigurationManager;
+import org.apache.sysml.conf.DMLConfig;
 import org.apache.sysml.parser.Expression.DataType;
 import org.apache.sysml.parser.Expression.ValueType;
 import org.apache.sysml.runtime.DMLRuntimeException;
@@ -291,6 +293,10 @@ public class ResultMergeRemoteMR extends ResultMerge
                        if( MRJobConfiguration.USE_BINARYBLOCK_SERIALIZATION )
                                
MRJobConfiguration.addBinaryBlockSerializationFramework( job );
                        
+                       //set up custom map/reduce configurations 
+                       DMLConfig config = ConfigurationManager.getConfig();
+                       MRJobConfiguration.setupCustomMRConfigurations(job, 
config);
+                       
                        //enables the reuse of JVMs (multiple tasks per MR task)
                        if( _jvmReuse )
                                job.setNumTasksToExecutePerJvm(-1); //unlimited

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/17ea62a3/src/main/java/org/apache/sysml/runtime/matrix/CMCOVMR.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/CMCOVMR.java 
b/src/main/java/org/apache/sysml/runtime/matrix/CMCOVMR.java
index 86249c7..1747214 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/CMCOVMR.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/CMCOVMR.java
@@ -27,6 +27,8 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RunningJob;
+import org.apache.sysml.conf.ConfigurationManager;
+import org.apache.sysml.conf.DMLConfig;
 import org.apache.sysml.runtime.instructions.MRJobInstruction;
 import org.apache.sysml.runtime.matrix.data.CM_N_COVCell;
 import org.apache.sysml.runtime.matrix.data.InputInfo;
@@ -87,6 +89,10 @@ public class CMCOVMR
                //set up the replication factor for the results
                job.setInt(MRConfigurationNames.DFS_REPLICATION, replication);
                
+               //set up custom map/reduce configurations 
+               DMLConfig config = ConfigurationManager.getConfig();
+               MRJobConfiguration.setupCustomMRConfigurations(job, config);
+               
                //set up what matrices are needed to pass from the mapper to 
reducer
                HashSet<Byte> 
mapoutputIndexes=MRJobConfiguration.setUpOutputIndexesForMapper(job, 
realIndexes, instructionsInMapper, null, 
                                cmNcomInstructions, resultIndexes);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/17ea62a3/src/main/java/org/apache/sysml/runtime/matrix/CSVReblockMR.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/CSVReblockMR.java 
b/src/main/java/org/apache/sysml/runtime/matrix/CSVReblockMR.java
index fc47ce3..68611bb 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/CSVReblockMR.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/CSVReblockMR.java
@@ -314,6 +314,10 @@ public class CSVReblockMR
                
                //set up the replication factor for the results
                job.setInt(MRConfigurationNames.DFS_REPLICATION, replication);
+
+               //set up custom map/reduce configurations 
+               DMLConfig config = ConfigurationManager.getConfig();
+               MRJobConfiguration.setupCustomMRConfigurations(job, config);
                
                //set up the number of reducers
                job.setNumReduceTasks(1);
@@ -406,6 +410,10 @@ public class CSVReblockMR
                //set up preferred custom serialization framework for binary 
block format
                if( MRJobConfiguration.USE_BINARYBLOCK_SERIALIZATION )
                        
MRJobConfiguration.addBinaryBlockSerializationFramework( job );
+
+               //set up custom map/reduce configurations 
+               DMLConfig config = ConfigurationManager.getConfig();
+               MRJobConfiguration.setupCustomMRConfigurations(job, config);
                
                //set up what matrices are needed to pass from the mapper to 
reducer
                HashSet<Byte> 
mapoutputIndexes=MRJobConfiguration.setUpOutputIndexesForMapper(job, 
realIndexes,  null, 
@@ -418,7 +426,7 @@ public class CSVReblockMR
                MatrixCharacteristics[] stats=ret.stats;
                
                //set up the number of reducers
-               int numRed = WriteCSVMR.determineNumReducers(rlens, clens, 
ConfigurationManager.getConfig().getIntValue(DMLConfig.NUM_REDUCERS), 
ret.numReducerGroups);
+               int numRed = WriteCSVMR.determineNumReducers(rlens, clens, 
config.getIntValue(DMLConfig.NUM_REDUCERS), ret.numReducerGroups);
                job.setNumReduceTasks( numRed );
                
                // Print the complete instruction

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/17ea62a3/src/main/java/org/apache/sysml/runtime/matrix/CombineMR.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/CombineMR.java 
b/src/main/java/org/apache/sysml/runtime/matrix/CombineMR.java
index 5940951..99d561f 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/CombineMR.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/CombineMR.java
@@ -34,6 +34,8 @@ import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.RunningJob;
+import org.apache.sysml.conf.ConfigurationManager;
+import org.apache.sysml.conf.DMLConfig;
 import org.apache.sysml.runtime.instructions.MRJobInstruction;
 import org.apache.sysml.runtime.instructions.mr.CombineBinaryInstruction;
 import org.apache.sysml.runtime.instructions.mr.CombineTernaryInstruction;
@@ -328,6 +330,10 @@ public class CombineMR
                
                //set up the replication factor for the results
                job.setInt(MRConfigurationNames.DFS_REPLICATION, replication);
+
+               //set up custom map/reduce configurations 
+               DMLConfig config = ConfigurationManager.getConfig();
+               MRJobConfiguration.setupCustomMRConfigurations(job, config);
                
                //set up what matrices are needed to pass from the mapper to 
reducer
                HashSet<Byte> 
mapoutputIndexes=MRJobConfiguration.setUpOutputIndexesForMapper(job, 
inputIndexes, null, null, combineInstructions, 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/17ea62a3/src/main/java/org/apache/sysml/runtime/matrix/DataGenMR.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/DataGenMR.java 
b/src/main/java/org/apache/sysml/runtime/matrix/DataGenMR.java
index 7d01d30..4cf2472 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/DataGenMR.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/DataGenMR.java
@@ -300,6 +300,9 @@ public class DataGenMR
                        DMLConfig config = ConfigurationManager.getConfig();
                        DMLAppMasterUtils.setupMRJobRemoteMaxMemory(job, 
config);
                        
+                       //set up custom map/reduce configurations 
+                       MRJobConfiguration.setupCustomMRConfigurations(job, 
config);
+                       
                        //determine degree of parallelism (nmappers: 
1<=n<=capacity)
                        //TODO use maxsparsity whenever we have a way of 
generating sparse rand data
                        int capacity = 
InfrastructureAnalyzer.getRemoteParallelMapTasks();

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/17ea62a3/src/main/java/org/apache/sysml/runtime/matrix/GMR.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/GMR.java 
b/src/main/java/org/apache/sysml/runtime/matrix/GMR.java
index 524ab6c..d1fdb87 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/GMR.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/GMR.java
@@ -214,7 +214,10 @@ public class GMR
                //set up map/reduce memory configurations (if in AM context)
                DMLConfig config = ConfigurationManager.getConfig();
                DMLAppMasterUtils.setupMRJobRemoteMaxMemory(job, config);
-                       
+               
+               //set up custom map/reduce configurations 
+               MRJobConfiguration.setupCustomMRConfigurations(job, config);
+               
                //set up jvm reuse (incl. reuse of loaded dist cache matrices)
                if( jvmReuse )
                        job.setNumTasksToExecutePerJvm(-1);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/17ea62a3/src/main/java/org/apache/sysml/runtime/matrix/GroupedAggMR.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/GroupedAggMR.java 
b/src/main/java/org/apache/sysml/runtime/matrix/GroupedAggMR.java
index 587c8a4..5a65ab0 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/GroupedAggMR.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/GroupedAggMR.java
@@ -26,6 +26,8 @@ import org.apache.hadoop.mapred.Counters.Group;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RunningJob;
+import org.apache.sysml.conf.ConfigurationManager;
+import org.apache.sysml.conf.DMLConfig;
 import org.apache.sysml.runtime.instructions.MRJobInstruction;
 import org.apache.sysml.runtime.matrix.data.InputInfo;
 import org.apache.sysml.runtime.matrix.data.OutputInfo;
@@ -93,6 +95,10 @@ public class GroupedAggMR
                //set up the replication factor for the results
                job.setInt(MRConfigurationNames.DFS_REPLICATION, replication);
                
+               //set up custom map/reduce configurations 
+               DMLConfig config = ConfigurationManager.getConfig();
+               MRJobConfiguration.setupCustomMRConfigurations(job, config);
+               
                //set up what matrices are needed to pass from the mapper to 
reducer
                MRJobConfiguration.setUpOutputIndexesForMapper(job, 
realIndexes, null, null, 
                                grpAggInstructions, resultIndexes);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/17ea62a3/src/main/java/org/apache/sysml/runtime/matrix/MMCJMR.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/MMCJMR.java 
b/src/main/java/org/apache/sysml/runtime/matrix/MMCJMR.java
index 39195f8..44513a6 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/MMCJMR.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/MMCJMR.java
@@ -212,6 +212,9 @@ public class MMCJMR
                //set up map/reduce memory configurations (if in AM context)
                DMLConfig config = ConfigurationManager.getConfig();
                DMLAppMasterUtils.setupMRJobRemoteMaxMemory(job, config);
+
+               //set up custom map/reduce configurations 
+               MRJobConfiguration.setupCustomMRConfigurations(job, config);
                
                byte[] resultIndexes=new 
byte[]{MRInstructionParser.parseSingleInstruction(aggBinInstrction).output};
                byte[] resultDimsUnknown_Array = new byte[]{resultDimsUnknown};

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/17ea62a3/src/main/java/org/apache/sysml/runtime/matrix/MMRJMR.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/MMRJMR.java 
b/src/main/java/org/apache/sysml/runtime/matrix/MMRJMR.java
index 3832d21..2d120bc 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/MMRJMR.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/MMRJMR.java
@@ -122,6 +122,9 @@ public class MMRJMR
                DMLConfig config = ConfigurationManager.getConfig();
                DMLAppMasterUtils.setupMRJobRemoteMaxMemory(job, config);
                                
+               //set up custom map/reduce configurations 
+               MRJobConfiguration.setupCustomMRConfigurations(job, config);
+               
                // byte[] resultIndexes=new 
byte[]{AggregateBinaryInstruction.parseMRInstruction(aggBinInstrction).output};
                
                //set up what matrices are needed to pass from the mapper to 
reducer

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/17ea62a3/src/main/java/org/apache/sysml/runtime/matrix/ReblockMR.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/ReblockMR.java 
b/src/main/java/org/apache/sysml/runtime/matrix/ReblockMR.java
index d28d7e4..8dd930d 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/ReblockMR.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/ReblockMR.java
@@ -113,6 +113,10 @@ public class ReblockMR
                if( MRJobConfiguration.USE_BINARYBLOCK_SERIALIZATION )
                        
MRJobConfiguration.addBinaryBlockSerializationFramework( job );
                
+               //set up custom map/reduce configurations 
+               DMLConfig config = ConfigurationManager.getConfig();
+               MRJobConfiguration.setupCustomMRConfigurations(job, config);
+               
                //enable jvm reuse (based on SystemML configuration)
                if( jvmReuse )
                        job.setNumTasksToExecutePerJvm(-1);
@@ -128,7 +132,7 @@ public class ReblockMR
                MatrixCharacteristics[] stats=ret.stats;
                
                //set up the number of reducers (according to output size)
-               int numRed = determineNumReducers(rlens, clens, nnz, 
ConfigurationManager.getConfig().getIntValue(DMLConfig.NUM_REDUCERS), 
ret.numReducerGroups);
+               int numRed = determineNumReducers(rlens, clens, nnz, 
config.getIntValue(DMLConfig.NUM_REDUCERS), ret.numReducerGroups);
                job.setNumReduceTasks(numRed);
                
                //setup in-memory reduce buffers budget (reblock reducer dont 
need much memory)

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/17ea62a3/src/main/java/org/apache/sysml/runtime/matrix/SortMR.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/SortMR.java 
b/src/main/java/org/apache/sysml/runtime/matrix/SortMR.java
index 6502e57..8f56336 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/SortMR.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/SortMR.java
@@ -43,6 +43,8 @@ import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Partitioner;
 import org.apache.hadoop.mapred.RunningJob;
+import org.apache.sysml.conf.ConfigurationManager;
+import org.apache.sysml.conf.DMLConfig;
 import org.apache.sysml.lops.Lop;
 import org.apache.sysml.lops.SortKeys;
 import org.apache.sysml.runtime.DMLRuntimeException;
@@ -253,6 +255,10 @@ public class SortMR
            //setup replication factor
            job.setInt(MRConfigurationNames.DFS_REPLICATION, replication);
            
+               //set up custom map/reduce configurations 
+               DMLConfig config = ConfigurationManager.getConfig();
+               MRJobConfiguration.setupCustomMRConfigurations(job, config);
+           
                MatrixCharacteristics[] s = new MatrixCharacteristics[1];
                s[0] = new MatrixCharacteristics(rlen, clen, brlen, bclen);
                

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/17ea62a3/src/main/java/org/apache/sysml/runtime/matrix/WriteCSVMR.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/WriteCSVMR.java 
b/src/main/java/org/apache/sysml/runtime/matrix/WriteCSVMR.java
index 632db0c..33939f8 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/WriteCSVMR.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/WriteCSVMR.java
@@ -83,13 +83,17 @@ public class WriteCSVMR
                if( MRJobConfiguration.USE_BINARYBLOCK_SERIALIZATION )
                        
MRJobConfiguration.addBinaryBlockSerializationFramework( job );
                
+               //set up custom map/reduce configurations 
+               DMLConfig config = ConfigurationManager.getConfig();
+               MRJobConfiguration.setupCustomMRConfigurations(job, config);
+               
                long maxRlen=0;
                for(long rlen: rlens)
                        if(rlen>maxRlen)
                                maxRlen=rlen;
                
                //set up the number of reducers (according to output size)
-               int numRed = determineNumReducers(rlens, clens, 
ConfigurationManager.getConfig().getIntValue(DMLConfig.NUM_REDUCERS), 
(int)maxRlen);
+               int numRed = determineNumReducers(rlens, clens, 
config.getIntValue(DMLConfig.NUM_REDUCERS), (int)maxRlen);
                job.setNumReduceTasks(numRed);
                
                byte[] resultDimsUnknown = new byte[resultIndexes.length];

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/17ea62a3/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRJobConfiguration.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRJobConfiguration.java 
b/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRJobConfiguration.java
index f2a7897..7199477 100644
--- 
a/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRJobConfiguration.java
+++ 
b/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRJobConfiguration.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Map.Entry;
 import java.util.TreeMap;
 
@@ -1922,4 +1923,18 @@ public class MRJobConfiguration
                String frameworkClassBB = 
BinaryBlockSerialization.class.getCanonicalName();
                job.set(MRConfigurationNames.IO_SERIALIZATIONS, 
frameworkClassBB+","+frameworkList);
        }
+       
+       /**
+        * Set all configurations with prefix mapred or mapreduce that exist in 
the given
+        * DMLConfig into the given JobConf.
+        * 
+        * @param job
+        * @param config
+        */
+       public static void setupCustomMRConfigurations( JobConf job, DMLConfig 
config ) {
+               Map<String,String> map = config.getCustomMRConfig();
+               for( Entry<String,String> e : map.entrySet() ) {
+                       job.set(e.getKey(), e.getValue());
+               }
+       }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/17ea62a3/src/test/scripts/functions/gdfo/SystemML-config-globalopt.xml
----------------------------------------------------------------------
diff --git a/src/test/scripts/functions/gdfo/SystemML-config-globalopt.xml 
b/src/test/scripts/functions/gdfo/SystemML-config-globalopt.xml
index 07e7ddd..1771351 100644
--- a/src/test/scripts/functions/gdfo/SystemML-config-globalopt.xml
+++ b/src/test/scripts/functions/gdfo/SystemML-config-globalopt.xml
@@ -53,4 +53,8 @@
    
    <!-- enables multi-threaded read/write of text formats in singlenode 
control program -->
    <cp.parallel.textio>true</cp.parallel.textio>
+   
+   
+   <!-- piggybacked test for custom mapred/mapreduce configurations -->
+   <mapreduce.task.io.sort.mb>50</mapreduce.task.io.sort.mb>
 </root>

Reply via email to