http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/05d2c0a8/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParWorkerReducer.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParWorkerReducer.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParWorkerReducer.java
index d54ce97..cb9fd06 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParWorkerReducer.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParWorkerReducer.java
@@ -1,372 +1,372 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.controlprogram.parfor;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Iterator;
-
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
-
-import org.apache.sysml.api.DMLScript;
-import org.apache.sysml.conf.ConfigurationManager;
-import org.apache.sysml.runtime.DMLRuntimeException;
-import 
org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat;
-import org.apache.sysml.runtime.controlprogram.caching.CacheStatistics;
-import org.apache.sysml.runtime.controlprogram.caching.CacheableData;
-import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
-import org.apache.sysml.runtime.controlprogram.parfor.Task.TaskType;
-import 
org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
-import org.apache.sysml.runtime.controlprogram.parfor.stat.StatisticMonitor;
-import org.apache.sysml.runtime.controlprogram.parfor.util.IDHandler;
-import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableBlock;
-import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableCell;
-import org.apache.sysml.runtime.instructions.cp.IntObject;
-import org.apache.sysml.runtime.matrix.data.MatrixBlock;
-import org.apache.sysml.runtime.matrix.data.OutputInfo;
-import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
-import org.apache.sysml.runtime.util.LocalFileUtils;
-import org.apache.sysml.utils.Statistics;
-
-/**
- *
- */
-public class RemoteDPParWorkerReducer extends ParWorker
-       implements Reducer<LongWritable, Writable, Writable, Writable>
-{
-
-       //MR data partitioning attributes
-       private String _inputVar = null;
-       private String _iterVar = null;
-       private PDataPartitionFormat _dpf = null;
-       private OutputInfo _info = null;
-       private int _rlen = -1;
-       private int _clen = -1;
-       private int _brlen = -1;
-       private int _bclen = -1;
-       
-       //reuse matrix partition
-       private MatrixBlock _partition = null; 
-       private boolean _tSparseCol = false;
-               
-       //MR ParWorker attributes  
-       protected String  _stringID       = null; 
-       protected HashMap<String, String> _rvarFnames = null; 
-
-       //cached collector/reporter
-       protected OutputCollector<Writable, Writable> _out = null;
-       protected Reporter _report = null;
-       
-       /**
-        * 
-        */
-       public RemoteDPParWorkerReducer() 
-       {
-               
-       }
-       
-       @Override
-       public void reduce(LongWritable key, Iterator<Writable> valueList, 
OutputCollector<Writable, Writable> out, Reporter reporter)
-               throws IOException 
-       {
-               //cache collector/reporter (for write in close)
-               _out = out;
-               _report = reporter;
-               
-               //collect input partition
-               if( _info == OutputInfo.BinaryBlockOutputInfo )
-                       _partition = collectBinaryBlock( valueList );
-               else
-                       _partition = collectBinaryCellInput( valueList );
-                       
-               //update in-memory matrix partition
-               MatrixObject mo = (MatrixObject)_ec.getVariable( _inputVar );
-               mo.setInMemoryPartition( _partition );
-               
-               //execute program
-               LOG.trace("execute RemoteDPParWorkerReducer "+_stringID+" 
("+_workerID+")");
-               try {
-                       //create tasks for input data
-                       Task lTask = new Task(TaskType.SET);
-                       lTask.addIteration( new IntObject(_iterVar,key.get()) );
-                       
-                       //execute program
-                       executeTask( lTask );
-               }
-               catch(Exception ex)
-               {
-                       throw new IOException("ParFOR: Failed to execute 
task.",ex);
-               }
-               
-               //statistic maintenance (after final export)
-               RemoteParForUtils.incrementParForMRCounters(_report, 1, 1);
-       }
-
-       /**
-        * 
-        */
-       @Override
-       public void configure(JobConf job)
-       {
-               //Step 1: configure data partitioning information
-               _rlen = (int)MRJobConfiguration.getPartitioningNumRows( job );
-               _clen = (int)MRJobConfiguration.getPartitioningNumCols( job );
-               _brlen = MRJobConfiguration.getPartitioningBlockNumRows( job );
-               _bclen = MRJobConfiguration.getPartitioningBlockNumCols( job );
-               _iterVar = MRJobConfiguration.getPartitioningItervar( job );
-               _inputVar = MRJobConfiguration.getPartitioningMatrixvar( job );
-               _dpf = MRJobConfiguration.getPartitioningFormat( job );         
-               switch( _dpf ) { //create matrix partition for reuse
-                       case ROW_WISE:    _rlen = 1; break;
-                       case COLUMN_WISE: _clen = 1; break;
-                       default:  throw new RuntimeException("Partition format 
not yet supported in fused partition-execute: "+_dpf);
-               }
-               _info = MRJobConfiguration.getPartitioningOutputInfo( job );
-               _tSparseCol = MRJobConfiguration.getPartitioningTransposedCol( 
job ); 
-               if( _tSparseCol )
-                       _partition = new MatrixBlock((int)_clen, _rlen, true);
-               else
-                       _partition = new MatrixBlock((int)_rlen, _clen, false);
-
-               //Step 1: configure parworker
-               String taskID = job.get("mapred.tip.id");               
-               LOG.trace("configure RemoteDPParWorkerReducer "+taskID);
-                       
-               try
-               {
-                       _stringID = taskID;
-                       _workerID = IDHandler.extractIntID(_stringID); //int 
task ID
-
-                       //use the given job configuration as source for all new 
job confs 
-                       //NOTE: this is required because on HDP 2.3, the 
classpath of mr tasks contained hadoop-common.jar 
-                       //which includes a core-default.xml configuration which 
hides the actual default cluster configuration
-                       //in the context of mr jobs (for example this config 
points to local fs instead of hdfs by default). 
-                       if( !InfrastructureAnalyzer.isLocalMode(job) ) {
-                               ConfigurationManager.setCachedJobConf(job);
-                       }
-                       
-                       //create local runtime program
-                       String in = MRJobConfiguration.getProgramBlocks(job);
-                       ParForBody body = ProgramConverter.parseParForBody(in, 
(int)_workerID);
-                       _childBlocks = body.getChildBlocks();
-                       _ec          = body.getEc();                            
-                       _resultVars  = body.getResultVarNames();
-       
-                       //init local cache manager 
-                       if( !CacheableData.isCachingActive() ) {
-                               String uuid = 
IDHandler.createDistributedUniqueID();
-                               LocalFileUtils.createWorkingDirectoryWithUUID( 
uuid );
-                               CacheableData.initCaching( uuid ); //incl 
activation, cache dir creation (each map task gets its own dir for simplified 
cleanup)
-                       }
-                       if( 
!CacheableData.cacheEvictionLocalFilePrefix.contains("_") ){ //account for 
local mode
-                               CacheableData.cacheEvictionLocalFilePrefix = 
CacheableData.cacheEvictionLocalFilePrefix +"_" + _workerID; 
-                       }
-                       
-                       //ensure that resultvar files are not removed
-                       super.pinResultVariables();
-               
-                       //enable/disable caching (if required)
-                       boolean cpCaching = 
MRJobConfiguration.getParforCachingConfig( job );
-                       if( !cpCaching )
-                               CacheableData.disableCaching();
-
-                       _numTasks    = 0;
-                       _numIters    = 0;                       
-               }
-               catch(Exception ex)
-               {
-                       throw new RuntimeException(ex);
-               }
-               
-               //disable parfor stat monitoring, reporting execution times via 
counters not useful 
-               StatisticMonitor.disableStatMonitoring();
-               
-               //always reset stats because counters per map task (for case of 
JVM reuse)
-               if( DMLScript.STATISTICS && 
!InfrastructureAnalyzer.isLocalMode(job) )
-               {
-                       CacheStatistics.reset();
-                       Statistics.reset();
-               }
-       }
-       
-       /**
-        * 
-        */
-       @Override
-       public void close() 
-           throws IOException 
-       {
-               try
-               {
-                       //write output if required (matrix indexed write)
-                       RemoteParForUtils.exportResultVariables( _workerID, 
_ec.getVariables(), _resultVars, _out );
-               
-                       //statistic maintenance (after final export)
-                       RemoteParForUtils.incrementParForMRCounters(_report, 0, 
0);
-                       
-                       //print heaver hitter per task
-                       JobConf job = ConfigurationManager.getCachedJobConf();
-                       if( DMLScript.STATISTICS && 
!InfrastructureAnalyzer.isLocalMode(job) )
-                               LOG.info("\nSystemML Statistics:\nHeavy hitter 
instructions (name, time, count):\n" + Statistics.getHeavyHitters(10));         
 
-               }
-               catch(Exception ex)
-               {
-                       throw new IOException( ex );
-               }
-               
-               //cleanup cache and local tmp dir
-               RemoteParForUtils.cleanupWorkingDirectories();
-               
-               //ensure caching is not disabled for CP in local mode
-               CacheableData.enableCaching();
-       }
-       
-       /**
-        * Collects a matrixblock partition from a given input iterator over 
-        * binary blocks.
-        * 
-        * Note it reuses the instance attribute _partition - multiple calls
-        * will overwrite the result.
-        * 
-        * @param valueList
-        * @return
-        * @throws IOException 
-        */
-       private MatrixBlock collectBinaryBlock( Iterator<Writable> valueList ) 
-               throws IOException 
-       {
-               try
-               {
-                       //reset reuse block, keep configured representation
-                       _partition.reset(_rlen, _clen); 
-
-                       while( valueList.hasNext() )
-                       {
-                               PairWritableBlock pairValue = 
(PairWritableBlock)valueList.next();
-                               int row_offset = 
(int)(pairValue.indexes.getRowIndex()-1)*_brlen;
-                               int col_offset = 
(int)(pairValue.indexes.getColumnIndex()-1)*_bclen;
-                               MatrixBlock block = pairValue.block;
-                               if( !_partition.isInSparseFormat() ) //DENSE
-                               {
-                                       _partition.copy( row_offset, 
row_offset+block.getNumRows()-1, 
-                                                          col_offset, 
col_offset+block.getNumColumns()-1,
-                                                          pairValue.block, 
false ); 
-                               }
-                               else //SPARSE 
-                               {
-                                       
_partition.appendToSparse(pairValue.block, row_offset, col_offset);
-                               }
-                       }
-
-                       //final partition cleanup
-                       cleanupCollectedMatrixPartition( 
_partition.isInSparseFormat() );
-               }
-               catch(DMLRuntimeException ex)
-               {
-                       throw new IOException(ex);
-               }
-               
-               return _partition;
-       }
-       
-       
-       /**
-        * Collects a matrixblock partition from a given input iterator over 
-        * binary cells.
-        * 
-        * Note it reuses the instance attribute _partition - multiple calls
-        * will overwrite the result.
-        * 
-        * @param valueList
-        * @return
-        * @throws IOException 
-        */
-       private MatrixBlock collectBinaryCellInput( Iterator<Writable> 
valueList ) 
-               throws IOException 
-       {
-               //reset reuse block, keep configured representation
-               if( _tSparseCol )
-                       _partition.reset(_clen, _rlen); 
-               else
-                       _partition.reset(_rlen, _clen);
-               
-               switch( _dpf )
-               {
-                       case ROW_WISE:
-                               while( valueList.hasNext() )
-                               {
-                                       PairWritableCell pairValue = 
(PairWritableCell)valueList.next();
-                                       if( 
pairValue.indexes.getColumnIndex()<0 )
-                                               continue; //cells used to 
ensure empty partitions
-                                       _partition.quickSetValue(0, 
(int)pairValue.indexes.getColumnIndex()-1, pairValue.cell.getValue());
-                               }
-                               break;
-                       case COLUMN_WISE:
-                               while( valueList.hasNext() )
-                               {
-                                       PairWritableCell pairValue = 
(PairWritableCell)valueList.next();
-                                       if( pairValue.indexes.getRowIndex()<0 )
-                                               continue; //cells used to 
ensure empty partitions
-                                       if( _tSparseCol )
-                                               
_partition.appendValue(0,(int)pairValue.indexes.getRowIndex()-1, 
pairValue.cell.getValue());
-                                       else
-                                               
_partition.quickSetValue((int)pairValue.indexes.getRowIndex()-1, 0, 
pairValue.cell.getValue());
-                               }
-                               break;
-                       default: 
-                               throw new IOException("Partition format not yet 
supported in fused partition-execute: "+_dpf);
-               }
-               
-               //final partition cleanup
-               cleanupCollectedMatrixPartition(_tSparseCol);
-               
-               return _partition;
-       }
-       
-       /**
-        * 
-        * @param sort
-        * @throws IOException
-        */
-       private void cleanupCollectedMatrixPartition(boolean sort) 
-               throws IOException
-       {
-               //sort sparse row contents if required
-               if( _partition.isInSparseFormat() && sort )
-                       _partition.sortSparseRows();
-
-               //ensure right number of nnz
-               if( !_partition.isInSparseFormat() )
-                       _partition.recomputeNonZeros();
-                       
-               //exam and switch dense/sparse representation
-               try {
-                       _partition.examSparsity();
-               }
-               catch(Exception ex){
-                       throw new IOException(ex);
-               }
-       }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.controlprogram.parfor;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+
+import org.apache.sysml.api.DMLScript;
+import org.apache.sysml.conf.ConfigurationManager;
+import org.apache.sysml.runtime.DMLRuntimeException;
+import 
org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat;
+import org.apache.sysml.runtime.controlprogram.caching.CacheStatistics;
+import org.apache.sysml.runtime.controlprogram.caching.CacheableData;
+import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
+import org.apache.sysml.runtime.controlprogram.parfor.Task.TaskType;
+import 
org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
+import org.apache.sysml.runtime.controlprogram.parfor.stat.StatisticMonitor;
+import org.apache.sysml.runtime.controlprogram.parfor.util.IDHandler;
+import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableBlock;
+import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableCell;
+import org.apache.sysml.runtime.instructions.cp.IntObject;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.matrix.data.OutputInfo;
+import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
+import org.apache.sysml.runtime.util.LocalFileUtils;
+import org.apache.sysml.utils.Statistics;
+
+/**
+ *
+ */
+public class RemoteDPParWorkerReducer extends ParWorker
+       implements Reducer<LongWritable, Writable, Writable, Writable>
+{
+
+       //MR data partitioning attributes
+       private String _inputVar = null;
+       private String _iterVar = null;
+       private PDataPartitionFormat _dpf = null;
+       private OutputInfo _info = null;
+       private int _rlen = -1;
+       private int _clen = -1;
+       private int _brlen = -1;
+       private int _bclen = -1;
+       
+       //reuse matrix partition
+       private MatrixBlock _partition = null; 
+       private boolean _tSparseCol = false;
+               
+       //MR ParWorker attributes  
+       protected String  _stringID       = null; 
+       protected HashMap<String, String> _rvarFnames = null; 
+
+       //cached collector/reporter
+       protected OutputCollector<Writable, Writable> _out = null;
+       protected Reporter _report = null;
+       
+       /**
+        * 
+        */
+       public RemoteDPParWorkerReducer() 
+       {
+               
+       }
+       
+       @Override
+       public void reduce(LongWritable key, Iterator<Writable> valueList, 
OutputCollector<Writable, Writable> out, Reporter reporter)
+               throws IOException 
+       {
+               //cache collector/reporter (for write in close)
+               _out = out;
+               _report = reporter;
+               
+               //collect input partition
+               if( _info == OutputInfo.BinaryBlockOutputInfo )
+                       _partition = collectBinaryBlock( valueList );
+               else
+                       _partition = collectBinaryCellInput( valueList );
+                       
+               //update in-memory matrix partition
+               MatrixObject mo = (MatrixObject)_ec.getVariable( _inputVar );
+               mo.setInMemoryPartition( _partition );
+               
+               //execute program
+               LOG.trace("execute RemoteDPParWorkerReducer "+_stringID+" 
("+_workerID+")");
+               try {
+                       //create tasks for input data
+                       Task lTask = new Task(TaskType.SET);
+                       lTask.addIteration( new IntObject(_iterVar,key.get()) );
+                       
+                       //execute program
+                       executeTask( lTask );
+               }
+               catch(Exception ex)
+               {
+                       throw new IOException("ParFOR: Failed to execute 
task.",ex);
+               }
+               
+               //statistic maintenance (after final export)
+               RemoteParForUtils.incrementParForMRCounters(_report, 1, 1);
+       }
+
+       /**
+        * 
+        */
+       @Override
+       public void configure(JobConf job)
+       {
+               //Step 1: configure data partitioning information
+               _rlen = (int)MRJobConfiguration.getPartitioningNumRows( job );
+               _clen = (int)MRJobConfiguration.getPartitioningNumCols( job );
+               _brlen = MRJobConfiguration.getPartitioningBlockNumRows( job );
+               _bclen = MRJobConfiguration.getPartitioningBlockNumCols( job );
+               _iterVar = MRJobConfiguration.getPartitioningItervar( job );
+               _inputVar = MRJobConfiguration.getPartitioningMatrixvar( job );
+               _dpf = MRJobConfiguration.getPartitioningFormat( job );         
+               switch( _dpf ) { //create matrix partition for reuse
+                       case ROW_WISE:    _rlen = 1; break;
+                       case COLUMN_WISE: _clen = 1; break;
+                       default:  throw new RuntimeException("Partition format 
not yet supported in fused partition-execute: "+_dpf);
+               }
+               _info = MRJobConfiguration.getPartitioningOutputInfo( job );
+               _tSparseCol = MRJobConfiguration.getPartitioningTransposedCol( 
job ); 
+               if( _tSparseCol )
+                       _partition = new MatrixBlock((int)_clen, _rlen, true);
+               else
+                       _partition = new MatrixBlock((int)_rlen, _clen, false);
+
+               //Step 1: configure parworker
+               String taskID = job.get("mapred.tip.id");               
+               LOG.trace("configure RemoteDPParWorkerReducer "+taskID);
+                       
+               try
+               {
+                       _stringID = taskID;
+                       _workerID = IDHandler.extractIntID(_stringID); //int 
task ID
+
+                       //use the given job configuration as source for all new 
job confs 
+                       //NOTE: this is required because on HDP 2.3, the 
classpath of mr tasks contained hadoop-common.jar 
+                       //which includes a core-default.xml configuration which 
hides the actual default cluster configuration
+                       //in the context of mr jobs (for example this config 
points to local fs instead of hdfs by default). 
+                       if( !InfrastructureAnalyzer.isLocalMode(job) ) {
+                               ConfigurationManager.setCachedJobConf(job);
+                       }
+                       
+                       //create local runtime program
+                       String in = MRJobConfiguration.getProgramBlocks(job);
+                       ParForBody body = ProgramConverter.parseParForBody(in, 
(int)_workerID);
+                       _childBlocks = body.getChildBlocks();
+                       _ec          = body.getEc();                            
+                       _resultVars  = body.getResultVarNames();
+       
+                       //init local cache manager 
+                       if( !CacheableData.isCachingActive() ) {
+                               String uuid = 
IDHandler.createDistributedUniqueID();
+                               LocalFileUtils.createWorkingDirectoryWithUUID( 
uuid );
+                               CacheableData.initCaching( uuid ); //incl 
activation, cache dir creation (each map task gets its own dir for simplified 
cleanup)
+                       }
+                       if( 
!CacheableData.cacheEvictionLocalFilePrefix.contains("_") ){ //account for 
local mode
+                               CacheableData.cacheEvictionLocalFilePrefix = 
CacheableData.cacheEvictionLocalFilePrefix +"_" + _workerID; 
+                       }
+                       
+                       //ensure that resultvar files are not removed
+                       super.pinResultVariables();
+               
+                       //enable/disable caching (if required)
+                       boolean cpCaching = 
MRJobConfiguration.getParforCachingConfig( job );
+                       if( !cpCaching )
+                               CacheableData.disableCaching();
+
+                       _numTasks    = 0;
+                       _numIters    = 0;                       
+               }
+               catch(Exception ex)
+               {
+                       throw new RuntimeException(ex);
+               }
+               
+               //disable parfor stat monitoring, reporting execution times via 
counters not useful 
+               StatisticMonitor.disableStatMonitoring();
+               
+               //always reset stats because counters per map task (for case of 
JVM reuse)
+               if( DMLScript.STATISTICS && 
!InfrastructureAnalyzer.isLocalMode(job) )
+               {
+                       CacheStatistics.reset();
+                       Statistics.reset();
+               }
+       }
+       
+       /**
+        * 
+        */
+       @Override
+       public void close() 
+           throws IOException 
+       {
+               try
+               {
+                       //write output if required (matrix indexed write)
+                       RemoteParForUtils.exportResultVariables( _workerID, 
_ec.getVariables(), _resultVars, _out );
+               
+                       //statistic maintenance (after final export)
+                       RemoteParForUtils.incrementParForMRCounters(_report, 0, 
0);
+                       
+                       //print heaver hitter per task
+                       JobConf job = ConfigurationManager.getCachedJobConf();
+                       if( DMLScript.STATISTICS && 
!InfrastructureAnalyzer.isLocalMode(job) )
+                               LOG.info("\nSystemML Statistics:\nHeavy hitter 
instructions (name, time, count):\n" + Statistics.getHeavyHitters(10));         
 
+               }
+               catch(Exception ex)
+               {
+                       throw new IOException( ex );
+               }
+               
+               //cleanup cache and local tmp dir
+               RemoteParForUtils.cleanupWorkingDirectories();
+               
+               //ensure caching is not disabled for CP in local mode
+               CacheableData.enableCaching();
+       }
+       
+       /**
+        * Collects a matrixblock partition from a given input iterator over 
+        * binary blocks.
+        * 
+        * Note it reuses the instance attribute _partition - multiple calls
+        * will overwrite the result.
+        * 
+        * @param valueList
+        * @return
+        * @throws IOException 
+        */
+       private MatrixBlock collectBinaryBlock( Iterator<Writable> valueList ) 
+               throws IOException 
+       {
+               try
+               {
+                       //reset reuse block, keep configured representation
+                       _partition.reset(_rlen, _clen); 
+
+                       while( valueList.hasNext() )
+                       {
+                               PairWritableBlock pairValue = 
(PairWritableBlock)valueList.next();
+                               int row_offset = 
(int)(pairValue.indexes.getRowIndex()-1)*_brlen;
+                               int col_offset = 
(int)(pairValue.indexes.getColumnIndex()-1)*_bclen;
+                               MatrixBlock block = pairValue.block;
+                               if( !_partition.isInSparseFormat() ) //DENSE
+                               {
+                                       _partition.copy( row_offset, 
row_offset+block.getNumRows()-1, 
+                                                          col_offset, 
col_offset+block.getNumColumns()-1,
+                                                          pairValue.block, 
false ); 
+                               }
+                               else //SPARSE 
+                               {
+                                       
_partition.appendToSparse(pairValue.block, row_offset, col_offset);
+                               }
+                       }
+
+                       //final partition cleanup
+                       cleanupCollectedMatrixPartition( 
_partition.isInSparseFormat() );
+               }
+               catch(DMLRuntimeException ex)
+               {
+                       throw new IOException(ex);
+               }
+               
+               return _partition;
+       }
+       
+       
+       /**
+        * Collects a matrixblock partition from a given input iterator over 
+        * binary cells.
+        * 
+        * Note it reuses the instance attribute _partition - multiple calls
+        * will overwrite the result.
+        * 
+        * @param valueList
+        * @return
+        * @throws IOException 
+        */
+       private MatrixBlock collectBinaryCellInput( Iterator<Writable> 
valueList ) 
+               throws IOException 
+       {
+               //reset reuse block, keep configured representation
+               if( _tSparseCol )
+                       _partition.reset(_clen, _rlen); 
+               else
+                       _partition.reset(_rlen, _clen);
+               
+               switch( _dpf )
+               {
+                       case ROW_WISE:
+                               while( valueList.hasNext() )
+                               {
+                                       PairWritableCell pairValue = 
(PairWritableCell)valueList.next();
+                                       if( 
pairValue.indexes.getColumnIndex()<0 )
+                                               continue; //cells used to 
ensure empty partitions
+                                       _partition.quickSetValue(0, 
(int)pairValue.indexes.getColumnIndex()-1, pairValue.cell.getValue());
+                               }
+                               break;
+                       case COLUMN_WISE:
+                               while( valueList.hasNext() )
+                               {
+                                       PairWritableCell pairValue = 
(PairWritableCell)valueList.next();
+                                       if( pairValue.indexes.getRowIndex()<0 )
+                                               continue; //cells used to 
ensure empty partitions
+                                       if( _tSparseCol )
+                                               
_partition.appendValue(0,(int)pairValue.indexes.getRowIndex()-1, 
pairValue.cell.getValue());
+                                       else
+                                               
_partition.quickSetValue((int)pairValue.indexes.getRowIndex()-1, 0, 
pairValue.cell.getValue());
+                               }
+                               break;
+                       default: 
+                               throw new IOException("Partition format not yet 
supported in fused partition-execute: "+_dpf);
+               }
+               
+               //final partition cleanup
+               cleanupCollectedMatrixPartition(_tSparseCol);
+               
+               return _partition;
+       }
+       
+       /**
+        * 
+        * @param sort
+        * @throws IOException
+        */
+       private void cleanupCollectedMatrixPartition(boolean sort) 
+               throws IOException
+       {
+               //sort sparse row contents if required
+               if( _partition.isInSparseFormat() && sort )
+                       _partition.sortSparseRows();
+
+               //ensure right number of nnz
+               if( !_partition.isInSparseFormat() )
+                       _partition.recomputeNonZeros();
+                       
+               //exam and switch dense/sparse representation
+               try {
+                       _partition.examSparsity();
+               }
+               catch(Exception ex){
+                       throw new IOException(ex);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/05d2c0a8/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForUtils.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForUtils.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForUtils.java
index 2426bc2..67beff6 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForUtils.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForUtils.java
@@ -1,266 +1,266 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.controlprogram.parfor;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
-
-import scala.Tuple2;
-
-import org.apache.sysml.api.DMLScript;
-import org.apache.sysml.conf.ConfigurationManager;
-import org.apache.sysml.parser.Expression.DataType;
-import org.apache.sysml.runtime.DMLRuntimeException;
-import org.apache.sysml.runtime.controlprogram.LocalVariableMap;
-import org.apache.sysml.runtime.controlprogram.ParForProgramBlock;
-import org.apache.sysml.runtime.controlprogram.caching.CacheStatistics;
-import org.apache.sysml.runtime.controlprogram.caching.CacheableData;
-import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
-import 
org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
-import org.apache.sysml.runtime.controlprogram.parfor.stat.Stat;
-import org.apache.sysml.runtime.instructions.cp.Data;
-import org.apache.sysml.runtime.util.LocalFileUtils;
-import org.apache.sysml.utils.Statistics;
-
-/**
- * Common functionalities for parfor workers in MR jobs. Used by worker 
wrappers in
- * mappers (base RemoteParFor) and reducers (fused data partitioning and 
parfor)
- * 
- */
-public class RemoteParForUtils 
-{
-       
-       /**
-        * 
-        * @param reporter
-        * @param deltaTasks
-        * @param deltaIterations
-        */
-       public static void incrementParForMRCounters(Reporter reporter, long 
deltaTasks, long deltaIterations)
-       {
-               //report parfor counters
-               if( deltaTasks>0 )
-                       
reporter.incrCounter(ParForProgramBlock.PARFOR_COUNTER_GROUP_NAME, 
Stat.PARFOR_NUMTASKS.toString(), deltaTasks);
-               if( deltaIterations>0 )
-                       
reporter.incrCounter(ParForProgramBlock.PARFOR_COUNTER_GROUP_NAME, 
Stat.PARFOR_NUMITERS.toString(), deltaIterations);
-               
-               JobConf job = ConfigurationManager.getCachedJobConf();
-               if( DMLScript.STATISTICS  && 
!InfrastructureAnalyzer.isLocalMode(job) ) 
-               {
-                       //report cache statistics
-                       reporter.incrCounter( 
ParForProgramBlock.PARFOR_COUNTER_GROUP_NAME, 
Stat.PARFOR_JITCOMPILE.toString(), Statistics.getJITCompileTime());
-                       reporter.incrCounter( 
ParForProgramBlock.PARFOR_COUNTER_GROUP_NAME, 
Stat.PARFOR_JVMGC_COUNT.toString(), Statistics.getJVMgcCount());
-                       reporter.incrCounter( 
ParForProgramBlock.PARFOR_COUNTER_GROUP_NAME, 
Stat.PARFOR_JVMGC_TIME.toString(), Statistics.getJVMgcTime());
-                       reporter.incrCounter( 
CacheableData.CACHING_COUNTER_GROUP_NAME, 
CacheStatistics.Stat.CACHE_HITS_MEM.toString(), CacheStatistics.getMemHits());
-                       reporter.incrCounter( 
CacheableData.CACHING_COUNTER_GROUP_NAME, 
CacheStatistics.Stat.CACHE_HITS_FSBUFF.toString(), 
CacheStatistics.getFSBuffHits());
-                       reporter.incrCounter( 
CacheableData.CACHING_COUNTER_GROUP_NAME, 
CacheStatistics.Stat.CACHE_HITS_FS.toString(), CacheStatistics.getFSHits());
-                       reporter.incrCounter( 
CacheableData.CACHING_COUNTER_GROUP_NAME, 
CacheStatistics.Stat.CACHE_HITS_HDFS.toString(), CacheStatistics.getHDFSHits());
-                       reporter.incrCounter( 
CacheableData.CACHING_COUNTER_GROUP_NAME, 
CacheStatistics.Stat.CACHE_WRITES_FSBUFF.toString(), 
CacheStatistics.getFSBuffWrites());
-                       reporter.incrCounter( 
CacheableData.CACHING_COUNTER_GROUP_NAME, 
CacheStatistics.Stat.CACHE_WRITES_FS.toString(), CacheStatistics.getFSWrites());
-                       reporter.incrCounter( 
CacheableData.CACHING_COUNTER_GROUP_NAME, 
CacheStatistics.Stat.CACHE_WRITES_HDFS.toString(), 
CacheStatistics.getHDFSWrites());
-                       reporter.incrCounter( 
CacheableData.CACHING_COUNTER_GROUP_NAME, 
CacheStatistics.Stat.CACHE_TIME_ACQR.toString(), 
CacheStatistics.getAcquireRTime());
-                       reporter.incrCounter( 
CacheableData.CACHING_COUNTER_GROUP_NAME, 
CacheStatistics.Stat.CACHE_TIME_ACQM.toString(), 
CacheStatistics.getAcquireMTime());
-                       reporter.incrCounter( 
CacheableData.CACHING_COUNTER_GROUP_NAME, 
CacheStatistics.Stat.CACHE_TIME_RLS.toString(), 
CacheStatistics.getReleaseTime());
-                       reporter.incrCounter( 
CacheableData.CACHING_COUNTER_GROUP_NAME, 
CacheStatistics.Stat.CACHE_TIME_EXP.toString(), 
CacheStatistics.getExportTime());
-               
-                       //reset cache statistics to prevent overlapping 
reporting
-                       CacheStatistics.reset();
-               }
-       }
-       
-       /**
-        * 
-        * @param workerID
-        * @param vars
-        * @param resultVars
-        * @param out
-        * @throws DMLRuntimeException
-        * @throws IOException
-        */
-       public static void exportResultVariables( long workerID, 
LocalVariableMap vars, ArrayList<String> resultVars, OutputCollector<Writable, 
Writable> out ) 
-                       throws DMLRuntimeException, IOException
-       {
-               exportResultVariables(workerID, vars, resultVars, null, out);
-       }       
-       
-       /**
-        * For remote MR parfor workers.
-        * 
-        * @param workerID
-        * @param vars
-        * @param resultVars
-        * @param rvarFnames
-        * @param out
-        * @throws DMLRuntimeException
-        * @throws IOException
-        */
-       public static void exportResultVariables( long workerID, 
LocalVariableMap vars, ArrayList<String> resultVars, 
-                                                         
HashMap<String,String> rvarFnames, OutputCollector<Writable, Writable> out ) 
-               throws DMLRuntimeException, IOException
-       {
-               //create key and value for reuse
-               LongWritable okey = new LongWritable( workerID ); 
-               Text ovalue = new Text();
-               
-               //foreach result variables probe if export necessary
-               for( String rvar : resultVars )
-               {
-                       Data dat = vars.get( rvar );
-                       
-                       //export output variable to HDFS (see RunMRJobs)
-                       if ( dat != null && dat.getDataType() == 
DataType.MATRIX ) 
-                       {
-                               MatrixObject mo = (MatrixObject) dat;
-                               if( mo.isDirty() )
-                               {
-                                       if( 
ParForProgramBlock.ALLOW_REUSE_MR_PAR_WORKER && rvarFnames!=null )
-                                       {
-                                               String fname = rvarFnames.get( 
rvar );
-                                               if( fname!=null )
-                                                       mo.setFileName( fname );
-                                                       
-                                               //export result var (iff 
actually modified in parfor)
-                                               mo.exportData(); //note: this 
is equivalent to doing it in close (currently not required because 1 Task=1Map 
tasks, hence only one map invocation)              
-                                               rvarFnames.put(rvar, 
mo.getFileName()); 
-                                       }
-                                       else
-                                       {
-                                               //export result var (iff 
actually modified in parfor)
-                                               mo.exportData(); //note: this 
is equivalent to doing it in close (currently not required because 1 Task=1Map 
tasks, hence only one map invocation)
-                                       }
-                                       
-                                       //pass output vars (scalars by value, 
matrix by ref) to result
-                                       //(only if actually exported, hence in 
check for dirty, otherwise potential problems in result merge)
-                                       String datStr = 
ProgramConverter.serializeDataObject(rvar, mo);
-                                       ovalue.set( datStr );
-                                       out.collect( okey, ovalue );
-                               }
-                       }       
-               }
-       }
-       
-       /**
-        * For remote Spark parfor workers. This is a simplified version 
compared to MR.
-        * 
-        * @param workerID
-        * @param vars
-        * @param resultVars
-        * @param rvarFnames
-        * @throws DMLRuntimeException
-        * @throws IOException
-        */
-       public static ArrayList<String> exportResultVariables( long workerID, 
LocalVariableMap vars, ArrayList<String> resultVars) 
-               throws DMLRuntimeException, IOException
-       {
-               ArrayList<String> ret = new ArrayList<String>();
-               
-               //foreach result variables probe if export necessary
-               for( String rvar : resultVars )
-               {
-                       Data dat = vars.get( rvar );
-                       
-                       //export output variable to HDFS (see RunMRJobs)
-                       if ( dat != null && dat.getDataType() == 
DataType.MATRIX ) 
-                       {
-                               MatrixObject mo = (MatrixObject) dat;
-                               if( mo.isDirty() )
-                               {
-                                       //export result var (iff actually 
modified in parfor)
-                                       mo.exportData(); 
-                                       
-                                       
-                                       //pass output vars (scalars by value, 
matrix by ref) to result
-                                       //(only if actually exported, hence in 
check for dirty, otherwise potential problems in result merge)
-                                       ret.add( 
ProgramConverter.serializeDataObject(rvar, mo) );
-                               }
-                       }       
-               }
-               
-               return ret;
-       }
-               
-       
-       /**
-        * Cleanup all temporary files created by this SystemML process
-        * instance.
-        * 
-        */
-       public static void cleanupWorkingDirectories()
-       {
-               //use the given job configuration for infrastructure analysis 
(see configure);
-               //this is important for robustness w/ misconfigured classpath 
which also contains
-               //core-default.xml and hence hides the actual cluster 
configuration; otherwise
-               //there is missing cleanup of working directories 
-               JobConf job = ConfigurationManager.getCachedJobConf();
-               
-               if( !InfrastructureAnalyzer.isLocalMode(job) )
-               {
-                       //delete cache files
-                       CacheableData.cleanupCacheDir();
-                       //disable caching (prevent dynamic eviction)
-                       CacheableData.disableCaching();
-                       //cleanup working dir (e.g., of CP_FILE instructions)
-                       LocalFileUtils.cleanupWorkingDirectory();
-               }
-       }
-       
-       /**
-        * 
-        * @param out
-        * @return
-        * @throws DMLRuntimeException
-        * @throws IOException
-        */
-       public static LocalVariableMap[] getResults( List<Tuple2<Long,String>> 
out, Log LOG ) 
-               throws DMLRuntimeException
-       {
-               HashMap<Long,LocalVariableMap> tmp = new 
HashMap<Long,LocalVariableMap>();
-
-               int countAll = 0;
-               for( Tuple2<Long,String> entry : out )
-               {
-                       Long key = entry._1();
-                       String val = entry._2();
-                       if( !tmp.containsKey( key ) )
-                       tmp.put(key, new LocalVariableMap ());     
-                       Object[] dat = ProgramConverter.parseDataObject( val );
-               tmp.get(key).put((String)dat[0], (Data)dat[1]);
-               countAll++;
-               }
-
-               if( LOG != null ) {
-                       LOG.debug("Num remote worker results (before 
deduplication): "+countAll);
-                       LOG.debug("Num remote worker results: "+tmp.size());
-               }
-               
-               //create return array
-               return tmp.values().toArray(new LocalVariableMap[0]);   
-       }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.controlprogram.parfor;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+import scala.Tuple2;
+
+import org.apache.sysml.api.DMLScript;
+import org.apache.sysml.conf.ConfigurationManager;
+import org.apache.sysml.parser.Expression.DataType;
+import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.controlprogram.LocalVariableMap;
+import org.apache.sysml.runtime.controlprogram.ParForProgramBlock;
+import org.apache.sysml.runtime.controlprogram.caching.CacheStatistics;
+import org.apache.sysml.runtime.controlprogram.caching.CacheableData;
+import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
+import 
org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
+import org.apache.sysml.runtime.controlprogram.parfor.stat.Stat;
+import org.apache.sysml.runtime.instructions.cp.Data;
+import org.apache.sysml.runtime.util.LocalFileUtils;
+import org.apache.sysml.utils.Statistics;
+
+/**
+ * Common functionalities for parfor workers in MR jobs. Used by worker 
wrappers in
+ * mappers (base RemoteParFor) and reducers (fused data partitioning and 
parfor)
+ * 
+ */
+public class RemoteParForUtils 
+{
+       
+       /**
+        * 
+        * @param reporter
+        * @param deltaTasks
+        * @param deltaIterations
+        */
+       public static void incrementParForMRCounters(Reporter reporter, long 
deltaTasks, long deltaIterations)
+       {
+               //report parfor counters
+               if( deltaTasks>0 )
+                       
reporter.incrCounter(ParForProgramBlock.PARFOR_COUNTER_GROUP_NAME, 
Stat.PARFOR_NUMTASKS.toString(), deltaTasks);
+               if( deltaIterations>0 )
+                       
reporter.incrCounter(ParForProgramBlock.PARFOR_COUNTER_GROUP_NAME, 
Stat.PARFOR_NUMITERS.toString(), deltaIterations);
+               
+               JobConf job = ConfigurationManager.getCachedJobConf();
+               if( DMLScript.STATISTICS  && 
!InfrastructureAnalyzer.isLocalMode(job) ) 
+               {
+                       //report cache statistics
+                       reporter.incrCounter( 
ParForProgramBlock.PARFOR_COUNTER_GROUP_NAME, 
Stat.PARFOR_JITCOMPILE.toString(), Statistics.getJITCompileTime());
+                       reporter.incrCounter( 
ParForProgramBlock.PARFOR_COUNTER_GROUP_NAME, 
Stat.PARFOR_JVMGC_COUNT.toString(), Statistics.getJVMgcCount());
+                       reporter.incrCounter( 
ParForProgramBlock.PARFOR_COUNTER_GROUP_NAME, 
Stat.PARFOR_JVMGC_TIME.toString(), Statistics.getJVMgcTime());
+                       reporter.incrCounter( 
CacheableData.CACHING_COUNTER_GROUP_NAME, 
CacheStatistics.Stat.CACHE_HITS_MEM.toString(), CacheStatistics.getMemHits());
+                       reporter.incrCounter( 
CacheableData.CACHING_COUNTER_GROUP_NAME, 
CacheStatistics.Stat.CACHE_HITS_FSBUFF.toString(), 
CacheStatistics.getFSBuffHits());
+                       reporter.incrCounter( 
CacheableData.CACHING_COUNTER_GROUP_NAME, 
CacheStatistics.Stat.CACHE_HITS_FS.toString(), CacheStatistics.getFSHits());
+                       reporter.incrCounter( 
CacheableData.CACHING_COUNTER_GROUP_NAME, 
CacheStatistics.Stat.CACHE_HITS_HDFS.toString(), CacheStatistics.getHDFSHits());
+                       reporter.incrCounter( 
CacheableData.CACHING_COUNTER_GROUP_NAME, 
CacheStatistics.Stat.CACHE_WRITES_FSBUFF.toString(), 
CacheStatistics.getFSBuffWrites());
+                       reporter.incrCounter( 
CacheableData.CACHING_COUNTER_GROUP_NAME, 
CacheStatistics.Stat.CACHE_WRITES_FS.toString(), CacheStatistics.getFSWrites());
+                       reporter.incrCounter( 
CacheableData.CACHING_COUNTER_GROUP_NAME, 
CacheStatistics.Stat.CACHE_WRITES_HDFS.toString(), 
CacheStatistics.getHDFSWrites());
+                       reporter.incrCounter( 
CacheableData.CACHING_COUNTER_GROUP_NAME, 
CacheStatistics.Stat.CACHE_TIME_ACQR.toString(), 
CacheStatistics.getAcquireRTime());
+                       reporter.incrCounter( 
CacheableData.CACHING_COUNTER_GROUP_NAME, 
CacheStatistics.Stat.CACHE_TIME_ACQM.toString(), 
CacheStatistics.getAcquireMTime());
+                       reporter.incrCounter( 
CacheableData.CACHING_COUNTER_GROUP_NAME, 
CacheStatistics.Stat.CACHE_TIME_RLS.toString(), 
CacheStatistics.getReleaseTime());
+                       reporter.incrCounter( 
CacheableData.CACHING_COUNTER_GROUP_NAME, 
CacheStatistics.Stat.CACHE_TIME_EXP.toString(), 
CacheStatistics.getExportTime());
+               
+                       //reset cache statistics to prevent overlapping 
reporting
+                       CacheStatistics.reset();
+               }
+       }
+       
+       /**
+        * 
+        * @param workerID
+        * @param vars
+        * @param resultVars
+        * @param out
+        * @throws DMLRuntimeException
+        * @throws IOException
+        */
+       public static void exportResultVariables( long workerID, 
LocalVariableMap vars, ArrayList<String> resultVars, OutputCollector<Writable, 
Writable> out ) 
+                       throws DMLRuntimeException, IOException
+       {
+               exportResultVariables(workerID, vars, resultVars, null, out);
+       }       
+       
+       /**
+        * For remote MR parfor workers.
+        * 
+        * @param workerID
+        * @param vars
+        * @param resultVars
+        * @param rvarFnames
+        * @param out
+        * @throws DMLRuntimeException
+        * @throws IOException
+        */
+       public static void exportResultVariables( long workerID, 
LocalVariableMap vars, ArrayList<String> resultVars, 
+                                                         
HashMap<String,String> rvarFnames, OutputCollector<Writable, Writable> out ) 
+               throws DMLRuntimeException, IOException
+       {
+               //create key and value for reuse
+               LongWritable okey = new LongWritable( workerID ); 
+               Text ovalue = new Text();
+               
+               //foreach result variables probe if export necessary
+               for( String rvar : resultVars )
+               {
+                       Data dat = vars.get( rvar );
+                       
+                       //export output variable to HDFS (see RunMRJobs)
+                       if ( dat != null && dat.getDataType() == 
DataType.MATRIX ) 
+                       {
+                               MatrixObject mo = (MatrixObject) dat;
+                               if( mo.isDirty() )
+                               {
+                                       if( 
ParForProgramBlock.ALLOW_REUSE_MR_PAR_WORKER && rvarFnames!=null )
+                                       {
+                                               String fname = rvarFnames.get( 
rvar );
+                                               if( fname!=null )
+                                                       mo.setFileName( fname );
+                                                       
+                                               //export result var (iff 
actually modified in parfor)
+                                               mo.exportData(); //note: this 
is equivalent to doing it in close (currently not required because 1 Task=1Map 
tasks, hence only one map invocation)              
+                                               rvarFnames.put(rvar, 
mo.getFileName()); 
+                                       }
+                                       else
+                                       {
+                                               //export result var (iff 
actually modified in parfor)
+                                               mo.exportData(); //note: this 
is equivalent to doing it in close (currently not required because 1 Task=1Map 
tasks, hence only one map invocation)
+                                       }
+                                       
+                                       //pass output vars (scalars by value, 
matrix by ref) to result
+                                       //(only if actually exported, hence in 
check for dirty, otherwise potential problems in result merge)
+                                       String datStr = 
ProgramConverter.serializeDataObject(rvar, mo);
+                                       ovalue.set( datStr );
+                                       out.collect( okey, ovalue );
+                               }
+                       }       
+               }
+       }
+       
+       /**
+        * For remote Spark parfor workers. This is a simplified version 
compared to MR.
+        * 
+        * @param workerID
+        * @param vars
+        * @param resultVars
+        * @param rvarFnames
+        * @throws DMLRuntimeException
+        * @throws IOException
+        */
+       public static ArrayList<String> exportResultVariables( long workerID, 
LocalVariableMap vars, ArrayList<String> resultVars) 
+               throws DMLRuntimeException, IOException
+       {
+               ArrayList<String> ret = new ArrayList<String>();
+               
+               //foreach result variables probe if export necessary
+               for( String rvar : resultVars )
+               {
+                       Data dat = vars.get( rvar );
+                       
+                       //export output variable to HDFS (see RunMRJobs)
+                       if ( dat != null && dat.getDataType() == 
DataType.MATRIX ) 
+                       {
+                               MatrixObject mo = (MatrixObject) dat;
+                               if( mo.isDirty() )
+                               {
+                                       //export result var (iff actually 
modified in parfor)
+                                       mo.exportData(); 
+                                       
+                                       
+                                       //pass output vars (scalars by value, 
matrix by ref) to result
+                                       //(only if actually exported, hence in 
check for dirty, otherwise potential problems in result merge)
+                                       ret.add( 
ProgramConverter.serializeDataObject(rvar, mo) );
+                               }
+                       }       
+               }
+               
+               return ret;
+       }
+               
+       
+       /**
+        * Cleanup all temporary files created by this SystemML process
+        * instance.
+        * 
+        */
+       public static void cleanupWorkingDirectories()
+       {
+               //use the given job configuration for infrastructure analysis 
(see configure);
+               //this is important for robustness w/ misconfigured classpath 
which also contains
+               //core-default.xml and hence hides the actual cluster 
configuration; otherwise
+               //there is missing cleanup of working directories 
+               JobConf job = ConfigurationManager.getCachedJobConf();
+               
+               if( !InfrastructureAnalyzer.isLocalMode(job) )
+               {
+                       //delete cache files
+                       CacheableData.cleanupCacheDir();
+                       //disable caching (prevent dynamic eviction)
+                       CacheableData.disableCaching();
+                       //cleanup working dir (e.g., of CP_FILE instructions)
+                       LocalFileUtils.cleanupWorkingDirectory();
+               }
+       }
+       
+       /**
+        * 
+        * @param out
+        * @return
+        * @throws DMLRuntimeException
+        * @throws IOException
+        */
+       public static LocalVariableMap[] getResults( List<Tuple2<Long,String>> 
out, Log LOG ) 
+               throws DMLRuntimeException
+       {
+               HashMap<Long,LocalVariableMap> tmp = new 
HashMap<Long,LocalVariableMap>();
+
+               int countAll = 0;
+               for( Tuple2<Long,String> entry : out )
+               {
+                       Long key = entry._1();
+                       String val = entry._2();
+                       if( !tmp.containsKey( key ) )
+                       tmp.put(key, new LocalVariableMap ());     
+                       Object[] dat = ProgramConverter.parseDataObject( val );
+               tmp.get(key).put((String)dat[0], (Data)dat[1]);
+               countAll++;
+               }
+
+               if( LOG != null ) {
+                       LOG.debug("Num remote worker results (before 
deduplication): "+countAll);
+                       LOG.debug("Num remote worker results: "+tmp.size());
+               }
+               
+               //create return array
+               return tmp.values().toArray(new LocalVariableMap[0]);   
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/05d2c0a8/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/PerfTestToolRegression.dml
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/PerfTestToolRegression.dml
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/PerfTestToolRegression.dml
index 6f478bc..e6556fa 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/PerfTestToolRegression.dml
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/PerfTestToolRegression.dml
@@ -1,58 +1,58 @@
-#-------------------------------------------------------------
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-# 
-#   http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-#-------------------------------------------------------------
-
-
-#PerfTestTool: DML template for estimation cost functions.
-
-dynRead = externalFunction(Matrix[Double] d, String fname, Integer m, Integer 
n) 
-return (Matrix[Double] D) 
-implemented in 
(classname="org.apache.sysml.runtime.controlprogram.parfor.test.dml.DynamicReadMatrix2DCP",exectype="mem")
 
-
-dynWrite = externalFunction(Matrix[Double] R, String fname) 
-return (Matrix[Double] D) 
-implemented in 
(classname="org.apache.sysml.runtime.controlprogram.parfor.test.dml.DynamicWriteMatrix2DCP",exectype="mem")
 
-
-solve = externalFunction(Matrix[Double] A, Matrix[Double] y) 
-return (Matrix[Double] b) 
-implemented in 
(classname="org.apache.sysml.packagesupport.LinearSolverWrapperCP",exectype="mem")
 
-
-k = %numModels%;
-m = -1; 
-n = -1;
-
-dummy = matrix(1,rows=1,cols=1); 
-
-for( i in 1:k, par=8, mode=LOCAL )
-{
-   sin1 = "./conf/PerfTestTool/"+i+"_in1.csv";   
-   sin2 = "./conf/PerfTestTool/"+i+"_in2.csv";   
-   
-   D = dynRead( dummy, sin1, m, n );
-   y = dynRead( dummy, sin2, m, 1 );
-   
-   A = t(D) %*% D; # X'X
-   b = t(D) %*% y; # X'y
-   beta = solve(A,b); 
-
-   sout = "./conf/PerfTestTool/"+i+"_out.csv";   
-   
-   X=dynWrite( beta, sout );
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+
+#PerfTestTool: DML template for estimation cost functions.
+
+dynRead = externalFunction(Matrix[Double] d, String fname, Integer m, Integer 
n) 
+return (Matrix[Double] D) 
+implemented in 
(classname="org.apache.sysml.runtime.controlprogram.parfor.test.dml.DynamicReadMatrix2DCP",exectype="mem")
 
+
+dynWrite = externalFunction(Matrix[Double] R, String fname) 
+return (Matrix[Double] D) 
+implemented in 
(classname="org.apache.sysml.runtime.controlprogram.parfor.test.dml.DynamicWriteMatrix2DCP",exectype="mem")
 
+
+solve = externalFunction(Matrix[Double] A, Matrix[Double] y) 
+return (Matrix[Double] b) 
+implemented in 
(classname="org.apache.sysml.packagesupport.LinearSolverWrapperCP",exectype="mem")
 
+
+k = %numModels%;
+m = -1; 
+n = -1;
+
+dummy = matrix(1,rows=1,cols=1); 
+
+for( i in 1:k, par=8, mode=LOCAL )
+{
+   sin1 = "./conf/PerfTestTool/"+i+"_in1.csv";   
+   sin2 = "./conf/PerfTestTool/"+i+"_in2.csv";   
+   
+   D = dynRead( dummy, sin1, m, n );
+   y = dynRead( dummy, sin2, m, 1 );
+   
+   A = t(D) %*% D; # X'X
+   b = t(D) %*% y; # X'y
+   beta = solve(A,b); 
+
+   sout = "./conf/PerfTestTool/"+i+"_out.csv";   
+   
+   X=dynWrite( beta, sout );
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/05d2c0a8/src/main/java/org/apache/sysml/runtime/instructions/spark/data/BroadcastObject.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/BroadcastObject.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/BroadcastObject.java
index 1dd419f..13c68e2 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/BroadcastObject.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/BroadcastObject.java
@@ -1,64 +1,64 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.instructions.spark.data;
-
-import java.lang.ref.SoftReference;
-
-import org.apache.spark.broadcast.Broadcast;
-
-public class BroadcastObject extends LineageObject
-{
-       //soft reference storage for graceful cleanup in case of memory pressure
-       private SoftReference<PartitionedBroadcastMatrix> _bcHandle = null;
-       
-       public BroadcastObject( PartitionedBroadcastMatrix bvar, String varName 
)
-       {
-               _bcHandle = new SoftReference<PartitionedBroadcastMatrix>(bvar);
-               _varName = varName;
-       }
-       
-       /**
-        * 
-        * @return
-        */
-       public PartitionedBroadcastMatrix getBroadcast()
-       {
-               return _bcHandle.get();
-       }
-       
-       /**
-        * 
-        * @return
-        */
-       public boolean isValid() 
-       {
-               //check for evicted soft reference
-               PartitionedBroadcastMatrix pbm = _bcHandle.get();
-               if( pbm == null )
-                       return false;
-               
-               //check for validity of individual broadcasts
-               Broadcast<PartitionedMatrixBlock>[] tmp = pbm.getBroadcasts();
-               for( Broadcast<PartitionedMatrixBlock> bc : tmp )
-                       if( !bc.isValid() )
-                               return false;           
-               return true;
-       }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.instructions.spark.data;
+
+import java.lang.ref.SoftReference;
+
+import org.apache.spark.broadcast.Broadcast;
+
+public class BroadcastObject extends LineageObject
+{
+       //soft reference storage for graceful cleanup in case of memory pressure
+       private SoftReference<PartitionedBroadcastMatrix> _bcHandle = null;
+       
+       public BroadcastObject( PartitionedBroadcastMatrix bvar, String varName 
)
+       {
+               _bcHandle = new SoftReference<PartitionedBroadcastMatrix>(bvar);
+               _varName = varName;
+       }
+       
+       /**
+        * 
+        * @return
+        */
+       public PartitionedBroadcastMatrix getBroadcast()
+       {
+               return _bcHandle.get();
+       }
+       
+       /**
+        * 
+        * @return
+        */
+       public boolean isValid() 
+       {
+               //check for evicted soft reference
+               PartitionedBroadcastMatrix pbm = _bcHandle.get();
+               if( pbm == null )
+                       return false;
+               
+               //check for validity of individual broadcasts
+               Broadcast<PartitionedMatrixBlock>[] tmp = pbm.getBroadcasts();
+               for( Broadcast<PartitionedMatrixBlock> bc : tmp )
+                       if( !bc.isValid() )
+                               return false;           
+               return true;
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/05d2c0a8/src/main/java/org/apache/sysml/runtime/instructions/spark/data/LineageObject.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/LineageObject.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/LineageObject.java
index b2bb62c..bcf37bb 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/LineageObject.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/LineageObject.java
@@ -1,83 +1,83 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.instructions.spark.data;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
-
-public abstract class LineageObject 
-{
-
-       //basic lineage information
-       protected int _numRef = -1;
-       protected List<LineageObject> _childs = null;
-       protected String _varName = null;
-       
-       //N:1 back reference to matrix object
-       protected MatrixObject _mo = null;
-       
-       protected LineageObject()
-       {
-               _numRef = 0;
-               _childs = new ArrayList<LineageObject>();
-       }
-       
-       public String getVarName() {
-               return _varName;
-       }
-       
-       public int getNumReferences()
-       {
-               return _numRef;
-       }
-       
-       public void setBackReference(MatrixObject mo)
-       {
-               _mo = mo;
-       }
-       
-       public boolean hasBackReference()
-       {
-               return (_mo != null);
-       }
-       
-       public void incrementNumReferences()
-       {
-               _numRef++;
-       }
-       
-       public void decrementNumReferences()
-       {
-               _numRef--;
-       }
-       
-       public List<LineageObject> getLineageChilds()
-       {
-               return _childs;
-       }
-       
-       public void addLineageChild(LineageObject lob)
-       {
-               lob.incrementNumReferences();
-               _childs.add( lob );
-       }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.instructions.spark.data;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
+
+public abstract class LineageObject 
+{
+
+       //basic lineage information
+       protected int _numRef = -1;
+       protected List<LineageObject> _childs = null;
+       protected String _varName = null;
+       
+       //N:1 back reference to matrix object
+       protected MatrixObject _mo = null;
+       
+       protected LineageObject()
+       {
+               _numRef = 0;
+               _childs = new ArrayList<LineageObject>();
+       }
+       
+       public String getVarName() {
+               return _varName;
+       }
+       
+       public int getNumReferences()
+       {
+               return _numRef;
+       }
+       
+       public void setBackReference(MatrixObject mo)
+       {
+               _mo = mo;
+       }
+       
+       public boolean hasBackReference()
+       {
+               return (_mo != null);
+       }
+       
+       public void incrementNumReferences()
+       {
+               _numRef++;
+       }
+       
+       public void decrementNumReferences()
+       {
+               _numRef--;
+       }
+       
+       public List<LineageObject> getLineageChilds()
+       {
+               return _childs;
+       }
+       
+       public void addLineageChild(LineageObject lob)
+       {
+               lob.incrementNumReferences();
+               _childs.add( lob );
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/05d2c0a8/src/main/java/org/apache/sysml/runtime/instructions/spark/data/RDDObject.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/RDDObject.java 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/RDDObject.java
index fb7e773..605e7ca 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/RDDObject.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/RDDObject.java
@@ -1,124 +1,124 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.instructions.spark.data;
-
-import org.apache.spark.api.java.JavaPairRDD;
-
-public class RDDObject extends LineageObject
-{
-
-       private JavaPairRDD<?,?> _rddHandle = null;
-       
-       //meta data on origin of given rdd handle
-       private boolean _checkpointed = false; //created via checkpoint 
instruction
-       private boolean _hdfsfile = false;     //created from hdfs file
-       private String  _hdfsFname = null;     //hdfs filename, if created from 
hdfs.  
-       
-       public RDDObject( JavaPairRDD<?,?> rddvar, String varName)
-       {
-               _rddHandle = rddvar;
-               _varName = varName;
-       }
-       
-       /**
-        * 
-        * @return
-        */
-       public JavaPairRDD<?,?> getRDD()
-       {
-               return _rddHandle;
-       }
-       
-       public void setCheckpointRDD( boolean flag )
-       {
-               _checkpointed = flag;
-       }
-       
-       public boolean isCheckpointRDD() 
-       {
-               return _checkpointed;
-       }
-       
-       public void setHDFSFile( boolean flag ) {
-               _hdfsfile = flag;
-       }
-       
-       public void setHDFSFilename( String fname ) {
-               _hdfsFname = fname;
-       }
-       
-       public boolean isHDFSFile() {
-               return _hdfsfile;
-       }
-       
-       public String getHDFSFilename() {
-               return _hdfsFname;
-       }
-       
-
-       /**
-        * Indicates if rdd is an hdfs file or a checkpoint over an hdfs file;
-        * in both cases, we can directly read the file instead of collecting
-        * the given rdd.
-        * 
-        * @return
-        */
-       public boolean allowsShortCircuitRead()
-       {
-               boolean ret = isHDFSFile();
-               
-               if( isCheckpointRDD() && getLineageChilds().size() == 1 ) {
-                       LineageObject lo = getLineageChilds().get(0);
-                       ret = ( lo instanceof RDDObject && 
((RDDObject)lo).isHDFSFile() );
-               }
-               
-               return ret;
-       }
-       
-       /**
-        * 
-        * @return
-        */
-       public boolean allowsShortCircuitCollect()
-       {
-               return ( isCheckpointRDD() && getLineageChilds().size() == 1
-                            && getLineageChilds().get(0) instanceof RDDObject 
);
-       }
-       
-       /**
-        * 
-        * @return
-        */
-       public boolean rHasCheckpointRDDChilds()
-       {
-               //probe for checkpoint rdd
-               if( _checkpointed )
-                       return true;
-               
-               //process childs recursively
-               boolean ret = false;
-               for( LineageObject lo : getLineageChilds() ) {
-                       if( lo instanceof RDDObject )
-                               ret |= 
((RDDObject)lo).rHasCheckpointRDDChilds();
-               }
-               
-               return ret;
-       }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.instructions.spark.data;
+
+import org.apache.spark.api.java.JavaPairRDD;
+
+public class RDDObject extends LineageObject
+{
+
+       private JavaPairRDD<?,?> _rddHandle = null;
+       
+       //meta data on origin of given rdd handle
+       private boolean _checkpointed = false; //created via checkpoint 
instruction
+       private boolean _hdfsfile = false;     //created from hdfs file
+       private String  _hdfsFname = null;     //hdfs filename, if created from 
hdfs.  
+       
+       public RDDObject( JavaPairRDD<?,?> rddvar, String varName)
+       {
+               _rddHandle = rddvar;
+               _varName = varName;
+       }
+       
+       /**
+        * 
+        * @return
+        */
+       public JavaPairRDD<?,?> getRDD()
+       {
+               return _rddHandle;
+       }
+       
+       public void setCheckpointRDD( boolean flag )
+       {
+               _checkpointed = flag;
+       }
+       
+       public boolean isCheckpointRDD() 
+       {
+               return _checkpointed;
+       }
+       
+       public void setHDFSFile( boolean flag ) {
+               _hdfsfile = flag;
+       }
+       
+       public void setHDFSFilename( String fname ) {
+               _hdfsFname = fname;
+       }
+       
+       public boolean isHDFSFile() {
+               return _hdfsfile;
+       }
+       
+       public String getHDFSFilename() {
+               return _hdfsFname;
+       }
+       
+
+       /**
+        * Indicates if rdd is an hdfs file or a checkpoint over an hdfs file;
+        * in both cases, we can directly read the file instead of collecting
+        * the given rdd.
+        * 
+        * @return
+        */
+       public boolean allowsShortCircuitRead()
+       {
+               boolean ret = isHDFSFile();
+               
+               if( isCheckpointRDD() && getLineageChilds().size() == 1 ) {
+                       LineageObject lo = getLineageChilds().get(0);
+                       ret = ( lo instanceof RDDObject && 
((RDDObject)lo).isHDFSFile() );
+               }
+               
+               return ret;
+       }
+       
+       /**
+        * 
+        * @return
+        */
+       public boolean allowsShortCircuitCollect()
+       {
+               return ( isCheckpointRDD() && getLineageChilds().size() == 1
+                            && getLineageChilds().get(0) instanceof RDDObject 
);
+       }
+       
+       /**
+        * 
+        * @return
+        */
+       public boolean rHasCheckpointRDDChilds()
+       {
+               //probe for checkpoint rdd
+               if( _checkpointed )
+                       return true;
+               
+               //process childs recursively
+               boolean ret = false;
+               for( LineageObject lo : getLineageChilds() ) {
+                       if( lo instanceof RDDObject )
+                               ret |= 
((RDDObject)lo).rHasCheckpointRDDChilds();
+               }
+               
+               return ret;
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/05d2c0a8/src/main/java/org/apache/sysml/runtime/matrix/mapred/GroupedAggMRCombiner.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/mapred/GroupedAggMRCombiner.java
 
b/src/main/java/org/apache/sysml/runtime/matrix/mapred/GroupedAggMRCombiner.java
index e561f3c..6cb0830 100644
--- 
a/src/main/java/org/apache/sysml/runtime/matrix/mapred/GroupedAggMRCombiner.java
+++ 
b/src/main/java/org/apache/sysml/runtime/matrix/mapred/GroupedAggMRCombiner.java
@@ -1,167 +1,167 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.matrix.mapred;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Iterator;
-
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.sysml.runtime.functionobjects.CM;
-import org.apache.sysml.runtime.functionobjects.KahanPlus;
-import org.apache.sysml.runtime.instructions.cp.CM_COV_Object;
-import org.apache.sysml.runtime.instructions.cp.KahanObject;
-import org.apache.sysml.runtime.instructions.mr.GroupedAggregateInstruction;
-import org.apache.sysml.runtime.matrix.data.TaggedMatrixIndexes;
-import org.apache.sysml.runtime.matrix.data.WeightedCell;
-import org.apache.sysml.runtime.matrix.operators.AggregateOperator;
-import org.apache.sysml.runtime.matrix.operators.CMOperator;
-import org.apache.sysml.runtime.matrix.operators.Operator;
-
-
-public class GroupedAggMRCombiner extends ReduceBase
-       implements Reducer<TaggedMatrixIndexes, WeightedCell, 
TaggedMatrixIndexes, WeightedCell>
-{      
-       //grouped aggregate instructions
-       private HashMap<Byte, GroupedAggregateInstruction> grpaggInstructions = 
new HashMap<Byte, GroupedAggregateInstruction>();
-       
-       //reused intermediate objects
-       private CM_COV_Object cmObj = new CM_COV_Object(); 
-       private HashMap<Byte, CM> cmFn = new HashMap<Byte, CM>();
-       private WeightedCell outCell = new WeightedCell();
-
-       @Override
-       public void reduce(TaggedMatrixIndexes key, Iterator<WeightedCell> 
values,
-                                  OutputCollector<TaggedMatrixIndexes, 
WeightedCell> out, Reporter reporter)
-               throws IOException 
-       {
-               long start = System.currentTimeMillis();
-               
-               //get aggregate operator
-               GroupedAggregateInstruction ins = 
grpaggInstructions.get(key.getTag());
-               Operator op = ins.getOperator();
-               boolean isPartialAgg = true;
-               
-               //combine iterator to single value
-               try
-               {
-                       if(op instanceof CMOperator) //everything except sum
-                       {
-                               if( ((CMOperator) 
op).isPartialAggregateOperator() )
-                               {
-                                       cmObj.reset();
-                                       CM lcmFn = cmFn.get(key.getTag());
-                                       
-                                       //partial aggregate cm operator 
-                                       while( values.hasNext() )
-                                       {
-                                               WeightedCell 
value=values.next();
-                                               lcmFn.execute(cmObj, 
value.getValue(), value.getWeight());                              
-                                       }
-                                       
-                                       
outCell.setValue(cmObj.getRequiredPartialResult(op));
-                                       outCell.setWeight(cmObj.getWeight());   
-                               }
-                               else //forward tuples to reducer
-                               {
-                                       isPartialAgg = false; 
-                                       while( values.hasNext() )
-                                               out.collect(key, values.next());
-                               }                               
-                       }
-                       else if(op instanceof AggregateOperator) //sum
-                       {
-                               AggregateOperator aggop=(AggregateOperator) op;
-                                       
-                               if( aggop.correctionExists )
-                               {
-                                       KahanObject buffer=new 
KahanObject(aggop.initialValue, 0);
-                                       
-                                       KahanPlus.getKahanPlusFnObject();
-                                       
-                                       //partial aggregate with correction
-                                       while( values.hasNext() )
-                                       {
-                                               WeightedCell 
value=values.next();
-                                               
aggop.increOp.fn.execute(buffer, value.getValue()*value.getWeight());
-                                       }
-                                       
-                                       outCell.setValue(buffer._sum);
-                                       outCell.setWeight(1);
-                               }
-                               else //no correction
-                               {
-                                       double v = aggop.initialValue;
-                                       
-                                       //partial aggregate without correction
-                                       while(values.hasNext())
-                                       {
-                                               WeightedCell 
value=values.next();
-                                               v=aggop.increOp.fn.execute(v, 
value.getValue()*value.getWeight());
-                                       }
-                                       
-                                       outCell.setValue(v);
-                                       outCell.setWeight(1);
-                               }                               
-                       }
-                       else
-                               throw new IOException("Unsupported operator in 
instruction: " + ins);
-               }
-               catch(Exception ex)
-               {
-                       throw new IOException(ex);
-               }
-               
-               //collect the output (to reducer)
-               if( isPartialAgg )
-                       out.collect(key, outCell);
-               
-               reporter.incrCounter(Counters.COMBINE_OR_REDUCE_TIME, 
System.currentTimeMillis()-start);
-       }
-
-       @Override
-       public void configure(JobConf job)
-       {
-               try 
-               {
-                       GroupedAggregateInstruction[] grpaggIns = 
MRJobConfiguration.getGroupedAggregateInstructions(job);
-                       if( grpaggIns != null ) 
-                               for(GroupedAggregateInstruction ins : grpaggIns)
-                               {
-                                       grpaggInstructions.put(ins.output, 
ins);        
-                                       if( ins.getOperator() instanceof 
CMOperator )
-                                               cmFn.put(ins.output, 
CM.getCMFnObject(((CMOperator)ins.getOperator()).getAggOpType()));
-                               }
-               } 
-               catch (Exception e) 
-               {
-                       throw new RuntimeException(e);
-               } 
-       }
-       
-       @Override
-       public void close()
-       {
-               //do nothing, overrides unnecessary handling in superclass
-       }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.matrix.mapred;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.sysml.runtime.functionobjects.CM;
+import org.apache.sysml.runtime.functionobjects.KahanPlus;
+import org.apache.sysml.runtime.instructions.cp.CM_COV_Object;
+import org.apache.sysml.runtime.instructions.cp.KahanObject;
+import org.apache.sysml.runtime.instructions.mr.GroupedAggregateInstruction;
+import org.apache.sysml.runtime.matrix.data.TaggedMatrixIndexes;
+import org.apache.sysml.runtime.matrix.data.WeightedCell;
+import org.apache.sysml.runtime.matrix.operators.AggregateOperator;
+import org.apache.sysml.runtime.matrix.operators.CMOperator;
+import org.apache.sysml.runtime.matrix.operators.Operator;
+
+
+public class GroupedAggMRCombiner extends ReduceBase
+       implements Reducer<TaggedMatrixIndexes, WeightedCell, 
TaggedMatrixIndexes, WeightedCell>
+{      
+       //grouped aggregate instructions
+       private HashMap<Byte, GroupedAggregateInstruction> grpaggInstructions = 
new HashMap<Byte, GroupedAggregateInstruction>();
+       
+       //reused intermediate objects
+       private CM_COV_Object cmObj = new CM_COV_Object(); 
+       private HashMap<Byte, CM> cmFn = new HashMap<Byte, CM>();
+       private WeightedCell outCell = new WeightedCell();
+
+       @Override
+       public void reduce(TaggedMatrixIndexes key, Iterator<WeightedCell> 
values,
+                                  OutputCollector<TaggedMatrixIndexes, 
WeightedCell> out, Reporter reporter)
+               throws IOException 
+       {
+               long start = System.currentTimeMillis();
+               
+               //get aggregate operator
+               GroupedAggregateInstruction ins = 
grpaggInstructions.get(key.getTag());
+               Operator op = ins.getOperator();
+               boolean isPartialAgg = true;
+               
+               //combine iterator to single value
+               try
+               {
+                       if(op instanceof CMOperator) //everything except sum
+                       {
+                               if( ((CMOperator) 
op).isPartialAggregateOperator() )
+                               {
+                                       cmObj.reset();
+                                       CM lcmFn = cmFn.get(key.getTag());
+                                       
+                                       //partial aggregate cm operator 
+                                       while( values.hasNext() )
+                                       {
+                                               WeightedCell 
value=values.next();
+                                               lcmFn.execute(cmObj, 
value.getValue(), value.getWeight());                              
+                                       }
+                                       
+                                       
outCell.setValue(cmObj.getRequiredPartialResult(op));
+                                       outCell.setWeight(cmObj.getWeight());   
+                               }
+                               else //forward tuples to reducer
+                               {
+                                       isPartialAgg = false; 
+                                       while( values.hasNext() )
+                                               out.collect(key, values.next());
+                               }                               
+                       }
+                       else if(op instanceof AggregateOperator) //sum
+                       {
+                               AggregateOperator aggop=(AggregateOperator) op;
+                                       
+                               if( aggop.correctionExists )
+                               {
+                                       KahanObject buffer=new 
KahanObject(aggop.initialValue, 0);
+                                       
+                                       KahanPlus.getKahanPlusFnObject();
+                                       
+                                       //partial aggregate with correction
+                                       while( values.hasNext() )
+                                       {
+                                               WeightedCell 
value=values.next();
+                                               
aggop.increOp.fn.execute(buffer, value.getValue()*value.getWeight());
+                                       }
+                                       
+                                       outCell.setValue(buffer._sum);
+                                       outCell.setWeight(1);
+                               }
+                               else //no correction
+                               {
+                                       double v = aggop.initialValue;
+                                       
+                                       //partial aggregate without correction
+                                       while(values.hasNext())
+                                       {
+                                               WeightedCell 
value=values.next();
+                                               v=aggop.increOp.fn.execute(v, 
value.getValue()*value.getWeight());
+                                       }
+                                       
+                                       outCell.setValue(v);
+                                       outCell.setWeight(1);
+                               }                               
+                       }
+                       else
+                               throw new IOException("Unsupported operator in 
instruction: " + ins);
+               }
+               catch(Exception ex)
+               {
+                       throw new IOException(ex);
+               }
+               
+               //collect the output (to reducer)
+               if( isPartialAgg )
+                       out.collect(key, outCell);
+               
+               reporter.incrCounter(Counters.COMBINE_OR_REDUCE_TIME, 
System.currentTimeMillis()-start);
+       }
+
+       @Override
+       public void configure(JobConf job)
+       {
+               try 
+               {
+                       GroupedAggregateInstruction[] grpaggIns = 
MRJobConfiguration.getGroupedAggregateInstructions(job);
+                       if( grpaggIns != null ) 
+                               for(GroupedAggregateInstruction ins : grpaggIns)
+                               {
+                                       grpaggInstructions.put(ins.output, 
ins);        
+                                       if( ins.getOperator() instanceof 
CMOperator )
+                                               cmFn.put(ins.output, 
CM.getCMFnObject(((CMOperator)ins.getOperator()).getAggOpType()));
+                               }
+               } 
+               catch (Exception e) 
+               {
+                       throw new RuntimeException(e);
+               } 
+       }
+       
+       @Override
+       public void close()
+       {
+               //do nothing, overrides unnecessary handling in superclass
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/05d2c0a8/src/main/java/org/apache/sysml/runtime/matrix/sort/IndexSortComparable.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/sort/IndexSortComparable.java 
b/src/main/java/org/apache/sysml/runtime/matrix/sort/IndexSortComparable.java
index a08631d..fa9843a 100644
--- 
a/src/main/java/org/apache/sysml/runtime/matrix/sort/IndexSortComparable.java
+++ 
b/src/main/java/org/apache/sysml/runtime/matrix/sort/IndexSortComparable.java
@@ -1,84 +1,84 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.matrix.sort;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.WritableComparable;
-
-@SuppressWarnings("rawtypes")
-public class IndexSortComparable implements WritableComparable
-{
-       
-       protected DoubleWritable _dval = null;
-       protected LongWritable _lval = null; 
-       
-       public IndexSortComparable()
-       {
-               _dval = new DoubleWritable();
-               _lval = new LongWritable();
-       }
-       
-       public void set(double dval, long lval)
-       {
-               _dval.set(dval);
-               _lval.set(lval);
-       }
-
-       @Override
-       public void readFields(DataInput arg0)
-               throws IOException 
-       {
-               _dval.readFields(arg0);
-               _lval.readFields(arg0);
-       }
-
-       @Override
-       public void write(DataOutput arg0) 
-               throws IOException 
-       {
-               _dval.write(arg0);
-               _lval.write(arg0);
-       }
-
-       @Override
-       public int compareTo(Object o) 
-       {
-               //compare only double value (e.g., for partitioner)
-               if( o instanceof DoubleWritable ) {
-                       return _dval.compareTo((DoubleWritable) o);
-               }
-               //compare double value and index (e.g., for stable sort)
-               else if( o instanceof IndexSortComparable) {
-                       IndexSortComparable that = (IndexSortComparable)o;
-                       int tmp = _dval.compareTo(that._dval);
-                       if( tmp==0 ) //secondary sort
-                               tmp = _lval.compareTo(that._lval);
-                       return tmp;
-               }       
-               else {
-                       throw new RuntimeException("Unsupported comparison 
involving class: "+o.getClass().getName());
-               }
-       }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.matrix.sort;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.WritableComparable;
+
+@SuppressWarnings("rawtypes")
+public class IndexSortComparable implements WritableComparable
+{
+       
+       protected DoubleWritable _dval = null;
+       protected LongWritable _lval = null; 
+       
+       public IndexSortComparable()
+       {
+               _dval = new DoubleWritable();
+               _lval = new LongWritable();
+       }
+       
+       public void set(double dval, long lval)
+       {
+               _dval.set(dval);
+               _lval.set(lval);
+       }
+
+       @Override
+       public void readFields(DataInput arg0)
+               throws IOException 
+       {
+               _dval.readFields(arg0);
+               _lval.readFields(arg0);
+       }
+
+       @Override
+       public void write(DataOutput arg0) 
+               throws IOException 
+       {
+               _dval.write(arg0);
+               _lval.write(arg0);
+       }
+
+       @Override
+       public int compareTo(Object o) 
+       {
+               //compare only double value (e.g., for partitioner)
+               if( o instanceof DoubleWritable ) {
+                       return _dval.compareTo((DoubleWritable) o);
+               }
+               //compare double value and index (e.g., for stable sort)
+               else if( o instanceof IndexSortComparable) {
+                       IndexSortComparable that = (IndexSortComparable)o;
+                       int tmp = _dval.compareTo(that._dval);
+                       if( tmp==0 ) //secondary sort
+                               tmp = _lval.compareTo(that._lval);
+                       return tmp;
+               }       
+               else {
+                       throw new RuntimeException("Unsupported comparison 
involving class: "+o.getClass().getName());
+               }
+       }
+}

Reply via email to