http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/05d2c0a8/scripts/utils/project.dml
----------------------------------------------------------------------
diff --git a/scripts/utils/project.dml b/scripts/utils/project.dml
index dc69bd0..ee6cd80 100644
--- a/scripts/utils/project.dml
+++ b/scripts/utils/project.dml
@@ -1,80 +1,80 @@
-#-------------------------------------------------------------
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-# 
-#   http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-#-------------------------------------------------------------
-
-# Utility script to project columns from input matrix.
-#
-# Parameters:
-#    X       : (input)  filename of data matrix
-#    P       : (input)  filename of 1-column projection matrix containing 
columnIDs
-#    o       : (output) filename of output matrix with projected columns
-#    exclude : (default FALSE) TRUE means P contains columnIds to be projected
-#                              FALSE means P contains columnsIDS to be excluded
-#    ofmt    : (default binary) format of output matrix
-#
-# Example:
-#   hadoop jar SystemML.jar -f algorithms/utils/project.dml -nvargs 
X="/tmp/M.mtx" P="/tmp/P.mtx" o="/tmp/PX.mtx" 
-#
-# Assumptions:
-# The order of colIDs in P is preserved. Order of columns in result is same as 
order of columns in P.
-#      i.e. projecting columns 4 and 2 of X results in a matrix with columns 4 
and 2.
-# If P specifies the exclude list, then projected columns are order preserved.
-
-exclude = ifdef ($exclude, FALSE);
-ofmt = ifdef ($ofmt, "binary");
-
-X = read ($X)
-P = read ($P)
-
-# create projection matrix using projection list and sequence matrix, and pad 
with 0s. The size of
-# PP is nbrOfColsInX x nbrOfColsToKeep
-
-if (exclude==FALSE)
-{
-   # create projection matrix using projection list and sequence matrix, and 
pad with 0s. The size
-   # of PP is nbrOfColsInX x nbrOfColsToKeep
-   PP = table(P, seq(1, nrow(P), 1), ncol(X), nrow(P)) 
-
- } else {
-   # create new vector P with list of columns to keep using original vector P 
containing exclude
-   # columns. These are all small vector operations.
-   C = table(P, seq(1, nrow(P), 1))
-   E = rowSums(C);
-      
-   # Row pad w/ 0s
-   EE = matrix (0, rows=ncol(X), cols=1)
-   EE[1:nrow(E),1] = E
-
-   # Convert exclude column list to include column list, and create column 
indices
-   EE = ppred(EE, 0, "==")
-   EE = EE * seq(1, ncol(X), 1)
-   P = removeEmpty(target=EE, margin="rows")
-
-   PP = table(P, seq(1, nrow(P), 1), ncol(X), nrow(P))
-
-}
-
-# Perform projection using permutation matrix
-PX = X %*% PP
-
-# Write output
-write (PX, $o, format=ofmt)
-
-
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+# Utility script to project columns from input matrix.
+#
+# Parameters:
+#    X       : (input)  filename of data matrix
+#    P       : (input)  filename of 1-column projection matrix containing 
columnIDs
+#    o       : (output) filename of output matrix with projected columns
+#    exclude : (default FALSE) TRUE means P contains columnIds to be projected
+#                              FALSE means P contains columnsIDS to be excluded
+#    ofmt    : (default binary) format of output matrix
+#
+# Example:
+#   hadoop jar SystemML.jar -f algorithms/utils/project.dml -nvargs 
X="/tmp/M.mtx" P="/tmp/P.mtx" o="/tmp/PX.mtx" 
+#
+# Assumptions:
+# The order of colIDs in P is preserved. Order of columns in result is same as 
order of columns in P.
+#      i.e. projecting columns 4 and 2 of X results in a matrix with columns 4 
and 2.
+# If P specifies the exclude list, then projected columns are order preserved.
+
+exclude = ifdef ($exclude, FALSE);
+ofmt = ifdef ($ofmt, "binary");
+
+X = read ($X)
+P = read ($P)
+
+# create projection matrix using projection list and sequence matrix, and pad 
with 0s. The size of
+# PP is nbrOfColsInX x nbrOfColsToKeep
+
+if (exclude==FALSE)
+{
+   # create projection matrix using projection list and sequence matrix, and 
pad with 0s. The size
+   # of PP is nbrOfColsInX x nbrOfColsToKeep
+   PP = table(P, seq(1, nrow(P), 1), ncol(X), nrow(P)) 
+
+ } else {
+   # create new vector P with list of columns to keep using original vector P 
containing exclude
+   # columns. These are all small vector operations.
+   C = table(P, seq(1, nrow(P), 1))
+   E = rowSums(C);
+      
+   # Row pad w/ 0s
+   EE = matrix (0, rows=ncol(X), cols=1)
+   EE[1:nrow(E),1] = E
+
+   # Convert exclude column list to include column list, and create column 
indices
+   EE = ppred(EE, 0, "==")
+   EE = EE * seq(1, ncol(X), 1)
+   P = removeEmpty(target=EE, margin="rows")
+
+   PP = table(P, seq(1, nrow(P), 1), ncol(X), nrow(P))
+
+}
+
+# Perform projection using permutation matrix
+PX = X %*% PP
+
+# Write output
+write (PX, $o, format=ofmt)
+
+

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/05d2c0a8/scripts/utils/rowIndexMax.dml
----------------------------------------------------------------------
diff --git a/scripts/utils/rowIndexMax.dml b/scripts/utils/rowIndexMax.dml
index 80af2e1..1e5cbc7 100644
--- a/scripts/utils/rowIndexMax.dml
+++ b/scripts/utils/rowIndexMax.dml
@@ -1,38 +1,38 @@
-#-------------------------------------------------------------
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-# 
-#   http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-#-------------------------------------------------------------
-
-# Utility script to return for each row the column nbr with the largest value. 
If all the values in
-# a row are the same, then the largest column nbr is returned.
-#
-# Parameters:
-#    I       : (input)  filename of input
-#    O       : (output) filename of output
-#    ofmt    : default "csv"; format of O: "csv", "binary"
-#
-# Example:
-#   hadoop jar SystemML.jar -f algorithms/utils/rowIndexMax.dml -nvargs 
I="/tmp/X.mtx" O="/tmp/X2.mtx"
-#
-
-ofmt = ifdef($ofmt, "csv")
-
-M = read($I)
-C = rowIndexMax(M)
-write(C, $O, format=ofmt)
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+# Utility script to return for each row the column nbr with the largest value. 
If all the values in
+# a row are the same, then the largest column nbr is returned.
+#
+# Parameters:
+#    I       : (input)  filename of input
+#    O       : (output) filename of output
+#    ofmt    : default "csv"; format of O: "csv", "binary"
+#
+# Example:
+#   hadoop jar SystemML.jar -f algorithms/utils/rowIndexMax.dml -nvargs 
I="/tmp/X.mtx" O="/tmp/X2.mtx"
+#
+
+ofmt = ifdef($ofmt, "csv")
+
+M = read($I)
+C = rowIndexMax(M)
+write(C, $O, format=ofmt)

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/05d2c0a8/scripts/utils/splitXY.dml
----------------------------------------------------------------------
diff --git a/scripts/utils/splitXY.dml b/scripts/utils/splitXY.dml
index 7d5fc24..82027a4 100644
--- a/scripts/utils/splitXY.dml
+++ b/scripts/utils/splitXY.dml
@@ -1,62 +1,62 @@
-#-------------------------------------------------------------
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-# 
-#   http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-#-------------------------------------------------------------
-
-# Utility script to split X into new X and Y.
-#
-# Parameters:
-#    X       : (input)  filename of data matrix
-#    y       : (default ncol(X))  colIndex
-#    OX      : (output) filename of output matrix with all columns except y
-#    OY      : (output) filename of output matrix with y column
-#    ofmt    : (default binary) format of OX and OY output matrix
-#
-# Example:
-#   hadoop jar SystemML.jar -f algorithms/utils/splitXY.dml -nvargs 
X="/tmp/X.mtx" y=50 OX="/tmp/OX.mtx  OY="/tmp/OY.mtx  
-#
-
-ofmt = ifdef($ofmt, "binary")
-y = ifdef($y, ncol($X))
-
-X = read ($X)
-
-if (y == 1)
-{
-   OX = X[,y+1:ncol(X)]
-   OY = X[,y]
-} 
-else if (y == ncol(X))
-{
-   OX = X[,1:y-1]
-   OY = X[,y]
-} 
-else 
-{
-   OX1 = X[,1:y-1]
-   OX2 = X[,y+1:ncol(X)]
-   OX = append (OX1, OX2)
-   OY = X[,y]
-}
-
-# Write output
-write (OX, $OX, format=ofmt)
-write (OY, $OY, format=ofmt)
-
-
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+# Utility script to split X into new X and Y.
+#
+# Parameters:
+#    X       : (input)  filename of data matrix
+#    y       : (default ncol(X))  colIndex
+#    OX      : (output) filename of output matrix with all columns except y
+#    OY      : (output) filename of output matrix with y column
+#    ofmt    : (default binary) format of OX and OY output matrix
+#
+# Example:
+#   hadoop jar SystemML.jar -f algorithms/utils/splitXY.dml -nvargs 
X="/tmp/X.mtx" y=50 OX="/tmp/OX.mtx  OY="/tmp/OY.mtx  
+#
+
+ofmt = ifdef($ofmt, "binary")
+y = ifdef($y, ncol($X))
+
+X = read ($X)
+
+if (y == 1)
+{
+   OX = X[,y+1:ncol(X)]
+   OY = X[,y]
+} 
+else if (y == ncol(X))
+{
+   OX = X[,1:y-1]
+   OY = X[,y]
+} 
+else 
+{
+   OX1 = X[,1:y-1]
+   OX2 = X[,y+1:ncol(X)]
+   OX = append (OX1, OX2)
+   OY = X[,y]
+}
+
+# Write output
+write (OX, $OX, format=ofmt)
+write (OY, $OY, format=ofmt)
+
+

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/05d2c0a8/scripts/utils/write.dml
----------------------------------------------------------------------
diff --git a/scripts/utils/write.dml b/scripts/utils/write.dml
index 7861a3a..f1c81e4 100644
--- a/scripts/utils/write.dml
+++ b/scripts/utils/write.dml
@@ -1,39 +1,39 @@
-#-------------------------------------------------------------
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-# 
-#   http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-#-------------------------------------------------------------
-
-# Utility script to change format of X.
-#
-# Parameters:
-#    I       : (input)  filename of input
-#    O       : (output) filename of output
-#    ofmt    : format of O: "csv", "binary"
-#    sep     : default ","; CSV separator in output
-#    header  : default "FALSE"; CSV header: TRUE | FALSE
-#
-# Example:
-#   hadoop jar SystemML.jar -f algorithms/utils/write.dml -nvargs 
I="/tmp/X.mtx" O="/tmp/X2.mtx" ofmt="binary" sep="|" header=TRUE
-#
-
-sep = ifdef($sep, ",")
-header = ifdef($header, FALSE )
-
-M = read($I)
-write(M, $O, format=$ofmt, sep=sep, header=header)
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+# Utility script to change format of X.
+#
+# Parameters:
+#    I       : (input)  filename of input
+#    O       : (output) filename of output
+#    ofmt    : format of O: "csv", "binary"
+#    sep     : default ","; CSV separator in output
+#    header  : default "FALSE"; CSV header: TRUE | FALSE
+#
+# Example:
+#   hadoop jar SystemML.jar -f algorithms/utils/write.dml -nvargs 
I="/tmp/X.mtx" O="/tmp/X2.mtx" ofmt="binary" sep="|" header=TRUE
+#
+
+sep = ifdef($sep, ",")
+header = ifdef($header, FALSE )
+
+M = read($I)
+write(M, $O, format=$ofmt, sep=sep, header=header)

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/05d2c0a8/src/main/java/org/apache/sysml/lops/BinaryScalar.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/lops/BinaryScalar.java 
b/src/main/java/org/apache/sysml/lops/BinaryScalar.java
index 0f423e3..f61c145 100644
--- a/src/main/java/org/apache/sysml/lops/BinaryScalar.java
+++ b/src/main/java/org/apache/sysml/lops/BinaryScalar.java
@@ -1,197 +1,197 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.lops;
-
-
- 
-import org.apache.sysml.lops.LopProperties.ExecLocation;
-import org.apache.sysml.lops.LopProperties.ExecType;
-import org.apache.sysml.lops.compile.JobType;
-import org.apache.sysml.parser.Expression.*;
-
-/**
- * Lop to perform binary scalar operations. Both inputs must be scalars.
- * Example i = j + k, i = i + 1. 
- */
-
-public class BinaryScalar extends Lop 
-{      
-       
-       public enum OperationTypes {
-               ADD, SUBTRACT, SUBTRACTRIGHT, MULTIPLY, DIVIDE, MODULUS, INTDIV,
-               LESS_THAN, LESS_THAN_OR_EQUALS, GREATER_THAN, 
GREATER_THAN_OR_EQUALS, EQUALS, NOT_EQUALS,
-               AND, OR, 
-               LOG,POW,MAX,MIN,PRINT,
-               IQSIZE,
-               Over,
-       }
-       
-       OperationTypes operation;
-
-       /**
-        * This overloaded constructor is used for setting exec type in case of 
spark backend
-        */
-       public BinaryScalar(Lop input1, Lop input2, OperationTypes op, DataType 
dt, ValueType vt, ExecType et) 
-       {
-               super(Lop.Type.BinaryCP, dt, vt);               
-               operation = op;         
-               this.addInput(input1);
-               this.addInput(input2);
-               input1.addOutput(this);
-               input2.addOutput(this);
-
-               boolean breaksAlignment = false; // this field does not carry 
any meaning for this lop
-               boolean aligner = false;
-               boolean definesMRJob = false;
-               lps.addCompatibility(JobType.INVALID);
-               this.lps.setProperties(inputs, et, ExecLocation.ControlProgram, 
breaksAlignment, aligner, definesMRJob );
-       }
-       
-       /**
-        * Constructor to perform a scalar operation
-        * @param input
-        * @param op
-        */
-
-       public BinaryScalar(Lop input1, Lop input2, OperationTypes op, DataType 
dt, ValueType vt) 
-       {
-               super(Lop.Type.BinaryCP, dt, vt);               
-               operation = op;         
-               this.addInput(input1);
-               this.addInput(input2);
-               input1.addOutput(this);
-               input2.addOutput(this);
-
-               boolean breaksAlignment = false; // this field does not carry 
any meaning for this lop
-               boolean aligner = false;
-               boolean definesMRJob = false;
-               lps.addCompatibility(JobType.INVALID);
-               this.lps.setProperties(inputs, ExecType.CP, 
ExecLocation.ControlProgram, breaksAlignment, aligner, definesMRJob );
-       }
-
-       @Override
-       public String toString() {
-               return "Operation: " + operation;
-       }
-       
-       public OperationTypes getOperationType(){
-               return operation;
-       }
-
-       @Override
-       public String getInstructions(String input1, String input2, String 
output) throws LopsException
-       {
-               String opString = getOpcode( operation );
-               
-               
-               
-               StringBuilder sb = new StringBuilder();
-               
-               sb.append(getExecType());
-               sb.append(Lop.OPERAND_DELIMITOR);
-               
-               sb.append( opString );
-               sb.append( OPERAND_DELIMITOR );
-               
-               sb.append( 
getInputs().get(0).prepScalarInputOperand(getExecType()) );
-               sb.append( OPERAND_DELIMITOR );
-               
-               sb.append( 
getInputs().get(1).prepScalarInputOperand(getExecType()));
-               sb.append( OPERAND_DELIMITOR );
-               
-               sb.append( prepOutputOperand(output));
-
-               return sb.toString();
-       }
-       
-       @Override
-       public Lop.SimpleInstType getSimpleInstructionType()
-       {
-               switch (operation){
- 
-               default:
-                       return SimpleInstType.Scalar;
-               }
-       }
-       
-       /**
-        * 
-        * @param op
-        * @return
-        */
-       public static String getOpcode( OperationTypes op )
-       {
-               switch ( op ) 
-               {
-                       /* Arithmetic */
-                       case ADD:
-                               return "+";
-                       case SUBTRACT:
-                               return "-";
-                       case MULTIPLY:
-                               return "*";
-                       case DIVIDE:
-                               return "/";
-                       case MODULUS:
-                               return "%%";    
-                       case INTDIV:
-                               return "%/%";   
-                       case POW:       
-                               return "^";
-                               
-                       /* Relational */
-                       case LESS_THAN:
-                               return "<";
-                       case LESS_THAN_OR_EQUALS:
-                               return "<=";
-                       case GREATER_THAN:
-                               return ">";
-                       case GREATER_THAN_OR_EQUALS:
-                               return ">=";
-                       case EQUALS:
-                               return "==";
-                       case NOT_EQUALS:
-                               return "!=";
-                       
-                       /* Boolean */
-                       case AND:
-                               return "&&";
-                       case OR:
-                               return "||";
-                       
-                       /* Builtin Functions */
-                       case LOG:
-                               return "log";
-                       case MIN:
-                               return "min"; 
-                       case MAX:
-                               return "max"; 
-                       
-                       case PRINT:
-                               return "print";
-                               
-                       case IQSIZE:
-                               return "iqsize"; 
-                               
-                       default:
-                               throw new 
UnsupportedOperationException("Instruction is not defined for BinaryScalar 
operator: " + op);
-               }
-       }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.lops;
+
+
+ 
+import org.apache.sysml.lops.LopProperties.ExecLocation;
+import org.apache.sysml.lops.LopProperties.ExecType;
+import org.apache.sysml.lops.compile.JobType;
+import org.apache.sysml.parser.Expression.*;
+
+/**
+ * Lop to perform binary scalar operations. Both inputs must be scalars.
+ * Example i = j + k, i = i + 1. 
+ */
+
+public class BinaryScalar extends Lop 
+{      
+       
+       public enum OperationTypes {
+               ADD, SUBTRACT, SUBTRACTRIGHT, MULTIPLY, DIVIDE, MODULUS, INTDIV,
+               LESS_THAN, LESS_THAN_OR_EQUALS, GREATER_THAN, 
GREATER_THAN_OR_EQUALS, EQUALS, NOT_EQUALS,
+               AND, OR, 
+               LOG,POW,MAX,MIN,PRINT,
+               IQSIZE,
+               Over,
+       }
+       
+       OperationTypes operation;
+
+       /**
+        * This overloaded constructor is used for setting exec type in case of 
spark backend
+        */
+       public BinaryScalar(Lop input1, Lop input2, OperationTypes op, DataType 
dt, ValueType vt, ExecType et) 
+       {
+               super(Lop.Type.BinaryCP, dt, vt);               
+               operation = op;         
+               this.addInput(input1);
+               this.addInput(input2);
+               input1.addOutput(this);
+               input2.addOutput(this);
+
+               boolean breaksAlignment = false; // this field does not carry 
any meaning for this lop
+               boolean aligner = false;
+               boolean definesMRJob = false;
+               lps.addCompatibility(JobType.INVALID);
+               this.lps.setProperties(inputs, et, ExecLocation.ControlProgram, 
breaksAlignment, aligner, definesMRJob );
+       }
+       
+       /**
+        * Constructor to perform a scalar operation
+        * @param input
+        * @param op
+        */
+
+       public BinaryScalar(Lop input1, Lop input2, OperationTypes op, DataType 
dt, ValueType vt) 
+       {
+               super(Lop.Type.BinaryCP, dt, vt);               
+               operation = op;         
+               this.addInput(input1);
+               this.addInput(input2);
+               input1.addOutput(this);
+               input2.addOutput(this);
+
+               boolean breaksAlignment = false; // this field does not carry 
any meaning for this lop
+               boolean aligner = false;
+               boolean definesMRJob = false;
+               lps.addCompatibility(JobType.INVALID);
+               this.lps.setProperties(inputs, ExecType.CP, 
ExecLocation.ControlProgram, breaksAlignment, aligner, definesMRJob );
+       }
+
+       @Override
+       public String toString() {
+               return "Operation: " + operation;
+       }
+       
+       public OperationTypes getOperationType(){
+               return operation;
+       }
+
+       @Override
+       public String getInstructions(String input1, String input2, String 
output) throws LopsException
+       {
+               String opString = getOpcode( operation );
+               
+               
+               
+               StringBuilder sb = new StringBuilder();
+               
+               sb.append(getExecType());
+               sb.append(Lop.OPERAND_DELIMITOR);
+               
+               sb.append( opString );
+               sb.append( OPERAND_DELIMITOR );
+               
+               sb.append( 
getInputs().get(0).prepScalarInputOperand(getExecType()) );
+               sb.append( OPERAND_DELIMITOR );
+               
+               sb.append( 
getInputs().get(1).prepScalarInputOperand(getExecType()));
+               sb.append( OPERAND_DELIMITOR );
+               
+               sb.append( prepOutputOperand(output));
+
+               return sb.toString();
+       }
+       
+       @Override
+       public Lop.SimpleInstType getSimpleInstructionType()
+       {
+               switch (operation){
+ 
+               default:
+                       return SimpleInstType.Scalar;
+               }
+       }
+       
+       /**
+        * 
+        * @param op
+        * @return
+        */
+       public static String getOpcode( OperationTypes op )
+       {
+               switch ( op ) 
+               {
+                       /* Arithmetic */
+                       case ADD:
+                               return "+";
+                       case SUBTRACT:
+                               return "-";
+                       case MULTIPLY:
+                               return "*";
+                       case DIVIDE:
+                               return "/";
+                       case MODULUS:
+                               return "%%";    
+                       case INTDIV:
+                               return "%/%";   
+                       case POW:       
+                               return "^";
+                               
+                       /* Relational */
+                       case LESS_THAN:
+                               return "<";
+                       case LESS_THAN_OR_EQUALS:
+                               return "<=";
+                       case GREATER_THAN:
+                               return ">";
+                       case GREATER_THAN_OR_EQUALS:
+                               return ">=";
+                       case EQUALS:
+                               return "==";
+                       case NOT_EQUALS:
+                               return "!=";
+                       
+                       /* Boolean */
+                       case AND:
+                               return "&&";
+                       case OR:
+                               return "||";
+                       
+                       /* Builtin Functions */
+                       case LOG:
+                               return "log";
+                       case MIN:
+                               return "min"; 
+                       case MAX:
+                               return "max"; 
+                       
+                       case PRINT:
+                               return "print";
+                               
+                       case IQSIZE:
+                               return "iqsize"; 
+                               
+                       default:
+                               throw new 
UnsupportedOperationException("Instruction is not defined for BinaryScalar 
operator: " + op);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/05d2c0a8/src/main/java/org/apache/sysml/parser/dml/Dml.g4
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/parser/dml/Dml.g4 
b/src/main/java/org/apache/sysml/parser/dml/Dml.g4
index 9d07dc9..bada11e 100644
--- a/src/main/java/org/apache/sysml/parser/dml/Dml.g4
+++ b/src/main/java/org/apache/sysml/parser/dml/Dml.g4
@@ -1,201 +1,201 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-grammar Dml;
-
-@header
-{
-       // Commenting the package name and explicitly passing it in build.xml 
to maintain compatibility with maven plugin
-    // package org.apache.sysml.parser.dml;
-}
-
-// DML Program is a list of expression
-// For now, we only allow global function definitions (not nested or inside a 
while block)
-dmlprogram: (blocks+=statement | functionBlocks+=functionStatement)* EOF;
-
-statement returns [ StatementInfo info ]
-@init {
-       // This actions occurs regardless of how many alternatives in this rule
-       $info = new StatementInfo();
-} :
-    // ------------------------------------------
-    // ImportStatement
-    'source' '(' filePath = STRING ')'  'as' namespace=ID ';'*       # 
ImportStatement
-    | 'setwd'  '(' pathValue = STRING ')' ';'*                          # 
PathStatement
-    // ------------------------------------------
-    // Treat function call as AssignmentStatement or MultiAssignmentStatement
-    // For backward compatibility and also since the behavior of foo() * A + 
foo() ... where foo returns A
-    // Convert FunctionCallIdentifier(paramExprs, ..) -> source
-    | // TODO: Throw an informative error if user doesnot provide the optional 
assignment
-    ( targetList+=dataIdentifier ('='|'<-') )? name=ID '(' 
(paramExprs+=parameterizedExpression (',' paramExprs+=parameterizedExpression)* 
)? ')' ';'*  # FunctionCallAssignmentStatement
-    | '[' targetList+=dataIdentifier (',' targetList+=dataIdentifier)* ']' 
('='|'<-') name=ID '(' (paramExprs+=parameterizedExpression (',' 
paramExprs+=parameterizedExpression)* )? ')' ';'*  # 
FunctionCallMultiAssignmentStatement
-    // {notifyErrorListeners("Too many parentheses");}
-    // ------------------------------------------
-    // AssignmentStatement
-    | targetList+=dataIdentifier op=('<-'|'=') 'ifdef' '(' 
commandLineParam=dataIdentifier ','  source=expression ')' ';'*   # 
IfdefAssignmentStatement
-    | targetList+=dataIdentifier op=('<-'|'=') source=expression ';'*   # 
AssignmentStatement
-    // ------------------------------------------
-    // We don't support block statement
-    // | '{' body+=expression ';'* ( body+=expression ';'* )*  '}' # 
BlockStatement
-    // ------------------------------------------
-    // IfStatement
-    | 'if' '(' predicate=expression ')' (ifBody+=statement ';'* | '{' 
(ifBody+=statement ';'*)*  '}')  ('else' (elseBody+=statement ';'* | '{' 
(elseBody+=statement ';'*)*  '}'))?  # IfStatement
-    // ------------------------------------------
-    // ForStatement & ParForStatement
-    | 'for' '(' iterVar=ID 'in' iterPred=iterablePredicate (',' 
parForParams+=strictParameterizedExpression)* ')' (body+=statement ';'* | '{' 
(body+=statement ';'* )*  '}')  # ForStatement
-    // Convert strictParameterizedExpression to HashMap<String, String> for 
parForParams
-    | 'parfor' '(' iterVar=ID 'in' iterPred=iterablePredicate (',' 
parForParams+=strictParameterizedExpression)* ')' (body+=statement ';'* | '{' 
(body+=statement ';'*)*  '}')  # ParForStatement
-    | 'while' '(' predicate=expression ')' (body+=statement ';'* | '{' 
(body+=statement ';'*)* '}')  # WhileStatement
-    // ------------------------------------------
-;
-
-iterablePredicate returns [ ExpressionInfo info ]
-  @init {
-         // This actions occurs regardless of how many alternatives in this 
rule
-         $info = new ExpressionInfo();
-  } :
-    from=expression ':' to=expression #IterablePredicateColonExpression
-    | ID '(' from=expression ',' to=expression ',' increment=expression ')' 
#IterablePredicateSeqExpression
-    ;
-
-functionStatement returns [ StatementInfo info ]
-@init {
-       // This actions occurs regardless of how many alternatives in this rule
-       $info = new StatementInfo();
-} :
-    // ------------------------------------------
-    // FunctionStatement & ExternalFunctionStatement
-    // small change: only allow typed arguments here ... instead of data 
identifier
-    name=ID ('<-'|'=') 'function' '(' ( inputParams+=typedArgNoAssign (',' 
inputParams+=typedArgNoAssign)* )? ')'  ( 'return' '(' ( 
outputParams+=typedArgNoAssign (',' outputParams+=typedArgNoAssign)* )? ')' )? 
'{' (body+=statement ';'*)* '}' # InternalFunctionDefExpression
-    | name=ID ('<-'|'=') 'externalFunction' '(' ( 
inputParams+=typedArgNoAssign (',' inputParams+=typedArgNoAssign)* )? ')'  ( 
'return' '(' ( outputParams+=typedArgNoAssign (',' 
outputParams+=typedArgNoAssign)* )? ')' )?   'implemented' 'in' '(' ( 
otherParams+=strictParameterizedKeyValueString (',' 
otherParams+=strictParameterizedKeyValueString)* )? ')' ';'*    # 
ExternalFunctionDefExpression
-    // ------------------------------------------
-;
-
-
-// Other data identifiers are typedArgNoAssign, parameterizedExpression and 
strictParameterizedExpression
-dataIdentifier returns [ ExpressionInfo dataInfo ]
-@init {
-       // This actions occurs regardless of how many alternatives in this rule
-       $dataInfo = new ExpressionInfo();
-       // $dataInfo.expr = new org.apache.sysml.parser.DataIdentifier();
-} :
-    // ------------------------------------------
-    // IndexedIdentifier
-    name=ID '[' (rowLower=expression (':' rowUpper=expression)?)? ',' 
(colLower=expression (':' colUpper=expression)?)? ']' # IndexedExpression
-    // ------------------------------------------
-    | ID                                            # 
SimpleDataIdentifierExpression
-    | COMMANDLINE_NAMED_ID                          # 
CommandlineParamExpression
-    | COMMANDLINE_POSITION_ID                       # 
CommandlinePositionExpression
-;
-expression returns [ ExpressionInfo info ]
-@init {
-       // This actions occurs regardless of how many alternatives in this rule
-       $info = new ExpressionInfo();
-       // $info.expr = new 
org.apache.sysml.parser.BinaryExpression(org.apache.sysml.parser.Expression.BinaryOp.INVALID);
-} :
-    // ------------------------------------------
-    // BinaryExpression
-    // power
-    <assoc=right> left=expression op='^' right=expression  # PowerExpression
-    // unary plus and minus
-    | op=('-'|'+') left=expression                        # UnaryExpression
-    // sequence - since we are only using this into for
-    //| left=expression op=':' right=expression             # 
SequenceExpression
-    // matrix multiply
-    | left=expression op='%*%' right=expression           # MatrixMulExpression
-    // modulus and integer division
-    | left=expression op=('%/%' | '%%' ) right=expression # ModIntDivExpression
-    // arithmetic multiply and divide
-    | left=expression op=('*'|'/') right=expression       # MultDivExpression
-    // arithmetic addition and subtraction
-    | left=expression op=('+'|'-') right=expression       # AddSubExpression
-    // ------------------------------------------
-    // RelationalExpression
-    | left=expression op=('>'|'>='|'<'|'<='|'=='|'!=') right=expression # 
RelationalExpression
-    // ------------------------------------------
-    // BooleanExpression
-    // boolean not
-    | op='!' left=expression # BooleanNotExpression
-    // boolean and
-    | left=expression op=('&'|'&&') right=expression # BooleanAndExpression
-    // boolean or
-    | left=expression op=('|'|'||') right=expression # BooleanOrExpression
-
-    // ---------------------------------
-    // only applicable for builtin function expressions
-    | name=ID '(' (paramExprs+=parameterizedExpression (',' 
paramExprs+=parameterizedExpression)* )? ')' ';'*  # BuiltinFunctionExpression
-
-    // 4. Atomic
-    | '(' left=expression ')'                       # AtomicExpression
-
-    // Should you allow indexed expression here ?
-    // | '[' targetList+=expression (',' targetList+=expression)* ']'  # 
MultiIdExpression
-
-    // | BOOLEAN                                       # 
ConstBooleanIdExpression
-    | 'TRUE'                                        # ConstTrueExpression
-    | 'FALSE'                                       # ConstFalseExpression
-    | INT                                           # ConstIntIdExpression
-    | DOUBLE                                        # ConstDoubleIdExpression
-    | STRING                                        # ConstStringIdExpression
-    | dataIdentifier                                # DataIdExpression
-    // Special
-    // | 'NULL' | 'NA' | 'Inf' | 'NaN'
-;
-
-typedArgNoAssign : paramType=ml_type paramName=ID;
-parameterizedExpression : (paramName=ID '=')? paramVal=expression;
-strictParameterizedExpression : paramName=ID '=' paramVal=expression ;
-strictParameterizedKeyValueString : paramName=ID '=' paramVal=STRING ;
-ID : (ALPHABET (ALPHABET|DIGIT|'_')*  '::')? ALPHABET (ALPHABET|DIGIT|'_')*
-    // Special ID cases:
-   // | 'matrix' // --> This is a special case which causes lot of headache
-   | 'as.scalar' | 'as.matrix' | 'as.double' | 'as.integer' | 'as.logical' | 
'index.return' | 'lower.tail'
-;
-// Unfortunately, we have datatype name clashing with builtin function name: 
matrix :(
-// Therefore, ugly work around for checking datatype
-ml_type :  valueType | dataType '[' valueType ']';
-// Note to reduce number of keywords, these are case-sensitive,
-// To allow case-insenstive,  'int' becomes: ('i' | 'I') ('n' | 'N') ('t' | 
'T')
-valueType: 'int' | 'integer' | 'string' | 'boolean' | 'double'
-            | 'Int' | 'Integer' | 'String' | 'Boolean' | 'Double';
-dataType:
-        // 'scalar' # ScalarDataTypeDummyCheck
-        // |
-        ID # MatrixDataTypeCheck //{ if($ID.text.compareTo("matrix") != 0) { 
notifyErrorListeners("incorrect datatype"); } }
-        //|  'matrix' //---> See ID, this causes lot of headache
-        ;
-INT : DIGIT+  [Ll]?;
-// BOOLEAN : 'TRUE' | 'FALSE';
-DOUBLE: DIGIT+ '.' DIGIT* EXP? [Ll]?
-| DIGIT+ EXP? [Ll]?
-| '.' DIGIT+ EXP? [Ll]?
-;
-DIGIT: '0'..'9';
-ALPHABET : [a-zA-Z] ;
-fragment EXP : ('E' | 'e') ('+' | '-')? INT ;
-COMMANDLINE_NAMED_ID: '$' ALPHABET (ALPHABET|DIGIT|'_')*;
-COMMANDLINE_POSITION_ID: '$' DIGIT+;
-
-// supports single and double quoted string with escape characters
-STRING: '"' ( ESC | ~[\\"] )*? '"' | '\'' ( ESC | ~[\\'] )*? '\'';
-fragment ESC : '\\' [abtnfrv"'\\] ;
-// Comments, whitespaces and new line
-LINE_COMMENT : '#' .*? '\r'? '\n' -> skip ;
-MULTILINE_BLOCK_COMMENT : '/*' .*? '*/' -> skip ;
-WHITESPACE : (' ' | '\t' | '\r' | '\n')+ -> skip ;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+grammar Dml;
+
+@header
+{
+       // Commenting the package name and explicitly passing it in build.xml 
to maintain compatibility with maven plugin
+    // package org.apache.sysml.parser.dml;
+}
+
+// DML Program is a list of expression
+// For now, we only allow global function definitions (not nested or inside a 
while block)
+dmlprogram: (blocks+=statement | functionBlocks+=functionStatement)* EOF;
+
+statement returns [ StatementInfo info ]
+@init {
+       // This actions occurs regardless of how many alternatives in this rule
+       $info = new StatementInfo();
+} :
+    // ------------------------------------------
+    // ImportStatement
+    'source' '(' filePath = STRING ')'  'as' namespace=ID ';'*       # 
ImportStatement
+    | 'setwd'  '(' pathValue = STRING ')' ';'*                          # 
PathStatement
+    // ------------------------------------------
+    // Treat function call as AssignmentStatement or MultiAssignmentStatement
+    // For backward compatibility and also since the behavior of foo() * A + 
foo() ... where foo returns A
+    // Convert FunctionCallIdentifier(paramExprs, ..) -> source
+    | // TODO: Throw an informative error if user doesnot provide the optional 
assignment
+    ( targetList+=dataIdentifier ('='|'<-') )? name=ID '(' 
(paramExprs+=parameterizedExpression (',' paramExprs+=parameterizedExpression)* 
)? ')' ';'*  # FunctionCallAssignmentStatement
+    | '[' targetList+=dataIdentifier (',' targetList+=dataIdentifier)* ']' 
('='|'<-') name=ID '(' (paramExprs+=parameterizedExpression (',' 
paramExprs+=parameterizedExpression)* )? ')' ';'*  # 
FunctionCallMultiAssignmentStatement
+    // {notifyErrorListeners("Too many parentheses");}
+    // ------------------------------------------
+    // AssignmentStatement
+    | targetList+=dataIdentifier op=('<-'|'=') 'ifdef' '(' 
commandLineParam=dataIdentifier ','  source=expression ')' ';'*   # 
IfdefAssignmentStatement
+    | targetList+=dataIdentifier op=('<-'|'=') source=expression ';'*   # 
AssignmentStatement
+    // ------------------------------------------
+    // We don't support block statement
+    // | '{' body+=expression ';'* ( body+=expression ';'* )*  '}' # 
BlockStatement
+    // ------------------------------------------
+    // IfStatement
+    | 'if' '(' predicate=expression ')' (ifBody+=statement ';'* | '{' 
(ifBody+=statement ';'*)*  '}')  ('else' (elseBody+=statement ';'* | '{' 
(elseBody+=statement ';'*)*  '}'))?  # IfStatement
+    // ------------------------------------------
+    // ForStatement & ParForStatement
+    | 'for' '(' iterVar=ID 'in' iterPred=iterablePredicate (',' 
parForParams+=strictParameterizedExpression)* ')' (body+=statement ';'* | '{' 
(body+=statement ';'* )*  '}')  # ForStatement
+    // Convert strictParameterizedExpression to HashMap<String, String> for 
parForParams
+    | 'parfor' '(' iterVar=ID 'in' iterPred=iterablePredicate (',' 
parForParams+=strictParameterizedExpression)* ')' (body+=statement ';'* | '{' 
(body+=statement ';'*)*  '}')  # ParForStatement
+    | 'while' '(' predicate=expression ')' (body+=statement ';'* | '{' 
(body+=statement ';'*)* '}')  # WhileStatement
+    // ------------------------------------------
+;
+
+iterablePredicate returns [ ExpressionInfo info ]
+  @init {
+         // This actions occurs regardless of how many alternatives in this 
rule
+         $info = new ExpressionInfo();
+  } :
+    from=expression ':' to=expression #IterablePredicateColonExpression
+    | ID '(' from=expression ',' to=expression ',' increment=expression ')' 
#IterablePredicateSeqExpression
+    ;
+
+functionStatement returns [ StatementInfo info ]
+@init {
+       // This actions occurs regardless of how many alternatives in this rule
+       $info = new StatementInfo();
+} :
+    // ------------------------------------------
+    // FunctionStatement & ExternalFunctionStatement
+    // small change: only allow typed arguments here ... instead of data 
identifier
+    name=ID ('<-'|'=') 'function' '(' ( inputParams+=typedArgNoAssign (',' 
inputParams+=typedArgNoAssign)* )? ')'  ( 'return' '(' ( 
outputParams+=typedArgNoAssign (',' outputParams+=typedArgNoAssign)* )? ')' )? 
'{' (body+=statement ';'*)* '}' # InternalFunctionDefExpression
+    | name=ID ('<-'|'=') 'externalFunction' '(' ( 
inputParams+=typedArgNoAssign (',' inputParams+=typedArgNoAssign)* )? ')'  ( 
'return' '(' ( outputParams+=typedArgNoAssign (',' 
outputParams+=typedArgNoAssign)* )? ')' )?   'implemented' 'in' '(' ( 
otherParams+=strictParameterizedKeyValueString (',' 
otherParams+=strictParameterizedKeyValueString)* )? ')' ';'*    # 
ExternalFunctionDefExpression
+    // ------------------------------------------
+;
+
+
+// Other data identifiers are typedArgNoAssign, parameterizedExpression and 
strictParameterizedExpression
+dataIdentifier returns [ ExpressionInfo dataInfo ]
+@init {
+       // This actions occurs regardless of how many alternatives in this rule
+       $dataInfo = new ExpressionInfo();
+       // $dataInfo.expr = new org.apache.sysml.parser.DataIdentifier();
+} :
+    // ------------------------------------------
+    // IndexedIdentifier
+    name=ID '[' (rowLower=expression (':' rowUpper=expression)?)? ',' 
(colLower=expression (':' colUpper=expression)?)? ']' # IndexedExpression
+    // ------------------------------------------
+    | ID                                            # 
SimpleDataIdentifierExpression
+    | COMMANDLINE_NAMED_ID                          # 
CommandlineParamExpression
+    | COMMANDLINE_POSITION_ID                       # 
CommandlinePositionExpression
+;
+expression returns [ ExpressionInfo info ]
+@init {
+       // This actions occurs regardless of how many alternatives in this rule
+       $info = new ExpressionInfo();
+       // $info.expr = new 
org.apache.sysml.parser.BinaryExpression(org.apache.sysml.parser.Expression.BinaryOp.INVALID);
+} :
+    // ------------------------------------------
+    // BinaryExpression
+    // power
+    <assoc=right> left=expression op='^' right=expression  # PowerExpression
+    // unary plus and minus
+    | op=('-'|'+') left=expression                        # UnaryExpression
+    // sequence - since we are only using this into for
+    //| left=expression op=':' right=expression             # 
SequenceExpression
+    // matrix multiply
+    | left=expression op='%*%' right=expression           # MatrixMulExpression
+    // modulus and integer division
+    | left=expression op=('%/%' | '%%' ) right=expression # ModIntDivExpression
+    // arithmetic multiply and divide
+    | left=expression op=('*'|'/') right=expression       # MultDivExpression
+    // arithmetic addition and subtraction
+    | left=expression op=('+'|'-') right=expression       # AddSubExpression
+    // ------------------------------------------
+    // RelationalExpression
+    | left=expression op=('>'|'>='|'<'|'<='|'=='|'!=') right=expression # 
RelationalExpression
+    // ------------------------------------------
+    // BooleanExpression
+    // boolean not
+    | op='!' left=expression # BooleanNotExpression
+    // boolean and
+    | left=expression op=('&'|'&&') right=expression # BooleanAndExpression
+    // boolean or
+    | left=expression op=('|'|'||') right=expression # BooleanOrExpression
+
+    // ---------------------------------
+    // only applicable for builtin function expressions
+    | name=ID '(' (paramExprs+=parameterizedExpression (',' 
paramExprs+=parameterizedExpression)* )? ')' ';'*  # BuiltinFunctionExpression
+
+    // 4. Atomic
+    | '(' left=expression ')'                       # AtomicExpression
+
+    // Should you allow indexed expression here ?
+    // | '[' targetList+=expression (',' targetList+=expression)* ']'  # 
MultiIdExpression
+
+    // | BOOLEAN                                       # 
ConstBooleanIdExpression
+    | 'TRUE'                                        # ConstTrueExpression
+    | 'FALSE'                                       # ConstFalseExpression
+    | INT                                           # ConstIntIdExpression
+    | DOUBLE                                        # ConstDoubleIdExpression
+    | STRING                                        # ConstStringIdExpression
+    | dataIdentifier                                # DataIdExpression
+    // Special
+    // | 'NULL' | 'NA' | 'Inf' | 'NaN'
+;
+
+typedArgNoAssign : paramType=ml_type paramName=ID;
+parameterizedExpression : (paramName=ID '=')? paramVal=expression;
+strictParameterizedExpression : paramName=ID '=' paramVal=expression ;
+strictParameterizedKeyValueString : paramName=ID '=' paramVal=STRING ;
+ID : (ALPHABET (ALPHABET|DIGIT|'_')*  '::')? ALPHABET (ALPHABET|DIGIT|'_')*
+    // Special ID cases:
+   // | 'matrix' // --> This is a special case which causes lot of headache
+   | 'as.scalar' | 'as.matrix' | 'as.double' | 'as.integer' | 'as.logical' | 
'index.return' | 'lower.tail'
+;
+// Unfortunately, we have datatype name clashing with builtin function name: 
matrix :(
+// Therefore, ugly work around for checking datatype
+ml_type :  valueType | dataType '[' valueType ']';
+// Note to reduce number of keywords, these are case-sensitive,
+// To allow case-insenstive,  'int' becomes: ('i' | 'I') ('n' | 'N') ('t' | 
'T')
+valueType: 'int' | 'integer' | 'string' | 'boolean' | 'double'
+            | 'Int' | 'Integer' | 'String' | 'Boolean' | 'Double';
+dataType:
+        // 'scalar' # ScalarDataTypeDummyCheck
+        // |
+        ID # MatrixDataTypeCheck //{ if($ID.text.compareTo("matrix") != 0) { 
notifyErrorListeners("incorrect datatype"); } }
+        //|  'matrix' //---> See ID, this causes lot of headache
+        ;
+INT : DIGIT+  [Ll]?;
+// BOOLEAN : 'TRUE' | 'FALSE';
+DOUBLE: DIGIT+ '.' DIGIT* EXP? [Ll]?
+| DIGIT+ EXP? [Ll]?
+| '.' DIGIT+ EXP? [Ll]?
+;
+DIGIT: '0'..'9';
+ALPHABET : [a-zA-Z] ;
+fragment EXP : ('E' | 'e') ('+' | '-')? INT ;
+COMMANDLINE_NAMED_ID: '$' ALPHABET (ALPHABET|DIGIT|'_')*;
+COMMANDLINE_POSITION_ID: '$' DIGIT+;
+
+// supports single and double quoted string with escape characters
+STRING: '"' ( ESC | ~[\\"] )*? '"' | '\'' ( ESC | ~[\\'] )*? '\'';
+fragment ESC : '\\' [abtnfrv"'\\] ;
+// Comments, whitespaces and new line
+LINE_COMMENT : '#' .*? '\r'? '\n' -> skip ;
+MULTILINE_BLOCK_COMMENT : '/*' .*? '*/' -> skip ;
+WHITESPACE : (' ' | '\t' | '\r' | '\n')+ -> skip ;

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/05d2c0a8/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java
index b2d37f9..77d5282 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java
@@ -1,292 +1,292 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.controlprogram.parfor;
-
-import java.io.IOException;
-import java.util.HashMap;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.Counters.Group;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.mapred.SequenceFileOutputFormat;
-
-import org.apache.sysml.api.DMLScript;
-import org.apache.sysml.conf.ConfigurationManager;
-import org.apache.sysml.conf.DMLConfig;
-import org.apache.sysml.runtime.DMLRuntimeException;
-import org.apache.sysml.runtime.controlprogram.LocalVariableMap;
-import org.apache.sysml.runtime.controlprogram.ParForProgramBlock;
-import 
org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat;
-import org.apache.sysml.runtime.controlprogram.caching.CacheStatistics;
-import org.apache.sysml.runtime.controlprogram.caching.CacheableData;
-import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
-import 
org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
-import org.apache.sysml.runtime.controlprogram.parfor.stat.Stat;
-import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableBlock;
-import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableCell;
-import org.apache.sysml.runtime.instructions.cp.Data;
-import org.apache.sysml.runtime.io.MatrixReader;
-import org.apache.sysml.runtime.matrix.data.InputInfo;
-import org.apache.sysml.runtime.matrix.data.OutputInfo;
-import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
-import org.apache.sysml.runtime.util.MapReduceTool;
-import org.apache.sysml.utils.Statistics;
-import org.apache.sysml.yarn.DMLAppMasterUtils;
-
-/**
- * MR job class for submitting parfor remote MR jobs, controlling its 
execution and obtaining results.
- * 
- *
- */
-public class RemoteDPParForMR
-{
-       
-       protected static final Log LOG = 
LogFactory.getLog(RemoteDPParForMR.class.getName());
-       
-       /**
-        * 
-        * @param pfid
-        * @param program
-        * @param taskFile
-        * @param resultFile
-        * @param enableCPCaching 
-        * @param mode
-        * @param numMappers
-        * @param replication
-        * @return
-        * @throws DMLRuntimeException
-        */
-       public static RemoteParForJobReturn runJob(long pfid, String itervar, 
String matrixvar, String program, String resultFile, MatrixObject input, 
-                                                          PDataPartitionFormat 
dpf, OutputInfo oi, boolean tSparseCol, //config params
-                                                          boolean 
enableCPCaching, int numReducers, int replication, int max_retry)  //opt params
-               throws DMLRuntimeException
-       {
-               RemoteParForJobReturn ret = null;
-               String jobname = "ParFor-DPEMR";
-               long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
-               
-               JobConf job;
-               job = new JobConf( RemoteDPParForMR.class );
-               job.setJobName(jobname+pfid);
-               
-               //maintain dml script counters
-               Statistics.incrementNoOfCompiledMRJobs();
-       
-               try
-               {
-                       /////
-                       //configure the MR job
-               
-                       //set arbitrary CP program blocks that will perform in 
the reducers
-                       MRJobConfiguration.setProgramBlocks(job, program); 
-
-                       //enable/disable caching
-                       MRJobConfiguration.setParforCachingConfig(job, 
enableCPCaching);
-               
-                       //setup input matrix
-                       Path path = new Path( input.getFileName() );
-                       long rlen = input.getNumRows();
-                       long clen = input.getNumColumns();
-                       int brlen = (int) input.getNumRowsPerBlock();
-                       int bclen = (int) input.getNumColumnsPerBlock();
-                       MRJobConfiguration.setPartitioningInfo(job, rlen, clen, 
brlen, bclen, InputInfo.BinaryBlockInputInfo, oi, dpf, 1, input.getFileName(), 
itervar, matrixvar, tSparseCol);
-                       
job.setInputFormat(InputInfo.BinaryBlockInputInfo.inputFormatClass);
-                       FileInputFormat.setInputPaths(job, path);
-                       
-                       //set mapper and reducers classes
-                       job.setMapperClass(DataPartitionerRemoteMapper.class); 
-                       job.setReducerClass(RemoteDPParWorkerReducer.class); 
-                       
-                   //set output format
-                   job.setOutputFormat(SequenceFileOutputFormat.class);
-                   
-                   //set output path
-                   MapReduceTool.deleteFileIfExistOnHDFS(resultFile);
-                   FileOutputFormat.setOutputPath(job, new Path(resultFile));
-                   
-                       //set the output key, value schema
-                   
-                   //parfor partitioning outputs (intermediates)
-                   job.setMapOutputKeyClass(LongWritable.class);
-                   if( oi == OutputInfo.BinaryBlockOutputInfo )
-                       job.setMapOutputValueClass(PairWritableBlock.class); 
-                   else if( oi == OutputInfo.BinaryCellOutputInfo )
-                       job.setMapOutputValueClass(PairWritableCell.class);
-                   else 
-                       throw new DMLRuntimeException("Unsupported 
intermrediate output info: "+oi);
-                   //parfor exec output
-                   job.setOutputKeyClass(LongWritable.class);
-                       job.setOutputValueClass(Text.class);
-                       
-                       //////
-                       //set optimization parameters
-
-                       //set the number of mappers and reducers 
-                       job.setNumReduceTasks( numReducers );                   
-                       
-                       //disable automatic tasks timeouts and speculative task 
exec
-                       job.setInt("mapred.task.timeout", 0);                   
-                       job.setMapSpeculativeExecution(false);
-                       
-                       //set up preferred custom serialization framework for 
binary block format
-                       if( MRJobConfiguration.USE_BINARYBLOCK_SERIALIZATION )
-                               
MRJobConfiguration.addBinaryBlockSerializationFramework( job );
-       
-                       //set up map/reduce memory configurations (if in AM 
context)
-                       DMLConfig config = ConfigurationManager.getConfig();
-                       DMLAppMasterUtils.setupMRJobRemoteMaxMemory(job, 
config);
-                       
-                       //disable JVM reuse
-                       job.setNumTasksToExecutePerJvm( 1 ); //-1 for unlimited 
-                       
-                       //set the replication factor for the results
-                       job.setInt("dfs.replication", replication);
-                       
-                       //set the max number of retries per map task
-                       //note: currently disabled to use cluster config
-                       //job.setInt("mapreduce.map.maxattempts", max_retry);
-                       
-                       //set unique working dir
-                       MRJobConfiguration.setUniqueWorkingDir(job);
-                       
-                       /////
-                       // execute the MR job                   
-                       RunningJob runjob = JobClient.runJob(job);
-                       
-                       // Process different counters 
-                       Statistics.incrementNoOfExecutedMRJobs();
-                       Group pgroup = 
runjob.getCounters().getGroup(ParForProgramBlock.PARFOR_COUNTER_GROUP_NAME);
-                       int numTasks = (int)pgroup.getCounter( 
Stat.PARFOR_NUMTASKS.toString() );
-                       int numIters = (int)pgroup.getCounter( 
Stat.PARFOR_NUMITERS.toString() );
-                       if( DMLScript.STATISTICS && 
!InfrastructureAnalyzer.isLocalMode() ) {
-                               Statistics.incrementJITCompileTime( 
pgroup.getCounter( Stat.PARFOR_JITCOMPILE.toString() ) );
-                               Statistics.incrementJVMgcCount( 
pgroup.getCounter( Stat.PARFOR_JVMGC_COUNT.toString() ) );
-                               Statistics.incrementJVMgcTime( 
pgroup.getCounter( Stat.PARFOR_JVMGC_TIME.toString() ) );
-                               Group cgroup = 
runjob.getCounters().getGroup(CacheableData.CACHING_COUNTER_GROUP_NAME.toString());
-                               
CacheStatistics.incrementMemHits((int)cgroup.getCounter( 
CacheStatistics.Stat.CACHE_HITS_MEM.toString() ));
-                               
CacheStatistics.incrementFSBuffHits((int)cgroup.getCounter( 
CacheStatistics.Stat.CACHE_HITS_FSBUFF.toString() ));
-                               
CacheStatistics.incrementFSHits((int)cgroup.getCounter( 
CacheStatistics.Stat.CACHE_HITS_FS.toString() ));
-                               
CacheStatistics.incrementHDFSHits((int)cgroup.getCounter( 
CacheStatistics.Stat.CACHE_HITS_HDFS.toString() ));
-                               
CacheStatistics.incrementFSBuffWrites((int)cgroup.getCounter( 
CacheStatistics.Stat.CACHE_WRITES_FSBUFF.toString() ));
-                               
CacheStatistics.incrementFSWrites((int)cgroup.getCounter( 
CacheStatistics.Stat.CACHE_WRITES_FS.toString() ));
-                               
CacheStatistics.incrementHDFSWrites((int)cgroup.getCounter( 
CacheStatistics.Stat.CACHE_WRITES_HDFS.toString() ));
-                               
CacheStatistics.incrementAcquireRTime(cgroup.getCounter( 
CacheStatistics.Stat.CACHE_TIME_ACQR.toString() ));
-                               
CacheStatistics.incrementAcquireMTime(cgroup.getCounter( 
CacheStatistics.Stat.CACHE_TIME_ACQM.toString() ));
-                               
CacheStatistics.incrementReleaseTime(cgroup.getCounter( 
CacheStatistics.Stat.CACHE_TIME_RLS.toString() ));
-                               
CacheStatistics.incrementExportTime(cgroup.getCounter( 
CacheStatistics.Stat.CACHE_TIME_EXP.toString() ));
-                       }
-                               
-                       // read all files of result variables and prepare for 
return
-                       LocalVariableMap[] results = readResultFile(job, 
resultFile); 
-
-                       ret = new RemoteParForJobReturn(runjob.isSuccessful(), 
-                                                               numTasks, 
numIters, 
-                                                               results);       
-               }
-               catch(Exception ex)
-               {
-                       throw new DMLRuntimeException(ex);
-               }
-               finally
-               {
-                       // remove created files 
-                       try
-                       {
-                               MapReduceTool.deleteFileIfExistOnHDFS(new 
Path(resultFile), job);
-                       }
-                       catch(IOException ex)
-                       {
-                               throw new DMLRuntimeException(ex);
-                       }
-               }
-               
-               if( DMLScript.STATISTICS ){
-                       long t1 = System.nanoTime();
-                       Statistics.maintainCPHeavyHitters("MR-Job_"+jobname, 
t1-t0);
-               }
-               
-               return ret;
-       }
-       
-
-       /**
-        * Result file contains hierarchy of workerID-resultvar(incl filename). 
We deduplicate
-        * on the workerID. Without JVM reuse each task refers to a unique 
workerID, so we
-        * will not find any duplicates. With JVM reuse, however, each slot 
refers to a workerID, 
-        * and there are duplicate filenames due to partial aggregation and 
overwrite of fname 
-        * (the RemoteParWorkerMapper ensures uniqueness of those files 
independent of the 
-        * runtime implementation). 
-        * 
-        * @param job 
-        * @param fname
-        * @return
-        * @throws DMLRuntimeException
-        */
-       @SuppressWarnings("deprecation")
-       public static LocalVariableMap [] readResultFile( JobConf job, String 
fname )
-               throws DMLRuntimeException, IOException
-       {
-               HashMap<Long,LocalVariableMap> tmp = new 
HashMap<Long,LocalVariableMap>();
-
-               FileSystem fs = FileSystem.get(job);
-               Path path = new Path(fname);
-               LongWritable key = new LongWritable(); //workerID
-               Text value = new Text();               //serialized var header 
(incl filename)
-               
-               int countAll = 0;
-               for( Path lpath : MatrixReader.getSequenceFilePaths(fs, path) )
-               {
-                       SequenceFile.Reader reader = new 
SequenceFile.Reader(FileSystem.get(job),lpath,job);
-                       try
-                       {
-                               while( reader.next(key, value) )
-                               {
-                                       
//System.out.println("key="+key.get()+", value="+value.toString());
-                                       if( !tmp.containsKey( key.get() ) )
-                                       tmp.put(key.get(), new LocalVariableMap 
());       
-                                       Object[] dat = 
ProgramConverter.parseDataObject( value.toString() );
-                               tmp.get( key.get() ).put((String)dat[0], 
(Data)dat[1]);
-                               countAll++;
-                               }
-                       }       
-                       finally
-                       {
-                               if( reader != null )
-                                       reader.close();
-                       }
-               }               
-
-               LOG.debug("Num remote worker results (before deduplication): 
"+countAll);
-               LOG.debug("Num remote worker results: "+tmp.size());
-
-               //create return array
-               return tmp.values().toArray(new LocalVariableMap[0]);   
-       }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.controlprogram.parfor;
+
+import java.io.IOException;
+import java.util.HashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.Counters.Group;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+
+import org.apache.sysml.api.DMLScript;
+import org.apache.sysml.conf.ConfigurationManager;
+import org.apache.sysml.conf.DMLConfig;
+import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.controlprogram.LocalVariableMap;
+import org.apache.sysml.runtime.controlprogram.ParForProgramBlock;
+import 
org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat;
+import org.apache.sysml.runtime.controlprogram.caching.CacheStatistics;
+import org.apache.sysml.runtime.controlprogram.caching.CacheableData;
+import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
+import 
org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
+import org.apache.sysml.runtime.controlprogram.parfor.stat.Stat;
+import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableBlock;
+import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableCell;
+import org.apache.sysml.runtime.instructions.cp.Data;
+import org.apache.sysml.runtime.io.MatrixReader;
+import org.apache.sysml.runtime.matrix.data.InputInfo;
+import org.apache.sysml.runtime.matrix.data.OutputInfo;
+import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
+import org.apache.sysml.runtime.util.MapReduceTool;
+import org.apache.sysml.utils.Statistics;
+import org.apache.sysml.yarn.DMLAppMasterUtils;
+
+/**
+ * MR job class for submitting parfor remote MR jobs, controlling its 
execution and obtaining results.
+ * 
+ *
+ */
+public class RemoteDPParForMR
+{
+       
+       protected static final Log LOG = 
LogFactory.getLog(RemoteDPParForMR.class.getName());
+       
+       /**
+        * 
+        * @param pfid
+        * @param program
+        * @param taskFile
+        * @param resultFile
+        * @param enableCPCaching 
+        * @param mode
+        * @param numMappers
+        * @param replication
+        * @return
+        * @throws DMLRuntimeException
+        */
+       public static RemoteParForJobReturn runJob(long pfid, String itervar, 
String matrixvar, String program, String resultFile, MatrixObject input, 
+                                                          PDataPartitionFormat 
dpf, OutputInfo oi, boolean tSparseCol, //config params
+                                                          boolean 
enableCPCaching, int numReducers, int replication, int max_retry)  //opt params
+               throws DMLRuntimeException
+       {
+               RemoteParForJobReturn ret = null;
+               String jobname = "ParFor-DPEMR";
+               long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
+               
+               JobConf job;
+               job = new JobConf( RemoteDPParForMR.class );
+               job.setJobName(jobname+pfid);
+               
+               //maintain dml script counters
+               Statistics.incrementNoOfCompiledMRJobs();
+       
+               try
+               {
+                       /////
+                       //configure the MR job
+               
+                       //set arbitrary CP program blocks that will perform in 
the reducers
+                       MRJobConfiguration.setProgramBlocks(job, program); 
+
+                       //enable/disable caching
+                       MRJobConfiguration.setParforCachingConfig(job, 
enableCPCaching);
+               
+                       //setup input matrix
+                       Path path = new Path( input.getFileName() );
+                       long rlen = input.getNumRows();
+                       long clen = input.getNumColumns();
+                       int brlen = (int) input.getNumRowsPerBlock();
+                       int bclen = (int) input.getNumColumnsPerBlock();
+                       MRJobConfiguration.setPartitioningInfo(job, rlen, clen, 
brlen, bclen, InputInfo.BinaryBlockInputInfo, oi, dpf, 1, input.getFileName(), 
itervar, matrixvar, tSparseCol);
+                       
job.setInputFormat(InputInfo.BinaryBlockInputInfo.inputFormatClass);
+                       FileInputFormat.setInputPaths(job, path);
+                       
+                       //set mapper and reducers classes
+                       job.setMapperClass(DataPartitionerRemoteMapper.class); 
+                       job.setReducerClass(RemoteDPParWorkerReducer.class); 
+                       
+                   //set output format
+                   job.setOutputFormat(SequenceFileOutputFormat.class);
+                   
+                   //set output path
+                   MapReduceTool.deleteFileIfExistOnHDFS(resultFile);
+                   FileOutputFormat.setOutputPath(job, new Path(resultFile));
+                   
+                       //set the output key, value schema
+                   
+                   //parfor partitioning outputs (intermediates)
+                   job.setMapOutputKeyClass(LongWritable.class);
+                   if( oi == OutputInfo.BinaryBlockOutputInfo )
+                       job.setMapOutputValueClass(PairWritableBlock.class); 
+                   else if( oi == OutputInfo.BinaryCellOutputInfo )
+                       job.setMapOutputValueClass(PairWritableCell.class);
+                   else 
+                       throw new DMLRuntimeException("Unsupported 
intermrediate output info: "+oi);
+                   //parfor exec output
+                   job.setOutputKeyClass(LongWritable.class);
+                       job.setOutputValueClass(Text.class);
+                       
+                       //////
+                       //set optimization parameters
+
+                       //set the number of mappers and reducers 
+                       job.setNumReduceTasks( numReducers );                   
+                       
+                       //disable automatic tasks timeouts and speculative task 
exec
+                       job.setInt("mapred.task.timeout", 0);                   
+                       job.setMapSpeculativeExecution(false);
+                       
+                       //set up preferred custom serialization framework for 
binary block format
+                       if( MRJobConfiguration.USE_BINARYBLOCK_SERIALIZATION )
+                               
MRJobConfiguration.addBinaryBlockSerializationFramework( job );
+       
+                       //set up map/reduce memory configurations (if in AM 
context)
+                       DMLConfig config = ConfigurationManager.getConfig();
+                       DMLAppMasterUtils.setupMRJobRemoteMaxMemory(job, 
config);
+                       
+                       //disable JVM reuse
+                       job.setNumTasksToExecutePerJvm( 1 ); //-1 for unlimited 
+                       
+                       //set the replication factor for the results
+                       job.setInt("dfs.replication", replication);
+                       
+                       //set the max number of retries per map task
+                       //note: currently disabled to use cluster config
+                       //job.setInt("mapreduce.map.maxattempts", max_retry);
+                       
+                       //set unique working dir
+                       MRJobConfiguration.setUniqueWorkingDir(job);
+                       
+                       /////
+                       // execute the MR job                   
+                       RunningJob runjob = JobClient.runJob(job);
+                       
+                       // Process different counters 
+                       Statistics.incrementNoOfExecutedMRJobs();
+                       Group pgroup = 
runjob.getCounters().getGroup(ParForProgramBlock.PARFOR_COUNTER_GROUP_NAME);
+                       int numTasks = (int)pgroup.getCounter( 
Stat.PARFOR_NUMTASKS.toString() );
+                       int numIters = (int)pgroup.getCounter( 
Stat.PARFOR_NUMITERS.toString() );
+                       if( DMLScript.STATISTICS && 
!InfrastructureAnalyzer.isLocalMode() ) {
+                               Statistics.incrementJITCompileTime( 
pgroup.getCounter( Stat.PARFOR_JITCOMPILE.toString() ) );
+                               Statistics.incrementJVMgcCount( 
pgroup.getCounter( Stat.PARFOR_JVMGC_COUNT.toString() ) );
+                               Statistics.incrementJVMgcTime( 
pgroup.getCounter( Stat.PARFOR_JVMGC_TIME.toString() ) );
+                               Group cgroup = 
runjob.getCounters().getGroup(CacheableData.CACHING_COUNTER_GROUP_NAME.toString());
+                               
CacheStatistics.incrementMemHits((int)cgroup.getCounter( 
CacheStatistics.Stat.CACHE_HITS_MEM.toString() ));
+                               
CacheStatistics.incrementFSBuffHits((int)cgroup.getCounter( 
CacheStatistics.Stat.CACHE_HITS_FSBUFF.toString() ));
+                               
CacheStatistics.incrementFSHits((int)cgroup.getCounter( 
CacheStatistics.Stat.CACHE_HITS_FS.toString() ));
+                               
CacheStatistics.incrementHDFSHits((int)cgroup.getCounter( 
CacheStatistics.Stat.CACHE_HITS_HDFS.toString() ));
+                               
CacheStatistics.incrementFSBuffWrites((int)cgroup.getCounter( 
CacheStatistics.Stat.CACHE_WRITES_FSBUFF.toString() ));
+                               
CacheStatistics.incrementFSWrites((int)cgroup.getCounter( 
CacheStatistics.Stat.CACHE_WRITES_FS.toString() ));
+                               
CacheStatistics.incrementHDFSWrites((int)cgroup.getCounter( 
CacheStatistics.Stat.CACHE_WRITES_HDFS.toString() ));
+                               
CacheStatistics.incrementAcquireRTime(cgroup.getCounter( 
CacheStatistics.Stat.CACHE_TIME_ACQR.toString() ));
+                               
CacheStatistics.incrementAcquireMTime(cgroup.getCounter( 
CacheStatistics.Stat.CACHE_TIME_ACQM.toString() ));
+                               
CacheStatistics.incrementReleaseTime(cgroup.getCounter( 
CacheStatistics.Stat.CACHE_TIME_RLS.toString() ));
+                               
CacheStatistics.incrementExportTime(cgroup.getCounter( 
CacheStatistics.Stat.CACHE_TIME_EXP.toString() ));
+                       }
+                               
+                       // read all files of result variables and prepare for 
return
+                       LocalVariableMap[] results = readResultFile(job, 
resultFile); 
+
+                       ret = new RemoteParForJobReturn(runjob.isSuccessful(), 
+                                                               numTasks, 
numIters, 
+                                                               results);       
+               }
+               catch(Exception ex)
+               {
+                       throw new DMLRuntimeException(ex);
+               }
+               finally
+               {
+                       // remove created files 
+                       try
+                       {
+                               MapReduceTool.deleteFileIfExistOnHDFS(new 
Path(resultFile), job);
+                       }
+                       catch(IOException ex)
+                       {
+                               throw new DMLRuntimeException(ex);
+                       }
+               }
+               
+               if( DMLScript.STATISTICS ){
+                       long t1 = System.nanoTime();
+                       Statistics.maintainCPHeavyHitters("MR-Job_"+jobname, 
t1-t0);
+               }
+               
+               return ret;
+       }
+       
+
+       /**
+        * Result file contains hierarchy of workerID-resultvar(incl filename). 
We deduplicate
+        * on the workerID. Without JVM reuse each task refers to a unique 
workerID, so we
+        * will not find any duplicates. With JVM reuse, however, each slot 
refers to a workerID, 
+        * and there are duplicate filenames due to partial aggregation and 
overwrite of fname 
+        * (the RemoteParWorkerMapper ensures uniqueness of those files 
independent of the 
+        * runtime implementation). 
+        * 
+        * @param job 
+        * @param fname
+        * @return
+        * @throws DMLRuntimeException
+        */
+       @SuppressWarnings("deprecation")
+       public static LocalVariableMap [] readResultFile( JobConf job, String 
fname )
+               throws DMLRuntimeException, IOException
+       {
+               HashMap<Long,LocalVariableMap> tmp = new 
HashMap<Long,LocalVariableMap>();
+
+               FileSystem fs = FileSystem.get(job);
+               Path path = new Path(fname);
+               LongWritable key = new LongWritable(); //workerID
+               Text value = new Text();               //serialized var header 
(incl filename)
+               
+               int countAll = 0;
+               for( Path lpath : MatrixReader.getSequenceFilePaths(fs, path) )
+               {
+                       SequenceFile.Reader reader = new 
SequenceFile.Reader(FileSystem.get(job),lpath,job);
+                       try
+                       {
+                               while( reader.next(key, value) )
+                               {
+                                       
//System.out.println("key="+key.get()+", value="+value.toString());
+                                       if( !tmp.containsKey( key.get() ) )
+                                       tmp.put(key.get(), new LocalVariableMap 
());       
+                                       Object[] dat = 
ProgramConverter.parseDataObject( value.toString() );
+                               tmp.get( key.get() ).put((String)dat[0], 
(Data)dat[1]);
+                               countAll++;
+                               }
+                       }       
+                       finally
+                       {
+                               if( reader != null )
+                                       reader.close();
+                       }
+               }               
+
+               LOG.debug("Num remote worker results (before deduplication): 
"+countAll);
+               LOG.debug("Num remote worker results: "+tmp.size());
+
+               //create return array
+               return tmp.values().toArray(new LocalVariableMap[0]);   
+       }
+}

Reply via email to