[MINOR] Performance createvar instruction, cleanup data handles

This patch improves the performance of string concatenation for
createvar instructions with unique filenames and removes the unnecessary
variable name from all data objects (matrices, frames, scalars).
 

Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/ffefd8e6
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/ffefd8e6
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/ffefd8e6

Branch: refs/heads/master
Commit: ffefd8e68defb7eb5412f92ddfd8e48046e6532c
Parents: c961432
Author: Matthias Boehm <mboe...@gmail.com>
Authored: Thu Nov 9 20:03:47 2017 -0800
Committer: Matthias Boehm <mboe...@gmail.com>
Committed: Thu Nov 9 22:08:02 2017 -0800

----------------------------------------------------------------------
 .../apache/sysml/api/jmlc/PreparedScript.java   |   8 +-
 .../api/mlcontext/MLContextConversionUtil.java  | 138 +++++++------------
 .../sysml/api/mlcontext/MLContextUtil.java      |  38 ++---
 .../runtime/controlprogram/ForProgramBlock.java |  12 +-
 .../controlprogram/ParForProgramBlock.java      |  16 +--
 .../runtime/controlprogram/ProgramBlock.java    |   8 +-
 .../controlprogram/caching/CacheableData.java   |  42 +++---
 .../controlprogram/caching/FrameObject.java     |   7 +-
 .../controlprogram/caching/MatrixObject.java    |  13 +-
 .../context/ExecutionContext.java               |   5 +-
 .../context/SparkExecutionContext.java          |  22 +--
 .../controlprogram/parfor/DataPartitioner.java  |   9 +-
 .../controlprogram/parfor/ParWorker.java        |  11 +-
 .../controlprogram/parfor/ProgramConverter.java |  12 +-
 .../parfor/RemoteDPParForSparkWorker.java       |   4 +-
 .../parfor/RemoteDPParWorkerReducer.java        |   4 +-
 .../parfor/ResultMergeLocalFile.java            |  59 ++++----
 .../parfor/ResultMergeLocalMemory.java          |  26 ++--
 .../parfor/ResultMergeRemoteMR.java             |  10 +-
 .../parfor/ResultMergeRemoteSpark.java          |  20 ++-
 .../runtime/controlprogram/parfor/Task.java     |  31 +++--
 .../parfor/TaskPartitionerFactoring.java        |  22 ++-
 .../parfor/TaskPartitionerFixedsize.java        |  24 ++--
 .../cp/AggregateUnaryCPInstruction.java         |   8 +-
 .../runtime/instructions/cp/BooleanObject.java  |   8 +-
 .../cp/CentralMomentCPInstruction.java          |   3 +-
 .../cp/CovarianceCPInstruction.java             |   4 +-
 .../sysml/runtime/instructions/cp/Data.java     |  17 +--
 .../runtime/instructions/cp/DoubleObject.java   |   6 +-
 .../cp/FunctionCallCPInstruction.java           |   2 -
 .../runtime/instructions/cp/IntObject.java      |   8 +-
 .../runtime/instructions/cp/ScalarObject.java   |  11 +-
 .../runtime/instructions/cp/StringObject.java   |   8 +-
 .../cp/UaggOuterChainCPInstruction.java         |   2 +-
 .../instructions/cp/VariableCPInstruction.java  |  11 +-
 .../cpfile/MatrixIndexingCPFileInstruction.java |   7 +-
 .../ParameterizedBuiltinCPFileInstruction.java  |  13 +-
 .../spark/CentralMomentSPInstruction.java       |   3 +-
 .../spark/CheckpointSPInstruction.java          |   2 +-
 .../spark/CovarianceSPInstruction.java          |   3 +-
 .../spark/data/BroadcastObject.java             |   4 +-
 .../instructions/spark/data/DatasetObject.java  |   8 +-
 .../instructions/spark/data/LineageObject.java  |  10 +-
 .../instructions/spark/data/RDDObject.java      |   4 +-
 .../ExternalFunctionInvocationInstruction.java  |  12 +-
 45 files changed, 275 insertions(+), 420 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/ffefd8e6/src/main/java/org/apache/sysml/api/jmlc/PreparedScript.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/api/jmlc/PreparedScript.java 
b/src/main/java/org/apache/sysml/api/jmlc/PreparedScript.java
index 3dc0db7..dac211e 100644
--- a/src/main/java/org/apache/sysml/api/jmlc/PreparedScript.java
+++ b/src/main/java/org/apache/sysml/api/jmlc/PreparedScript.java
@@ -142,7 +142,7 @@ public class PreparedScript
         * @throws DMLException if DMLException occurs
         */
        public void setScalar(String varname, boolean scalar, boolean reuse) 
throws DMLException {
-               setScalar(varname, new BooleanObject(varname, scalar), reuse);
+               setScalar(varname, new BooleanObject(scalar), reuse);
        }
        
        /**
@@ -165,7 +165,7 @@ public class PreparedScript
         * @throws DMLException if DMLException occurs
         */
        public void setScalar(String varname, long scalar, boolean reuse) 
throws DMLException {
-               setScalar(varname, new IntObject(varname, scalar), reuse);
+               setScalar(varname, new IntObject(scalar), reuse);
        }
        
        /** Binds a scalar double to a registered input variable.
@@ -187,7 +187,7 @@ public class PreparedScript
         * @throws DMLException if DMLException occurs
         */
        public void setScalar(String varname, double scalar, boolean reuse) 
throws DMLException {
-               setScalar(varname, new DoubleObject(varname, scalar), reuse);
+               setScalar(varname, new DoubleObject(scalar), reuse);
        }
        
        /**
@@ -210,7 +210,7 @@ public class PreparedScript
         * @throws DMLException if DMLException occurs
         */
        public void setScalar(String varname, String scalar, boolean reuse) 
throws DMLException {
-               setScalar(varname, new StringObject(varname, scalar), reuse);
+               setScalar(varname, new StringObject(scalar), reuse);
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/systemml/blob/ffefd8e6/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java 
b/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java
index 41e8551..1536f8f 100644
--- a/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java
+++ b/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java
@@ -124,23 +124,21 @@ public class MLContextConversionUtil {
        /**
         * Convert a matrix at a URL to a {@code MatrixObject}.
         *
-        * @param variableName
-        *            name of the variable associated with the matrix
         * @param url
         *            the URL to a matrix (in CSV or IJV format)
         * @param matrixMetadata
         *            the matrix metadata
         * @return the matrix at a URL converted to a {@code MatrixObject}
         */
-       public static MatrixObject urlToMatrixObject(String variableName, URL 
url, MatrixMetadata matrixMetadata) {
+       public static MatrixObject urlToMatrixObject(URL url, MatrixMetadata 
matrixMetadata) {
                try {
                        InputStream is = url.openStream();
                        List<String> lines = IOUtils.readLines(is);
                        JavaRDD<String> javaRDD = jsc().parallelize(lines);
                        if ((matrixMetadata == null) || 
(matrixMetadata.getMatrixFormat() == MatrixFormat.CSV)) {
-                               return 
javaRDDStringCSVToMatrixObject(variableName, javaRDD, matrixMetadata);
+                               return javaRDDStringCSVToMatrixObject(javaRDD, 
matrixMetadata);
                        } else if (matrixMetadata.getMatrixFormat() == 
MatrixFormat.IJV) {
-                               return 
javaRDDStringIJVToMatrixObject(variableName, javaRDD, matrixMetadata);
+                               return javaRDDStringIJVToMatrixObject(javaRDD, 
matrixMetadata);
                        }
                        return null;
                } catch (Exception e) {
@@ -206,25 +204,21 @@ public class MLContextConversionUtil {
         * Convert a {@code JavaPairRDD<MatrixIndexes, MatrixBlock>} to a
         * {@code MatrixObject}.
         *
-        * @param variableName
-        *            name of the variable associated with the matrix
         * @param binaryBlocks
         *            {@code JavaPairRDD<MatrixIndexes, MatrixBlock>} 
representation
         *            of a binary-block matrix
         * @return the {@code JavaPairRDD<MatrixIndexes, MatrixBlock>} matrix
         *         converted to a {@code MatrixObject}
         */
-       public static MatrixObject binaryBlocksToMatrixObject(String 
variableName,
+       public static MatrixObject binaryBlocksToMatrixObject(
                        JavaPairRDD<MatrixIndexes, MatrixBlock> binaryBlocks) {
-               return binaryBlocksToMatrixObject(variableName, binaryBlocks, 
null);
+               return binaryBlocksToMatrixObject(binaryBlocks, null);
        }
 
        /**
         * Convert a {@code JavaPairRDD<MatrixIndexes, MatrixBlock>} to a
         * {@code MatrixObject}.
         *
-        * @param variableName
-        *            name of the variable associated with the matrix
         * @param binaryBlocks
         *            {@code JavaPairRDD<MatrixIndexes, MatrixBlock>} 
representation
         *            of a binary-block matrix
@@ -233,21 +227,20 @@ public class MLContextConversionUtil {
         * @return the {@code JavaPairRDD<MatrixIndexes, MatrixBlock>} matrix
         *         converted to a {@code MatrixObject}
         */
-       public static MatrixObject binaryBlocksToMatrixObject(String 
variableName,
+       public static MatrixObject binaryBlocksToMatrixObject(
                        JavaPairRDD<MatrixIndexes, MatrixBlock> binaryBlocks, 
MatrixMetadata matrixMetadata) {
-               return binaryBlocksToMatrixObject(variableName, binaryBlocks, 
matrixMetadata, true);
+               return binaryBlocksToMatrixObject(binaryBlocks, matrixMetadata, 
true);
        }
 
-       private static MatrixObject binaryBlocksToMatrixObject(String 
variableName,
-                       JavaPairRDD<MatrixIndexes, MatrixBlock> binaryBlocks, 
MatrixMetadata matrixMetadata, boolean copy) {
+       private static MatrixObject 
binaryBlocksToMatrixObject(JavaPairRDD<MatrixIndexes, MatrixBlock> binaryBlocks,
+               MatrixMetadata matrixMetadata, boolean copy) {
 
                MatrixCharacteristics mc = (matrixMetadata != null) ? 
matrixMetadata.asMatrixCharacteristics()
                                : new MatrixCharacteristics();
                JavaPairRDD<MatrixIndexes, MatrixBlock> javaPairRdd = 
SparkUtils.copyBinaryBlockMatrix(binaryBlocks, copy);
-
                MatrixObject matrixObject = new MatrixObject(ValueType.DOUBLE, 
OptimizerUtils.getUniqueTempFileName(),
                                new MetaDataFormat(mc, 
OutputInfo.BinaryBlockOutputInfo, InputInfo.BinaryBlockInputInfo));
-               matrixObject.setRDDHandle(new RDDObject(javaPairRdd, 
variableName));
+               matrixObject.setRDDHandle(new RDDObject(javaPairRdd));
                return matrixObject;
        }
 
@@ -279,24 +272,19 @@ public class MLContextConversionUtil {
        /**
         * Convert a {@code JavaPairRDD<Long, FrameBlock>} to a {@code 
FrameObject}.
         *
-        * @param variableName
-        *            name of the variable associated with the frame
         * @param binaryBlocks
         *            {@code JavaPairRDD<Long, FrameBlock>} representation of a
         *            binary-block frame
         * @return the {@code JavaPairRDD<Long, FrameBlock>} frame converted to 
a
         *         {@code FrameObject}
         */
-       public static FrameObject binaryBlocksToFrameObject(String variableName,
-                       JavaPairRDD<Long, FrameBlock> binaryBlocks) {
-               return binaryBlocksToFrameObject(variableName, binaryBlocks, 
null);
+       public static FrameObject binaryBlocksToFrameObject(JavaPairRDD<Long, 
FrameBlock> binaryBlocks) {
+               return binaryBlocksToFrameObject(binaryBlocks, null);
        }
 
        /**
         * Convert a {@code JavaPairRDD<Long, FrameBlock>} to a {@code 
FrameObject}.
         *
-        * @param variableName
-        *            name of the variable associated with the frame
         * @param binaryBlocks
         *            {@code JavaPairRDD<Long, FrameBlock>} representation of a
         *            binary-block frame
@@ -305,7 +293,7 @@ public class MLContextConversionUtil {
         * @return the {@code JavaPairRDD<Long, FrameBlock>} frame converted to 
a
         *         {@code FrameObject}
         */
-       public static FrameObject binaryBlocksToFrameObject(String 
variableName, JavaPairRDD<Long, FrameBlock> binaryBlocks,
+       public static FrameObject binaryBlocksToFrameObject(JavaPairRDD<Long, 
FrameBlock> binaryBlocks,
                        FrameMetadata frameMetadata) {
 
                MatrixCharacteristics mc = (frameMetadata != null) ? 
@@ -316,29 +304,25 @@ public class MLContextConversionUtil {
                
                FrameObject frameObject = new 
FrameObject(OptimizerUtils.getUniqueTempFileName(),
                        new MetaDataFormat(mc, 
OutputInfo.BinaryBlockOutputInfo, InputInfo.BinaryBlockInputInfo), schema);
-               frameObject.setRDDHandle(new RDDObject(binaryBlocks, 
variableName));
+               frameObject.setRDDHandle(new RDDObject(binaryBlocks));
                return frameObject;
        }
 
        /**
         * Convert a {@code DataFrame} to a {@code MatrixObject}.
         *
-        * @param variableName
-        *            name of the variable associated with the matrix
         * @param dataFrame
         *            the Spark {@code DataFrame}
         * @return the {@code DataFrame} matrix converted to a converted to a
         *         {@code MatrixObject}
         */
-       public static MatrixObject dataFrameToMatrixObject(String variableName, 
Dataset<Row> dataFrame) {
-               return dataFrameToMatrixObject(variableName, dataFrame, null);
+       public static MatrixObject dataFrameToMatrixObject(Dataset<Row> 
dataFrame) {
+               return dataFrameToMatrixObject(dataFrame, null);
        }
 
        /**
         * Convert a {@code DataFrame} to a {@code MatrixObject}.
         *
-        * @param variableName
-        *            name of the variable associated with the matrix
         * @param dataFrame
         *            the Spark {@code DataFrame}
         * @param matrixMetadata
@@ -346,14 +330,14 @@ public class MLContextConversionUtil {
         * @return the {@code DataFrame} matrix converted to a converted to a
         *         {@code MatrixObject}
         */
-       public static MatrixObject dataFrameToMatrixObject(String variableName, 
Dataset<Row> dataFrame,
+       public static MatrixObject dataFrameToMatrixObject(Dataset<Row> 
dataFrame,
                        MatrixMetadata matrixMetadata) {
                matrixMetadata = (matrixMetadata != null) ? matrixMetadata : 
new MatrixMetadata();
                JavaPairRDD<MatrixIndexes, MatrixBlock> binaryBlock = 
dataFrameToMatrixBinaryBlocks(dataFrame, matrixMetadata);
-               MatrixObject mo = binaryBlocksToMatrixObject(variableName, 
binaryBlock, matrixMetadata, false);
+               MatrixObject mo = binaryBlocksToMatrixObject(binaryBlock, 
matrixMetadata, false);
                // keep lineage of original dataset to allow bypassing binary 
block
                // conversion if possible
-               mo.getRDDHandle().addLineageChild(new DatasetObject(dataFrame, 
variableName,
+               mo.getRDDHandle().addLineageChild(new DatasetObject(dataFrame,
                                isDataFrameWithIDColumn(matrixMetadata), 
isVectorBasedDataFrame(matrixMetadata)));
                return mo;
        }
@@ -361,22 +345,18 @@ public class MLContextConversionUtil {
        /**
         * Convert a {@code DataFrame} to a {@code FrameObject}.
         *
-        * @param variableName
-        *            name of the variable associated with the frame
         * @param dataFrame
         *            the Spark {@code DataFrame}
         * @return the {@code DataFrame} matrix converted to a converted to a
         *         {@code FrameObject}
         */
-       public static FrameObject dataFrameToFrameObject(String variableName, 
Dataset<Row> dataFrame) {
-               return dataFrameToFrameObject(variableName, dataFrame, null);
+       public static FrameObject dataFrameToFrameObject(Dataset<Row> 
dataFrame) {
+               return dataFrameToFrameObject(dataFrame, null);
        }
 
        /**
         * Convert a {@code DataFrame} to a {@code FrameObject}.
         *
-        * @param variableName
-        *            name of the variable associated with the frame
         * @param dataFrame
         *            the Spark {@code DataFrame}
         * @param frameMetadata
@@ -384,7 +364,7 @@ public class MLContextConversionUtil {
         * @return the {@code DataFrame} frame converted to a converted to a
         *         {@code FrameObject}
         */
-       public static FrameObject dataFrameToFrameObject(String variableName, 
Dataset<Row> dataFrame,
+       public static FrameObject dataFrameToFrameObject(Dataset<Row> dataFrame,
                        FrameMetadata frameMetadata) {
                try {
                        // setup meta data and java spark context
@@ -405,7 +385,7 @@ public class MLContextConversionUtil {
                        frameMetadata.setMatrixCharacteristics(mc); // required 
due to meta
                                                                                
                                // data copy
 
-                       return 
MLContextConversionUtil.binaryBlocksToFrameObject(variableName, binaryBlock, 
frameMetadata);
+                       return 
MLContextConversionUtil.binaryBlocksToFrameObject(binaryBlock, frameMetadata);
                } catch (DMLRuntimeException e) {
                        throw new MLContextException("Exception converting 
DataFrame to FrameObject", e);
                }
@@ -590,28 +570,24 @@ public class MLContextConversionUtil {
        /**
         * Convert a {@code JavaRDD<String>} in CSV format to a {@code 
MatrixObject}
         *
-        * @param variableName
-        *            name of the variable associated with the matrix
         * @param javaRDD
         *            the Java RDD of strings
         * @return the {@code JavaRDD<String>} converted to a {@code 
MatrixObject}
         */
-       public static MatrixObject javaRDDStringCSVToMatrixObject(String 
variableName, JavaRDD<String> javaRDD) {
-               return javaRDDStringCSVToMatrixObject(variableName, javaRDD, 
null);
+       public static MatrixObject 
javaRDDStringCSVToMatrixObject(JavaRDD<String> javaRDD) {
+               return javaRDDStringCSVToMatrixObject(javaRDD, null);
        }
 
        /**
         * Convert a {@code JavaRDD<String>} in CSV format to a {@code 
MatrixObject}
         *
-        * @param variableName
-        *            name of the variable associated with the matrix
         * @param javaRDD
         *            the Java RDD of strings
         * @param matrixMetadata
         *            matrix metadata
         * @return the {@code JavaRDD<String>} converted to a {@code 
MatrixObject}
         */
-       public static MatrixObject javaRDDStringCSVToMatrixObject(String 
variableName, JavaRDD<String> javaRDD,
+       public static MatrixObject 
javaRDDStringCSVToMatrixObject(JavaRDD<String> javaRDD,
                        MatrixMetadata matrixMetadata) {
                JavaPairRDD<LongWritable, Text> javaPairRDD = 
javaRDD.mapToPair(new ConvertStringToLongTextPair());
                MatrixCharacteristics mc = (matrixMetadata != null) ? 
matrixMetadata.asMatrixCharacteristics()
@@ -620,35 +596,31 @@ public class MLContextConversionUtil {
                MatrixObject matrixObject = new MatrixObject(ValueType.DOUBLE, 
OptimizerUtils.getUniqueTempFileName(),
                                new MetaDataFormat(mc, 
OutputInfo.CSVOutputInfo, InputInfo.CSVInputInfo));
                JavaPairRDD<LongWritable, Text> javaPairRDD2 = 
javaPairRDD.mapToPair(new CopyTextInputFunction());
-               matrixObject.setRDDHandle(new RDDObject(javaPairRDD2, 
variableName));
+               matrixObject.setRDDHandle(new RDDObject(javaPairRDD2));
                return matrixObject;
        }
 
        /**
         * Convert a {@code JavaRDD<String>} in CSV format to a {@code 
FrameObject}
         *
-        * @param variableName
-        *            name of the variable associated with the frame
         * @param javaRDD
         *            the Java RDD of strings
         * @return the {@code JavaRDD<String>} converted to a {@code 
FrameObject}
         */
-       public static FrameObject javaRDDStringCSVToFrameObject(String 
variableName, JavaRDD<String> javaRDD) {
-               return javaRDDStringCSVToFrameObject(variableName, javaRDD, 
null);
+       public static FrameObject javaRDDStringCSVToFrameObject(JavaRDD<String> 
javaRDD) {
+               return javaRDDStringCSVToFrameObject(javaRDD, null);
        }
 
        /**
         * Convert a {@code JavaRDD<String>} in CSV format to a {@code 
FrameObject}
         *
-        * @param variableName
-        *            name of the variable associated with the frame
         * @param javaRDD
         *            the Java RDD of strings
         * @param frameMetadata
         *            frame metadata
         * @return the {@code JavaRDD<String>} converted to a {@code 
FrameObject}
         */
-       public static FrameObject javaRDDStringCSVToFrameObject(String 
variableName, JavaRDD<String> javaRDD,
+       public static FrameObject javaRDDStringCSVToFrameObject(JavaRDD<String> 
javaRDD,
                        FrameMetadata frameMetadata) {
                JavaPairRDD<LongWritable, Text> javaPairRDD = 
javaRDD.mapToPair(new ConvertStringToLongTextPair());
                MatrixCharacteristics mc = (frameMetadata != null) ? 
frameMetadata.asMatrixCharacteristics()
@@ -666,7 +638,7 @@ public class MLContextConversionUtil {
                        e.printStackTrace();
                        return null;
                }
-               frameObject.setRDDHandle(new RDDObject(rdd, variableName));
+               frameObject.setRDDHandle(new RDDObject(rdd));
                return frameObject;
        }
 
@@ -674,15 +646,13 @@ public class MLContextConversionUtil {
         * Convert a {@code JavaRDD<String>} in IJV format to a {@code 
MatrixObject}
         * . Note that metadata is required for IJV format.
         *
-        * @param variableName
-        *            name of the variable associated with the matrix
         * @param javaRDD
         *            the Java RDD of strings
         * @param matrixMetadata
         *            matrix metadata
         * @return the {@code JavaRDD<String>} converted to a {@code 
MatrixObject}
         */
-       public static MatrixObject javaRDDStringIJVToMatrixObject(String 
variableName, JavaRDD<String> javaRDD,
+       public static MatrixObject 
javaRDDStringIJVToMatrixObject(JavaRDD<String> javaRDD,
                        MatrixMetadata matrixMetadata) {
                JavaPairRDD<LongWritable, Text> javaPairRDD = 
javaRDD.mapToPair(new ConvertStringToLongTextPair());
                MatrixCharacteristics mc = (matrixMetadata != null) ? 
matrixMetadata.asMatrixCharacteristics()
@@ -691,7 +661,7 @@ public class MLContextConversionUtil {
                MatrixObject matrixObject = new MatrixObject(ValueType.DOUBLE, 
OptimizerUtils.getUniqueTempFileName(),
                                new MetaDataFormat(mc, 
OutputInfo.TextCellOutputInfo, InputInfo.TextCellInputInfo));
                JavaPairRDD<LongWritable, Text> javaPairRDD2 = 
javaPairRDD.mapToPair(new CopyTextInputFunction());
-               matrixObject.setRDDHandle(new RDDObject(javaPairRDD2, 
variableName));
+               matrixObject.setRDDHandle(new RDDObject(javaPairRDD2));
                return matrixObject;
        }
 
@@ -699,15 +669,13 @@ public class MLContextConversionUtil {
         * Convert a {@code JavaRDD<String>} in IJV format to a {@code 
FrameObject}
         * . Note that metadata is required for IJV format.
         *
-        * @param variableName
-        *            name of the variable associated with the frame
         * @param javaRDD
         *            the Java RDD of strings
         * @param frameMetadata
         *            frame metadata
         * @return the {@code JavaRDD<String>} converted to a {@code 
FrameObject}
         */
-       public static FrameObject javaRDDStringIJVToFrameObject(String 
variableName, JavaRDD<String> javaRDD,
+       public static FrameObject javaRDDStringIJVToFrameObject(JavaRDD<String> 
javaRDD,
                        FrameMetadata frameMetadata) {
                JavaPairRDD<LongWritable, Text> javaPairRDD = 
javaRDD.mapToPair(new ConvertStringToLongTextPair());
                MatrixCharacteristics mc = (frameMetadata != null) ? 
frameMetadata.asMatrixCharacteristics()
@@ -728,108 +696,96 @@ public class MLContextConversionUtil {
                        e.printStackTrace();
                        return null;
                }
-               frameObject.setRDDHandle(new RDDObject(rdd, variableName));
+               frameObject.setRDDHandle(new RDDObject(rdd));
                return frameObject;
        }
 
        /**
         * Convert a {@code RDD<String>} in CSV format to a {@code MatrixObject}
         *
-        * @param variableName
-        *            name of the variable associated with the matrix
         * @param rdd
         *            the RDD of strings
         * @return the {@code RDD<String>} converted to a {@code MatrixObject}
         */
-       public static MatrixObject rddStringCSVToMatrixObject(String 
variableName, RDD<String> rdd) {
-               return rddStringCSVToMatrixObject(variableName, rdd, null);
+       public static MatrixObject rddStringCSVToMatrixObject(RDD<String> rdd) {
+               return rddStringCSVToMatrixObject(rdd, null);
        }
 
        /**
         * Convert a {@code RDD<String>} in CSV format to a {@code MatrixObject}
         *
-        * @param variableName
-        *            name of the variable associated with the matrix
         * @param rdd
         *            the RDD of strings
         * @param matrixMetadata
         *            matrix metadata
         * @return the {@code RDD<String>} converted to a {@code MatrixObject}
         */
-       public static MatrixObject rddStringCSVToMatrixObject(String 
variableName, RDD<String> rdd,
+       public static MatrixObject rddStringCSVToMatrixObject(RDD<String> rdd,
                        MatrixMetadata matrixMetadata) {
                ClassTag<String> tag = 
scala.reflect.ClassTag$.MODULE$.apply(String.class);
                JavaRDD<String> javaRDD = JavaRDD.fromRDD(rdd, tag);
-               return javaRDDStringCSVToMatrixObject(variableName, javaRDD, 
matrixMetadata);
+               return javaRDDStringCSVToMatrixObject(javaRDD, matrixMetadata);
        }
 
        /**
         * Convert a {@code RDD<String>} in CSV format to a {@code FrameObject}
         *
-        * @param variableName
-        *            name of the variable associated with the frame
         * @param rdd
         *            the RDD of strings
         * @return the {@code RDD<String>} converted to a {@code FrameObject}
         */
-       public static FrameObject rddStringCSVToFrameObject(String 
variableName, RDD<String> rdd) {
-               return rddStringCSVToFrameObject(variableName, rdd, null);
+       public static FrameObject rddStringCSVToFrameObject(RDD<String> rdd) {
+               return rddStringCSVToFrameObject(rdd, null);
        }
 
        /**
         * Convert a {@code RDD<String>} in CSV format to a {@code FrameObject}
         *
-        * @param variableName
-        *            name of the variable associated with the frame
         * @param rdd
         *            the RDD of strings
         * @param frameMetadata
         *            frame metadata
         * @return the {@code RDD<String>} converted to a {@code FrameObject}
         */
-       public static FrameObject rddStringCSVToFrameObject(String 
variableName, RDD<String> rdd,
+       public static FrameObject rddStringCSVToFrameObject(RDD<String> rdd,
                        FrameMetadata frameMetadata) {
                ClassTag<String> tag = 
scala.reflect.ClassTag$.MODULE$.apply(String.class);
                JavaRDD<String> javaRDD = JavaRDD.fromRDD(rdd, tag);
-               return javaRDDStringCSVToFrameObject(variableName, javaRDD, 
frameMetadata);
+               return javaRDDStringCSVToFrameObject(javaRDD, frameMetadata);
        }
 
        /**
         * Convert a {@code RDD<String>} in IJV format to a {@code 
MatrixObject}.
         * Note that metadata is required for IJV format.
         *
-        * @param variableName
-        *            name of the variable associated with the matrix
         * @param rdd
         *            the RDD of strings
         * @param matrixMetadata
         *            matrix metadata
         * @return the {@code RDD<String>} converted to a {@code MatrixObject}
         */
-       public static MatrixObject rddStringIJVToMatrixObject(String 
variableName, RDD<String> rdd,
+       public static MatrixObject rddStringIJVToMatrixObject(RDD<String> rdd,
                        MatrixMetadata matrixMetadata) {
                ClassTag<String> tag = 
scala.reflect.ClassTag$.MODULE$.apply(String.class);
                JavaRDD<String> javaRDD = JavaRDD.fromRDD(rdd, tag);
-               return javaRDDStringIJVToMatrixObject(variableName, javaRDD, 
matrixMetadata);
+               return javaRDDStringIJVToMatrixObject(javaRDD, matrixMetadata);
        }
 
        /**
         * Convert a {@code RDD<String>} in IJV format to a {@code FrameObject}.
         * Note that metadata is required for IJV format.
         *
-        * @param variableName
-        *            name of the variable associated with the frame
         * @param rdd
         *            the RDD of strings
         * @param frameMetadata
         *            frame metadata
         * @return the {@code RDD<String>} converted to a {@code FrameObject}
         */
-       public static FrameObject rddStringIJVToFrameObject(String 
variableName, RDD<String> rdd,
+       public static FrameObject rddStringIJVToFrameObject(RDD<String> rdd,
                        FrameMetadata frameMetadata) {
                ClassTag<String> tag = 
scala.reflect.ClassTag$.MODULE$.apply(String.class);
                JavaRDD<String> javaRDD = JavaRDD.fromRDD(rdd, tag);
-               return javaRDDStringIJVToFrameObject(variableName, javaRDD, 
frameMetadata);
+               return javaRDDStringIJVToFrameObject(javaRDD, frameMetadata);
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/systemml/blob/ffefd8e6/src/main/java/org/apache/sysml/api/mlcontext/MLContextUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/api/mlcontext/MLContextUtil.java 
b/src/main/java/org/apache/sysml/api/mlcontext/MLContextUtil.java
index 6c1788a..af99ab1 100644
--- a/src/main/java/org/apache/sysml/api/mlcontext/MLContextUtil.java
+++ b/src/main/java/org/apache/sysml/api/mlcontext/MLContextUtil.java
@@ -533,24 +533,24 @@ public final class MLContextUtil {
                        if (hasMatrixMetadata) {
                                MatrixMetadata matrixMetadata = 
(MatrixMetadata) metadata;
                                if (matrixMetadata.getMatrixFormat() == 
MatrixFormat.IJV) {
-                                       return 
MLContextConversionUtil.javaRDDStringIJVToMatrixObject(name, javaRDD, 
matrixMetadata);
+                                       return 
MLContextConversionUtil.javaRDDStringIJVToMatrixObject(javaRDD, matrixMetadata);
                                } else {
-                                       return 
MLContextConversionUtil.javaRDDStringCSVToMatrixObject(name, javaRDD, 
matrixMetadata);
+                                       return 
MLContextConversionUtil.javaRDDStringCSVToMatrixObject(javaRDD, matrixMetadata);
                                }
                        } else if (hasFrameMetadata) {
                                FrameMetadata frameMetadata = (FrameMetadata) 
metadata;
                                if (frameMetadata.getFrameFormat() == 
FrameFormat.IJV) {
-                                       return 
MLContextConversionUtil.javaRDDStringIJVToFrameObject(name, javaRDD, 
frameMetadata);
+                                       return 
MLContextConversionUtil.javaRDDStringIJVToFrameObject(javaRDD, frameMetadata);
                                } else {
-                                       return 
MLContextConversionUtil.javaRDDStringCSVToFrameObject(name, javaRDD, 
frameMetadata);
+                                       return 
MLContextConversionUtil.javaRDDStringCSVToFrameObject(javaRDD, frameMetadata);
                                }
                        } else if (!hasMetadata) {
                                String firstLine = javaRDD.first();
                                boolean isAllNumbers = 
isCSVLineAllNumbers(firstLine);
                                if (isAllNumbers) {
-                                       return 
MLContextConversionUtil.javaRDDStringCSVToMatrixObject(name, javaRDD);
+                                       return 
MLContextConversionUtil.javaRDDStringCSVToMatrixObject(javaRDD);
                                } else {
-                                       return 
MLContextConversionUtil.javaRDDStringCSVToFrameObject(name, javaRDD);
+                                       return 
MLContextConversionUtil.javaRDDStringCSVToFrameObject(javaRDD);
                                }
                        }
 
@@ -561,24 +561,24 @@ public final class MLContextUtil {
                        if (hasMatrixMetadata) {
                                MatrixMetadata matrixMetadata = 
(MatrixMetadata) metadata;
                                if (matrixMetadata.getMatrixFormat() == 
MatrixFormat.IJV) {
-                                       return 
MLContextConversionUtil.rddStringIJVToMatrixObject(name, rdd, matrixMetadata);
+                                       return 
MLContextConversionUtil.rddStringIJVToMatrixObject(rdd, matrixMetadata);
                                } else {
-                                       return 
MLContextConversionUtil.rddStringCSVToMatrixObject(name, rdd, matrixMetadata);
+                                       return 
MLContextConversionUtil.rddStringCSVToMatrixObject(rdd, matrixMetadata);
                                }
                        } else if (hasFrameMetadata) {
                                FrameMetadata frameMetadata = (FrameMetadata) 
metadata;
                                if (frameMetadata.getFrameFormat() == 
FrameFormat.IJV) {
-                                       return 
MLContextConversionUtil.rddStringIJVToFrameObject(name, rdd, frameMetadata);
+                                       return 
MLContextConversionUtil.rddStringIJVToFrameObject(rdd, frameMetadata);
                                } else {
-                                       return 
MLContextConversionUtil.rddStringCSVToFrameObject(name, rdd, frameMetadata);
+                                       return 
MLContextConversionUtil.rddStringCSVToFrameObject(rdd, frameMetadata);
                                }
                        } else if (!hasMetadata) {
                                String firstLine = rdd.first();
                                boolean isAllNumbers = 
isCSVLineAllNumbers(firstLine);
                                if (isAllNumbers) {
-                                       return 
MLContextConversionUtil.rddStringCSVToMatrixObject(name, rdd);
+                                       return 
MLContextConversionUtil.rddStringCSVToMatrixObject(rdd);
                                } else {
-                                       return 
MLContextConversionUtil.rddStringCSVToFrameObject(name, rdd);
+                                       return 
MLContextConversionUtil.rddStringCSVToFrameObject(rdd);
                                }
                        }
                } else if (value instanceof MatrixBlock) {
@@ -593,15 +593,15 @@ public final class MLContextUtil {
 
                        dataFrame = MLUtils.convertVectorColumnsToML(dataFrame);
                        if (hasMatrixMetadata) {
-                               return 
MLContextConversionUtil.dataFrameToMatrixObject(name, dataFrame, 
(MatrixMetadata) metadata);
+                               return 
MLContextConversionUtil.dataFrameToMatrixObject(dataFrame, (MatrixMetadata) 
metadata);
                        } else if (hasFrameMetadata) {
-                               return 
MLContextConversionUtil.dataFrameToFrameObject(name, dataFrame, (FrameMetadata) 
metadata);
+                               return 
MLContextConversionUtil.dataFrameToFrameObject(dataFrame, (FrameMetadata) 
metadata);
                        } else if (!hasMetadata) {
                                boolean looksLikeMatrix = 
doesDataFrameLookLikeMatrix(dataFrame);
                                if (looksLikeMatrix) {
-                                       return 
MLContextConversionUtil.dataFrameToMatrixObject(name, dataFrame);
+                                       return 
MLContextConversionUtil.dataFrameToMatrixObject(dataFrame);
                                } else {
-                                       return 
MLContextConversionUtil.dataFrameToFrameObject(name, dataFrame);
+                                       return 
MLContextConversionUtil.dataFrameToFrameObject(dataFrame);
                                }
                        }
                } else if (value instanceof Matrix) {
@@ -611,7 +611,7 @@ public final class MLContextUtil {
                                        metadata = matrix.getMatrixMetadata();
                                }
                                JavaPairRDD<MatrixIndexes, MatrixBlock> 
binaryBlocks = matrix.toBinaryBlocks();
-                               return 
MLContextConversionUtil.binaryBlocksToMatrixObject(name, binaryBlocks,
+                               return 
MLContextConversionUtil.binaryBlocksToMatrixObject(binaryBlocks,
                                                (MatrixMetadata) metadata);
                        } else {
                                return matrix.toMatrixObject();
@@ -623,7 +623,7 @@ public final class MLContextUtil {
                                        metadata = frame.getFrameMetadata();
                                }
                                JavaPairRDD<Long, FrameBlock> binaryBlocks = 
frame.toBinaryBlocks();
-                               return 
MLContextConversionUtil.binaryBlocksToFrameObject(name, binaryBlocks, 
(FrameMetadata) metadata);
+                               return 
MLContextConversionUtil.binaryBlocksToFrameObject(binaryBlocks, (FrameMetadata) 
metadata);
                        } else {
                                return frame.toFrameObject();
                        }
@@ -632,7 +632,7 @@ public final class MLContextUtil {
                        return 
MLContextConversionUtil.doubleMatrixToMatrixObject(name, doubleMatrix, 
(MatrixMetadata) metadata);
                } else if (value instanceof URL) {
                        URL url = (URL) value;
-                       return MLContextConversionUtil.urlToMatrixObject(name, 
url, (MatrixMetadata) metadata);
+                       return MLContextConversionUtil.urlToMatrixObject(url, 
(MatrixMetadata) metadata);
                } else if (value instanceof Integer) {
                        return new IntObject((Integer) value);
                } else if (value instanceof Double) {

http://git-wip-us.apache.org/repos/asf/systemml/blob/ffefd8e6/src/main/java/org/apache/sysml/runtime/controlprogram/ForProgramBlock.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/ForProgramBlock.java 
b/src/main/java/org/apache/sysml/runtime/controlprogram/ForProgramBlock.java
index ec7a16f..c28825e 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/ForProgramBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/ForProgramBlock.java
@@ -121,7 +121,7 @@ public class ForProgramBlock extends ProgramBlock
                        UpdateType[] flags = prepareUpdateInPlaceVariables(ec, 
_tid);
                        
                        // run for loop body for each instance of predicate 
sequence 
-                       SequenceIterator seqIter = new 
SequenceIterator(_iterPredVar, from, to, incr);
+                       SequenceIterator seqIter = new SequenceIterator(from, 
to, incr);
                        for( IntObject iterVar : seqIter ) 
                        {
                                //set iteration variable
@@ -131,7 +131,7 @@ public class ForProgramBlock extends ProgramBlock
                                for(int i=0 ; i < this._childBlocks.size() ; 
i++) {
                                        ec.updateDebugState( i );
                                        _childBlocks.get(i).execute(ec);
-                               }                               
+                               }
                        }
                        
                        // reset update-in-place variables
@@ -201,7 +201,7 @@ public class ForProgramBlock extends ProgramBlock
                if( tmp instanceof IntObject )
                        ret = (IntObject)tmp;
                else //downcast to int if necessary
-                       ret = new IntObject(tmp.getName(),tmp.getLongValue()); 
+                       ret = new IntObject(tmp.getLongValue()); 
                
                return ret;
        }
@@ -216,14 +216,12 @@ public class ForProgramBlock extends ProgramBlock
         */
        protected class SequenceIterator implements Iterator<IntObject>, 
Iterable<IntObject>
        {
-               private String _varName = null;
                private long _cur = -1;
                private long _to = -1;
                private long _incr = -1;
                private boolean _inuse = false;
                
-               protected SequenceIterator(String varName, IntObject from, 
IntObject to, IntObject incr) {
-                       _varName = varName;
+               protected SequenceIterator(IntObject from, IntObject to, 
IntObject incr) {
                        _cur = from.getLongValue();
                        _to = to.getLongValue();
                        _incr = incr.getLongValue();
@@ -236,7 +234,7 @@ public class ForProgramBlock extends ProgramBlock
 
                @Override
                public IntObject next() {
-                       IntObject ret = new IntObject( _varName, _cur );
+                       IntObject ret = new IntObject(_cur);
                        _cur += _incr; //update current val
                        return ret;
                }

http://git-wip-us.apache.org/repos/asf/systemml/blob/ffefd8e6/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java 
b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
index 760ddff..4775494 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
@@ -616,7 +616,7 @@ public class ParForProgramBlock extends ForProgramBlock
                        StatisticMonitor.putPFStat(_ID, 
Stat.PARFOR_INIT_DATA_T, time.stop());
                
                // initialize iter var to form value
-               IntObject iterVar = new IntObject(_iterPredVar, 
from.getLongValue() );
+               IntObject iterVar = new IntObject(from.getLongValue());
                
                ///////
                //begin PARALLEL EXECUTION of (PAR)FOR body
@@ -674,7 +674,7 @@ public class ParForProgramBlock extends ForProgramBlock
                cleanupSharedVariables(ec, varState);
                
                //set iteration var to TO value (+ increment) for FOR 
equivalence
-               iterVar = new IntObject(_iterPredVar, to.getLongValue()); 
//consistent with for
+               iterVar = new IntObject(to.getLongValue()); //consistent with 
for
                ec.setVariable(_iterPredVar, iterVar);
                
                //ensure that subsequent program blocks never see partitioned 
data (invalid plans!)
@@ -982,7 +982,7 @@ public class ParForProgramBlock extends ForProgramBlock
                OutputInfo inputOI = ((inputMatrix.getSparsity()<0.1 && 
inputDPF==PartitionFormat.COLUMN_WISE)
                        || (inputMatrix.getSparsity()<0.001 && 
inputDPF==PartitionFormat.ROW_WISE)) ?
                        OutputInfo.BinaryCellOutputInfo : 
OutputInfo.BinaryBlockOutputInfo;
-               RemoteParForJobReturn ret = RemoteDPParForMR.runJob(_ID, 
itervar.getName(), _colocatedDPMatrix, program, resultFile, 
+               RemoteParForJobReturn ret = RemoteDPParForMR.runJob(_ID, 
_iterPredVar, _colocatedDPMatrix, program, resultFile, 
                        inputMatrix, inputDPF, inputOI, _tSparseCol, 
_enableCPCaching, _numThreads, _replicationDP );
                
                if( _monitor )
@@ -1107,7 +1107,7 @@ public class ParForProgramBlock extends ForProgramBlock
                // Step 4) submit MR job (wait for finished work)
                //TODO runtime support for binary cell partitioning 
                OutputInfo inputOI = OutputInfo.BinaryBlockOutputInfo;
-               RemoteParForJobReturn ret = RemoteDPParForSpark.runJob(_ID, 
itervar.getName(), _colocatedDPMatrix, program,
+               RemoteParForJobReturn ret = RemoteDPParForSpark.runJob(_ID, 
_iterPredVar, _colocatedDPMatrix, program,
                        clsMap, resultFile, inputMatrix, ec, inputDPF, inputOI, 
_tSparseCol, _enableCPCaching, _numThreads );
                
                if( _monitor ) 
@@ -1273,10 +1273,10 @@ public class ParForProgramBlock extends ForProgramBlock
                                {
                                        case SCALAR:
                                                switch( valuetype ) {
-                                                       case BOOLEAN: dataObj = 
new BooleanObject(var,false); break;
-                                                       case INT:     dataObj = 
new IntObject(var,-1);        break;
-                                                       case DOUBLE:  dataObj = 
new DoubleObject(var,-1d);    break;
-                                                       case STRING:  dataObj = 
new StringObject(var,"-1");   break;
+                                                       case BOOLEAN: dataObj = 
new BooleanObject(false); break;
+                                                       case INT:     dataObj = 
new IntObject(-1);        break;
+                                                       case DOUBLE:  dataObj = 
new DoubleObject(-1d);    break;
+                                                       case STRING:  dataObj = 
new StringObject("-1");   break;
                                                        default:
                                                                throw new 
DMLRuntimeException("Value type not supported: "+valuetype);
                                                }

http://git-wip-us.apache.org/repos/asf/systemml/blob/ffefd8e6/src/main/java/org/apache/sysml/runtime/controlprogram/ProgramBlock.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/ProgramBlock.java 
b/src/main/java/org/apache/sysml/runtime/controlprogram/ProgramBlock.java
index 4ac9af0..34ba2fc 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/ProgramBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/ProgramBlock.java
@@ -235,10 +235,10 @@ public class ProgramBlock implements ParseInfo
                //check and correct scalar ret type (incl save double to int)
                if( ret.getValueType() != retType )
                        switch( retType ) {
-                               case BOOLEAN: ret = new 
BooleanObject(ret.getName(),ret.getBooleanValue()); break;
-                               case INT:         ret = new 
IntObject(ret.getName(),ret.getLongValue()); break;
-                               case DOUBLE:  ret = new 
DoubleObject(ret.getName(),ret.getDoubleValue()); break;
-                               case STRING:  ret = new 
StringObject(ret.getName(),ret.getStringValue()); break;
+                               case BOOLEAN: ret = new 
BooleanObject(ret.getBooleanValue()); break;
+                               case INT:         ret = new 
IntObject(ret.getLongValue()); break;
+                               case DOUBLE:  ret = new 
DoubleObject(ret.getDoubleValue()); break;
+                               case STRING:  ret = new 
StringObject(ret.getStringValue()); break;
                                default:
                                        //do nothing
                        }

http://git-wip-us.apache.org/repos/asf/systemml/blob/ffefd8e6/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java
index e41b06c..885fedb 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java
@@ -186,7 +186,6 @@ public abstract class CacheableData<T extends CacheBlock> 
extends Data
        // additional private flags and meta data
        private int     _numReadThreads = 0;   //number of threads for read 
from HDFS
        private boolean _cleanupFlag = true;   //flag if obj unpinned (cleanup 
enabled) 
-       private String  _varName = "";         //plan variable name
        private String  _cacheFileName = null; //local eviction file name
        private boolean _requiresLocalWrite = false; //flag if local write for 
read obj
        private boolean _isAcquireFromEmpty = false; //flag if read from status 
empty 
@@ -205,8 +204,8 @@ public abstract class CacheableData<T extends CacheBlock> 
extends Data
         * @param vt value type
         */
        protected CacheableData(DataType dt, ValueType vt) {
-               super (dt, vt);         
-               _uniqueID = (int)_seq.getNextID();              
+               super (dt, vt);
+               _uniqueID = (int)_seq.getNextID();
                _cacheStatus = CacheStatus.EMPTY;
                _numReadThreads = 0;
                _gpuObjects = new HashMap<>();
@@ -222,7 +221,6 @@ public abstract class CacheableData<T extends CacheBlock> 
extends Data
                _cleanupFlag = that._cleanupFlag;
                _hdfsFileName = that._hdfsFileName;
                _hdfsFileExists = that._hdfsFileExists; 
-               _varName = that._varName;
                _gpuObjects = that._gpuObjects;
        }
 
@@ -247,14 +245,6 @@ public abstract class CacheableData<T extends CacheBlock> 
extends Data
                return _cleanupFlag;
        }
 
-       public void setVarName(String s) {
-               _varName = s;
-       }
-
-       public String getVarName() {
-               return _varName;
-       }
-
        public boolean isHDFSFileExists() {
                return _hdfsFileExists;
        }
@@ -384,7 +374,7 @@ public abstract class CacheableData<T extends CacheBlock> 
extends Data
                throws CacheException
        {
                if( LOG.isTraceEnabled() )
-                       LOG.trace("Acquire read "+getVarName());
+                       LOG.trace("Acquire read "+hashCode());
                long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
                
                if ( !isAvailableToRead() )
@@ -443,7 +433,7 @@ public abstract class CacheableData<T extends CacheBlock> 
extends Data
                                setDirty(false);
                        }
                        catch (IOException e) {
-                               throw new CacheException("Reading of " + 
_hdfsFileName + " ("+getVarName()+") failed.", e);
+                               throw new CacheException("Reading of " + 
_hdfsFileName + " ("+hashCode()+") failed.", e);
                        }
                        
                        _isAcquireFromEmpty = true;
@@ -481,7 +471,7 @@ public abstract class CacheableData<T extends CacheBlock> 
extends Data
                throws CacheException
        {
                if( LOG.isTraceEnabled() )
-                       LOG.trace("Acquire modify "+getVarName());
+                       LOG.trace("Acquire modify "+hashCode());
                long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
                
                if ( !isAvailableToModify() )
@@ -505,7 +495,7 @@ public abstract class CacheableData<T extends CacheBlock> 
extends Data
                        }
                        catch (IOException e)
                        {
-                               throw new CacheException("Reading of " + 
_hdfsFileName + " ("+getVarName()+") failed.", e);
+                               throw new CacheException("Reading of " + 
_hdfsFileName + " ("+hashCode()+") failed.", e);
                        }
                }
 
@@ -544,7 +534,7 @@ public abstract class CacheableData<T extends CacheBlock> 
extends Data
                throws DMLRuntimeException
        {
                if( LOG.isTraceEnabled() )
-                       LOG.trace("Acquire modify newdata "+getVarName());
+                       LOG.trace("Acquire modify newdata "+hashCode());
                long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
                
                if (! isAvailableToModify ())
@@ -602,7 +592,7 @@ public abstract class CacheableData<T extends CacheBlock> 
extends Data
                throws CacheException
        {
                if( LOG.isTraceEnabled() )
-                       LOG.trace("Release "+getVarName());
+                       LOG.trace("Release "+hashCode());
                long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
                
                boolean write = false;
@@ -642,7 +632,7 @@ public abstract class CacheableData<T extends CacheBlock> 
extends Data
                                        }
                                }
                                catch (Exception e) {
-                                       throw new CacheException("Eviction to 
local path " + filePath + " ("+getVarName()+") failed.", e);
+                                       throw new CacheException("Eviction to 
local path " + filePath + " ("+hashCode()+") failed.", e);
                                }
                                _requiresLocalWrite = false;
                        }
@@ -652,7 +642,7 @@ public abstract class CacheableData<T extends CacheBlock> 
extends Data
                        _data = null;                   
                }
                else if( LOG.isTraceEnabled() ){
-                       LOG.trace("Var "+getVarName()+" not subject to caching, 
state="+getStatusAsString());
+                       LOG.trace("Var "+hashCode()+" not subject to caching, 
state="+getStatusAsString());
                }
 
                if( DMLScript.STATISTICS ){
@@ -677,7 +667,7 @@ public abstract class CacheableData<T extends CacheBlock> 
extends Data
                throws DMLRuntimeException
        {
                if( LOG.isTraceEnabled() )
-                       LOG.trace("Clear data "+getVarName());
+                       LOG.trace("Clear data "+hashCode());
                
                // check if cleanup enabled and possible 
                if( !isCleanupEnabled() ) 
@@ -774,7 +764,7 @@ public abstract class CacheableData<T extends CacheBlock> 
extends Data
                throws CacheException
        {
                if( LOG.isTraceEnabled() )
-                       LOG.trace("Export data "+getVarName()+" "+fName);
+                       LOG.trace("Export data "+hashCode()+" "+fName);
                long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
                
                //prevent concurrent modifications
@@ -826,7 +816,7 @@ public abstract class CacheableData<T extends CacheBlock> 
extends Data
                                }
                                catch (IOException e)
                                {
-                                   throw new CacheException("Reading of " + 
_hdfsFileName + " ("+getVarName()+") failed.", e);
+                                   throw new CacheException("Reading of " + 
_hdfsFileName + " ("+hashCode()+") failed.", e);
                                }
                        }
                        //get object from cache
@@ -926,7 +916,7 @@ public abstract class CacheableData<T extends CacheBlock> 
extends Data
                long begin = LOG.isTraceEnabled() ? System.currentTimeMillis() 
: 0;
                
                if( LOG.isTraceEnabled() )
-                       LOG.trace ("CACHE: Restoring matrix...  " + 
getVarName() + "  HDFS path: " + 
+                       LOG.trace ("CACHE: Restoring matrix...  " + hashCode() 
+ "  HDFS path: " + 
                                                (_hdfsFileName == null ? "null" 
: _hdfsFileName) + ", Restore from path: " + cacheFilePathAndName);
                                
                if (_data != null)
@@ -959,7 +949,7 @@ public abstract class CacheableData<T extends CacheBlock> 
extends Data
                String cacheFilePathAndName = getCacheFilePathAndName();
                long begin = LOG.isTraceEnabled() ? System.currentTimeMillis() 
: 0;
                if( LOG.isTraceEnabled() )
-                       LOG.trace("CACHE: Freeing evicted matrix...  " + 
getVarName() + "  HDFS path: " + 
+                       LOG.trace("CACHE: Freeing evicted matrix...  " + 
hashCode() + "  HDFS path: " + 
                                                (_hdfsFileName == null ? "null" 
: _hdfsFileName) + " Eviction path: " + cacheFilePathAndName);
                
                LazyWriteBuffer.deleteBlock(cacheFilePathAndName);
@@ -982,7 +972,7 @@ public abstract class CacheableData<T extends CacheBlock> 
extends Data
                String debugNameEnding = (_hdfsFileName == null ? "null" : 
                        (_hdfsFileName.length() < maxLength ? _hdfsFileName : 
"..." + 
                                _hdfsFileName.substring (_hdfsFileName.length() 
- maxLength + 3)));
-               return getVarName() + " " + debugNameEnding;
+               return hashCode() + " " + debugNameEnding;
        }
 
        protected T readBlobFromHDFS(String fname) 

http://git-wip-us.apache.org/repos/asf/systemml/blob/ffefd8e6/src/main/java/org/apache/sysml/runtime/controlprogram/caching/FrameObject.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/FrameObject.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/FrameObject.java
index b594777..10e924f 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/FrameObject.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/FrameObject.java
@@ -223,11 +223,10 @@ public class FrameObject extends CacheableData<FrameBlock>
                catch(DMLRuntimeException ex) {
                        throw new IOException(ex);
                }
-                               
+               
                //sanity check correct output
-               if( fb == null ) {
-                       throw new IOException("Unable to load frame from rdd: 
"+lrdd.getVarName());
-               }
+               if( fb == null )
+                       throw new IOException("Unable to load frame from rdd.");
                
                return fb;
        }

http://git-wip-us.apache.org/repos/asf/systemml/blob/ffefd8e6/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java
index 6486b02..0aaca69 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java
@@ -99,7 +99,7 @@ public class MatrixObject extends CacheableData<MatrixBlock>
        public MatrixObject( ValueType vt, String file, MetaData mtd ) {
                super (DataType.MATRIX, vt);
                _metaData = mtd; 
-               _hdfsFileName = file;           
+               _hdfsFileName = file;
                _cache = null;
                _data = null;
        }
@@ -239,7 +239,7 @@ public class MatrixObject extends CacheableData<MatrixBlock>
                throws CacheException
        {
                if( LOG.isTraceEnabled() )
-                       LOG.trace("Acquire partition "+getVarName()+" "+pred);
+                       LOG.trace("Acquire partition "+hashCode()+" "+pred);
                long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
                
                if ( !_partitioned )
@@ -424,7 +424,7 @@ public class MatrixObject extends CacheableData<MatrixBlock>
                long begin = 0;
                
                if( LOG.isTraceEnabled() ) {
-                       LOG.trace("Reading matrix from HDFS...  " + 
getVarName() + "  Path: " + fname 
+                       LOG.trace("Reading matrix from HDFS...  " + hashCode() 
+ "  Path: " + fname 
                                        + ", dimensions: [" + mc.getRows() + ", 
" + mc.getCols() + ", " + mc.getNonZeros() + "]");
                        begin = System.currentTimeMillis();
                }
@@ -504,9 +504,8 @@ public class MatrixObject extends CacheableData<MatrixBlock>
                }
                
                //sanity check correct output
-               if( mb == null ) {
-                       throw new IOException("Unable to load matrix from rdd: 
"+lrdd.getVarName());
-               }
+               if( mb == null )
+                       throw new IOException("Unable to load matrix from 
rdd.");
                
                return mb;
        }
@@ -520,7 +519,7 @@ public class MatrixObject extends CacheableData<MatrixBlock>
        {
                long begin = 0;
                if( LOG.isTraceEnabled() ){
-                       LOG.trace (" Writing matrix to HDFS...  " + 
getVarName() + "  Path: " + fname + ", Format: " +
+                       LOG.trace (" Writing matrix to HDFS...  " + hashCode() 
+ "  Path: " + fname + ", Format: " +
                                                (ofmt != null ? ofmt : 
"inferred from metadata"));
                        begin = System.currentTimeMillis();
                }

http://git-wip-us.apache.org/repos/asf/systemml/blob/ffefd8e6/src/main/java/org/apache/sysml/runtime/controlprogram/context/ExecutionContext.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/context/ExecutionContext.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/context/ExecutionContext.java
index 3b9c075..67e91b0 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/context/ExecutionContext.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/context/ExecutionContext.java
@@ -518,12 +518,9 @@ public class ExecutionContext {
                throws DMLRuntimeException 
        {
                FrameObject fo = getFrameObject(varName);
-               if( outputData.getNumColumns()>0 && 
outputData.getSchema()!=null )
-                       fo.setValueType(outputData.getSchema()[0]);
                fo.acquireModify(outputData);
                fo.release();
-                   
-           setVariable(varName, fo);
+               setVariable(varName, fo);
        }
        
        /**

http://git-wip-us.apache.org/repos/asf/systemml/blob/ffefd8e6/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
index be95164..467b6fc 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
@@ -369,7 +369,7 @@ public class SparkExecutionContext extends ExecutionContext
                        }
 
                        //keep rdd handle for future operations on it
-                       RDDObject rddhandle = new RDDObject(rdd, 
mo.getVarName());
+                       RDDObject rddhandle = new RDDObject(rdd);
                        rddhandle.setHDFSFile(fromFile);
                        mo.setRDDHandle(rddhandle);
                }
@@ -397,7 +397,7 @@ public class SparkExecutionContext extends ExecutionContext
                        }
 
                        //keep rdd handle for future operations on it
-                       RDDObject rddhandle = new RDDObject(rdd, 
mo.getVarName());
+                       RDDObject rddhandle = new RDDObject(rdd);
                        rddhandle.setHDFSFile(true);
                        mo.setRDDHandle(rddhandle);
                }
@@ -461,7 +461,7 @@ public class SparkExecutionContext extends ExecutionContext
                        }
 
                        //keep rdd handle for future operations on it
-                       RDDObject rddhandle = new RDDObject(rdd, 
fo.getVarName());
+                       RDDObject rddhandle = new RDDObject(rdd);
                        rddhandle.setHDFSFile(fromFile);
                        fo.setRDDHandle(rddhandle);
                }
@@ -488,7 +488,7 @@ public class SparkExecutionContext extends ExecutionContext
                        }
 
                        //keep rdd handle for future operations on it
-                       RDDObject rddhandle = new RDDObject(rdd, 
fo.getVarName());
+                       RDDObject rddhandle = new RDDObject(rdd);
                        rddhandle.setHDFSFile(true);
                        fo.setRDDHandle(rddhandle);
                }
@@ -560,7 +560,7 @@ public class SparkExecutionContext extends ExecutionContext
                        }
                        
                        bret = new PartitionedBroadcast<>(ret);
-                       BroadcastObject<MatrixBlock> bchandle = new 
BroadcastObject<>(bret, varname,
+                       BroadcastObject<MatrixBlock> bchandle = new 
BroadcastObject<>(bret,
                                        
OptimizerUtils.estimatePartitionedSizeExactSparsity(mo.getMatrixCharacteristics()));
                        mo.setBroadcastHandle(bchandle);
                        CacheableData.addBroadcastSize(bchandle.getSize());
@@ -630,7 +630,7 @@ public class SparkExecutionContext extends ExecutionContext
                        }
 
                        bret = new PartitionedBroadcast<>(ret);
-                       BroadcastObject<FrameBlock> bchandle = new 
BroadcastObject<>(bret, varname,
+                       BroadcastObject<FrameBlock> bchandle = new 
BroadcastObject<>(bret,
                                        
OptimizerUtils.estimatePartitionedSizeExactSparsity(fo.getMatrixCharacteristics()));
                        fo.setBroadcastHandle(bchandle);
                        CacheableData.addBroadcastSize(bchandle.getSize());
@@ -656,7 +656,7 @@ public class SparkExecutionContext extends ExecutionContext
                throws DMLRuntimeException
        {
                CacheableData<?> obj = getCacheableData(varname);
-               RDDObject rddhandle = new RDDObject(rdd, varname);
+               RDDObject rddhandle = new RDDObject(rdd);
                obj.setRDDHandle( rddhandle );
        }
 
@@ -1236,10 +1236,10 @@ public class SparkExecutionContext extends 
ExecutionContext
                   .count(); //trigger caching to prevent contention
 
                //create new rdd handle, in-place of current matrix object
-               RDDObject inro =  mo.getRDDHandle();       //guaranteed to 
exist (see above)
-               RDDObject outro = new RDDObject(out, var); //create new rdd 
object
-               outro.setCheckpointRDD(true);              //mark as 
checkpointed
-               outro.addLineageChild(inro);               //keep lineage to 
prevent cycles on cleanup
+               RDDObject inro =  mo.getRDDHandle();  //guaranteed to exist 
(see above)
+               RDDObject outro = new RDDObject(out); //create new rdd object
+               outro.setCheckpointRDD(true);         //mark as checkpointed
+               outro.addLineageChild(inro);          //keep lineage to prevent 
cycles on cleanup
                mo.setRDDHandle(outro);
        }
 

http://git-wip-us.apache.org/repos/asf/systemml/blob/ffefd8e6/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitioner.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitioner.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitioner.java
index 69d2a70..afb2c71 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitioner.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitioner.java
@@ -23,8 +23,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 import org.apache.sysml.hops.Hop;
-import org.apache.sysml.parser.Expression.DataType;
-import org.apache.sysml.parser.Expression.ValueType;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import 
org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat;
 import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
@@ -71,12 +69,7 @@ public abstract class DataPartitioner
        public MatrixObject createPartitionedMatrixObject( MatrixObject in, 
String fnameNew, boolean force )
                throws DMLRuntimeException
        {
-               ValueType vt = in.getValueType();
-               String varname = in.getVarName();
-               MatrixObject out = new MatrixObject(vt, fnameNew );
-               out.setDataType( DataType.MATRIX );
-               out.setVarName( varname+NAME_SUFFIX );          
-               
+               MatrixObject out = new MatrixObject(in.getValueType(), 
fnameNew);
                return createPartitionedMatrixObject(in, out, force);
        }
        

http://git-wip-us.apache.org/repos/asf/systemml/blob/ffefd8e6/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ParWorker.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ParWorker.java 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ParWorker.java
index 05872b1..281ce07 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ParWorker.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ParWorker.java
@@ -145,7 +145,7 @@ public abstract class ParWorker
                throws DMLRuntimeException 
        {
                //monitoring start
-               Timing time1 = null, time2 = null;              
+               Timing time1 = null, time2 = null;
                if( _monitor )
                {
                        time1 = new Timing(true); 
@@ -155,12 +155,13 @@ public abstract class ParWorker
                //core execution
 
                //foreach iteration in task, execute iteration body
+               String lVarName = task.getVarName();
                for( IntObject indexVal : task.getIterations() )
                {
                        //System.out.println(" EXECUTE ITERATION: 
"+indexVal.getName()+"="+indexVal.getIntValue());
                        
                        //set index values
-                       _ec.setVariable(indexVal.getName(), indexVal);
+                       _ec.setVariable(lVarName, indexVal);
                        
                        // for each program block
                        for (ProgramBlock pb : _childBlocks)
@@ -186,7 +187,7 @@ public abstract class ParWorker
                throws DMLRuntimeException 
        {
                //monitoring start
-               Timing time1 = null, time2 = null;              
+               Timing time1 = null, time2 = null;
                if( _monitor )
                {
                        time1 = new Timing(true); 
@@ -195,7 +196,7 @@ public abstract class ParWorker
                
                //core execution
                List<IntObject> tmp = task.getIterations();
-               String lVarName = tmp.get(0).getName();
+               String lVarName = task.getVarName();
                long lFrom      = tmp.get(0).getLongValue();
                long lTo        = tmp.get(1).getLongValue();
                long lIncr      = tmp.get(2).getLongValue();
@@ -203,7 +204,7 @@ public abstract class ParWorker
                for( long i=lFrom; i<=lTo; i+=lIncr )
                {
                        //set index values
-                       _ec.setVariable(lVarName, new IntObject(lVarName,i));
+                       _ec.setVariable(lVarName, new IntObject(i));
                        
                        // for each program block
                        for (ProgramBlock pb : _childBlocks)

http://git-wip-us.apache.org/repos/asf/systemml/blob/ffefd8e6/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java
index 860c7b6..3e4568b 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java
@@ -1718,19 +1718,16 @@ public class ProgramConverter
                                switch ( valuetype )
                                {
                                        case INT:
-                                               long value1 = 
Long.parseLong(valString);
-                                               dat = new 
IntObject(name,value1);
+                                               dat = new 
IntObject(Long.parseLong(valString));
                                                break;
                                        case DOUBLE:
-                                               double value2 = 
Double.parseDouble(valString);
-                                               dat = new 
DoubleObject(name,value2);
+                                               dat = new 
DoubleObject(Double.parseDouble(valString));
                                                break;
                                        case BOOLEAN:
-                                               boolean value3 = 
Boolean.parseBoolean(valString);
-                                               dat = new 
BooleanObject(name,value3);
+                                               dat = new 
BooleanObject(Boolean.parseBoolean(valString));
                                                break;
                                        case STRING:
-                                               dat = new 
StringObject(name,valString);
+                                               dat = new 
StringObject(valString);
                                                break;
                                        default:
                                                throw new 
DMLRuntimeException("Unable to parse valuetype "+valuetype);
@@ -1752,7 +1749,6 @@ public class ProgramConverter
                                MatrixCharacteristics mc = new 
MatrixCharacteristics(rows, cols, brows, bcols, nnz); 
                                MetaDataFormat md = new MetaDataFormat( mc, 
oin, iin );
                                mo.setMetaData( md );
-                               mo.setVarName( name );
                                if( partFormat._dpf != 
PDataPartitionFormat.NONE )
                                        mo.setPartitioned( partFormat._dpf, 
partFormat._N );
                                mo.setUpdateType(inplace);

http://git-wip-us.apache.org/repos/asf/systemml/blob/ffefd8e6/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
index 367cc8b..9f2658c 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
@@ -121,8 +121,8 @@ public class RemoteDPParForSparkWorker extends ParWorker 
implements PairFlatMapF
                        mo.setInMemoryPartition( partition );
                        
                        //create tasks for input data
-                       Task lTask = new Task(TaskType.SET);
-                       lTask.addIteration( new IntObject(_iterVar, larg._1()) 
);
+                       Task lTask = new Task(_iterVar, TaskType.SET);
+                       lTask.addIteration( new IntObject(larg._1()) );
                        
                        //execute program
                        long numIter = getExecutedIterations();

http://git-wip-us.apache.org/repos/asf/systemml/blob/ffefd8e6/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParWorkerReducer.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParWorkerReducer.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParWorkerReducer.java
index 3dda64c..4c05791 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParWorkerReducer.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParWorkerReducer.java
@@ -99,8 +99,8 @@ public class RemoteDPParWorkerReducer extends ParWorker
                        mo.setInMemoryPartition( _partition );
                        
                        //create tasks for input data
-                       Task lTask = new Task(TaskType.SET);
-                       lTask.addIteration( new IntObject(_iterVar,key.get()) );
+                       Task lTask = new Task(_iterVar, TaskType.SET);
+                       lTask.addIteration( new IntObject(key.get()) );
                        
                        //execute program
                        executeTask( lTask );

http://git-wip-us.apache.org/repos/asf/systemml/blob/ffefd8e6/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalFile.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalFile.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalFile.java
index 253883b..f245cbf 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalFile.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalFile.java
@@ -41,8 +41,6 @@ import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.TextInputFormat;
 import org.apache.sysml.conf.ConfigurationManager;
-import org.apache.sysml.parser.Expression.DataType;
-import org.apache.sysml.parser.Expression.ValueType;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.caching.CacheException;
 import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
@@ -92,17 +90,13 @@ public class ResultMergeLocalFile extends ResultMerge
                throws DMLRuntimeException 
        {
                MatrixObject moNew = null; //always create new matrix object 
(required for nested parallelism)
-
-               //Timing time = null;
-               LOG.trace("ResultMerge (local, file): Execute serial merge for 
output "+_output.getVarName()+" (fname="+_output.getFileName()+")");
-               //      time = new Timing();
-               //      time.start();
-
+               
+               if( LOG.isTraceEnabled() )
+               LOG.trace("ResultMerge (local, file): Execute serial merge for 
output "
+                       +_output.hashCode()+" 
(fname="+_output.getFileName()+")");
                
                try
                {
-                       
-                       
                        //collect all relevant inputs
                        ArrayList<MatrixObject> inMO = new ArrayList<>();
                        for( MatrixObject in : _inputs )
@@ -155,13 +149,8 @@ public class ResultMergeLocalFile extends ResultMerge
        private MatrixObject createNewMatrixObject(MatrixObject output, 
ArrayList<MatrixObject> inMO ) 
                throws DMLRuntimeException
        {
-               String varName = _output.getVarName();
-               ValueType vt = _output.getValueType();
                MetaDataFormat metadata = (MetaDataFormat) 
_output.getMetaData();
-               
-               MatrixObject moNew = new MatrixObject( vt, _outputFName );
-               moNew.setVarName( varName.contains(NAME_SUFFIX) ? varName : 
varName+NAME_SUFFIX );
-               moNew.setDataType( DataType.MATRIX );
+               MatrixObject moNew = new MatrixObject( _output.getValueType(), 
_outputFName );
                
                //create deep copy of metadata obj
                MatrixCharacteristics mcOld = 
metadata.getMatrixCharacteristics();
@@ -231,7 +220,9 @@ public class ResultMergeLocalFile extends ResultMerge
                        {
                                for( MatrixObject in : inMO ) //read/write all 
inputs
                                {
-                                       LOG.trace("ResultMerge (local, file): 
Merge input "+in.getVarName()+" (fname="+in.getFileName()+") via stream merge");
+                                       if( LOG.isTraceEnabled() )
+                                               LOG.trace("ResultMerge (local, 
file): Merge input "+in.hashCode()+" (fname="
+                                                       +in.getFileName()+") 
via stream merge");
                                        
                                        JobConf tmpJob = new 
JobConf(ConfigurationManager.getCachedJobConf());
                                        Path tmpPath = new 
Path(in.getFileName());
@@ -282,13 +273,16 @@ public class ResultMergeLocalFile extends ResultMerge
                        MapReduceTool.deleteFileIfExistOnHDFS(fnameNew);
                        
                        //Step 0) write compare blocks to staging area (if 
necessary)
-                       LOG.trace("ResultMerge (local, file): Create merge 
compare matrix for output "+outMo.getVarName()+" 
(fname="+outMo.getFileName()+")");
+                       if( LOG.isTraceEnabled() )
+                               LOG.trace("ResultMerge (local, file): Create 
merge compare matrix for output "
+                                       +outMo.hashCode()+" 
(fname="+outMo.getFileName()+")");
                        createTextCellStagingFile(fnameStagingCompare, outMo, 
0);
                        
                        //Step 1) read and write blocks to staging area
                        for( MatrixObject in : inMO )
                        {
-                               LOG.trace("ResultMerge (local, file): Merge 
input "+in.getVarName()+" (fname="+in.getFileName()+")");
+                               if( LOG.isTraceEnabled() )
+                                       LOG.trace("ResultMerge (local, file): 
Merge input "+in.hashCode()+" (fname="+in.getFileName()+")");
                                
                                long ID = _seq.getNextID();
                                createTextCellStagingFile( fnameStaging, in, ID 
);
@@ -334,7 +328,9 @@ public class ResultMergeLocalFile extends ResultMerge
                        {
                                for( MatrixObject in : inMO ) //read/write all 
inputs
                                {
-                                       LOG.trace("ResultMerge (local, file): 
Merge input "+in.getVarName()+" (fname="+in.getFileName()+") via stream merge");
+                                       if( LOG.isTraceEnabled() )
+                                               LOG.trace("ResultMerge (local, 
file): Merge input "
+                                                       +in.hashCode()+" 
(fname="+in.getFileName()+") via stream merge");
                                        
                                        JobConf tmpJob = new 
JobConf(ConfigurationManager.getCachedJobConf());
                                        Path tmpPath = new 
Path(in.getFileName());
@@ -377,13 +373,16 @@ public class ResultMergeLocalFile extends ResultMerge
                        MapReduceTool.deleteFileIfExistOnHDFS(fnameNew);
                        
                        //Step 0) write compare blocks to staging area (if 
necessary)
-                       LOG.trace("ResultMerge (local, file): Create merge 
compare matrix for output "+outMo.getVarName()+" 
(fname="+outMo.getFileName()+")");
+                       if( LOG.isTraceEnabled() )
+                               LOG.trace("ResultMerge (local, file): Create 
merge compare matrix for output "
+                                       +outMo.hashCode()+" 
(fname="+outMo.getFileName()+")");
                        createBinaryCellStagingFile(fnameStagingCompare, outMo, 
0);
                        
                        //Step 1) read and write blocks to staging area
                        for( MatrixObject in : inMO )
                        {
-                               LOG.trace("ResultMerge (local, file): Merge 
input "+in.getVarName()+" (fname="+in.getFileName()+")");
+                               if( LOG.isTraceEnabled() )
+                                       LOG.trace("ResultMerge (local, file): 
Merge input "+in.hashCode()+" (fname="+in.getFileName()+")");
                                
                                long ID = _seq.getNextID();
                                createBinaryCellStagingFile( fnameStaging, in, 
ID );
@@ -414,7 +413,8 @@ public class ResultMergeLocalFile extends ResultMerge
                        //Step 1) read and write blocks to staging area
                        for( MatrixObject in : inMO )
                        {
-                               LOG.trace("ResultMerge (local, file): Merge 
input "+in.getVarName()+" (fname="+in.getFileName()+")");                       
    
+                               if( LOG.isTraceEnabled() )
+                                       LOG.trace("ResultMerge (local, file): 
Merge input "+in.hashCode()+" (fname="+in.getFileName()+")");
                                
                                createBinaryBlockStagingFile( fnameStaging, in 
);
                        }
@@ -442,14 +442,17 @@ public class ResultMergeLocalFile extends ResultMerge
                        MapReduceTool.deleteFileIfExistOnHDFS(fnameNew);
                        
                        //Step 0) write compare blocks to staging area (if 
necessary)
-                       LOG.trace("ResultMerge (local, file): Create merge 
compare matrix for output "+outMo.getVarName()+" 
(fname="+outMo.getFileName()+")");                  
+                       if( LOG.isTraceEnabled() )
+                               LOG.trace("ResultMerge (local, file): Create 
merge compare matrix for output "
+                                       +outMo.hashCode()+" 
(fname="+outMo.getFileName()+")");
                        
                        createBinaryBlockStagingFile(fnameStagingCompare, 
outMo);
                        
                        //Step 1) read and write blocks to staging area
                        for( MatrixObject in : inMO )
                        {
-                               LOG.trace("ResultMerge (local, file): Merge 
input "+in.getVarName()+" (fname="+in.getFileName()+")");           
+                               if( LOG.isTraceEnabled() )
+                                       LOG.trace("ResultMerge (local, file): 
Merge input "+in.hashCode()+" (fname="+in.getFileName()+")");
                                createBinaryBlockStagingFile( fnameStaging, in 
);
                        }
        
@@ -1017,8 +1020,10 @@ public class ResultMergeLocalFile extends ResultMerge
                //merge in all input matrix objects
                IDSequence seq = new IDSequence();
                for( MatrixObject in : inMO )
-               {                       
-                       LOG.trace("ResultMerge (local, file): Merge input 
"+in.getVarName()+" (fname="+in.getFileName()+") via file rename.");
+               {
+                       if( LOG.isTraceEnabled() )
+                               LOG.trace("ResultMerge (local, file): Merge 
input "+in.hashCode()
+                                       +" (fname="+in.getFileName()+") via 
file rename.");
                        
                        //copy over files (just rename file or entire dir)
                        Path tmpPath = new Path(in.getFileName());

http://git-wip-us.apache.org/repos/asf/systemml/blob/ffefd8e6/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalMemory.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalMemory.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalMemory.java
index 5fcfb26..a31294e 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalMemory.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalMemory.java
@@ -21,7 +21,6 @@ package org.apache.sysml.runtime.controlprogram.parfor;
 
 import java.util.ArrayList;
 
-import org.apache.sysml.parser.Expression.DataType;
 import org.apache.sysml.parser.Expression.ValueType;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
@@ -56,9 +55,11 @@ public class ResultMergeLocalMemory extends ResultMerge
                throws DMLRuntimeException
        {
                MatrixObject moNew = null; //always create new matrix object 
(required for nested parallelism)
-
-               LOG.trace("ResultMerge (local, in-memory): Execute serial merge 
for output "+_output.getVarName()+" (fname="+_output.getFileName()+")");
-                               
+               
+               if( LOG.isTraceEnabled() )
+                       LOG.trace("ResultMerge (local, in-memory): Execute 
serial merge for output "
+                               +_output.hashCode()+" 
(fname="+_output.getFileName()+")");
+               
                try
                {
                        //get old output matrix from cache for compare
@@ -83,7 +84,8 @@ public class ResultMergeLocalMemory extends ResultMerge
                                //check for empty inputs (no iterations 
executed)
                                if( in != null && in != _output ) 
                                {
-                                       LOG.trace("ResultMerge (local, 
in-memory): Merge input "+in.getVarName()+" (fname="+in.getFileName()+")");
+                                       if( LOG.isTraceEnabled() )
+                                               LOG.trace("ResultMerge (local, 
in-memory): Merge input "+in.hashCode()+" (fname="+in.getFileName()+")");
                                        
                                        //read/pin input_i
                                        MatrixBlock inMB = in.acquireRead();    
@@ -145,7 +147,9 @@ public class ResultMergeLocalMemory extends ResultMerge
        {               
                MatrixObject moNew = null; //always create new matrix object 
(required for nested parallelism)
                
-               LOG.trace("ResultMerge (local, in-memory): Execute parallel 
(par="+par+") merge for output "+_output.getVarName()+" 
(fname="+_output.getFileName()+")");
+               if( LOG.isTraceEnabled() )
+                       LOG.trace("ResultMerge (local, in-memory): Execute 
parallel (par="+par+") "
+                               + "merge for output "+_output.hashCode()+" 
(fname="+_output.getFileName()+")");
                
                try
                {
@@ -205,7 +209,7 @@ public class ResultMergeLocalMemory extends ResultMerge
                        }
                        
                        //release old output, and all inputs
-                       _output.release();                      
+                       _output.release();
                }
                catch(Exception ex) {
                        throw new DMLRuntimeException(ex);
@@ -213,7 +217,7 @@ public class ResultMergeLocalMemory extends ResultMerge
                
                //LOG.trace("ResultMerge (local, in-memory): Executed parallel 
(par="+par+") merge for output "+_output.getVarName()+" 
(fname="+_output.getFileName()+") in "+time.stop()+"ms");
 
-               return moNew;           
+               return moNew;
        }
 
        private static double[][] createCompareMatrix( MatrixBlock output ) {
@@ -226,13 +230,9 @@ public class ResultMergeLocalMemory extends ResultMerge
        private MatrixObject createNewMatrixObject( MatrixBlock data ) 
                throws DMLRuntimeException
        {
-               String varName = _output.getVarName();
                ValueType vt = _output.getValueType();
                MetaDataFormat metadata = (MetaDataFormat) 
_output.getMetaData();
-               
                MatrixObject moNew = new MatrixObject( vt, _outputFName );
-               moNew.setVarName( varName.contains(NAME_SUFFIX) ? varName : 
varName+NAME_SUFFIX );
-               moNew.setDataType( DataType.MATRIX );
                
                //create deep copy of metadata obj
                MatrixCharacteristics mcOld = 
metadata.getMatrixCharacteristics();
@@ -315,7 +315,7 @@ public class ResultMergeLocalMemory extends ResultMerge
                        //read each input if required
                        try
                        {
-                               LOG.trace("ResultMerge (local, in-memory): 
Merge input "+_inMO.getVarName()+" (fname="+_inMO.getFileName()+")");
+                               LOG.trace("ResultMerge (local, in-memory): 
Merge input "+_inMO.hashCode()+" (fname="+_inMO.getFileName()+")");
                                
                                MatrixBlock inMB = _inMO.acquireRead(); //incl. 
implicit read from HDFS
                                merge( _outMB, inMB, false );

http://git-wip-us.apache.org/repos/asf/systemml/blob/ffefd8e6/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteMR.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteMR.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteMR.java
index 76cbd40..7ea1543 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteMR.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteMR.java
@@ -34,8 +34,6 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.sysml.api.DMLScript;
 import org.apache.sysml.conf.ConfigurationManager;
 import org.apache.sysml.conf.DMLConfig;
-import org.apache.sysml.parser.Expression.DataType;
-import org.apache.sysml.parser.Expression.ValueType;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysml.runtime.controlprogram.parfor.util.StagingFileUtils;
@@ -99,7 +97,7 @@ public class ResultMergeRemoteMR extends ResultMerge
                MatrixObject moNew = null; //always create new matrix object 
(required for nested parallelism)
                if( LOG.isTraceEnabled() )
                        LOG.trace("ResultMerge (remote, mr): Execute serial 
merge for output "
-                               +_output.getVarName()+" 
(fname="+_output.getFileName()+")");
+                               +_output.hashCode()+" 
(fname="+_output.getFileName()+")");
                
                try
                {
@@ -135,11 +133,7 @@ public class ResultMergeRemoteMR extends ResultMerge
                                                     mcOld.getRowsPerBlock(), 
mcOld.getColsPerBlock());
                                
                                //create new output matrix (e.g., to prevent 
potential export<->read file access conflict
-                               String varName = _output.getVarName();
-                               ValueType vt = _output.getValueType();
-                               moNew = new MatrixObject( vt, _outputFName );
-                               moNew.setVarName( varName.contains(NAME_SUFFIX) 
? varName : varName+NAME_SUFFIX );
-                               moNew.setDataType( DataType.MATRIX );
+                               moNew = new 
MatrixObject(_output.getValueType(), _outputFName);
                                OutputInfo oiOld = metadata.getOutputInfo();
                                InputInfo iiOld = metadata.getInputInfo();
                                MatrixCharacteristics mc = new 
MatrixCharacteristics(mcOld.getRows(),mcOld.getCols(),

http://git-wip-us.apache.org/repos/asf/systemml/blob/ffefd8e6/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSpark.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSpark.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSpark.java
index 9accfea..2b64bb2 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSpark.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSpark.java
@@ -28,8 +28,6 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.sysml.api.DMLScript;
-import org.apache.sysml.parser.Expression.DataType;
-import org.apache.sysml.parser.Expression.ValueType;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
@@ -79,7 +77,9 @@ public class ResultMergeRemoteSpark extends ResultMerge
        {
                MatrixObject moNew = null; //always create new matrix object 
(required for nested parallelism)
 
-               LOG.trace("ResultMerge (remote, spark): Execute serial merge 
for output "+_output.getVarName()+" (fname="+_output.getFileName()+")");
+               if( LOG.isTraceEnabled() )
+                       LOG.trace("ResultMerge (remote, spark): Execute serial 
merge for output "
+                               +_output.hashCode()+" 
(fname="+_output.getFileName()+")");
 
                try
                {
@@ -91,14 +91,10 @@ public class ResultMergeRemoteSpark extends ResultMerge
                                MatrixObject compare = (mcOld.getNonZeros()==0) 
? null : _output;
                                
                                //actual merge
-                               RDDObject ro = executeMerge(compare, _inputs, 
_output.getVarName(), mcOld.getRows(), mcOld.getCols(), 
mcOld.getRowsPerBlock(), mcOld.getColsPerBlock());
+                               RDDObject ro = executeMerge(compare, _inputs, 
mcOld.getRows(), mcOld.getCols(), mcOld.getRowsPerBlock(), 
mcOld.getColsPerBlock());
                                
                                //create new output matrix (e.g., to prevent 
potential export<->read file access conflict
-                               String varName = _output.getVarName();
-                               ValueType vt = _output.getValueType();
-                               moNew = new MatrixObject( vt, _outputFName );
-                               moNew.setVarName( varName.contains(NAME_SUFFIX) 
? varName : varName+NAME_SUFFIX );
-                               moNew.setDataType( DataType.MATRIX );
+                               moNew = new 
MatrixObject(_output.getValueType(), _outputFName);
                                OutputInfo oiOld = metadata.getOutputInfo();
                                InputInfo iiOld = metadata.getInputInfo();
                                MatrixCharacteristics mc = new 
MatrixCharacteristics(mcOld.getRows(),mcOld.getCols(),
@@ -122,7 +118,7 @@ public class ResultMergeRemoteSpark extends ResultMerge
        }
 
        @SuppressWarnings("unchecked")
-       protected RDDObject executeMerge(MatrixObject compare, MatrixObject[] 
inputs, String varname, long rlen, long clen, int brlen, int bclen)
+       protected RDDObject executeMerge(MatrixObject compare, MatrixObject[] 
inputs, long rlen, long clen, int brlen, int bclen)
                throws DMLRuntimeException 
        {
                String jobname = "ParFor-RMSP";
@@ -187,7 +183,7 @@ public class ResultMergeRemoteSpark extends ResultMerge
                        }
                    
                        //Step 3: create output rdd handle w/ lineage
-                       ret = new RDDObject(out, varname);
+                       ret = new RDDObject(out);
                        for(int i=0; i<paths.length; i++)
                                ret.addLineageChild(inputs[i].getRDDHandle());
                        if( withCompare )
@@ -221,7 +217,7 @@ public class ResultMergeRemoteSpark extends ResultMerge
                JavaSparkContext sc = sec.getSparkContext();
                JavaPairRDD<MatrixIndexes,MatrixBlock> rdd = 
(JavaPairRDD<MatrixIndexes,MatrixBlock>) 
                        sc.hadoopFile( mo.getFileName(), 
iinfo.inputFormatClass, iinfo.inputKeyClass, iinfo.inputValueClass);
-               RDDObject rddhandle = new RDDObject(rdd, mo.getVarName());
+               RDDObject rddhandle = new RDDObject(rdd);
                rddhandle.setHDFSFile(true);
                mo.setRDDHandle(rddhandle);
        }

http://git-wip-us.apache.org/repos/asf/systemml/blob/ffefd8e6/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/Task.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/Task.java 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/Task.java
index 7edb3f2..d09fb89 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/Task.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/Task.java
@@ -47,21 +47,23 @@ public class Task implements Serializable
        public static final int MAX_VARNAME_SIZE  = 256;
        public static final int MAX_TASK_SIZE     = Integer.MAX_VALUE-1; 
        
-       private TaskType                  _type;
+       private String _iterVar;
+       private TaskType _type;
        private LinkedList<IntObject> _iterations; //each iteration is 
specified as an ordered set of index values
        
        public Task() {
                //default constructor for serialize
        }
        
-       public Task( TaskType type ) {
+       public Task( String iterVar, TaskType type ) {
+               if( iterVar.length() > MAX_VARNAME_SIZE )
+                       throw new RuntimeException("Cannot create task, 
MAX_VARNAME_SIZE exceeded.");
+               _iterVar = iterVar;
                _type = type;
                _iterations = new LinkedList<>();
        }
        
        public void addIteration( IntObject indexVal )  {
-               if( indexVal.getName().length() > MAX_VARNAME_SIZE )
-                       throw new RuntimeException("Cannot add iteration, 
MAX_VARNAME_SIZE exceeded.");
                if( size() >= MAX_TASK_SIZE )
                        throw new RuntimeException("Cannot add iteration, 
MAX_TASK_SIZE reached.");
                _iterations.addLast( indexVal );
@@ -75,6 +77,10 @@ public class Task implements Serializable
                return _type;
        }
        
+       public String getVarName() {
+               return _iterVar;
+       }
+       
        public int size() {
                return _iterations.size();
        }
@@ -96,7 +102,7 @@ public class Task implements Serializable
                        if( count!=0 ) 
                                sb.append(";");
                        sb.append("[");
-                       sb.append(dat.getName());
+                       sb.append(_iterVar);
                        sb.append("=");
                        sb.append(dat.getLongValue());
                        sb.append("]");
@@ -115,8 +121,7 @@ public class Task implements Serializable
                if( size() > 0 )
                {
                        sb.append(".");
-                       IntObject dat0 = _iterations.getFirst();
-                       sb.append(dat0.getName());
+                       sb.append(_iterVar);
                        sb.append(".{");
                
                        int count = 0;
@@ -142,8 +147,7 @@ public class Task implements Serializable
                if( size() > 0 )
                {
                        sb.append(".");
-                       IntObject dat0 = _iterations.getFirst();
-                       sb.append(dat0.getName());
+                       sb.append(_iterVar);
                        sb.append(".{");
                
                        int count = 0;
@@ -168,19 +172,18 @@ public class Task implements Serializable
        public static Task parseCompactString( String stask )
        {
                StringTokenizer st = new StringTokenizer( stask.trim(), "." );
-               
-               Task newTask = new Task( TaskType.valueOf(st.nextToken()) );
+               TaskType type = TaskType.valueOf(st.nextToken());
                String meta = st.nextToken();
+               Task newTask = new Task(meta, type);
                
                //iteration data
                String sdata = st.nextToken();
                sdata = sdata.substring(1,sdata.length()-1); // remove brackets
                StringTokenizer st2 = new StringTokenizer(sdata, ",");
-               while( st2.hasMoreTokens() )
-               {
+               while( st2.hasMoreTokens() ) {
                        //create new iteration
                        String lsdata = st2.nextToken();
-                       IntObject ldata = new IntObject(meta,Integer.parseInt( 
lsdata ) );
+                       IntObject ldata = new 
IntObject(Integer.parseInt(lsdata));
                        newTask.addIteration(ldata);
                }
                

http://git-wip-us.apache.org/repos/asf/systemml/blob/ffefd8e6/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/TaskPartitionerFactoring.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/TaskPartitionerFactoring.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/TaskPartitionerFactoring.java
index 4a00037..547e607 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/TaskPartitionerFactoring.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/TaskPartitionerFactoring.java
@@ -80,23 +80,23 @@ public class TaskPartitionerFactoring extends 
TaskPartitioner
                                        break;
                                
                                //create new task and add to list of tasks
-                               Task lTask = new Task( type );
+                               Task lTask = new Task(_iterVarName, type);
                                tasks.addLast(lTask);
                                
                                // add iterations to task 
                                if( type == TaskType.SET ) {
                                        //value based tasks
                                        for( long k=0; k<K && i<=lTo; k++, 
i+=lIncr )
-                                               lTask.addIteration(new 
IntObject(_iterVarName, i));
+                                               lTask.addIteration(new 
IntObject(i));
                                }
                                else {
                                        //determine end of task
                                        long to = Math.min( i+(K-1)*lIncr, lTo 
);
                                        
                                        //range based tasks
-                                       lTask.addIteration(new 
IntObject(_iterVarName, i));         //from
-                                       lTask.addIteration(new 
IntObject(_iterVarName, to));    //to
-                                       lTask.addIteration(new 
IntObject(_iterVarName, lIncr)); //increment
+                                       lTask.addIteration(new IntObject(i));   
  //from
+                                       lTask.addIteration(new IntObject(to));  
  //to
+                                       lTask.addIteration(new 
IntObject(lIncr)); //increment
                                        i = to + lIncr;
                                }
                        }
@@ -138,16 +138,14 @@ public class TaskPartitionerFactoring extends 
TaskPartitioner
                                                break;
                                        
                                        //create new task and add to list of 
tasks
-                                       Task lTask = new Task( type );
+                                       Task lTask = new Task(_iterVarName, 
type);
                                        
                                        // add iterations to task 
                                        if( type == TaskType.SET ) 
                                        {
                                                //value based tasks
                                                for( long k=0; k<K && i<=lTo; 
k++, i+=lIncr )
-                                               {
-                                                       lTask.addIteration(new 
IntObject(_iterVarName, i));                             
-                                               }                               
+                                                       lTask.addIteration(new 
IntObject(i));
                                        }
                                        else 
                                        {
@@ -155,9 +153,9 @@ public class TaskPartitionerFactoring extends 
TaskPartitioner
                                                long to = Math.min( 
i+(K-1)*lIncr, lTo );
                                                
                                                //range based tasks
-                                               lTask.addIteration(new 
IntObject(_iterVarName, i));         //from
-                                               lTask.addIteration(new 
IntObject(_iterVarName, to));    //to
-                                               lTask.addIteration(new 
IntObject(_iterVarName, lIncr)); //increment
+                                               lTask.addIteration(new 
IntObject(i));       //from
+                                               lTask.addIteration(new 
IntObject(to));    //to
+                                               lTask.addIteration(new 
IntObject(lIncr));       //increment
                                                
                                                i = to + lIncr;
                                        }

Reply via email to