http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/7ba17c7f/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
index 1dd3600..06a2005 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -84,8 +84,8 @@ public class SparkExecutionContext extends ExecutionContext
 {
        private static final Log LOG = 
LogFactory.getLog(SparkExecutionContext.class.getName());
        private static final boolean LDEBUG = false; //local debug flag
-       
-       //internal configurations 
+
+       //internal configurations
        private static boolean LAZY_SPARKCTX_CREATION = true;
        private static boolean ASYNCHRONOUS_VAR_DESTROY = true;
 
@@ -93,16 +93,16 @@ public class SparkExecutionContext extends ExecutionContext
 
        //executor memory and relative fractions as obtained from the spark 
configuration
        private static SparkClusterConfig _sconf = null;
-       
-       //singleton spark context (as there can be only one spark context per 
JVM) 
-       private static JavaSparkContext _spctx = null; 
-       
-       //registry of parallelized RDDs to enforce that at any time, we spent 
at most 
+
+       //singleton spark context (as there can be only one spark context per 
JVM)
+       private static JavaSparkContext _spctx = null;
+
+       //registry of parallelized RDDs to enforce that at any time, we spent 
at most
        //10% of JVM max heap size for parallelized RDDs; if this is not 
sufficient,
        //matrices or frames are exported to HDFS and the RDDs are created from 
files.
        //TODO unify memory management for CP, par RDDs, and potentially 
broadcasts
        private static MemoryManagerParRDDs _parRDDs = new 
MemoryManagerParRDDs(0.1);
-       
+
        static {
                // for internal debugging only
                if( LDEBUG ) {
@@ -111,31 +111,31 @@ public class SparkExecutionContext extends 
ExecutionContext
                }
        }
 
-       protected SparkExecutionContext(boolean allocateVars, Program prog) 
+       protected SparkExecutionContext(boolean allocateVars, Program prog)
        {
                //protected constructor to force use of ExecutionContextFactory
                super( allocateVars, prog );
-                               
+
                //spark context creation via internal initializer
                if( !(LAZY_SPARKCTX_CREATION && 
OptimizerUtils.isHybridExecutionMode()) ) {
                        initSparkContext();
                }
        }
-               
+
        /**
         * Returns the used singleton spark context. In case of lazy spark 
context
         * creation, this methods blocks until the spark context is created.
-        *  
+        *
         * @return java spark context
         */
        public JavaSparkContext getSparkContext()
        {
-               //lazy spark context creation on demand (lazy instead of 
asynchronous 
+               //lazy spark context creation on demand (lazy instead of 
asynchronous
                //to avoid wait for uninitialized spark context on close)
                if( LAZY_SPARKCTX_CREATION ) {
                        initSparkContext();
                }
-               
+
                //return the created spark context
                return _spctx;
        }
@@ -144,11 +144,11 @@ public class SparkExecutionContext extends 
ExecutionContext
                initSparkContext();
                return _spctx;
        }
-       
+
        /**
         * Indicates if the spark context has been created or has
         * been passed in from outside.
-        * 
+        *
         * @return true if spark context created
         */
        public synchronized static boolean isSparkContextCreated() {
@@ -159,26 +159,25 @@ public class SparkExecutionContext extends 
ExecutionContext
                _spctx = null;
        }
 
-       public void close() 
+       public void close()
        {
                synchronized( SparkExecutionContext.class ) {
-                       if( _spctx != null ) 
+                       if( _spctx != null )
                        {
                                //stop the spark context if existing
                                _spctx.stop();
-                               
+
                                //make sure stopped context is never used again
-                               _spctx = null; 
+                               _spctx = null;
                        }
-                               
+
                }
        }
-       
+
        public static boolean isLazySparkContextCreation(){
                return LAZY_SPARKCTX_CREATION;
        }
 
-       @SuppressWarnings("deprecation")
        private synchronized static void initSparkContext()
        {
                //check for redundant spark context init
@@ -186,24 +185,19 @@ public class SparkExecutionContext extends 
ExecutionContext
                        return;
 
                long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
-               
+
                //create a default spark context (master, appname, etc refer to 
system properties
                //as given in the spark configuration or during spark-submit)
-               
+
                Object mlCtxObj = MLContextProxy.getActiveMLContext();
-               if(mlCtxObj != null) 
+               if(mlCtxObj != null)
                {
                        // This is when DML is called through spark shell
                        // Will clean the passing of static variables later as 
this involves minimal change to DMLScript
-                       if (mlCtxObj instanceof org.apache.sysml.api.MLContext) 
{
-                               org.apache.sysml.api.MLContext mlCtx = 
(org.apache.sysml.api.MLContext) mlCtxObj;
-                               _spctx = new 
JavaSparkContext(mlCtx.getSparkContext());
-                       } else if (mlCtxObj instanceof 
org.apache.sysml.api.mlcontext.MLContext) {
-                               org.apache.sysml.api.mlcontext.MLContext mlCtx 
= (org.apache.sysml.api.mlcontext.MLContext) mlCtxObj;
-                               _spctx = 
MLContextUtil.getJavaSparkContext(mlCtx);
-                       }
+                       org.apache.sysml.api.mlcontext.MLContext mlCtx = 
(org.apache.sysml.api.mlcontext.MLContext) mlCtxObj;
+                       _spctx = MLContextUtil.getJavaSparkContext(mlCtx);
                }
-               else 
+               else
                {
                        if(DMLScript.USE_LOCAL_SPARK_CONFIG) {
                                // For now set 4 cores for integration testing 
:)
@@ -220,128 +214,128 @@ public class SparkExecutionContext extends 
ExecutionContext
                                SparkConf conf = createSystemMLSparkConf();
                                _spctx = new JavaSparkContext(conf);
                        }
-                       
+
                        _parRDDs.clear();
                }
-               
-               // Set warning if spark.driver.maxResultSize is not set. It 
needs to be set before starting Spark Context for CP collect 
+
+               // Set warning if spark.driver.maxResultSize is not set. It 
needs to be set before starting Spark Context for CP collect
                String strDriverMaxResSize = 
_spctx.getConf().get("spark.driver.maxResultSize", "1g");
-               long driverMaxResSize = 
UtilFunctions.parseMemorySize(strDriverMaxResSize); 
+               long driverMaxResSize = 
UtilFunctions.parseMemorySize(strDriverMaxResSize);
                if (driverMaxResSize != 0 && 
driverMaxResSize<OptimizerUtils.getLocalMemBudget() && 
!DMLScript.USE_LOCAL_SPARK_CONFIG)
                        LOG.warn("Configuration parameter 
spark.driver.maxResultSize set to " + 
UtilFunctions.formatMemorySize(driverMaxResSize) + "."
-                                       + " You can set it through Spark 
default configuration setting either to 0 (unlimited) or to available memory 
budget of size " 
+                                       + " You can set it through Spark 
default configuration setting either to 0 (unlimited) or to available memory 
budget of size "
                                        + 
UtilFunctions.formatMemorySize((long)OptimizerUtils.getLocalMemBudget()) + ".");
-               
+
                //globally add binaryblock serialization framework for all hdfs 
read/write operations
-               //TODO if spark context passed in from outside (mlcontext), we 
need to clean this up at the end 
+               //TODO if spark context passed in from outside (mlcontext), we 
need to clean this up at the end
                if( MRJobConfiguration.USE_BINARYBLOCK_SERIALIZATION )
                        
MRJobConfiguration.addBinaryBlockSerializationFramework( 
_spctx.hadoopConfiguration() );
-               
+
                //statistics maintenance
                if( DMLScript.STATISTICS ){
                        Statistics.setSparkCtxCreateTime(System.nanoTime()-t0);
                }
-       }       
-       
+       }
+
        /**
         * Sets up a SystemML-preferred Spark configuration based on the 
implicit
         * default configuration (as passed via configurations from outside).
-        * 
+        *
         * @return spark configuration
         */
        public static SparkConf createSystemMLSparkConf() {
                SparkConf conf = new SparkConf();
-               
+
                //always set unlimited result size (required for cp collect)
                conf.set("spark.driver.maxResultSize", "0");
-               
+
                //always use the fair scheduler (for single jobs, it's 
equivalent to fifo
                //but for concurrent jobs in parfor it ensures better data 
locality because
                //round robin assignment mitigates the problem of 'sticky 
slots')
                if( FAIR_SCHEDULER_MODE ) {
                        conf.set("spark.scheduler.mode", "FAIR");
                }
-               
+
                //increase scheduler delay (usually more robust due to better 
data locality)
                if( !conf.contains("spark.locality.wait") ) { //default 3s
                        conf.set("spark.locality.wait", "5s");
                }
-               
+
                return conf;
        }
 
        /**
         * Spark instructions should call this for all matrix inputs except 
broadcast
         * variables.
-        * 
+        *
         * @param varname variable name
         * @return JavaPairRDD of MatrixIndexes-MatrixBlocks
         * @throws DMLRuntimeException if DMLRuntimeException occurs
         */
        @SuppressWarnings("unchecked")
-       public JavaPairRDD<MatrixIndexes,MatrixBlock> 
getBinaryBlockRDDHandleForVariable( String varname ) 
-               throws DMLRuntimeException 
+       public JavaPairRDD<MatrixIndexes,MatrixBlock> 
getBinaryBlockRDDHandleForVariable( String varname )
+               throws DMLRuntimeException
        {
                return (JavaPairRDD<MatrixIndexes,MatrixBlock>) 
getRDDHandleForVariable( varname, InputInfo.BinaryBlockInputInfo);
        }
-       
+
        /**
         * Spark instructions should call this for all frame inputs except 
broadcast
         * variables.
-        * 
+        *
         * @param varname variable name
         * @return JavaPairRDD of Longs-FrameBlocks
         * @throws DMLRuntimeException if DMLRuntimeException occurs
         */
        @SuppressWarnings("unchecked")
-       public JavaPairRDD<Long,FrameBlock> 
getFrameBinaryBlockRDDHandleForVariable( String varname ) 
-               throws DMLRuntimeException 
+       public JavaPairRDD<Long,FrameBlock> 
getFrameBinaryBlockRDDHandleForVariable( String varname )
+               throws DMLRuntimeException
        {
                JavaPairRDD<Long,FrameBlock> out = 
(JavaPairRDD<Long,FrameBlock>) getRDDHandleForVariable( varname, 
InputInfo.BinaryBlockInputInfo);
                return out;
        }
 
-       public JavaPairRDD<?,?> getRDDHandleForVariable( String varname, 
InputInfo inputInfo ) 
+       public JavaPairRDD<?,?> getRDDHandleForVariable( String varname, 
InputInfo inputInfo )
                throws DMLRuntimeException
        {
                Data dat = getVariable(varname);
                if( dat instanceof MatrixObject ) {
                        MatrixObject mo = getMatrixObject(varname);
-                       return getRDDHandleForMatrixObject(mo, inputInfo);      
+                       return getRDDHandleForMatrixObject(mo, inputInfo);
                }
                else if( dat instanceof FrameObject ) {
                        FrameObject fo = getFrameObject(varname);
-                       return getRDDHandleForFrameObject(fo, inputInfo);       
+                       return getRDDHandleForFrameObject(fo, inputInfo);
                }
                else {
                        throw new DMLRuntimeException("Failed to obtain RDD for 
data type other than matrix or frame.");
                }
        }
-       
+
        /**
-        * This call returns an RDD handle for a given matrix object. This 
includes 
-        * the creation of RDDs for in-memory or binary-block HDFS data. 
-        * 
+        * This call returns an RDD handle for a given matrix object. This 
includes
+        * the creation of RDDs for in-memory or binary-block HDFS data.
+        *
         * @param mo matrix object
         * @param inputInfo input info
         * @return JavaPairRDD handle for a matrix object
         * @throws DMLRuntimeException if DMLRuntimeException occurs
         */
        @SuppressWarnings("unchecked")
-       public JavaPairRDD<?,?> getRDDHandleForMatrixObject( MatrixObject mo, 
InputInfo inputInfo ) 
+       public JavaPairRDD<?,?> getRDDHandleForMatrixObject( MatrixObject mo, 
InputInfo inputInfo )
                throws DMLRuntimeException
-       {               
+       {
                //NOTE: MB this logic should be integrated into MatrixObject
-               //However, for now we cannot assume that spark libraries are 
-               //always available and hence only store generic references in 
+               //However, for now we cannot assume that spark libraries are
+               //always available and hence only store generic references in
                //matrix object while all the logic is in the SparkExecContext
-               
+
                JavaSparkContext sc = getSparkContext();
                JavaPairRDD<?,?> rdd = null;
                //CASE 1: rdd already existing (reuse if checkpoint or trigger
-               //pending rdd operations if not yet cached but prevent to 
re-evaluate 
+               //pending rdd operations if not yet cached but prevent to 
re-evaluate
                //rdd operations if already executed and cached
-               if(    mo.getRDDHandle()!=null 
+               if(    mo.getRDDHandle()!=null
                        && (mo.getRDDHandle().isCheckpointRDD() || 
!mo.isCached(false)) )
                {
                        //return existing rdd handling (w/o input format change)
@@ -359,7 +353,7 @@ public class SparkExecutionContext extends ExecutionContext
                                if( mo.isDirty() || !mo.isHDFSFileExists() ) 
//write if necessary
                                        mo.exportData();
                                rdd = sc.hadoopFile( mo.getFileName(), 
inputInfo.inputFormatClass, inputInfo.inputKeyClass, inputInfo.inputValueClass);
-                               rdd = 
SparkUtils.copyBinaryBlockMatrix((JavaPairRDD<MatrixIndexes, MatrixBlock>)rdd); 
//cp is workaround for read bug                   
+                               rdd = 
SparkUtils.copyBinaryBlockMatrix((JavaPairRDD<MatrixIndexes, MatrixBlock>)rdd); 
//cp is workaround for read bug
                                fromFile = true;
                        }
                        else { //default case
@@ -368,7 +362,7 @@ public class SparkExecutionContext extends ExecutionContext
                                mo.release(); //unpin matrix
                                _parRDDs.registerRDD(rdd.id(), 
OptimizerUtils.estimatePartitionedSizeExactSparsity(mc), true);
                        }
-                       
+
                        //keep rdd handle for future operations on it
                        RDDObject rddhandle = new RDDObject(rdd, 
mo.getVarName());
                        rddhandle.setHDFSFile(fromFile);
@@ -396,43 +390,43 @@ public class SparkExecutionContext extends 
ExecutionContext
                        else {
                                throw new DMLRuntimeException("Incorrect input 
format in getRDDHandleForVariable");
                        }
-                       
+
                        //keep rdd handle for future operations on it
                        RDDObject rddhandle = new RDDObject(rdd, 
mo.getVarName());
                        rddhandle.setHDFSFile(true);
                        mo.setRDDHandle(rddhandle);
                }
-               
+
                return rdd;
        }
-       
+
        /**
         * FIXME: currently this implementation assumes matrix representations 
but frame signature
         * in order to support the old transform implementation.
-        * 
+        *
         * @param fo frame object
         * @param inputInfo input info
         * @return JavaPairRDD handle for a frame object
         * @throws DMLRuntimeException if DMLRuntimeException occurs
         */
        @SuppressWarnings("unchecked")
-       public JavaPairRDD<?,?> getRDDHandleForFrameObject( FrameObject fo, 
InputInfo inputInfo ) 
+       public JavaPairRDD<?,?> getRDDHandleForFrameObject( FrameObject fo, 
InputInfo inputInfo )
                throws DMLRuntimeException
-       {       
+       {
                //NOTE: MB this logic should be integrated into FrameObject
-               //However, for now we cannot assume that spark libraries are 
-               //always available and hence only store generic references in 
+               //However, for now we cannot assume that spark libraries are
+               //always available and hence only store generic references in
                //matrix object while all the logic is in the SparkExecContext
-               
-               InputInfo inputInfo2 = 
(inputInfo==InputInfo.BinaryBlockInputInfo) ? 
+
+               InputInfo inputInfo2 = 
(inputInfo==InputInfo.BinaryBlockInputInfo) ?
                                InputInfo.BinaryBlockFrameInputInfo : inputInfo;
-               
+
                JavaSparkContext sc = getSparkContext();
                JavaPairRDD<?,?> rdd = null;
                //CASE 1: rdd already existing (reuse if checkpoint or trigger
-               //pending rdd operations if not yet cached but prevent to 
re-evaluate 
+               //pending rdd operations if not yet cached but prevent to 
re-evaluate
                //rdd operations if already executed and cached
-               if(    fo.getRDDHandle()!=null 
+               if(    fo.getRDDHandle()!=null
                        && (fo.getRDDHandle().isCheckpointRDD() || 
!fo.isCached(false)) )
                {
                        //return existing rdd handling (w/o input format change)
@@ -451,7 +445,7 @@ public class SparkExecutionContext extends ExecutionContext
                                        fo.exportData();
                                }
                                rdd = sc.hadoopFile( fo.getFileName(), 
inputInfo2.inputFormatClass, inputInfo2.inputKeyClass, 
inputInfo2.inputValueClass);
-                               rdd = ((JavaPairRDD<LongWritable, 
FrameBlock>)rdd).mapToPair( new CopyFrameBlockPairFunction() ); //cp is 
workaround for read bug                       
+                               rdd = ((JavaPairRDD<LongWritable, 
FrameBlock>)rdd).mapToPair( new CopyFrameBlockPairFunction() ); //cp is 
workaround for read bug
                                fromFile = true;
                        }
                        else { //default case
@@ -460,7 +454,7 @@ public class SparkExecutionContext extends ExecutionContext
                                fo.release(); //unpin frame
                                _parRDDs.registerRDD(rdd.id(), 
OptimizerUtils.estimatePartitionedSizeExactSparsity(mc), true);
                        }
-                       
+
                        //keep rdd handle for future operations on it
                        RDDObject rddhandle = new RDDObject(rdd, 
fo.getVarName());
                        rddhandle.setHDFSFile(fromFile);
@@ -487,64 +481,64 @@ public class SparkExecutionContext extends 
ExecutionContext
                        else {
                                throw new DMLRuntimeException("Incorrect input 
format in getRDDHandleForVariable");
                        }
-                       
+
                        //keep rdd handle for future operations on it
                        RDDObject rddhandle = new RDDObject(rdd, 
fo.getVarName());
                        rddhandle.setHDFSFile(true);
                        fo.setRDDHandle(rddhandle);
                }
-               
+
                return rdd;
        }
-       
+
        /**
         * TODO So far we only create broadcast variables but never destroy
         * them. This is a memory leak which might lead to executor 
out-of-memory.
-        * However, in order to handle this, we need to keep track when 
broadcast 
+        * However, in order to handle this, we need to keep track when 
broadcast
         * variables are no longer required.
-        * 
+        *
         * @param varname variable name
         * @return wrapper for broadcast variables
         * @throws DMLRuntimeException if DMLRuntimeException occurs
         */
        @SuppressWarnings("unchecked")
-       public PartitionedBroadcast<MatrixBlock> getBroadcastForVariable( 
String varname ) 
+       public PartitionedBroadcast<MatrixBlock> getBroadcastForVariable( 
String varname )
                throws DMLRuntimeException
-       {               
+       {
                long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
 
                MatrixObject mo = getMatrixObject(varname);
-               
+
                PartitionedBroadcast<MatrixBlock> bret = null;
-               
+
                //reuse existing broadcast handle
-               if( mo.getBroadcastHandle()!=null 
-                       && mo.getBroadcastHandle().isValid() ) 
+               if( mo.getBroadcastHandle()!=null
+                       && mo.getBroadcastHandle().isValid() )
                {
                        bret = mo.getBroadcastHandle().getBroadcast();
                }
-               
+
                //create new broadcast handle (never created, evicted)
-               if( bret == null ) 
+               if( bret == null )
                {
                        //account for overwritten invalid broadcast (e.g., 
evicted)
                        if( mo.getBroadcastHandle()!=null )
                                
CacheableData.addBroadcastSize(-mo.getBroadcastHandle().getSize());
-                       
-                       //obtain meta data for matrix 
+
+                       //obtain meta data for matrix
                        int brlen = (int) mo.getNumRowsPerBlock();
                        int bclen = (int) mo.getNumColumnsPerBlock();
-                       
+
                        //create partitioned matrix block and release memory 
consumed by input
                        MatrixBlock mb = mo.acquireRead();
                        PartitionedBlock<MatrixBlock> pmb = new 
PartitionedBlock<MatrixBlock>(mb, brlen, bclen);
                        mo.release();
-                       
+
                        //determine coarse-grained partitioning
                        int numPerPart = 
PartitionedBroadcast.computeBlocksPerPartition(mo.getNumRows(), 
mo.getNumColumns(), brlen, bclen);
-                       int numParts = (int) 
Math.ceil((double)pmb.getNumRowBlocks()*pmb.getNumColumnBlocks() / numPerPart); 
+                       int numParts = (int) 
Math.ceil((double)pmb.getNumRowBlocks()*pmb.getNumColumnBlocks() / numPerPart);
                        Broadcast<PartitionedBlock<MatrixBlock>>[] ret = new 
Broadcast[numParts];
-                                       
+
                        //create coarse-grained partitioned broadcasts
                        if( numParts > 1 ) {
                                for( int i=0; i<numParts; i++ ) {
@@ -557,60 +551,60 @@ public class SparkExecutionContext extends 
ExecutionContext
                        else { //single partition
                                ret[0] = getSparkContext().broadcast(pmb);
                        }
-               
+
                        bret = new PartitionedBroadcast<MatrixBlock>(ret);
-                       BroadcastObject<MatrixBlock> bchandle = new 
BroadcastObject<MatrixBlock>(bret, varname, 
+                       BroadcastObject<MatrixBlock> bchandle = new 
BroadcastObject<MatrixBlock>(bret, varname,
                                        
OptimizerUtils.estimatePartitionedSizeExactSparsity(mo.getMatrixCharacteristics()));
                        mo.setBroadcastHandle(bchandle);
                        CacheableData.addBroadcastSize(bchandle.getSize());
                }
-               
+
                if (DMLScript.STATISTICS) {
                        Statistics.accSparkBroadCastTime(System.nanoTime() - 
t0);
                        Statistics.incSparkBroadcastCount(1);
                }
-               
+
                return bret;
        }
-       
+
        @SuppressWarnings("unchecked")
-       public PartitionedBroadcast<FrameBlock> getBroadcastForFrameVariable( 
String varname) 
+       public PartitionedBroadcast<FrameBlock> getBroadcastForFrameVariable( 
String varname)
                throws DMLRuntimeException
-       {               
+       {
                long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
 
                FrameObject fo = getFrameObject(varname);
-               
+
                PartitionedBroadcast<FrameBlock> bret = null;
-               
+
                //reuse existing broadcast handle
-               if( fo.getBroadcastHandle()!=null 
-                       && fo.getBroadcastHandle().isValid() ) 
+               if( fo.getBroadcastHandle()!=null
+                       && fo.getBroadcastHandle().isValid() )
                {
                        bret = fo.getBroadcastHandle().getBroadcast();
                }
-               
+
                //create new broadcast handle (never created, evicted)
-               if( bret == null ) 
+               if( bret == null )
                {
                        //account for overwritten invalid broadcast (e.g., 
evicted)
                        if( fo.getBroadcastHandle()!=null )
                                
CacheableData.addBroadcastSize(-fo.getBroadcastHandle().getSize());
-                       
-                       //obtain meta data for frame 
+
+                       //obtain meta data for frame
                        int bclen = (int) fo.getNumColumns();
                        int brlen = OptimizerUtils.getDefaultFrameSize();
-                       
+
                        //create partitioned frame block and release memory 
consumed by input
                        FrameBlock mb = fo.acquireRead();
                        PartitionedBlock<FrameBlock> pmb = new 
PartitionedBlock<FrameBlock>(mb, brlen, bclen);
                        fo.release();
-                       
+
                        //determine coarse-grained partitioning
                        int numPerPart = 
PartitionedBroadcast.computeBlocksPerPartition(fo.getNumRows(), 
fo.getNumColumns(), brlen, bclen);
-                       int numParts = (int) 
Math.ceil((double)pmb.getNumRowBlocks()*pmb.getNumColumnBlocks() / numPerPart); 
+                       int numParts = (int) 
Math.ceil((double)pmb.getNumRowBlocks()*pmb.getNumColumnBlocks() / numPerPart);
                        Broadcast<PartitionedBlock<FrameBlock>>[] ret = new 
Broadcast[numParts];
-                                       
+
                        //create coarse-grained partitioned broadcasts
                        if( numParts > 1 ) {
                                for( int i=0; i<numParts; i++ ) {
@@ -623,41 +617,41 @@ public class SparkExecutionContext extends 
ExecutionContext
                        else { //single partition
                                ret[0] = getSparkContext().broadcast(pmb);
                        }
-               
+
                        bret = new PartitionedBroadcast<FrameBlock>(ret);
-                       BroadcastObject<FrameBlock> bchandle = new 
BroadcastObject<FrameBlock>(bret, varname,  
+                       BroadcastObject<FrameBlock> bchandle = new 
BroadcastObject<FrameBlock>(bret, varname,
                                        
OptimizerUtils.estimatePartitionedSizeExactSparsity(fo.getMatrixCharacteristics()));
                        fo.setBroadcastHandle(bchandle);
                        CacheableData.addBroadcastSize(bchandle.getSize());
                }
-               
+
                if (DMLScript.STATISTICS) {
                        Statistics.accSparkBroadCastTime(System.nanoTime() - 
t0);
                        Statistics.incSparkBroadcastCount(1);
                }
-               
+
                return bret;
        }
 
        /**
-        * Keep the output rdd of spark rdd operations as meta data of 
matrix/frame 
+        * Keep the output rdd of spark rdd operations as meta data of 
matrix/frame
         * objects in the symbol table.
-        * 
+        *
         * @param varname variable name
         * @param rdd JavaPairRDD handle for variable
         * @throws DMLRuntimeException if DMLRuntimeException occurs
         */
-       public void setRDDHandleForVariable(String varname, JavaPairRDD<?,?> 
rdd) 
+       public void setRDDHandleForVariable(String varname, JavaPairRDD<?,?> 
rdd)
                throws DMLRuntimeException
        {
                CacheableData<?> obj = getCacheableData(varname);
                RDDObject rddhandle = new RDDObject(rdd, varname);
                obj.setRDDHandle( rddhandle );
        }
-       
+
        /**
         * Utility method for creating an RDD out of an in-memory matrix block.
-        * 
+        *
         * @param sc java spark context
         * @param src matrix block
         * @param brlen block row length
@@ -665,13 +659,13 @@ public class SparkExecutionContext extends 
ExecutionContext
         * @return JavaPairRDD handle to matrix block
         * @throws DMLRuntimeException if DMLRuntimeException occurs
         */
-       public static JavaPairRDD<MatrixIndexes,MatrixBlock> 
toMatrixJavaPairRDD(JavaSparkContext sc, MatrixBlock src, int brlen, int bclen) 
+       public static JavaPairRDD<MatrixIndexes,MatrixBlock> 
toMatrixJavaPairRDD(JavaSparkContext sc, MatrixBlock src, int brlen, int bclen)
                throws DMLRuntimeException
-       {       
+       {
                long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
                LinkedList<Tuple2<MatrixIndexes,MatrixBlock>> list = new 
LinkedList<Tuple2<MatrixIndexes,MatrixBlock>>();
-               
-               if(    src.getNumRows() <= brlen 
+
+               if(    src.getNumRows() <= brlen
                    && src.getNumColumns() <= bclen )
                {
                        list.addLast(new Tuple2<MatrixIndexes,MatrixBlock>(new 
MatrixIndexes(1,1), src));
@@ -679,44 +673,44 @@ public class SparkExecutionContext extends 
ExecutionContext
                else
                {
                        boolean sparse = src.isInSparseFormat();
-                       
+
                        //create and write subblocks of matrix
                        for(int blockRow = 0; blockRow < 
(int)Math.ceil(src.getNumRows()/(double)brlen); blockRow++)
                                for(int blockCol = 0; blockCol < 
(int)Math.ceil(src.getNumColumns()/(double)bclen); blockCol++)
                                {
                                        int maxRow = (blockRow*brlen + brlen < 
src.getNumRows()) ? brlen : src.getNumRows() - blockRow*brlen;
                                        int maxCol = (blockCol*bclen + bclen < 
src.getNumColumns()) ? bclen : src.getNumColumns() - blockCol*bclen;
-                                       
+
                                        MatrixBlock block = new 
MatrixBlock(maxRow, maxCol, sparse);
-                                               
+
                                        int row_offset = blockRow*brlen;
                                        int col_offset = blockCol*bclen;
-       
+
                                        //copy submatrix to block
-                                       src.sliceOperations( row_offset, 
row_offset+maxRow-1, 
-                                                                    
col_offset, col_offset+maxCol-1, block );                                       
           
-                                       
+                                       src.sliceOperations( row_offset, 
row_offset+maxRow-1,
+                                                                    
col_offset, col_offset+maxCol-1, block );
+
                                        //append block to sequence file
                                        MatrixIndexes indexes = new 
MatrixIndexes(blockRow+1, blockCol+1);
                                        list.addLast(new 
Tuple2<MatrixIndexes,MatrixBlock>(indexes, block));
                                }
                }
-               
+
                JavaPairRDD<MatrixIndexes,MatrixBlock> result = 
sc.parallelizePairs(list);
                if (DMLScript.STATISTICS) {
                        Statistics.accSparkParallelizeTime(System.nanoTime() - 
t0);
                        Statistics.incSparkParallelizeCount(1);
                }
-               
+
                return result;
        }
 
-       public static JavaPairRDD<Long,FrameBlock> 
toFrameJavaPairRDD(JavaSparkContext sc, FrameBlock src) 
+       public static JavaPairRDD<Long,FrameBlock> 
toFrameJavaPairRDD(JavaSparkContext sc, FrameBlock src)
                throws DMLRuntimeException
-       {       
+       {
                long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
                LinkedList<Tuple2<Long,FrameBlock>> list = new 
LinkedList<Tuple2<Long,FrameBlock>>();
-                       
+
                //create and write subblocks of matrix
                int blksize = ConfigurationManager.getBlocksize();
                for(int blockRow = 0; blockRow < 
(int)Math.ceil(src.getNumRows()/(double)blksize); blockRow++)
@@ -725,28 +719,28 @@ public class SparkExecutionContext extends 
ExecutionContext
                        int roffset = blockRow*blksize;
 
                        FrameBlock block = new FrameBlock(src.getSchema());
-                       
+
                        //copy sub frame to block, incl meta data on first
-                       src.sliceOperations( roffset, roffset+maxRow-1, 0, 
src.getNumColumns()-1, block );              
+                       src.sliceOperations( roffset, roffset+maxRow-1, 0, 
src.getNumColumns()-1, block );
                        if( roffset == 0 )
                                
block.setColumnMetadata(src.getColumnMetadata());
-                       
+
                        //append block to sequence file
                        list.addLast(new 
Tuple2<Long,FrameBlock>((long)roffset+1, block));
                }
-               
+
                JavaPairRDD<Long,FrameBlock> result = sc.parallelizePairs(list);
                if (DMLScript.STATISTICS) {
                        Statistics.accSparkParallelizeTime(System.nanoTime() - 
t0);
                        Statistics.incSparkParallelizeCount(1);
                }
-               
+
                return result;
        }
-       
+
        /**
         * This method is a generic abstraction for calls from the buffer pool.
-        * 
+        *
         * @param rdd rdd object
         * @param rlen number of rows
         * @param clen number of columns
@@ -757,21 +751,21 @@ public class SparkExecutionContext extends 
ExecutionContext
         * @throws DMLRuntimeException if DMLRuntimeException occurs
         */
        @SuppressWarnings("unchecked")
-       public static MatrixBlock toMatrixBlock(RDDObject rdd, int rlen, int 
clen, int brlen, int bclen, long nnz) 
+       public static MatrixBlock toMatrixBlock(RDDObject rdd, int rlen, int 
clen, int brlen, int bclen, long nnz)
                throws DMLRuntimeException
-       {                       
+       {
                return toMatrixBlock(
-                               (JavaPairRDD<MatrixIndexes, MatrixBlock>) 
rdd.getRDD(), 
+                               (JavaPairRDD<MatrixIndexes, MatrixBlock>) 
rdd.getRDD(),
                                rlen, clen, brlen, bclen, nnz);
        }
-       
+
        /**
-        * Utility method for creating a single matrix block out of a binary 
block RDD. 
-        * Note that this collect call might trigger execution of any pending 
transformations. 
-        * 
+        * Utility method for creating a single matrix block out of a binary 
block RDD.
+        * Note that this collect call might trigger execution of any pending 
transformations.
+        *
         * NOTE: This is an unguarded utility function, which requires memory 
for both the output matrix
         * and its collected, blocked representation.
-        * 
+        *
         * @param rdd JavaPairRDD for matrix block
         * @param rlen number of rows
         * @param clen number of columns
@@ -781,19 +775,19 @@ public class SparkExecutionContext extends 
ExecutionContext
         * @return matrix block
         * @throws DMLRuntimeException if DMLRuntimeException occurs
         */
-       public static MatrixBlock 
toMatrixBlock(JavaPairRDD<MatrixIndexes,MatrixBlock> rdd, int rlen, int clen, 
int brlen, int bclen, long nnz) 
+       public static MatrixBlock 
toMatrixBlock(JavaPairRDD<MatrixIndexes,MatrixBlock> rdd, int rlen, int clen, 
int brlen, int bclen, long nnz)
                throws DMLRuntimeException
        {
-               
+
                long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
 
                MatrixBlock out = null;
-               
+
                if( rlen <= brlen && clen <= bclen ) //SINGLE BLOCK
                {
                        //special case without copy and nnz maintenance
                        List<Tuple2<MatrixIndexes,MatrixBlock>> list = 
rdd.collect();
-                       
+
                        if( list.size()>1 )
                                throw new DMLRuntimeException("Expecting no 
more than one result block.");
                        else if( list.size()==1 )
@@ -806,70 +800,70 @@ public class SparkExecutionContext extends 
ExecutionContext
                        //determine target sparse/dense representation
                        long lnnz = (nnz >= 0) ? nnz : (long)rlen * clen;
                        boolean sparse = 
MatrixBlock.evalSparseFormatInMemory(rlen, clen, lnnz);
-                       
+
                        //create output matrix block (w/ lazy allocation)
                        out = new MatrixBlock(rlen, clen, sparse, lnnz);
-                       
+
                        List<Tuple2<MatrixIndexes,MatrixBlock>> list = 
rdd.collect();
-                       
+
                        //copy blocks one-at-a-time into output matrix block
-                       long aNnz = 0; 
+                       long aNnz = 0;
                        for( Tuple2<MatrixIndexes,MatrixBlock> keyval : list )
                        {
                                //unpack index-block pair
                                MatrixIndexes ix = keyval._1();
                                MatrixBlock block = keyval._2();
-                               
+
                                //compute row/column block offsets
                                int row_offset = 
(int)(ix.getRowIndex()-1)*brlen;
                                int col_offset = 
(int)(ix.getColumnIndex()-1)*bclen;
                                int rows = block.getNumRows();
                                int cols = block.getNumColumns();
-                               
+
                                //append block
                                if( sparse ) { //SPARSE OUTPUT
                                        //append block to sparse target in 
order to avoid shifting, where
                                        //we use a shallow row copy in case of 
MCSR and single column blocks
-                                       //note: this append requires, for 
multiple column blocks, a final sort 
+                                       //note: this append requires, for 
multiple column blocks, a final sort
                                        out.appendToSparse(block, row_offset, 
col_offset, clen>bclen);
                                }
                                else { //DENSE OUTPUT
-                                       out.copy( row_offset, 
row_offset+rows-1, 
-                                                         col_offset, 
col_offset+cols-1, block, false );        
+                                       out.copy( row_offset, row_offset+rows-1,
+                                                         col_offset, 
col_offset+cols-1, block, false );
                                }
-                               
+
                                //incremental maintenance nnz
                                aNnz += block.getNonZeros();
                        }
-                       
+
                        //post-processing output matrix
                        if( sparse && clen>bclen )
                                out.sortSparseRows();
                        out.setNonZeros(aNnz);
                        out.examSparsity();
                }
-               
+
                if (DMLScript.STATISTICS) {
                        Statistics.accSparkCollectTime(System.nanoTime() - t0);
                        Statistics.incSparkCollectCount(1);
                }
-               
+
                return out;
        }
-       
+
        @SuppressWarnings("unchecked")
-       public static MatrixBlock toMatrixBlock(RDDObject rdd, int rlen, int 
clen, long nnz) 
+       public static MatrixBlock toMatrixBlock(RDDObject rdd, int rlen, int 
clen, long nnz)
                throws DMLRuntimeException
-       {                       
+       {
                return toMatrixBlock(
-                               (JavaPairRDD<MatrixIndexes, MatrixCell>) 
rdd.getRDD(), 
+                               (JavaPairRDD<MatrixIndexes, MatrixCell>) 
rdd.getRDD(),
                                rlen, clen, nnz);
        }
-       
+
        /**
-        * Utility method for creating a single matrix block out of a binary 
cell RDD. 
-        * Note that this collect call might trigger execution of any pending 
transformations. 
-        * 
+        * Utility method for creating a single matrix block out of a binary 
cell RDD.
+        * Note that this collect call might trigger execution of any pending 
transformations.
+        *
         * @param rdd JavaPairRDD for matrix block
         * @param rlen number of rows
         * @param clen number of columns
@@ -877,57 +871,57 @@ public class SparkExecutionContext extends 
ExecutionContext
         * @return matrix block
         * @throws DMLRuntimeException if DMLRuntimeException occurs
         */
-       public static MatrixBlock toMatrixBlock(JavaPairRDD<MatrixIndexes, 
MatrixCell> rdd, int rlen, int clen, long nnz) 
+       public static MatrixBlock toMatrixBlock(JavaPairRDD<MatrixIndexes, 
MatrixCell> rdd, int rlen, int clen, long nnz)
                throws DMLRuntimeException
-       {       
+       {
                long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
 
                MatrixBlock out = null;
-               
+
                //determine target sparse/dense representation
                long lnnz = (nnz >= 0) ? nnz : (long)rlen * clen;
                boolean sparse = MatrixBlock.evalSparseFormatInMemory(rlen, 
clen, lnnz);
-                               
+
                //create output matrix block (w/ lazy allocation)
                out = new MatrixBlock(rlen, clen, sparse);
-               
+
                List<Tuple2<MatrixIndexes,MatrixCell>> list = rdd.collect();
-               
+
                //copy blocks one-at-a-time into output matrix block
                for( Tuple2<MatrixIndexes,MatrixCell> keyval : list )
                {
                        //unpack index-block pair
                        MatrixIndexes ix = keyval._1();
                        MatrixCell cell = keyval._2();
-                       
+
                        //append cell to dense/sparse target in order to avoid 
shifting for sparse
                        //note: this append requires a final sort of sparse rows
                        out.appendValue((int)ix.getRowIndex()-1, 
(int)ix.getColumnIndex()-1, cell.getValue());
                }
-               
+
                //post-processing output matrix
                if( sparse )
                        out.sortSparseRows();
                out.recomputeNonZeros();
                out.examSparsity();
-               
+
                if (DMLScript.STATISTICS) {
                        Statistics.accSparkCollectTime(System.nanoTime() - t0);
                        Statistics.incSparkCollectCount(1);
                }
-               
+
                return out;
        }
 
-       public static PartitionedBlock<MatrixBlock> 
toPartitionedMatrixBlock(JavaPairRDD<MatrixIndexes,MatrixBlock> rdd, int rlen, 
int clen, int brlen, int bclen, long nnz) 
+       public static PartitionedBlock<MatrixBlock> 
toPartitionedMatrixBlock(JavaPairRDD<MatrixIndexes,MatrixBlock> rdd, int rlen, 
int clen, int brlen, int bclen, long nnz)
                throws DMLRuntimeException
        {
-               
+
                long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
 
                PartitionedBlock<MatrixBlock> out = new 
PartitionedBlock<MatrixBlock>(rlen, clen, brlen, bclen);
                List<Tuple2<MatrixIndexes,MatrixBlock>> list = rdd.collect();
-               
+
                //copy blocks one-at-a-time into output matrix block
                for( Tuple2<MatrixIndexes,MatrixBlock> keyval : list )
                {
@@ -936,24 +930,24 @@ public class SparkExecutionContext extends 
ExecutionContext
                        MatrixBlock block = keyval._2();
                        out.setBlock((int)ix.getRowIndex(), 
(int)ix.getColumnIndex(), block);
                }
-               
+
                if (DMLScript.STATISTICS) {
                        Statistics.accSparkCollectTime(System.nanoTime() - t0);
                        Statistics.incSparkCollectCount(1);
                }
-                               
+
                return out;
        }
 
        @SuppressWarnings("unchecked")
-       public static FrameBlock toFrameBlock(RDDObject rdd, ValueType[] 
schema, int rlen, int clen) 
-               throws DMLRuntimeException 
+       public static FrameBlock toFrameBlock(RDDObject rdd, ValueType[] 
schema, int rlen, int clen)
+               throws DMLRuntimeException
        {
                JavaPairRDD<Long,FrameBlock> lrdd = 
(JavaPairRDD<Long,FrameBlock>) rdd.getRDD();
                return toFrameBlock(lrdd, schema, rlen, clen);
        }
 
-       public static FrameBlock toFrameBlock(JavaPairRDD<Long,FrameBlock> rdd, 
ValueType[] schema, int rlen, int clen) 
+       public static FrameBlock toFrameBlock(JavaPairRDD<Long,FrameBlock> rdd, 
ValueType[] schema, int rlen, int clen)
                throws DMLRuntimeException
        {
                long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
@@ -964,16 +958,16 @@ public class SparkExecutionContext extends 
ExecutionContext
                //create output frame block (w/ lazy allocation)
                FrameBlock out = new FrameBlock(schema);
                out.ensureAllocatedColumns(rlen);
-               
+
                List<Tuple2<Long,FrameBlock>> list = rdd.collect();
-               
+
                //copy blocks one-at-a-time into output matrix block
                for( Tuple2<Long,FrameBlock> keyval : list )
                {
                        //unpack index-block pair
                        int ix = (int)(keyval._1() - 1);
                        FrameBlock block = keyval._2();
-               
+
                        //copy into output frame
                        out.copy( ix, ix+block.getNumRows()-1, 0, 
block.getNumColumns()-1, block );
                        if( ix == 0 ) {
@@ -981,12 +975,12 @@ public class SparkExecutionContext extends 
ExecutionContext
                                
out.setColumnMetadata(block.getColumnMetadata());
                        }
                }
-               
+
                if (DMLScript.STATISTICS) {
                        Statistics.accSparkCollectTime(System.nanoTime() - t0);
                        Statistics.incSparkCollectCount(1);
                }
-               
+
                return out;
        }
 
@@ -994,17 +988,17 @@ public class SparkExecutionContext extends 
ExecutionContext
        public static long writeRDDtoHDFS( RDDObject rdd, String path, 
OutputInfo oinfo )
        {
                JavaPairRDD<MatrixIndexes,MatrixBlock> lrdd = 
(JavaPairRDD<MatrixIndexes, MatrixBlock>) rdd.getRDD();
-               
+
                //piggyback nnz maintenance on write
                LongAccumulator aNnz = 
getSparkContextStatic().sc().longAccumulator("nnz");
                lrdd = lrdd.mapValues(new ComputeBinaryBlockNnzFunction(aNnz));
-               
+
                //save file is an action which also triggers nnz maintenance
-               lrdd.saveAsHadoopFile(path, 
-                               oinfo.outputKeyClass, 
-                               oinfo.outputValueClass, 
+               lrdd.saveAsHadoopFile(path,
+                               oinfo.outputKeyClass,
+                               oinfo.outputValueClass,
                                oinfo.outputFormatClass);
-               
+
                //return nnz aggregate of all blocks
                return aNnz.value();
        }
@@ -1013,58 +1007,58 @@ public class SparkExecutionContext extends 
ExecutionContext
        public static void writeFrameRDDtoHDFS( RDDObject rdd, String path, 
OutputInfo oinfo )
        {
                JavaPairRDD<?, FrameBlock> lrdd = (JavaPairRDD<Long, 
FrameBlock>) rdd.getRDD();
-               
+
                //convert keys to writables if necessary
                if( oinfo == OutputInfo.BinaryBlockOutputInfo ) {
                        lrdd = ((JavaPairRDD<Long, FrameBlock>)lrdd).mapToPair(
                                        new 
LongFrameToLongWritableFrameFunction());
                        oinfo = OutputInfo.BinaryBlockFrameOutputInfo;
                }
-       
+
                //save file is an action which also triggers nnz maintenance
-               lrdd.saveAsHadoopFile(path, 
-                               oinfo.outputKeyClass, 
-                               oinfo.outputValueClass, 
+               lrdd.saveAsHadoopFile(path,
+                               oinfo.outputKeyClass,
+                               oinfo.outputValueClass,
                                oinfo.outputFormatClass);
        }
-       
+
        ///////////////////////////////////////////
        // Cleanup of RDDs and Broadcast variables
        ///////
-       
+
        /**
         * Adds a child rdd object to the lineage of a parent rdd.
-        * 
+        *
         * @param varParent parent variable
         * @param varChild child variable
         * @throws DMLRuntimeException if DMLRuntimeException occurs
         */
-       public void addLineageRDD(String varParent, String varChild) 
-               throws DMLRuntimeException 
+       public void addLineageRDD(String varParent, String varChild)
+               throws DMLRuntimeException
        {
                RDDObject parent = getCacheableData(varParent).getRDDHandle();
                RDDObject child = getCacheableData(varChild).getRDDHandle();
-               
+
                parent.addLineageChild( child );
        }
-       
+
        /**
         * Adds a child broadcast object to the lineage of a parent rdd.
-        * 
+        *
         * @param varParent parent variable
         * @param varChild child variable
         * @throws DMLRuntimeException if DMLRuntimeException occurs
         */
-       public void addLineageBroadcast(String varParent, String varChild) 
-               throws DMLRuntimeException 
+       public void addLineageBroadcast(String varParent, String varChild)
+               throws DMLRuntimeException
        {
                RDDObject parent = getCacheableData(varParent).getRDDHandle();
                BroadcastObject<?> child = 
getCacheableData(varChild).getBroadcastHandle();
-               
+
                parent.addLineageChild( child );
        }
 
-       public void addLineage(String varParent, String varChild, boolean 
broadcast) 
+       public void addLineage(String varParent, String varChild, boolean 
broadcast)
                throws DMLRuntimeException
        {
                if( broadcast )
@@ -1072,25 +1066,25 @@ public class SparkExecutionContext extends 
ExecutionContext
                else
                        addLineageRDD(varParent, varChild);
        }
-       
+
        @Override
-       public void cleanupMatrixObject( MatrixObject mo ) 
+       public void cleanupMatrixObject( MatrixObject mo )
                throws DMLRuntimeException
        {
                //NOTE: this method overwrites the default behavior of 
cleanupMatrixObject
                //and hence is transparently used by rmvar instructions and 
other users. The
                //core difference is the lineage-based cleanup of RDD and 
broadcast variables.
-               
+
                try
                {
-                       if ( mo.isCleanupEnabled() ) 
+                       if ( mo.isCleanupEnabled() )
                        {
                                //compute ref count only if matrix cleanup 
actually necessary
-                               if ( !getVariables().hasReferences(mo) ) 
+                               if ( !getVariables().hasReferences(mo) )
                                {
-                                       //clean cached data     
-                                       mo.clearData(); 
-                                       
+                                       //clean cached data
+                                       mo.clearData();
+
                                        //clean hdfs data if no pending rdd 
operations on it
                                        if( mo.isHDFSFileExists() && 
mo.getFileName()!=null ) {
                                                if( mo.getRDDHandle()==null ) {
@@ -1101,12 +1095,12 @@ public class SparkExecutionContext extends 
ExecutionContext
                                                        
rdd.setHDFSFilename(mo.getFileName());
                                                }
                                        }
-                                       
+
                                        //cleanup RDD and broadcast variables 
(recursive)
                                        //note: requires that mo.clearData 
already removed back references
-                                       if( mo.getRDDHandle()!=null ) { 
+                                       if( mo.getRDDHandle()!=null ) {
                                                
rCleanupLineageObject(mo.getRDDHandle());
-                                       }       
+                                       }
                                        if( mo.getBroadcastHandle()!=null ) {
                                                
rCleanupLineageObject(mo.getBroadcastHandle());
                                        }
@@ -1120,18 +1114,18 @@ public class SparkExecutionContext extends 
ExecutionContext
        }
 
        @SuppressWarnings({ "rawtypes", "unchecked" })
-       private void rCleanupLineageObject(LineageObject lob) 
+       private void rCleanupLineageObject(LineageObject lob)
                throws IOException
-       {               
+       {
                //abort recursive cleanup if still consumers
                if( lob.getNumReferences() > 0 )
                        return;
-                       
-               //abort if still reachable through matrix object (via back 
references for 
+
+               //abort if still reachable through matrix object (via back 
references for
                //robustness in function calls and to prevent repeated scans of 
the symbol table)
                if( lob.hasBackReference() )
                        return;
-               
+
                //cleanup current lineage object (from driver/executors)
                //incl deferred hdfs file removal (only if metadata set by 
cleanup call)
                if( lob instanceof RDDObject ) {
@@ -1151,38 +1145,38 @@ public class SparkExecutionContext extends 
ExecutionContext
                                        cleanupBroadcastVariable(bc);
                        
CacheableData.addBroadcastSize(-((BroadcastObject)lob).getSize());
                }
-       
+
                //recursively process lineage children
                for( LineageObject c : lob.getLineageChilds() ){
                        c.decrementNumReferences();
                        rCleanupLineageObject(c);
                }
        }
-       
+
        /**
         * This call destroys a broadcast variable at all executors and the 
driver.
         * Hence, it is intended to be used on rmvar only. Depending on the
         * ASYNCHRONOUS_VAR_DESTROY configuration, this is asynchronous or not.
-        * 
+        *
         * @param bvar broadcast variable
         */
-       public static void cleanupBroadcastVariable(Broadcast<?> bvar) 
+       public static void cleanupBroadcastVariable(Broadcast<?> bvar)
        {
-               //In comparison to 'unpersist' (which would only delete the 
broadcast 
+               //In comparison to 'unpersist' (which would only delete the 
broadcast
                //from the executors), this call also deletes related data from 
the driver.
                if( bvar.isValid() ) {
                        bvar.destroy( !ASYNCHRONOUS_VAR_DESTROY );
                }
        }
-       
+
        /**
         * This call removes an rdd variable from executor memory and disk if 
required.
         * Hence, it is intended to be used on rmvar only. Depending on the
         * ASYNCHRONOUS_VAR_DESTROY configuration, this is asynchronous or not.
-        * 
+        *
         * @param rvar rdd variable to remove
         */
-       public static void cleanupRDDVariable(JavaPairRDD<?,?> rvar) 
+       public static void cleanupRDDVariable(JavaPairRDD<?,?> rvar)
        {
                if( rvar.getStorageLevel()!=StorageLevel.NONE() ) {
                        rvar.unpersist( !ASYNCHRONOUS_VAR_DESTROY );
@@ -1190,72 +1184,72 @@ public class SparkExecutionContext extends 
ExecutionContext
        }
 
        @SuppressWarnings("unchecked")
-       public void repartitionAndCacheMatrixObject( String var ) 
+       public void repartitionAndCacheMatrixObject( String var )
                throws DMLRuntimeException
        {
                MatrixObject mo = getMatrixObject(var);
                MatrixCharacteristics mcIn = mo.getMatrixCharacteristics();
-               
+
                //double check size to avoid unnecessary spark context creation
                if( !OptimizerUtils.exceedsCachingThreshold(mo.getNumColumns(), 
(double)
                                OptimizerUtils.estimateSizeExactSparsity(mcIn)) 
)
-                       return; 
-               
+                       return;
+
                //get input rdd and default storage level
-               JavaPairRDD<MatrixIndexes,MatrixBlock> in = 
(JavaPairRDD<MatrixIndexes, MatrixBlock>) 
+               JavaPairRDD<MatrixIndexes,MatrixBlock> in = 
(JavaPairRDD<MatrixIndexes, MatrixBlock>)
                                getRDDHandleForMatrixObject(mo, 
InputInfo.BinaryBlockInputInfo);
-               
+
                //avoid unnecessary caching of input in order to reduce memory 
pressure
                if( mo.getRDDHandle().allowsShortCircuitRead()
                        && isRDDMarkedForCaching(in.id()) && 
!isRDDCached(in.id()) ) {
                        in = (JavaPairRDD<MatrixIndexes,MatrixBlock>)
                                        
((RDDObject)mo.getRDDHandle().getLineageChilds().get(0)).getRDD();
-                       
+
                        //investigate issue of unnecessarily large number of 
partitions
                        int numPartitions = 
SparkUtils.getNumPreferredPartitions(mcIn, in);
                        if( numPartitions < in.getNumPartitions() )
                                in = in.coalesce( numPartitions );
                }
-               
-               //repartition rdd (force creation of shuffled rdd via merge), 
note: without deep copy albeit 
+
+               //repartition rdd (force creation of shuffled rdd via merge), 
note: without deep copy albeit
                //executed on the original data, because there will be no 
merge, i.e., no key duplicates
                JavaPairRDD<MatrixIndexes,MatrixBlock> out = 
RDDAggregateUtils.mergeByKey(in, false);
-               
+
                //convert mcsr into memory-efficient csr if potentially sparse
-               if( OptimizerUtils.checkSparseBlockCSRConversion(mcIn) ) {      
                        
+               if( OptimizerUtils.checkSparseBlockCSRConversion(mcIn) ) {
                        out = out.mapValues(new 
CreateSparseBlockFunction(SparseBlock.Type.CSR));
                }
-               
-               //persist rdd in default storage level 
+
+               //persist rdd in default storage level
                out.persist( Checkpoint.DEFAULT_STORAGE_LEVEL )
                   .count(); //trigger caching to prevent contention
-               
+
                //create new rdd handle, in-place of current matrix object
                RDDObject inro =  mo.getRDDHandle();       //guaranteed to 
exist (see above)
                RDDObject outro = new RDDObject(out, var); //create new rdd 
object
                outro.setCheckpointRDD(true);              //mark as 
checkpointed
                outro.addLineageChild(inro);               //keep lineage to 
prevent cycles on cleanup
-               mo.setRDDHandle(outro);                                
+               mo.setRDDHandle(outro);
        }
 
        @SuppressWarnings("unchecked")
-       public void cacheMatrixObject( String var ) 
+       public void cacheMatrixObject( String var )
                throws DMLRuntimeException
        {
                //get input rdd and default storage level
                MatrixObject mo = getMatrixObject(var);
-               
+
                //double check size to avoid unnecessary spark context creation
                if( !OptimizerUtils.exceedsCachingThreshold(mo.getNumColumns(), 
(double)
                                
OptimizerUtils.estimateSizeExactSparsity(mo.getMatrixCharacteristics())) )
-                       return; 
-               
-               JavaPairRDD<MatrixIndexes,MatrixBlock> in = 
(JavaPairRDD<MatrixIndexes, MatrixBlock>) 
+                       return;
+
+               JavaPairRDD<MatrixIndexes,MatrixBlock> in = 
(JavaPairRDD<MatrixIndexes, MatrixBlock>)
                                getRDDHandleForMatrixObject(mo, 
InputInfo.BinaryBlockInputInfo);
-               
+
                //persist rdd (force rdd caching, if not already cached)
                if( !isRDDCached(in.id()) )
-                       in.count(); //trigger caching to prevent contention     
                       
+                       in.count(); //trigger caching to prevent contention
        }
 
        public void setThreadLocalSchedulerPool(String poolName) {
@@ -1283,7 +1277,7 @@ public class SparkExecutionContext extends 
ExecutionContext
                if( !jsc.sc().getPersistentRDDs().contains(rddID) ) {
                        return false;
                }
-               
+
                //check that rdd is actually already cached
                for( RDDInfo info : jsc.sc().getRDDStorageInfo() ) {
                        if( info.id() == rddID )
@@ -1293,35 +1287,35 @@ public class SparkExecutionContext extends 
ExecutionContext
        }
 
        ///////////////////////////////////////////
-       // Spark configuration handling 
+       // Spark configuration handling
        ///////
 
        /**
-        * Obtains the lazily analyzed spark cluster configuration. 
-        * 
+        * Obtains the lazily analyzed spark cluster configuration.
+        *
         * @return spark cluster configuration
         */
        public static SparkClusterConfig getSparkClusterConfig() {
-               //lazy creation of spark cluster config         
+               //lazy creation of spark cluster config
                if( _sconf == null )
                        _sconf = new SparkClusterConfig();
                return _sconf;
        }
-       
+
        /**
         * Obtains the available memory budget for broadcast variables in bytes.
-        * 
+        *
         * @return broadcast memory budget
         */
        public static double getBroadcastMemoryBudget() {
                return getSparkClusterConfig()
                        .getBroadcastMemoryBudget();
        }
-       
+
        /**
         * Obtain the available memory budget for data storage in bytes.
-        * 
-        * @param min      flag for minimum data budget 
+        *
+        * @param min      flag for minimum data budget
         * @param refresh  flag for refresh with spark context
         * @return data memory budget
         */
@@ -1329,21 +1323,21 @@ public class SparkExecutionContext extends 
ExecutionContext
                return getSparkClusterConfig()
                        .getDataMemoryBudget(min, refresh);
        }
-       
+
        /**
         * Obtain the number of executors in the cluster (excluding the driver).
-        * 
+        *
         * @return number of executors
         */
        public static int getNumExecutors() {
                return getSparkClusterConfig()
                        .getNumExecutors();
        }
-       
+
        /**
-        * Obtain the default degree of parallelism (cores in the cluster). 
-        * 
-        * @param refresh  flag for refresh with spark context 
+        * Obtain the default degree of parallelism (cores in the cluster).
+        *
+        * @param refresh  flag for refresh with spark context
         * @return default degree of parallelism
         */
        public static int getDefaultParallelism(boolean refresh) {
@@ -1360,13 +1354,13 @@ public class SparkExecutionContext extends 
ExecutionContext
                int numExecutors = getNumExecutors();
                int numCores = getDefaultParallelism(false);
                boolean multiThreaded = (numCores > numExecutors);
-               
+
                //check for jdk version less than 8 (and raise warning if 
multi-threaded)
-               if( isLtJDK8 && multiThreaded) 
+               if( isLtJDK8 && multiThreaded)
                {
-                       //get the jre version 
+                       //get the jre version
                        String version = System.getProperty("java.version");
-                       
+
                        
LOG.warn("########################################################################################");
                        LOG.warn("### WARNING: Multi-threaded text reblock may 
lead to thread contention on JRE < 1.8 ####");
                        LOG.warn("### java.version = " + version);
@@ -1377,51 +1371,51 @@ public class SparkExecutionContext extends 
ExecutionContext
                        
LOG.warn("########################################################################################");
                }
        }
-       
+
        /**
-        * Captures relevant spark cluster configuration properties, e.g., 
memory budgets and 
+        * Captures relevant spark cluster configuration properties, e.g., 
memory budgets and
         * degree of parallelism. This configuration abstracts legacy (< Spark 
1.6) and current
-        * configurations and provides a unified view. 
+        * configurations and provides a unified view.
         */
-       private static class SparkClusterConfig 
+       private static class SparkClusterConfig
        {
                //broadcasts are stored in mem-and-disk in data space, this 
config
                //defines the fraction of data space to be used as broadcast 
budget
                private static final double BROADCAST_DATA_FRACTION = 0.3;
-               
+
                //forward private config from Spark's 
UnifiedMemoryManager.scala (>1.6)
                private static final long RESERVED_SYSTEM_MEMORY_BYTES = 300 * 
1024 * 1024;
-               
+
                //meta configurations
                private boolean _legacyVersion = false; //spark version <1.6
                private boolean _confOnly = false; //infrastructure info based 
on config
-               
+
                //memory management configurations
                private long _memExecutor = -1; //mem per executor
                private double _memDataMinFrac = -1; //minimum data fraction
                private double _memDataMaxFrac = -1; //maximum data fraction
                private double _memBroadcastFrac = -1; //broadcast fraction
-               
+
                //degree of parallelism configurations
                private int _numExecutors = -1; //total executors
-               private int _defaultPar = -1; //total vcores  
-       
-               public SparkClusterConfig() 
+               private int _defaultPar = -1; //total vcores
+
+               public SparkClusterConfig()
                {
                        SparkConf sconf = createSystemMLSparkConf();
                        _confOnly = true;
-                       
+
                        //parse version and config
                        String sparkVersion = getSparkVersionString();
                        _legacyVersion = 
(UtilFunctions.compareVersion(sparkVersion, "1.6.0") < 0
                                        || 
sconf.getBoolean("spark.memory.useLegacyMode", false) );
-       
+
                        //obtain basic spark configurations
                        if( _legacyVersion )
                                analyzeSparkConfiguationLegacy(sconf);
                        else
                                analyzeSparkConfiguation(sconf);
-       
+
                        //log debug of created spark cluster config
                        if( LOG.isDebugEnabled() )
                                LOG.debug( this.toString() );
@@ -1432,30 +1426,30 @@ public class SparkExecutionContext extends 
ExecutionContext
                }
 
                public long getDataMemoryBudget(boolean min, boolean refresh) {
-                       //always get the current num executors on refresh 
because this might 
+                       //always get the current num executors on refresh 
because this might
                        //change if not all executors are initially allocated 
and it is plan-relevant
                        int numExec = _numExecutors;
                        if( refresh && !_confOnly ) {
                                JavaSparkContext jsc = getSparkContextStatic();
                                numExec = 
Math.max(jsc.sc().getExecutorMemoryStatus().size() - 1, 1);
                        }
-                       
+
                        //compute data memory budget
                        return (long) ( numExec * _memExecutor *
-                               (min ? _memDataMinFrac : _memDataMaxFrac) );    
+                               (min ? _memDataMinFrac : _memDataMaxFrac) );
                }
 
                public int getNumExecutors() {
                        if( _numExecutors < 0 )
-                               analyzeSparkParallelismConfiguation(null);      
                
+                               analyzeSparkParallelismConfiguation(null);
                        return _numExecutors;
                }
 
                public int getDefaultParallelism(boolean refresh) {
                        if( _defaultPar < 0 && !refresh )
                                analyzeSparkParallelismConfiguation(null);
-                       
-                       //always get the current default parallelism on refresh 
because this might 
+
+                       //always get the current default parallelism on refresh 
because this might
                        //change if not all executors are initially allocated 
and it is plan-relevant
                        return ( refresh && !_confOnly ) ?
                                getSparkContextStatic().defaultParallelism() : 
_defaultPar;
@@ -1464,36 +1458,36 @@ public class SparkExecutionContext extends 
ExecutionContext
                public void analyzeSparkConfiguationLegacy(SparkConf conf)  {
                        //ensure allocated spark conf
                        SparkConf sconf = (conf == null) ? 
createSystemMLSparkConf() : conf;
-                       
+
                        //parse absolute executor memory
                        _memExecutor = UtilFunctions.parseMemorySize(
                                        sconf.get("spark.executor.memory", 
"1g"));
-                       
+
                        //get data and shuffle memory ratios (defaults not 
specified in job conf)
                        double dataFrac = 
sconf.getDouble("spark.storage.memoryFraction", 0.6); //default 60%
                        _memDataMinFrac = dataFrac;
                        _memDataMaxFrac = dataFrac;
                        _memBroadcastFrac = dataFrac * BROADCAST_DATA_FRACTION; 
//default 18%
-                       
-                       //analyze spark degree of parallelism 
-                       analyzeSparkParallelismConfiguation(sconf);     
+
+                       //analyze spark degree of parallelism
+                       analyzeSparkParallelismConfiguation(sconf);
                }
 
                public void analyzeSparkConfiguation(SparkConf conf) {
                        //ensure allocated spark conf
                        SparkConf sconf = (conf == null) ? 
createSystemMLSparkConf() : conf;
-                       
+
                        //parse absolute executor memory, incl fixed cut off
                        _memExecutor = UtilFunctions.parseMemorySize(
-                                       sconf.get("spark.executor.memory", 
"1g")) 
+                                       sconf.get("spark.executor.memory", 
"1g"))
                                        - RESERVED_SYSTEM_MEMORY_BYTES;
-                       
+
                        //get data and shuffle memory ratios (defaults not 
specified in job conf)
                        _memDataMinFrac = 
sconf.getDouble("spark.memory.storageFraction", 0.5); //default 50%
                        _memDataMaxFrac = 
sconf.getDouble("spark.memory.fraction", 0.75); //default 75%
                        _memBroadcastFrac = _memDataMaxFrac * 
BROADCAST_DATA_FRACTION; //default 22.5%
-                       
-                       //analyze spark degree of parallelism 
+
+                       //analyze spark degree of parallelism
                        analyzeSparkParallelismConfiguation(sconf);
                }
 
@@ -1501,7 +1495,7 @@ public class SparkExecutionContext extends 
ExecutionContext
                        int numExecutors = 
sconf.getInt("spark.executor.instances", -1);
                        int numCoresPerExec = 
sconf.getInt("spark.executor.cores", -1);
                        int defaultPar = 
sconf.getInt("spark.default.parallelism", -1);
-                       
+
                        if( numExecutors > 1 && (defaultPar > 1 || 
numCoresPerExec > 1) ) {
                                _numExecutors = numExecutors;
                                _defaultPar = (defaultPar>1) ? defaultPar : 
numExecutors * numCoresPerExec;
@@ -1512,28 +1506,28 @@ public class SparkExecutionContext extends 
ExecutionContext
                                //note: spark context provides this information 
while conf does not
                                //(for num executors we need to correct for 
driver and local mode)
                                JavaSparkContext jsc = getSparkContextStatic();
-                               _numExecutors = 
Math.max(jsc.sc().getExecutorMemoryStatus().size() - 1, 1);  
+                               _numExecutors = 
Math.max(jsc.sc().getExecutorMemoryStatus().size() - 1, 1);
                                _defaultPar = jsc.defaultParallelism();
-                               _confOnly &= false; //implies env info refresh 
w/ spark context 
+                               _confOnly &= false; //implies env info refresh 
w/ spark context
                        }
                }
-               
+
                /**
                 * Obtains the spark version string. If the spark context has 
been created,
-                * we simply get it from the context; otherwise, we use Spark 
internal 
-                * constants to avoid creating the spark context just for the 
version. 
-                * 
+                * we simply get it from the context; otherwise, we use Spark 
internal
+                * constants to avoid creating the spark context just for the 
version.
+                *
                 * @return spark version string
                 */
                private String getSparkVersionString() {
                        //check for existing spark context
-                       if( isSparkContextCreated() ) 
+                       if( isSparkContextCreated() )
                                return getSparkContextStatic().version();
-                       
+
                        //use spark internal constant to avoid context creation
                        return 
org.apache.spark.package$.MODULE$.SPARK_VERSION();
                }
-               
+
                @Override
                public String toString() {
                        StringBuilder sb = new 
StringBuilder("SparkClusterConfig: \n");
@@ -1544,17 +1538,17 @@ public class SparkExecutionContext extends 
ExecutionContext
                        sb.append("-- memDataMaxFrac   = " + _memDataMaxFrac + 
"\n");
                        sb.append("-- memBroadcastFrac = " + _memBroadcastFrac 
+ "\n");
                        sb.append("-- numExecutors     = " + _numExecutors + 
"\n");
-                       sb.append("-- defaultPar       = " + _defaultPar + 
"\n");               
+                       sb.append("-- defaultPar       = " + _defaultPar + 
"\n");
                        return sb.toString();
                }
        }
-       
-       private static class MemoryManagerParRDDs 
+
+       private static class MemoryManagerParRDDs
        {
                private final long _limit;
                private long _size;
                private HashMap<Integer, Long> _rdds;
-               
+
                public MemoryManagerParRDDs(double fractionMem) {
                        _limit = (long)(fractionMem * 
InfrastructureAnalyzer.getLocalMaxMemory());
                        _size = 0;
@@ -1566,7 +1560,7 @@ public class SparkExecutionContext extends 
ExecutionContext
                        _size += ret ? rddSize : 0;
                        return ret;
                }
-               
+
                public synchronized void registerRDD(int rddID, long rddSize, 
boolean reserved) {
                        if( !reserved ) {
                                throw new RuntimeException("Unsupported rdd 
registration "
@@ -1574,7 +1568,7 @@ public class SparkExecutionContext extends 
ExecutionContext
                        }
                        _rdds.put(rddID, rddSize);
                }
-               
+
                public synchronized void deregisterRDD(int rddID) {
                        long rddSize = _rdds.remove(rddID);
                        _size -= rddSize;

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/7ba17c7f/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/GetMLBlock.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/GetMLBlock.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/GetMLBlock.java
deleted file mode 100644
index a1173bd..0000000
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/GetMLBlock.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.instructions.spark.functions;
-
-import java.io.Serializable;
-
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.sql.Row;
-
-import scala.Tuple2;
-
-import org.apache.sysml.api.MLBlock;
-import org.apache.sysml.runtime.matrix.data.MatrixBlock;
-import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
-
-@SuppressWarnings("deprecation")
-public class GetMLBlock implements Function<Tuple2<MatrixIndexes,MatrixBlock>, 
Row>, Serializable {
-
-       private static final long serialVersionUID = 8829736765002126985L;
-
-       @Override
-       public Row call(Tuple2<MatrixIndexes, MatrixBlock> kv) throws Exception 
{
-               return new MLBlock(kv._1, kv._2);
-       }
-       
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/7ba17c7f/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
index f206fbd..377ca2e 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -63,14 +63,14 @@ import scala.Tuple2;
  * can be moved to RDDConverterUtils.
  */
 @SuppressWarnings("unused")
-public class RDDConverterUtilsExt 
+public class RDDConverterUtilsExt
 {
        public enum RDDConverterTypes {
                TEXT_TO_MATRIX_CELL,
                MATRIXENTRY_TO_MATRIXCELL
        }
-       
-       
+
+
        /**
         * Example usage:
         * <pre><code>
@@ -88,7 +88,7 @@ public class RDDConverterUtilsExt
         * val mc = new MatrixCharacteristics(numRows, numCols, 1000, 1000, nnz)
         * val binBlocks = 
RDDConverterUtilsExt.coordinateMatrixToBinaryBlock(new JavaSparkContext(sc), 
coordinateMatrix, mc, true)
         * </code></pre>
-        * 
+        *
         * @param sc java spark context
         * @param input coordinate matrix
         * @param mcIn matrix characteristics
@@ -97,26 +97,26 @@ public class RDDConverterUtilsExt
         * @throws DMLRuntimeException if DMLRuntimeException occurs
         */
        public static JavaPairRDD<MatrixIndexes, MatrixBlock> 
coordinateMatrixToBinaryBlock(JavaSparkContext sc,
-                       CoordinateMatrix input, MatrixCharacteristics mcIn, 
boolean outputEmptyBlocks) throws DMLRuntimeException 
+                       CoordinateMatrix input, MatrixCharacteristics mcIn, 
boolean outputEmptyBlocks) throws DMLRuntimeException
        {
                //convert matrix entry rdd to binary block rdd (w/ partial 
blocks)
                JavaPairRDD<MatrixIndexes, MatrixBlock> out = 
input.entries().toJavaRDD()
                                .mapPartitionsToPair(new 
MatrixEntryToBinaryBlockFunction(mcIn));
-               
-               //inject empty blocks (if necessary) 
+
+               //inject empty blocks (if necessary)
                if( outputEmptyBlocks && mcIn.mightHaveEmptyBlocks() ) {
-                       out = out.union( 
+                       out = out.union(
                                SparkUtils.getEmptyBlockRDD(sc, mcIn) );
                }
-               
+
                //aggregate partial matrix blocks
-               out = RDDAggregateUtils.mergeByKey(out, false); 
-               
+               out = RDDAggregateUtils.mergeByKey(out, false);
+
                return out;
        }
-       
+
        public static JavaPairRDD<MatrixIndexes, MatrixBlock> 
coordinateMatrixToBinaryBlock(SparkContext sc,
-                       CoordinateMatrix input, MatrixCharacteristics mcIn, 
boolean outputEmptyBlocks) throws DMLRuntimeException 
+                       CoordinateMatrix input, MatrixCharacteristics mcIn, 
boolean outputEmptyBlocks) throws DMLRuntimeException
        {
                return coordinateMatrixToBinaryBlock(new JavaSparkContext(sc), 
input, mcIn, true);
        }
@@ -128,19 +128,19 @@ public class RDDConverterUtilsExt
                }
                return df.select(columns.get(0), 
scala.collection.JavaConversions.asScalaBuffer(columnToSelect).toList());
        }
-       
+
        public static MatrixBlock convertPy4JArrayToMB(byte [] data, long rlen, 
long clen) throws DMLRuntimeException {
                return convertPy4JArrayToMB(data, (int)rlen, (int)clen, false);
        }
-       
+
        public static MatrixBlock convertPy4JArrayToMB(byte [] data, int rlen, 
int clen) throws DMLRuntimeException {
                return convertPy4JArrayToMB(data, rlen, clen, false);
        }
-       
+
        public static MatrixBlock convertSciPyCOOToMB(byte [] data, byte [] 
row, byte [] col, long rlen, long clen, long nnz) throws DMLRuntimeException {
                return convertSciPyCOOToMB(data, row, col, (int)rlen, 
(int)clen, (int)nnz);
        }
-       
+
        public static MatrixBlock convertSciPyCOOToMB(byte [] data, byte [] 
row, byte [] col, int rlen, int clen, int nnz) throws DMLRuntimeException {
                MatrixBlock mb = new MatrixBlock(rlen, clen, true);
                mb.allocateSparseRowsBlock(false);
@@ -154,17 +154,17 @@ public class RDDConverterUtilsExt
                        double val = buf1.getDouble();
                        int rowIndex = buf2.getInt();
                        int colIndex = buf3.getInt();
-                       mb.setValue(rowIndex, colIndex, val); 
+                       mb.setValue(rowIndex, colIndex, val);
                }
                mb.recomputeNonZeros();
                mb.examSparsity();
                return mb;
        }
-       
+
        public static MatrixBlock convertPy4JArrayToMB(byte [] data, long rlen, 
long clen, boolean isSparse) throws DMLRuntimeException {
                return convertPy4JArrayToMB(data, (int) rlen, (int) clen, 
isSparse);
        }
-       
+
        public static MatrixBlock allocateDenseOrSparse(int rlen, int clen, 
boolean isSparse) {
                MatrixBlock ret = new MatrixBlock(rlen, clen, isSparse);
                ret.allocateDenseOrSparseBlock();
@@ -176,7 +176,7 @@ public class RDDConverterUtilsExt
                }
                return allocateDenseOrSparse(rlen, clen, isSparse);
        }
-       
+
        public static void copyRowBlocks(MatrixBlock mb, int rowIndex, 
MatrixBlock ret, int numRowsPerBlock, int rlen, int clen) throws 
DMLRuntimeException {
                copyRowBlocks(mb, (long)rowIndex, ret, (long)numRowsPerBlock, 
(long)rlen, (long)clen);
        }
@@ -192,12 +192,12 @@ public class RDDConverterUtilsExt
                        ret.copy((int)(rowIndex*numRowsPerBlock), 
(int)Math.min((rowIndex+1)*numRowsPerBlock-1, rlen-1), 0, (int)(clen-1), mb, 
false);
                // }
        }
-       
+
        public static void postProcessAfterCopying(MatrixBlock ret) throws 
DMLRuntimeException {
                ret.recomputeNonZeros();
                ret.examSparsity();
        }
-       
+
        public static MatrixBlock convertPy4JArrayToMB(byte [] data, int rlen, 
int clen, boolean isSparse) throws DMLRuntimeException {
                MatrixBlock mb = new MatrixBlock(rlen, clen, isSparse, -1);
                if(isSparse) {
@@ -219,19 +219,19 @@ public class RDDConverterUtilsExt
                mb.examSparsity();
                return mb;
        }
-       
+
        public static byte [] convertMBtoPy4JDenseArr(MatrixBlock mb) throws 
DMLRuntimeException {
                byte [] ret = null;
                if(mb.isInSparseFormat()) {
                        mb.sparseToDense();
                }
-               
+
                long limit = mb.getNumRows()*mb.getNumColumns();
                int times = Double.SIZE / Byte.SIZE;
                if( limit > Integer.MAX_VALUE / times )
                        throw new DMLRuntimeException("MatrixBlock of size " + 
limit + " cannot be converted to dense numpy array");
                ret = new byte[(int) (limit * times)];
-               
+
                double [] denseBlock = mb.getDenseBlock();
                if(mb.isEmptyBlock()) {
                        for(int i=0;i < limit;i++){
@@ -246,10 +246,10 @@ public class RDDConverterUtilsExt
                        ByteBuffer.wrap(ret, i*times, 
times).order(ByteOrder.nativeOrder()).putDouble(denseBlock[i]);
                        }
                }
-               
+
                return ret;
        }
-       
+
        public static class AddRowID implements Function<Tuple2<Row,Long>, Row> 
{
                private static final long serialVersionUID = 
-3733816995375745659L;
 
@@ -263,12 +263,12 @@ public class RDDConverterUtilsExt
                        fields[oldNumCols] = new Double(arg0._2 + 1);
                        return RowFactory.create(fields);
                }
-               
+
        }
 
        /**
         * Add element indices as new column to DataFrame
-        * 
+        *
         * @param df input data frame
         * @param sparkSession the Spark Session
         * @param nameOfCol name of index column
@@ -286,27 +286,10 @@ public class RDDConverterUtilsExt
                return sparkSession.createDataFrame(newRows, new 
StructType(newSchema));
        }
 
-       /**
-        * Add element indices as new column to DataFrame
-        * 
-        * @param df input data frame
-        * @param sqlContext the SQL Context
-        * @param nameOfCol name of index column
-        * @return new data frame
-        * 
-        * @deprecated This will be removed in SystemML 1.0.
-        */
-       @Deprecated
-       public static Dataset<Row> addIDToDataFrame(Dataset<Row> df, SQLContext 
sqlContext, String nameOfCol) {
-               SparkSession sparkSession = sqlContext.sparkSession();
-               return addIDToDataFrame(df, sparkSession, nameOfCol);
-       }
-       
-       
-       private static class MatrixEntryToBinaryBlockFunction implements 
PairFlatMapFunction<Iterator<MatrixEntry>,MatrixIndexes,MatrixBlock> 
+       private static class MatrixEntryToBinaryBlockFunction implements 
PairFlatMapFunction<Iterator<MatrixEntry>,MatrixIndexes,MatrixBlock>
        {
                private static final long serialVersionUID = 
4907483236186747224L;
-               
+
                private IJVToBinaryBlockFunctionHelper helper = null;
                public MatrixEntryToBinaryBlockFunction(MatrixCharacteristics 
mc) throws DMLRuntimeException {
                        helper = new IJVToBinaryBlockFunctionHelper(mc);
@@ -318,18 +301,18 @@ public class RDDConverterUtilsExt
                }
 
        }
-       
+
        private static class IJVToBinaryBlockFunctionHelper implements 
Serializable {
                private static final long serialVersionUID = 
-7952801318564745821L;
                //internal buffer size (aligned w/ default matrix block size)
                private static final int BUFFER_SIZE = 4 * 1000 * 1000; //4M 
elements (32MB)
                private int _bufflen = -1;
-               
+
                private long _rlen = -1;
                private long _clen = -1;
                private int _brlen = -1;
                private int _bclen = -1;
-               
+
                public IJVToBinaryBlockFunctionHelper(MatrixCharacteristics mc) 
throws DMLRuntimeException
                {
                        if(!mc.dimsKnown()) {
@@ -339,21 +322,21 @@ public class RDDConverterUtilsExt
                        _clen = mc.getCols();
                        _brlen = mc.getRowsPerBlock();
                        _bclen = mc.getColsPerBlock();
-                       
+
                        //determine upper bounded buffer len
                        _bufflen = (int) Math.min(_rlen*_clen, BUFFER_SIZE);
-                       
+
                }
-               
+
                // ----------------------------------------------------
                // Can extend this by having type hierarchy
                public Tuple2<MatrixIndexes, MatrixCell> textToMatrixCell(Text 
txt) {
                        FastStringTokenizer st = new FastStringTokenizer(' ');
                        //get input string (ignore matrix market comments)
                        String strVal = txt.toString();
-                       if( strVal.startsWith("%") ) 
+                       if( strVal.startsWith("%") )
                                return null;
-                       
+
                        //parse input ijv triple
                        st.reset( strVal );
                        long row = st.nextLong();
@@ -363,19 +346,19 @@ public class RDDConverterUtilsExt
                        MatrixCell cell = new MatrixCell(val);
                        return new Tuple2<MatrixIndexes, MatrixCell>(indx, 
cell);
                }
-               
+
                public Tuple2<MatrixIndexes, MatrixCell> 
matrixEntryToMatrixCell(MatrixEntry entry) {
                        MatrixIndexes indx = new MatrixIndexes(entry.i(), 
entry.j());
                        MatrixCell cell = new MatrixCell(entry.value());
                        return new Tuple2<MatrixIndexes, MatrixCell>(indx, 
cell);
                }
-               
+
                // ----------------------------------------------------
-               
+
                Iterable<Tuple2<MatrixIndexes, MatrixBlock>> 
convertToBinaryBlock(Object arg0, RDDConverterTypes converter)  throws 
Exception {
                        ArrayList<Tuple2<MatrixIndexes,MatrixBlock>> ret = new 
ArrayList<Tuple2<MatrixIndexes,MatrixBlock>>();
                        ReblockBuffer rbuff = new ReblockBuffer(_bufflen, 
_rlen, _clen, _brlen, _bclen);
-               
+
                        Iterator<?> iter = (Iterator<?>) arg0;
                        while( iter.hasNext() ) {
                                Tuple2<MatrixIndexes, MatrixCell> cell = null;
@@ -383,38 +366,38 @@ public class RDDConverterUtilsExt
                                        case MATRIXENTRY_TO_MATRIXCELL:
                                                cell = 
matrixEntryToMatrixCell((MatrixEntry) iter.next());
                                                break;
-                                               
+
                                        case TEXT_TO_MATRIX_CELL:
                                                cell = textToMatrixCell((Text) 
iter.next());
                                                break;
-                                       
+
                                        default:
                                                throw new Exception("Invalid 
converter for IJV data:" + converter.toString());
                                }
-                               
+
                                if(cell == null) {
                                        continue;
                                }
-                               
+
                                //flush buffer if necessary
                                if( rbuff.getSize() >= rbuff.getCapacity() )
                                        flushBufferToList(rbuff, ret);
-                               
+
                                //add value to reblock buffer
                                rbuff.appendCell(cell._1.getRowIndex(), 
cell._1.getColumnIndex(), cell._2.getValue());
                        }
-                       
+
                        //final flush buffer
                        flushBufferToList(rbuff, ret);
-               
+
                        return ret;
                }
 
-               private void flushBufferToList( ReblockBuffer rbuff,  
ArrayList<Tuple2<MatrixIndexes,MatrixBlock>> ret ) 
+               private void flushBufferToList( ReblockBuffer rbuff,  
ArrayList<Tuple2<MatrixIndexes,MatrixBlock>> ret )
                        throws IOException, DMLRuntimeException
                {
                        //temporary list of indexed matrix values to prevent 
library dependencies
-                       ArrayList<IndexedMatrixValue> rettmp = new 
ArrayList<IndexedMatrixValue>();                                     
+                       ArrayList<IndexedMatrixValue> rettmp = new 
ArrayList<IndexedMatrixValue>();
                        rbuff.flushBufferToBinaryBlocks(rettmp);
                        ret.addAll(SparkUtils.fromIndexedMatrixBlock(rettmp));
                }
@@ -423,50 +406,17 @@ public class RDDConverterUtilsExt
        /**
         * Convert a dataframe of comma-separated string rows to a dataframe of
         * ml.linalg.Vector rows.
-        * 
-        * <p>
-        * Example input rows:<br>
-        * 
-        * <code>
-        * ((1.2, 4.3, 3.4))<br>
-        * (1.2, 3.4, 2.2)<br>
-        * [[1.2, 34.3, 1.2, 1.25]]<br>
-        * [1.2, 3.4]<br>
-        * </code>
-        * 
-        * @param sqlContext
-        *            Spark SQL Context
-        * @param inputDF
-        *            dataframe of comma-separated row strings to convert to
-        *            dataframe of ml.linalg.Vector rows
-        * @return dataframe of ml.linalg.Vector rows
-        * @throws DMLRuntimeException
-        *             if DMLRuntimeException occurs
-        *             
-        * @deprecated This will be removed in SystemML 1.0. Please migrate to 
{@code
-        * RDDConverterUtilsExt.stringDataFrameToVectorDataFrame(SparkSession, 
Dataset<Row>) }
-        */
-       @Deprecated
-       public static Dataset<Row> stringDataFrameToVectorDataFrame(SQLContext 
sqlContext, Dataset<Row> inputDF)
-                       throws DMLRuntimeException {
-               SparkSession sparkSession = sqlContext.sparkSession();
-               return stringDataFrameToVectorDataFrame(sparkSession, inputDF);
-       }
-       
-       /**
-        * Convert a dataframe of comma-separated string rows to a dataframe of
-        * ml.linalg.Vector rows.
-        * 
+        *
         * <p>
         * Example input rows:<br>
-        * 
+        *
         * <code>
         * ((1.2, 4.3, 3.4))<br>
         * (1.2, 3.4, 2.2)<br>
         * [[1.2, 34.3, 1.2, 1.25]]<br>
         * [1.2, 3.4]<br>
         * </code>
-        * 
+        *
         * @param sparkSession
         *            Spark Session
         * @param inputDF

Reply via email to