Repository: incubator-systemml
Updated Branches:
  refs/heads/master 67f16c46e -> 1b4f1ec4d


[SYSTEMML-1228] Cleanup monitoring util and switch to Spark 2.1.0

Closes #376.


Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/1b4f1ec4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/1b4f1ec4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/1b4f1ec4

Branch: refs/heads/master
Commit: 1b4f1ec4d983b97afab2f144fded7c73f3923768
Parents: 67f16c4
Author: Niketan Pansare <[email protected]>
Authored: Fri Feb 10 16:41:36 2017 -0800
Committer: Niketan Pansare <[email protected]>
Committed: Fri Feb 10 16:42:18 2017 -0800

----------------------------------------------------------------------
 pom.xml                                         |   2 +-
 .../java/org/apache/sysml/api/MLContext.java    |  88 +--
 .../org/apache/sysml/api/MLContextProxy.java    |  38 --
 .../apache/sysml/api/mlcontext/MLContext.java   |  24 +-
 .../sysml/api/mlcontext/ScriptExecutor.java     |  74 ---
 .../api/monitoring/InstructionComparator.java   |  36 --
 .../apache/sysml/api/monitoring/Location.java   |  55 --
 .../api/monitoring/SparkMonitoringUtil.java     | 630 -------------------
 .../context/SparkExecutionContext.java          |  27 +-
 .../sysml/runtime/instructions/Instruction.java |  11 -
 .../runtime/instructions/cp/CPInstruction.java  |   4 -
 .../instructions/spark/SPInstruction.java       |  48 +-
 .../spark/functions/SparkListener.java          | 199 ------
 .../DataFrameRowFrameConversionTest.java        |   8 +-
 14 files changed, 10 insertions(+), 1234 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1b4f1ec4/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index f81557e..cd95e37 100644
--- a/pom.xml
+++ b/pom.xml
@@ -65,7 +65,7 @@
        <properties>
                <hadoop.version>2.4.1</hadoop.version>
                <antlr.version>4.5.3</antlr.version>
-               <spark.version>2.0.2</spark.version>
+               <spark.version>2.1.0</spark.version>
                <scala.version>2.11.8</scala.version>
                <scala.binary.version>2.11</scala.binary.version>
                <scala.test.version>2.2.6</scala.test.version>

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1b4f1ec4/src/main/java/org/apache/sysml/api/MLContext.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/api/MLContext.java 
b/src/main/java/org/apache/sysml/api/MLContext.java
index 42fb018..520e51e 100644
--- a/src/main/java/org/apache/sysml/api/MLContext.java
+++ b/src/main/java/org/apache/sysml/api/MLContext.java
@@ -41,7 +41,6 @@ import org.apache.spark.sql.SQLContext;
 import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
 import org.apache.sysml.api.jmlc.JMLCUtils;
 import org.apache.sysml.api.mlcontext.ScriptType;
-import org.apache.sysml.api.monitoring.SparkMonitoringUtil;
 import org.apache.sysml.conf.CompilerConfig;
 import org.apache.sysml.conf.CompilerConfig.ConfigType;
 import org.apache.sysml.conf.ConfigurationManager;
@@ -75,7 +74,6 @@ import org.apache.sysml.runtime.instructions.cp.Data;
 import org.apache.sysml.runtime.instructions.spark.data.RDDObject;
 import 
org.apache.sysml.runtime.instructions.spark.functions.ConvertStringToLongTextPair;
 import 
org.apache.sysml.runtime.instructions.spark.functions.CopyTextInputFunction;
-import org.apache.sysml.runtime.instructions.spark.functions.SparkListener;
 import 
org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils;
 import org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtils;
 import org.apache.sysml.runtime.instructions.spark.utils.SparkUtils;
@@ -207,19 +205,6 @@ public class MLContext {
        
        private Map<String, String> _additionalConfigs = new HashMap<String, 
String>();
        
-       // --------------------------------------------------
-       // _monitorUtils is set only when MLContext(sc, true)
-       private SparkMonitoringUtil _monitorUtils = null;
-       
-       /**
-        * Experimental API. Not supported in Python MLContext API.
-        * @return SparkMonitoringUtil the Spark monitoring util
-        */
-       public SparkMonitoringUtil getMonitoringUtil() {
-               return _monitorUtils;
-       }
-       // --------------------------------------------------
-       
        /**
         * Create an associated MLContext for given spark session.
         * @param sc SparkContext
@@ -1219,56 +1204,6 @@ public class MLContext {
        }
        
        // -------------------------------- Utility methods ends 
----------------------------------------------------------
-               
-       
-       // -------------------------------- Experimental API begins 
----------------------------------------------------------
-       /**
-        * Experimental api:
-        * Setting monitorPerformance to true adds additional overhead of 
storing state. So, use it only if necessary.
-        * @param sc SparkContext
-        * @param monitorPerformance if true, monitor performance, otherwise 
false
-        * @throws DMLRuntimeException if DMLRuntimeException occurs
-        */
-       public MLContext(SparkContext sc, boolean monitorPerformance) throws 
DMLRuntimeException {
-               initializeSpark(sc, monitorPerformance, false);
-       }
-       
-       /**
-        * Experimental api:
-        * Setting monitorPerformance to true adds additional overhead of 
storing state. So, use it only if necessary.
-        * @param sc JavaSparkContext
-        * @param monitorPerformance if true, monitor performance, otherwise 
false
-        * @throws DMLRuntimeException if DMLRuntimeException occurs
-        */
-       public MLContext(JavaSparkContext sc, boolean monitorPerformance) 
throws DMLRuntimeException {
-               initializeSpark(sc.sc(), monitorPerformance, false);
-       }
-       
-       /**
-        * Experimental api:
-        * Setting monitorPerformance to true adds additional overhead of 
storing state. So, use it only if necessary.
-        * @param sc SparkContext
-        * @param monitorPerformance if true, monitor performance, otherwise 
false
-        * @param setForcedSparkExecType set forced spark exec type
-        * @throws DMLRuntimeException if DMLRuntimeException occurs
-        */
-       public MLContext(SparkContext sc, boolean monitorPerformance, boolean 
setForcedSparkExecType) throws DMLRuntimeException {
-               initializeSpark(sc, monitorPerformance, setForcedSparkExecType);
-       }
-       
-       /**
-        * Experimental api:
-        * Setting monitorPerformance to true adds additional overhead of 
storing state. So, use it only if necessary.
-        * @param sc JavaSparkContext
-        * @param monitorPerformance if true, monitor performance, otherwise 
false
-        * @param setForcedSparkExecType set forced spark exec type
-        * @throws DMLRuntimeException if DMLRuntimeException occurs
-        */
-       public MLContext(JavaSparkContext sc, boolean monitorPerformance, 
boolean setForcedSparkExecType) throws DMLRuntimeException {
-               initializeSpark(sc.sc(), monitorPerformance, 
setForcedSparkExecType);
-       }
-       
-       // -------------------------------- Experimental API ends 
----------------------------------------------------------
        
        // -------------------------------- Private methods begins 
----------------------------------------------------------
        private boolean isRegisteredAsInput(String varName) {
@@ -1347,20 +1282,8 @@ public class MLContext {
                        DMLScript.rtplatform = RUNTIME_PLATFORM.SPARK;
                else
                        DMLScript.rtplatform = RUNTIME_PLATFORM.HYBRID_SPARK;
-               
-               if(monitorPerformance) {
-                       initializeSparkListener(sc);
-               }
        }
        
-       private void initializeSparkListener(SparkContext sc) throws 
DMLRuntimeException {
-               if(compareVersion(sc.version(), "1.4.0")  < 0 ) {
-                       throw new DMLRuntimeException("Expected spark version 
>= 1.4.0 for monitoring MLContext performance");
-               }
-               SparkListener sparkListener = new SparkListener(sc);
-               _monitorUtils = new SparkMonitoringUtil(sparkListener);
-               sc.addSparkListener(sparkListener);
-       }
        
        /**
         * Execute a script stored in a string.
@@ -1502,9 +1425,6 @@ public class MLContext {
                        
                        // Set active MLContext.
                        _activeMLContext = this;
-                       if(_monitorUtils != null) {
-                               _monitorUtils.resetMonitoringData();
-                       }
                        
                        if( OptimizerUtils.isSparkExecutionMode() ) {
                                // Depending on whether 
registerInput/registerOutput was called initialize the variables 
@@ -1589,9 +1509,6 @@ public class MLContext {
                
                //read dml script string
                String dmlScriptStr = DMLScript.readDMLScript( 
isFile?"-f":"-s", dmlScriptFilePath);
-               if(_monitorUtils != null) {
-                       _monitorUtils.setDMLString(dmlScriptStr);
-               }
                
                //simplified compilation chain
                _rtprog = null;
@@ -1653,9 +1570,6 @@ public class MLContext {
                //core execute runtime program  
                _rtprog.execute( ec );
                
-               if(_monitorUtils != null)
-                       
_monitorUtils.setExplainOutput(Explain.explain(_rtprog));
-               
                return ec;
        }
        
@@ -1680,4 +1594,4 @@ public class MLContext {
                MatrixCharacteristics mcOut = 
out.getMatrixCharacteristics("output");
                return MLMatrix.createMLMatrix(this, sqlContext, blocks, mcOut);
        }       
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1b4f1ec4/src/main/java/org/apache/sysml/api/MLContextProxy.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/api/MLContextProxy.java 
b/src/main/java/org/apache/sysml/api/MLContextProxy.java
index 88423b8..db87230 100644
--- a/src/main/java/org/apache/sysml/api/MLContextProxy.java
+++ b/src/main/java/org/apache/sysml/api/MLContextProxy.java
@@ -22,11 +22,9 @@ package org.apache.sysml.api;
 import java.util.ArrayList;
 
 import org.apache.sysml.api.mlcontext.MLContextException;
-import org.apache.sysml.api.monitoring.Location;
 import org.apache.sysml.parser.Expression;
 import org.apache.sysml.parser.LanguageException;
 import org.apache.sysml.runtime.instructions.Instruction;
-import org.apache.sysml.runtime.instructions.spark.SPInstruction;
 
 /**
  * The purpose of this proxy is to shield systemml internals from direct 
access to MLContext
@@ -87,41 +85,5 @@ public class MLContextProxy
                throw new MLContextException("No MLContext object is currently 
active. Have you created one? "
                                + "Hint: in Scala, 'val ml = new 
MLContext(sc)'", true);
        }
-
-       @SuppressWarnings("deprecation")
-       public static void setInstructionForMonitoring(Instruction inst) {
-               Location loc = inst.getLocation();
-               if (loc == null) {
-                       return;
-               }
-               
-               if (org.apache.sysml.api.MLContext.getActiveMLContext() != 
null) {
-                       org.apache.sysml.api.MLContext mlContext = 
org.apache.sysml.api.MLContext.getActiveMLContext();
-                       if(mlContext.getMonitoringUtil() != null) {
-                               
mlContext.getMonitoringUtil().setInstructionLocation(loc, inst);
-                       }
-               } else if 
(org.apache.sysml.api.mlcontext.MLContext.getActiveMLContext() != null) {
-                       org.apache.sysml.api.mlcontext.MLContext mlContext = 
org.apache.sysml.api.mlcontext.MLContext.getActiveMLContext();
-                       if(mlContext.getSparkMonitoringUtil() != null) {
-                               
mlContext.getSparkMonitoringUtil().setInstructionLocation(loc, inst);
-                       }
-               }
-       }
-       
-       @SuppressWarnings("deprecation")
-       public static void addRDDForInstructionForMonitoring(SPInstruction 
inst, Integer rddID) {
-               
-               if (org.apache.sysml.api.MLContext.getActiveMLContext() != 
null) {
-                       org.apache.sysml.api.MLContext mlContext = 
org.apache.sysml.api.MLContext.getActiveMLContext();
-                       if(mlContext.getMonitoringUtil() != null) {
-                               
mlContext.getMonitoringUtil().addRDDForInstruction(inst, rddID);
-                       }
-               } else if 
(org.apache.sysml.api.mlcontext.MLContext.getActiveMLContext() != null) {
-                       org.apache.sysml.api.mlcontext.MLContext mlContext = 
org.apache.sysml.api.mlcontext.MLContext.getActiveMLContext();
-                       if(mlContext.getSparkMonitoringUtil() != null) {
-                               
mlContext.getSparkMonitoringUtil().addRDDForInstruction(inst, rddID);
-                       }
-               }
-       }
        
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1b4f1ec4/src/main/java/org/apache/sysml/api/mlcontext/MLContext.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/api/mlcontext/MLContext.java 
b/src/main/java/org/apache/sysml/api/mlcontext/MLContext.java
index 17cb92b..48f013e 100644
--- a/src/main/java/org/apache/sysml/api/mlcontext/MLContext.java
+++ b/src/main/java/org/apache/sysml/api/mlcontext/MLContext.java
@@ -32,7 +32,6 @@ import org.apache.sysml.api.DMLScript;
 import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
 import org.apache.sysml.api.MLContextProxy;
 import org.apache.sysml.api.jmlc.JMLCUtils;
-import org.apache.sysml.api.monitoring.SparkMonitoringUtil;
 import org.apache.sysml.conf.ConfigurationManager;
 import org.apache.sysml.conf.DMLConfig;
 import org.apache.sysml.parser.DataExpression;
@@ -46,7 +45,6 @@ import 
org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
 import org.apache.sysml.runtime.instructions.Instruction;
 import org.apache.sysml.runtime.instructions.cp.Data;
 import org.apache.sysml.runtime.instructions.cp.ScalarObject;
-import org.apache.sysml.runtime.instructions.spark.functions.SparkListener;
 import org.apache.sysml.runtime.matrix.MatrixFormatMetaData;
 import org.apache.sysml.runtime.matrix.data.OutputInfo;
 import org.apache.sysml.utils.Explain.ExplainType;
@@ -68,11 +66,6 @@ public class MLContext {
        private SparkContext sc = null;
 
        /**
-        * SparkMonitoringUtil monitors SystemML performance on Spark.
-        */
-       private SparkMonitoringUtil sparkMonitoringUtil = null;
-
-       /**
         * Reference to the currently executing script.
         */
        private Script executingScript = null;
@@ -232,12 +225,6 @@ public class MLContext {
 
                MLContextUtil.setDefaultConfig();
                MLContextUtil.setCompilerConfig();
-
-               if (monitorPerformance) {
-                       SparkListener sparkListener = new SparkListener(sc);
-                       sparkMonitoringUtil = new 
SparkMonitoringUtil(sparkListener);
-                       sc.addSparkListener(sparkListener);
-               }
        }
 
        /**
@@ -273,7 +260,7 @@ public class MLContext {
         * @return the results as a MLResults object
         */
        public MLResults execute(Script script) {
-               ScriptExecutor scriptExecutor = new 
ScriptExecutor(sparkMonitoringUtil);
+               ScriptExecutor scriptExecutor = new ScriptExecutor();
                scriptExecutor.setExplain(explain);
                scriptExecutor.setExplainLevel(explainLevel);
                scriptExecutor.setStatistics(statistics);
@@ -326,15 +313,6 @@ public class MLContext {
        }
 
        /**
-        * Obtain the SparkMonitoringUtil if it is available.
-        *
-        * @return the SparkMonitoringUtil if it is available.
-        */
-       public SparkMonitoringUtil getSparkMonitoringUtil() {
-               return sparkMonitoringUtil;
-       }
-
-       /**
         * Obtain the SparkContext associated with this MLContext.
         *
         * @return the SparkContext associated with this MLContext.

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1b4f1ec4/src/main/java/org/apache/sysml/api/mlcontext/ScriptExecutor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/api/mlcontext/ScriptExecutor.java 
b/src/main/java/org/apache/sysml/api/mlcontext/ScriptExecutor.java
index 734475e..5ee8622 100644
--- a/src/main/java/org/apache/sysml/api/mlcontext/ScriptExecutor.java
+++ b/src/main/java/org/apache/sysml/api/mlcontext/ScriptExecutor.java
@@ -27,7 +27,6 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.sysml.api.DMLScript;
 import org.apache.sysml.api.jmlc.JMLCUtils;
 import org.apache.sysml.api.mlcontext.MLContext.ExplainLevel;
-import org.apache.sysml.api.monitoring.SparkMonitoringUtil;
 import org.apache.sysml.conf.ConfigurationManager;
 import org.apache.sysml.conf.DMLConfig;
 import org.apache.sysml.hops.HopsException;
@@ -108,7 +107,6 @@ import org.apache.sysml.utils.Statistics;
 public class ScriptExecutor {
 
        protected DMLConfig config;
-       protected SparkMonitoringUtil sparkMonitoringUtil;
        protected DMLProgram dmlProgram;
        protected DMLTranslator dmlTranslator;
        protected Program runtimeProgram;
@@ -141,32 +139,6 @@ public class ScriptExecutor {
        }
 
        /**
-        * ScriptExecutor constructor, where a SparkMonitoringUtil object is 
passed
-        * in.
-        * 
-        * @param sparkMonitoringUtil
-        *            SparkMonitoringUtil object to monitor Spark
-        */
-       public ScriptExecutor(SparkMonitoringUtil sparkMonitoringUtil) {
-               this();
-               this.sparkMonitoringUtil = sparkMonitoringUtil;
-       }
-
-       /**
-        * ScriptExecutor constructor, where the configuration properties and a
-        * SparkMonitoringUtil object are passed in.
-        * 
-        * @param config
-        *            the configuration properties to use by the ScriptExecutor
-        * @param sparkMonitoringUtil
-        *            SparkMonitoringUtil object to monitor Spark
-        */
-       public ScriptExecutor(DMLConfig config, SparkMonitoringUtil 
sparkMonitoringUtil) {
-               this.config = config;
-               this.sparkMonitoringUtil = sparkMonitoringUtil;
-       }
-
-       /**
         * Construct DAGs of high-level operators (HOPs) for each block of
         * statements.
         */
@@ -318,7 +290,6 @@ public class ScriptExecutor {
                cleanupRuntimeProgram();
                createAndInitializeExecutionContext();
                executeRuntimeProgram();
-               setExplainRuntimeProgramInSparkMonitor();
                cleanupAfterExecution();
 
                // add symbol table to MLResults
@@ -344,7 +315,6 @@ public class ScriptExecutor {
                this.script = script;
                checkScriptHasTypeAndString();
                script.setScriptExecutor(this);
-               setScriptStringInSparkMonitor();
                // Set global variable indicating the script type
                DMLScript.SCRIPT_TYPE = script.getScriptType();
        }
@@ -403,15 +373,6 @@ public class ScriptExecutor {
        }
 
        /**
-        * Obtain the SparkMonitoringUtil object.
-        * 
-        * @return the SparkMonitoringUtil object, if available
-        */
-       public SparkMonitoringUtil getSparkMonitoringUtil() {
-               return sparkMonitoringUtil;
-       }
-
-       /**
         * Check security, create scratch space, cleanup working directories,
         * initialize caching, and reset statistics.
         */
@@ -500,41 +461,6 @@ public class ScriptExecutor {
        }
 
        /**
-        * Set the explanation of the runtime program in the 
SparkMonitoringUtil if
-        * it exists.
-        */
-       protected void setExplainRuntimeProgramInSparkMonitor() {
-               if (sparkMonitoringUtil != null) {
-                       try {
-                               String explainOutput = 
Explain.explain(runtimeProgram);
-                               
sparkMonitoringUtil.setExplainOutput(explainOutput);
-                       } catch (HopsException e) {
-                               throw new MLContextException("Exception 
occurred while explaining runtime program", e);
-                       }
-               }
-
-       }
-
-       /**
-        * Set the script string in the SparkMonitoringUtil if it exists.
-        */
-       protected void setScriptStringInSparkMonitor() {
-               if (sparkMonitoringUtil != null) {
-                       
sparkMonitoringUtil.setDMLString(script.getScriptString());
-               }
-       }
-
-       /**
-        * Set the SparkMonitoringUtil object.
-        * 
-        * @param sparkMonitoringUtil
-        *            The SparkMonitoringUtil object
-        */
-       public void setSparkMonitoringUtil(SparkMonitoringUtil 
sparkMonitoringUtil) {
-               this.sparkMonitoringUtil = sparkMonitoringUtil;
-       }
-
-       /**
         * Liveness analysis is performed on the program, obtaining sets of 
live-in
         * and live-out variables by forward and backward passes over the 
program.
         */

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1b4f1ec4/src/main/java/org/apache/sysml/api/monitoring/InstructionComparator.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/api/monitoring/InstructionComparator.java 
b/src/main/java/org/apache/sysml/api/monitoring/InstructionComparator.java
deleted file mode 100644
index 485ada4..0000000
--- a/src/main/java/org/apache/sysml/api/monitoring/InstructionComparator.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sysml.api.monitoring;
-
-import java.util.Comparator;
-import java.util.HashMap;
-
-public class InstructionComparator implements Comparator<String>{
-
-       HashMap<String, Long> instructionCreationTime;
-       public InstructionComparator(HashMap<String, Long> 
instructionCreationTime) {
-               this.instructionCreationTime = instructionCreationTime;
-       }
-       
-       @Override
-       public int compare(String o1, String o2) {
-               return instructionCreationTime.get(o1)
-                       .compareTo(instructionCreationTime.get(o2));
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1b4f1ec4/src/main/java/org/apache/sysml/api/monitoring/Location.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/api/monitoring/Location.java 
b/src/main/java/org/apache/sysml/api/monitoring/Location.java
deleted file mode 100644
index ddc9749..0000000
--- a/src/main/java/org/apache/sysml/api/monitoring/Location.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sysml.api.monitoring;
-
-public class Location implements Comparable<Location> {
-       public int beginLine;
-       public int endLine;
-       public int beginCol;
-       public int endCol;
-       public Location(int beginLine, int endLine, int beginCol, int endCol) {
-               this.beginLine = beginLine;
-               this.endLine = endLine;
-               this.beginCol = beginCol;
-               this.endCol = endCol;
-       }
-       
-       @Override
-       public boolean equals(Object other) {
-               if(!( other instanceof Location ))
-                       return false;
-               Location loc = (Location) other;
-               return loc.beginLine == beginLine && loc.endLine == endLine 
-                               && loc.beginCol == beginCol && loc.endCol == 
endCol;
-       }
-       
-       public String toString() {
-               return beginLine + ":" + beginCol + ", " + endLine + ":" + 
endCol;
-       }
-
-       @Override
-       public int compareTo(Location loc) {
-               int ret1 = Integer.compare(loc.beginLine, beginLine);
-               int ret2 = Integer.compare(loc.endLine, endLine);
-               int ret3 = Integer.compare(loc.beginCol, beginCol);
-               int ret4 = Integer.compare(loc.endCol, endCol);
-               
-               return (ret1 != 0) ? ret1 : (ret2 != 0) ? ret2 : (ret3 != 0) ? 
ret3 : ret4;   
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1b4f1ec4/src/main/java/org/apache/sysml/api/monitoring/SparkMonitoringUtil.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/api/monitoring/SparkMonitoringUtil.java 
b/src/main/java/org/apache/sysml/api/monitoring/SparkMonitoringUtil.java
deleted file mode 100644
index 42a1f32..0000000
--- a/src/main/java/org/apache/sysml/api/monitoring/SparkMonitoringUtil.java
+++ /dev/null
@@ -1,630 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sysml.api.monitoring;
-
-import java.io.BufferedWriter;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.util.AbstractMap.SimpleEntry;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.TreeMap;
-
-import org.apache.sysml.lops.Lop;
-import org.apache.sysml.runtime.DMLRuntimeException;
-import org.apache.sysml.runtime.instructions.Instruction;
-import org.apache.sysml.runtime.instructions.spark.SPInstruction;
-import org.apache.sysml.runtime.instructions.spark.functions.SparkListener;
-
-import scala.collection.Seq;
-import scala.xml.Node;
-
-/**
- * Usage guide:
- * MLContext mlCtx = new MLContext(sc, true);
- * mlCtx.register...
- * mlCtx.execute(...)
- * mlCtx.getMonitoringUtil().getRuntimeInfoInHTML("runtime.html");
- */
-public class SparkMonitoringUtil {
-       private HashMap<String, String> lineageInfo = new HashMap<String, 
String>();    // instruction -> lineageInfo
-       private HashMap<String, Long> instructionCreationTime = new 
HashMap<String, Long>();
-       private MultiMap<Location, String> instructions = new 
MultiMap<Location, String>();
-       private MultiMap<String, Integer> stageIDs = new MultiMap<String, 
Integer>();
-       private MultiMap<String, Integer> jobIDs = new MultiMap<String, 
Integer>();
-       private MultiMap<Integer, String> rddInstructionMapping = new 
MultiMap<Integer, String>();
-       
-       private HashSet<String> getRelatedInstructions(int stageID) {
-               HashSet<String> retVal = new HashSet<String>();
-               if(_sparkListener != null) {
-                       ArrayList<Integer> rdds = 
_sparkListener.stageRDDMapping.get(stageID);
-                       for(Integer rddID : rdds) {
-                               retVal.addAll(rddInstructionMapping.get(rddID));
-                       }
-               }
-               return retVal;
-       }
-       
-       private SparkListener _sparkListener = null;
-       public SparkListener getSparkListener() {
-               return _sparkListener;
-       }
-       
-       private String explainOutput = "";
-       
-       public String getExplainOutput() {
-               return explainOutput;
-       }
-
-       public void setExplainOutput(String explainOutput) {
-               this.explainOutput = explainOutput;
-       }
-
-       public SparkMonitoringUtil(SparkListener sparkListener) {
-               _sparkListener = sparkListener;
-       }
-       
-       public void addCurrentInstruction(SPInstruction inst) {
-               if(_sparkListener != null) {
-                       _sparkListener.addCurrentInstruction(inst);
-               }
-       }
-       
-       public void addRDDForInstruction(SPInstruction inst, Integer rddID) {
-               this.rddInstructionMapping.put(rddID, 
getInstructionString(inst));
-       }
-       
-       public void removeCurrentInstruction(SPInstruction inst) {
-               if(_sparkListener != null) {
-                       _sparkListener.removeCurrentInstruction(inst);
-               }
-       }
-       
-       public void setDMLString(String dmlStr) {
-               this.dmlStrForMonitoring = dmlStr;
-       }
-       
-       public void resetMonitoringData() {
-               if(_sparkListener != null && _sparkListener.stageDAGs != null)
-                       _sparkListener.stageDAGs.clear();
-               if(_sparkListener != null && _sparkListener.stageTimeline != 
null)
-                       _sparkListener.stageTimeline.clear();
-       }
-       
-       // public Multimap<Location, String> hops = ArrayListMultimap.create(); 
TODO:
-       private String dmlStrForMonitoring = null;
-       public void getRuntimeInfoInHTML(String htmlFilePath) throws 
DMLRuntimeException, IOException {
-               String jsAndCSSFiles = "<script 
src=\"js/lodash.min.js\"></script>"
-                               + "<script 
src=\"js/jquery-1.11.1.min.js\"></script>"
-                               + "<script src=\"js/d3.min.js\"></script>"
-                               + "<script 
src=\"js/bootstrap-tooltip.js\"></script>"
-                               + "<script src=\"js/dagre-d3.min.js\"></script>"
-                               + "<script 
src=\"js/graphlib-dot.min.js\"></script>"
-                               + "<script 
src=\"js/spark-dag-viz.js\"></script>"
-                               + "<script 
src=\"js/timeline-view.js\"></script>"
-                               + "<script src=\"js/vis.min.js\"></script>"
-                               + "<link rel=\"stylesheet\" 
href=\"css/bootstrap.min.css\">"
-                               + "<link rel=\"stylesheet\" 
href=\"css/vis.min.css\">"
-                               + "<link rel=\"stylesheet\" 
href=\"css/spark-dag-viz.css\">"
-                               + "<link rel=\"stylesheet\" 
href=\"css/timeline-view.css\"> ";
-               BufferedWriter bw = new BufferedWriter(new 
FileWriter(htmlFilePath));
-               bw.write("<html><head>\n");
-               bw.write(jsAndCSSFiles + "\n");
-               bw.write("</head><body>\n<table border=1>\n");
-               
-               bw.write("<tr>\n");
-               bw.write("<td><b>Position in script</b></td>\n");
-               bw.write("<td><b>DML</b></td>\n");
-               bw.write("<td><b>Instruction</b></td>\n");
-               bw.write("<td><b>StageIDs</b></td>\n");
-               bw.write("<td><b>RDD Lineage</b></td>\n");
-               bw.write("</tr>\n");
-               
-               for(Location loc : instructions.keySet()) {
-                       String dml = getExpression(loc);
-                       
-                       // Sort the instruction with time - so as to separate 
recompiled instructions
-                       List<String> listInst = new 
ArrayList<String>(instructions.get(loc));
-                       Collections.sort(listInst, new 
InstructionComparator(instructionCreationTime));
-                       
-                       if(dml != null && dml.trim().length() > 1) {
-                               bw.write("<tr>\n");
-                               int rowSpan = listInst.size();
-                               bw.write("<td rowspan=\"" + rowSpan + "\">" + 
loc.toString() + "</td>\n");
-                               bw.write("<td rowspan=\"" + rowSpan + "\">" + 
dml + "</td>\n");
-                               boolean firstTime = true;
-                               for(String inst : listInst) {
-                                       if(!firstTime)
-                                               bw.write("<tr>\n");
-                                       
-                                       if(inst.startsWith("SPARK"))
-                                               bw.write("<td 
style=\"color:red\">" + inst + "</td>\n");
-                                       else if(isInterestingCP(inst))
-                                               bw.write("<td 
style=\"color:blue\">" + inst + "</td>\n");
-                                       else
-                                               bw.write("<td>" + inst + 
"</td>\n");
-                                       
-                                       bw.write("<td>" + 
getStageIDAsString(inst) + "</td>\n");
-                                       if(lineageInfo.containsKey(inst))
-                                               bw.write("<td>" + 
lineageInfo.get(inst).replaceAll("\n", "<br />") + "</td>\n");
-                                       else
-                                               bw.write("<td></td>\n");
-                                       
-                                       bw.write("</tr>\n");
-                                       firstTime = false;
-                               }
-                               
-                       }
-                       
-               }
-               
-               bw.write("</table></body>\n</html>");
-               bw.close();
-       }
-       
-       private String getInQuotes(String str) {
-               return "\"" + str + "\"";
-       }
-       private String getEscapedJSON(String json) {
-               if(json == null)
-                       return "";
-               else {
-                       return json
-                                       //.replaceAll("\\\\", "\\\\\\")
-                                       .replaceAll("\\t", "\\\\t")
-                                       .replaceAll("/", "\\\\/")
-                                       .replaceAll("\"", "\\\\\"")
-                                       .replaceAll("\\r?\\n", "\\\\n");
-               }
-       }
-       
-       private long maxExpressionExecutionTime = 0;
-       HashMap<Integer, Long> stageExecutionTimes = new HashMap<Integer, 
Long>();
-       HashMap<String, Long> expressionExecutionTimes = new HashMap<String, 
Long>();
-       HashMap<String, Long> instructionExecutionTimes = new HashMap<String, 
Long>();
-       HashMap<Integer, HashSet<String>> relatedInstructionsPerStage = new 
HashMap<Integer, HashSet<String>>();
-       private void fillExecutionTimes() {
-               stageExecutionTimes.clear();
-               expressionExecutionTimes.clear();
-               for(Location loc : instructions.keySet()) {
-                       List<String> listInst = new 
ArrayList<String>(instructions.get(loc));
-                       long expressionExecutionTime = 0;
-                       
-                       for(String inst : listInst) {
-                               long instructionExecutionTime = 0;
-                               for(Integer stageId : stageIDs.get(inst)) {
-                                       try {
-                                               
if(getStageExecutionTime(stageId) != null) {
-                                                       long stageExecTime = 
getStageExecutionTime(stageId);
-                                                       
instructionExecutionTime += stageExecTime;
-                                                       expressionExecutionTime 
+= stageExecTime;
-                                                       
stageExecutionTimes.put(stageId, stageExecTime);
-                                               }
-                                       }
-                                       catch(Exception e) {}
-
-                                       
relatedInstructionsPerStage.put(stageId, getRelatedInstructions(stageId));
-                               }
-                               instructionExecutionTimes.put(inst, 
instructionExecutionTime);
-                       }
-                       expressionExecutionTime /= listInst.size(); // average
-                       maxExpressionExecutionTime = 
Math.max(maxExpressionExecutionTime, expressionExecutionTime);
-                       expressionExecutionTimes.put(loc.toString(), 
expressionExecutionTime);
-               }
-               
-               // Now fill empty instructions
-               for(Entry<String, Long> kv : 
instructionExecutionTimes.entrySet()) {
-                       if(kv.getValue() == 0) {
-                               // Find all stages that contain this as related 
instruction
-                               long sumExecutionTime = 0;
-                               for(Entry<Integer, HashSet<String>> kv1 : 
relatedInstructionsPerStage.entrySet()) {
-                                       
if(kv1.getValue().contains(kv.getKey())) {
-                                               sumExecutionTime += 
stageExecutionTimes.get(kv1.getKey());
-                                       }
-                               }
-                               kv.setValue(sumExecutionTime);
-                       }
-               }
-               
-               for(Location loc : instructions.keySet()) {
-                       if(expressionExecutionTimes.get(loc.toString()) == 0) {
-                               List<String> listInst = new 
ArrayList<String>(instructions.get(loc));
-                               long expressionExecutionTime = 0;
-                               for(String inst : listInst) {
-                                       expressionExecutionTime += 
instructionExecutionTimes.get(inst);
-                               }
-                               expressionExecutionTime /= listInst.size(); // 
average
-                               maxExpressionExecutionTime = 
Math.max(maxExpressionExecutionTime, expressionExecutionTime);
-                               expressionExecutionTimes.put(loc.toString(), 
expressionExecutionTime);
-                       }
-               }
-               
-       }
-
-       public String getRuntimeInfoInJSONFormat() throws DMLRuntimeException, 
IOException {
-               StringBuilder retVal = new StringBuilder("{\n");
-               
-               retVal.append(getInQuotes("dml") + ":" + 
getInQuotes(getEscapedJSON(dmlStrForMonitoring)) + ",\n");
-               retVal.append(getInQuotes("expressions") + ":" + "[\n");
-               
-               boolean isFirstExpression = true;
-               fillExecutionTimes();
-               
-               for(Location loc : instructions.keySet()) {
-                       String dml = getEscapedJSON(getExpressionInJSON(loc));
-                       
-                       if(dml != null) {
-                               // Sort the instruction with time - so as to 
separate recompiled instructions
-                               List<String> listInst = new 
ArrayList<String>(instructions.get(loc));
-                               Collections.sort(listInst, new 
InstructionComparator(instructionCreationTime));
-                               
-                               if(!isFirstExpression) {
-                                       retVal.append(",\n");
-                               }
-                               retVal.append("{\n");
-                               isFirstExpression = false;
-                               
-                               retVal.append(getInQuotes("beginLine") + ":" + 
loc.beginLine + ",\n");
-                               retVal.append(getInQuotes("beginCol") + ":" + 
loc.beginCol + ",\n");
-                               retVal.append(getInQuotes("endLine") + ":" + 
loc.endLine + ",\n");
-                               retVal.append(getInQuotes("endCol") + ":" + 
loc.endCol + ",\n");
-                               
-                               long expressionExecutionTime = 
expressionExecutionTimes.get(loc.toString());
-                               
retVal.append(getInQuotes("expressionExecutionTime") + ":" + 
expressionExecutionTime + ",\n");
-                               
retVal.append(getInQuotes("expressionHeavyHitterFactor") + ":" + 
((double)expressionExecutionTime / (double)maxExpressionExecutionTime) + ",\n");
-                               
-                               retVal.append(getInQuotes("expression") + ":" + 
getInQuotes(dml) + ",\n");
-                               
-                               retVal.append(getInQuotes("instructions") + ":" 
+ "[\n");
-                       
-                               boolean firstTime = true;
-                               for(String inst : listInst) {
-                                       
-                                       if(!firstTime)
-                                               retVal.append(", {");
-                                       else
-                                               retVal.append("{");
-                                       
-                                       if(inst.startsWith("SPARK")) {
-                                               
retVal.append(getInQuotes("isSpark") + ":" + "true,\n"); 
-                                       }
-                                       else if(isInterestingCP(inst)) {
-                                               
retVal.append(getInQuotes("isInteresting") + ":" + "true,\n");
-                                       }
-                                       
-                                       
retVal.append(getStageIDAsJSONString(inst) + "\n");
-                                       if(lineageInfo.containsKey(inst)) {
-                                               
retVal.append(getInQuotes("lineageInfo") + ":" + 
getInQuotes(getEscapedJSON(lineageInfo.get(inst))) + ",\n");
-                                       }
-                                       
-                                       
retVal.append(getInQuotes("instruction") + ":" + 
getInQuotes(getEscapedJSON(inst)));
-                                       retVal.append("}");
-                                       firstTime = false;
-                               }
-                               
-                               retVal.append("]\n");
-                               retVal.append("}\n");
-                       }
-                       
-               }
-               
-               return retVal.append("]\n}").toString();
-       }
-       
-       
-       private boolean isInterestingCP(String inst) {
-               if(inst.startsWith("CP rmvar") || inst.startsWith("CP cpvar") 
|| inst.startsWith("CP mvvar"))
-                       return false;
-               else if(inst.startsWith("CP"))
-                       return true;
-               else
-                       return false;
-       }
-       
-       private String getStageIDAsString(String instruction) {
-               String retVal = "";
-               for(Integer stageId : stageIDs.get(instruction)) {
-                       String stageDAG = "";
-                       String stageTimeLine = "";
-                       
-                       if(getStageDAGs(stageId) != null) {
-                               stageDAG = getStageDAGs(stageId).toString();
-                       }
-                       
-                       if(getStageTimeLine(stageId) != null) {
-                               stageTimeLine = 
getStageTimeLine(stageId).toString();
-                       }
-                       
-                       retVal +=  "Stage:" + stageId + 
-                                       " ("
-                                       + "<div>" 
-                                               + 
stageDAG.replaceAll("toggleDagViz\\(false\\)", "toggleDagViz(false, this)") 
-                                       + "</div>, "
-                                       + "<div id=\"timeline-" + stageId + 
"\">"
-                                               + stageTimeLine
-                                                       
.replaceAll("drawTaskAssignmentTimeline\\(", "registerTimelineData(" + stageId 
+ ", ")
-                                                       
.replaceAll("class=\"expand-task-assignment-timeline\"",  
"class=\"expand-task-assignment-timeline\" 
onclick=\"toggleStageTimeline(this)\"")
-                                       + "</div>"
-                                       + ")"; 
-               }
-               return retVal;
-       }
-       
-       private String getStageIDAsJSONString(String instruction) {
-               long instructionExecutionTime = 
instructionExecutionTimes.get(instruction);
-               
-               StringBuilder retVal = new 
StringBuilder(getInQuotes("instructionExecutionTime") + ":" + 
instructionExecutionTime + ",\n");
-               
-               boolean isFirst = true;
-               if(stageIDs.get(instruction).size() == 0) {
-                       // Find back references
-                       HashSet<Integer> relatedStages = new HashSet<Integer>();
-                       for(Entry<Integer, HashSet<String>> kv : 
relatedInstructionsPerStage.entrySet()) {
-                               if(kv.getValue().contains(instruction)) {
-                                       relatedStages.add(kv.getKey());
-                               }
-                       }
-                       HashSet<String> relatedInstructions = new 
HashSet<String>();
-                       for(Entry<String, Integer> kv : stageIDs.entries()) {
-                               if(relatedStages.contains(kv.getValue())) {
-                                       relatedInstructions.add(kv.getKey());
-                               }
-                       }
-                       
-                       retVal.append(getInQuotes("backReferences") + ": [\n");
-                       boolean isFirstRelInst = true;
-                       for(String relInst : relatedInstructions) {
-                               if(!isFirstRelInst) {
-                                       retVal.append(",\n");
-                               }
-                               retVal.append(getInQuotes(relInst));
-                               isFirstRelInst = false;
-                       }
-                       retVal.append("], \n");
-               }
-               else {
-                       retVal.append(getInQuotes("stages") + ": {");
-                       for(Integer stageId : stageIDs.get(instruction)) {
-                               String stageDAG = "";
-                               String stageTimeLine = "";
-                               
-                               if(getStageDAGs(stageId) != null) {
-                                       stageDAG = 
getStageDAGs(stageId).toString();
-                               }
-                               
-                               if(getStageTimeLine(stageId) != null) {
-                                       stageTimeLine = 
getStageTimeLine(stageId).toString();
-                               }
-                               
-                               long stageExecutionTime = 
stageExecutionTimes.get(stageId);
-                               if(!isFirst) {
-                                       retVal.append(",\n");
-                               }
-                               
-                               retVal.append(getInQuotes("" + stageId) + ": 
{");
-                               
-                               // Now add related instructions
-                               HashSet<String> relatedInstructions = 
relatedInstructionsPerStage.get(stageId);
-                               
-                               
retVal.append(getInQuotes("relatedInstructions") + ": [\n");
-                               boolean isFirstRelInst = true;
-                               for(String relInst : relatedInstructions) {
-                                       if(!isFirstRelInst) {
-                                               retVal.append(",\n");
-                                       }
-                                       retVal.append(getInQuotes(relInst));
-                                       isFirstRelInst = false;
-                               }
-                               retVal.append("],\n");
-                               
-                               retVal.append(getInQuotes("DAG") + ":")
-                                         .append(
-                                                       getInQuotes(
-                                                       
getEscapedJSON(stageDAG.replaceAll("toggleDagViz\\(false\\)", 
"toggleDagViz(false, this)")) 
-                                                       ) + ",\n"
-                                                       )
-                                         
.append(getInQuotes("stageExecutionTime") + ":" + stageExecutionTime + ",\n")
-                                         .append(getInQuotes("timeline") + ":")
-                                         .append(
-                                                       getInQuotes(
-                                                               getEscapedJSON(
-                                                               stageTimeLine
-                                                               
.replaceAll("drawTaskAssignmentTimeline\\(", "registerTimelineData(" + stageId 
+ ", ")
-                                                               
.replaceAll("class=\"expand-task-assignment-timeline\"",  
"class=\"expand-task-assignment-timeline\" 
onclick=\"toggleStageTimeline(this)\""))
-                                                               )
-                                                        )
-                                         .append("}");
-                               
-                               isFirst = false;
-                       }
-                       retVal.append("}, ");
-               }
-               
-               
-               retVal.append(getInQuotes("jobs") + ": {");
-               isFirst = true;
-               for(Integer jobId : jobIDs.get(instruction)) {
-                       String jobDAG = "";
-                       
-                       if(getJobDAGs(jobId) != null) {
-                               jobDAG = getJobDAGs(jobId).toString();
-                       }
-                       if(!isFirst) {
-                               retVal.append(",\n");
-                       }
-                       
-                       retVal.append(getInQuotes("" + jobId) + ": {")
-                                       .append(getInQuotes("DAG") + ":" ) 
-                                       .append(getInQuotes(
-                                               
getEscapedJSON(jobDAG.replaceAll("toggleDagViz\\(true\\)", "toggleDagViz(true, 
this)")) 
-                                               ) + "}\n");
-                       
-                       isFirst = false;
-               }
-               retVal.append("}, ");
-               
-               return retVal.toString();
-       }
-
-       
-       String [] dmlLines = null;
-       private String getExpression(Location loc) {
-               try {
-                       if(dmlLines == null) {
-                               dmlLines =  
dmlStrForMonitoring.split("\\r?\\n");
-                       }
-                       if(loc.beginLine == loc.endLine) {
-                               return 
dmlLines[loc.beginLine-1].substring(loc.beginCol-1, loc.endCol);
-                       }
-                       else {
-                               String retVal = 
dmlLines[loc.beginLine-1].substring(loc.beginCol-1);
-                               for(int i = loc.beginLine+1; i < loc.endLine; 
i++) {
-                                       retVal += "<br />" +  dmlLines[i-1];
-                               }
-                               retVal += "<br />" + 
dmlLines[loc.endLine-1].substring(0, loc.endCol);
-                               return retVal;
-                       }
-               }
-               catch(Exception e) {
-                       return null; // "[[" + loc.beginLine + "," + 
loc.endLine + "," + loc.beginCol + "," + loc.endCol + "]]";
-               }
-       }
-       
-       
-       private String getExpressionInJSON(Location loc) {
-               try {
-                       if(dmlLines == null) {
-                               dmlLines =  
dmlStrForMonitoring.split("\\r?\\n");
-                       }
-                       if(loc.beginLine == loc.endLine) {
-                               return 
dmlLines[loc.beginLine-1].substring(loc.beginCol-1, loc.endCol);
-                       }
-                       else {
-                               String retVal = 
dmlLines[loc.beginLine-1].substring(loc.beginCol-1);
-                               for(int i = loc.beginLine+1; i < loc.endLine; 
i++) {
-                                       retVal += "\\n" +  dmlLines[i-1];
-                               }
-                               retVal += "\\n" + 
dmlLines[loc.endLine-1].substring(0, loc.endCol);
-                               return retVal;
-                       }
-               }
-               catch(Exception e) {
-                       return null; // "[[" + loc.beginLine + "," + 
loc.endLine + "," + loc.beginCol + "," + loc.endCol + "]]";
-               }
-       }
-       
-       public Seq<Node> getStageDAGs(int stageIDs) {
-               if(_sparkListener == null || _sparkListener.stageDAGs == null)
-                       return null;
-               else
-                       return _sparkListener.stageDAGs.get(stageIDs);
-       }
-       
-       public Long getStageExecutionTime(int stageID) {
-               if(_sparkListener == null || _sparkListener.stageDAGs == null)
-                       return null;
-               else
-                       return _sparkListener.stageExecutionTime.get(stageID);
-       }
-       
-       public Seq<Node> getJobDAGs(int jobID) {
-               if(_sparkListener == null || _sparkListener.jobDAGs == null)
-                       return null;
-               else
-                       return _sparkListener.jobDAGs.get(jobID);
-       }
-       
-       public Seq<Node> getStageTimeLine(int stageIDs) {
-               if(_sparkListener == null || _sparkListener.stageTimeline == 
null)
-                       return null;
-               else
-                       return _sparkListener.stageTimeline.get(stageIDs);
-       }
-       public void setLineageInfo(Instruction inst, String plan) {
-               lineageInfo.put(getInstructionString(inst), plan);
-       }
-       public void setStageId(Instruction inst, int stageId) {
-               stageIDs.put(getInstructionString(inst), stageId);
-       }
-       public void setJobId(Instruction inst, int jobId) {
-               jobIDs.put(getInstructionString(inst), jobId);
-       }
-       public void setInstructionLocation(Location loc, Instruction inst) {
-               String instStr = getInstructionString(inst);
-               instructions.put(loc, instStr);
-               instructionCreationTime.put(instStr, 
System.currentTimeMillis());
-       }
-       private String getInstructionString(Instruction inst) {
-               String tmp = inst.toString();
-               tmp = tmp.replaceAll(Lop.OPERAND_DELIMITOR, " ");
-               tmp = tmp.replaceAll(Lop.DATATYPE_PREFIX, ".");
-               tmp = tmp.replaceAll(Lop.INSTRUCTION_DELIMITOR, ", ");
-               return tmp;
-       }
-
-       public class MultiMap<K, V extends Comparable<V>> {
-               private SortedMap<K, List<V>> m = new TreeMap<K, List<V>>();
-
-               public MultiMap(){
-               }
-               
-               public void put(K key, V value) {
-                       List<V> list;
-                       if (!m.containsKey(key)) {
-                               list = new ArrayList<V>();
-                               m.put(key, list);
-                       } else {
-                               list = m.get(key);
-                       }
-                       list.add(value);
-                       Collections.sort(list);
-               }
-
-               public Collection<Entry<K, V>> entries() {
-                       // the treemap is sorted and the lists are sorted, so 
can traverse
-                       // to generate a key/value ordered list of all entries.
-                       Collection<Entry<K, V>> allEntries = new 
ArrayList<Entry<K, V>>();
-
-                       for (K key : m.keySet()) {
-                               List<V> list = m.get(key);
-                               for (V value : list) {
-                                       Entry<K, V> listEntry = new 
SimpleEntry<K, V>(key, value);
-                                       allEntries.add(listEntry);
-                               }
-                       }
-
-                       return allEntries;
-               }
-
-               public List<V> get(K key) {
-                       return m.get(key);
-               }
-
-               public Set<K> keySet() {
-                       return m.keySet();
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1b4f1ec4/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
index 8e5e4dc..bbc9fb3 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
@@ -1263,9 +1263,7 @@ public class SparkExecutionContext extends 
ExecutionContext
                
                if( parentLineage == null || parentLineage.getRDD() == null )
                        return;
-               
-               MLContextProxy.addRDDForInstructionForMonitoring(inst, 
parentLineage.getRDD().id());
-               
+       
                JavaPairRDD<?, ?> out = parentLineage.getRDD();
                JavaPairRDD<?, ?> in1 = null; 
                JavaPairRDD<?, ?> in2 = null;
@@ -1344,29 +1342,6 @@ public class SparkExecutionContext extends 
ExecutionContext
                        }
                }
                
-               
-               Object mlContextObj = MLContextProxy.getActiveMLContext();
-               if (mlContextObj != null) {
-                       if (mlContextObj instanceof 
org.apache.sysml.api.MLContext) {
-                               org.apache.sysml.api.MLContext mlCtx = 
(org.apache.sysml.api.MLContext) mlContextObj;
-                               if (mlCtx.getMonitoringUtil() != null) {
-                                       
mlCtx.getMonitoringUtil().setLineageInfo(inst, outDebugString);
-                               } else {
-                                       throw new DMLRuntimeException("The 
method setLineageInfoForExplain should be called only through MLContext");
-                               }
-                       } else if (mlContextObj instanceof 
org.apache.sysml.api.mlcontext.MLContext) {
-                               org.apache.sysml.api.mlcontext.MLContext mlCtx 
= (org.apache.sysml.api.mlcontext.MLContext) mlContextObj;
-                               if (mlCtx.getSparkMonitoringUtil() != null) {
-                                       
mlCtx.getSparkMonitoringUtil().setLineageInfo(inst, outDebugString);
-                               } else {
-                                       throw new DMLRuntimeException("The 
method setLineageInfoForExplain should be called only through MLContext");
-                               }
-                       }
-                       
-               } else {
-                       throw new DMLRuntimeException("The method 
setLineageInfoForExplain should be called only through MLContext");
-               }
-               
        }
        
 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1b4f1ec4/src/main/java/org/apache/sysml/runtime/instructions/Instruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/Instruction.java 
b/src/main/java/org/apache/sysml/runtime/instructions/Instruction.java
index fae12ee..0681f14 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/Instruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/Instruction.java
@@ -22,7 +22,6 @@ package org.apache.sysml.runtime.instructions;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-import org.apache.sysml.api.monitoring.Location;
 import org.apache.sysml.lops.Lop;
 import org.apache.sysml.parser.DataIdentifier;
 import org.apache.sysml.runtime.DMLRuntimeException;
@@ -112,16 +111,6 @@ public abstract class Instruction
                }
        }
        
-       public Location getLocation() {
-               // Rather than exposing 4 different getter methods. Also 
Location doesnot contain any references to Spark libraries
-               if(beginLine == -1 || endLine == -1 || beginCol == -1 || endCol 
== -1) {
-                       return null;
-               }
-               else
-                       return new Location(beginLine, endLine, beginCol, 
endCol);
-       }
-       
-       
        /**
         * Getter for instruction line number
         * @return lineNum Instruction approximate DML script line number

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1b4f1ec4/src/main/java/org/apache/sysml/runtime/instructions/cp/CPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/cp/CPInstruction.java 
b/src/main/java/org/apache/sysml/runtime/instructions/cp/CPInstruction.java
index a63202a..1d192d5 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/cp/CPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/cp/CPInstruction.java
@@ -19,7 +19,6 @@
 
 package org.apache.sysml.runtime.instructions.cp;
 
-import org.apache.sysml.api.MLContextProxy;
 import org.apache.sysml.lops.runtime.RunMRJobs;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
@@ -79,9 +78,6 @@ public abstract class CPInstruction extends Instruction
                        //note: no exchange of updated instruction as labels 
might change in the general case
                        String updInst = RunMRJobs.updateLabels(tmp.toString(), 
ec.getVariables());
                        tmp = 
CPInstructionParser.parseSingleInstruction(updInst);
-                       if(MLContextProxy.isActive()) {
-                               MLContextProxy.setInstructionForMonitoring(tmp);
-                       }
                }
 
                return tmp;

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1b4f1ec4/src/main/java/org/apache/sysml/runtime/instructions/spark/SPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/SPInstruction.java 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/SPInstruction.java
index 53a1c4b..5660cb3 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/SPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/SPInstruction.java
@@ -19,11 +19,9 @@
 
 package org.apache.sysml.runtime.instructions.spark;
 
-import org.apache.sysml.api.MLContextProxy;
 import org.apache.sysml.lops.runtime.RunMRJobs;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
-import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
 import org.apache.sysml.runtime.instructions.Instruction;
 import org.apache.sysml.runtime.instructions.SPInstructionParser;
 import org.apache.sysml.runtime.matrix.operators.Operator;
@@ -91,29 +89,7 @@ public abstract class SPInstruction extends Instruction
                        String updInst = RunMRJobs.updateLabels(tmp.toString(), 
ec.getVariables());
                        tmp = 
SPInstructionParser.parseSingleInstruction(updInst);
                }
-               
-
-               //spark-explain-specific handling of current instructions 
-               //This only relevant for ComputationSPInstruction as in 
postprocess we call setDebugString which is valid only for 
ComputationSPInstruction
-               Object mlCtxObj = MLContextProxy.getActiveMLContext();
-               if (mlCtxObj instanceof org.apache.sysml.api.MLContext) {
-                       org.apache.sysml.api.MLContext mlCtx = 
(org.apache.sysml.api.MLContext) mlCtxObj;
-                       if (tmp instanceof ComputationSPInstruction 
-                               && mlCtx != null && mlCtx.getMonitoringUtil() 
!= null 
-                               && ec instanceof SparkExecutionContext ) {
-                               
mlCtx.getMonitoringUtil().addCurrentInstruction((SPInstruction)tmp);
-                               MLContextProxy.setInstructionForMonitoring(tmp);
-                       }
-               } else if (mlCtxObj instanceof 
org.apache.sysml.api.mlcontext.MLContext) {
-                       org.apache.sysml.api.mlcontext.MLContext mlCtx = 
(org.apache.sysml.api.mlcontext.MLContext) mlCtxObj;
-                       if (tmp instanceof ComputationSPInstruction 
-                               && mlCtx != null && 
mlCtx.getSparkMonitoringUtil() != null 
-                               && ec instanceof SparkExecutionContext ) {
-                               
mlCtx.getSparkMonitoringUtil().addCurrentInstruction((SPInstruction)tmp);
-                               MLContextProxy.setInstructionForMonitoring(tmp);
-                       }
-               }
-               
+                               
                return tmp;
        }
 
@@ -126,28 +102,6 @@ public abstract class SPInstruction extends Instruction
        public void postprocessInstruction(ExecutionContext ec)
                        throws DMLRuntimeException 
        {
-               //spark-explain-specific handling of current instructions
-               Object mlCtxObj = MLContextProxy.getActiveMLContext();
-               if (mlCtxObj instanceof org.apache.sysml.api.MLContext) {
-                       org.apache.sysml.api.MLContext mlCtx = 
(org.apache.sysml.api.MLContext) mlCtxObj;
-                       if (this instanceof ComputationSPInstruction 
-                                       && mlCtx != null && 
mlCtx.getMonitoringUtil() != null
-                                       && ec instanceof SparkExecutionContext 
) {
-                               SparkExecutionContext sec = 
(SparkExecutionContext) ec;
-                               sec.setDebugString(this, 
((ComputationSPInstruction) this).getOutputVariableName());
-                               
mlCtx.getMonitoringUtil().removeCurrentInstruction(this);
-                       }
-               } else if (mlCtxObj instanceof 
org.apache.sysml.api.mlcontext.MLContext) {
-                       org.apache.sysml.api.mlcontext.MLContext mlCtx = 
(org.apache.sysml.api.mlcontext.MLContext) mlCtxObj;
-                       if (this instanceof ComputationSPInstruction 
-                                       && mlCtx != null && 
mlCtx.getSparkMonitoringUtil() != null
-                                       && ec instanceof SparkExecutionContext 
) {
-                               SparkExecutionContext sec = 
(SparkExecutionContext) ec;
-                               sec.setDebugString(this, 
((ComputationSPInstruction) this).getOutputVariableName());
-                               
mlCtx.getSparkMonitoringUtil().removeCurrentInstruction(this);
-                       }
-               }
-               
                //maintain statistics
                Statistics.incrementNoOfExecutedSPInst();
                

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1b4f1ec4/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/SparkListener.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/SparkListener.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/SparkListener.java
deleted file mode 100644
index edb2417..0000000
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/SparkListener.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-package org.apache.sysml.runtime.instructions.spark.functions;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.spark.SparkContext;
-import org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate;
-import org.apache.spark.scheduler.SparkListenerStageCompleted;
-import org.apache.spark.scheduler.SparkListenerStageSubmitted;
-import org.apache.spark.storage.RDDInfo;
-import org.apache.spark.ui.jobs.StagesTab;
-import org.apache.spark.ui.jobs.UIData.TaskUIData;
-import org.apache.spark.ui.scope.RDDOperationGraphListener;
-import org.apache.sysml.api.MLContextProxy;
-import org.apache.sysml.runtime.instructions.spark.SPInstruction;
-
-import scala.Option;
-import scala.collection.Iterator;
-import scala.collection.Seq;
-import scala.xml.Node;
-
-// Instead of extending org.apache.spark.JavaSparkListener
-/**
- * This class is only used by MLContext for now. It is used to provide UI data 
for Python notebook.
- *
- */
-public class SparkListener extends RDDOperationGraphListener {
-       
-       
-       public SparkListener(SparkContext sc) {
-               super(sc.conf());
-               this._sc = sc;
-       }
-       
-       // protected SparkExecutionContext sec = null;
-       protected SparkContext _sc = null;
-       protected Set<SPInstruction> currentInstructions = new 
HashSet<SPInstruction>();
-       private HashMap<Integer, ArrayList<TaskUIData>> stageTaskMapping = new 
HashMap<Integer, ArrayList<TaskUIData>>();  
-       
-       public HashMap<Integer, Seq<Node>> stageDAGs = new HashMap<Integer, 
Seq<Node>>();
-       public HashMap<Integer, Seq<Node>> stageTimeline = new HashMap<Integer, 
Seq<Node>>();
-       public HashMap<Integer, Seq<Node>> jobDAGs = new HashMap<Integer, 
Seq<Node>>();
-       public HashMap<Integer, Long> stageExecutionTime = new HashMap<Integer, 
Long>();
-       public HashMap<Integer, ArrayList<Integer>> stageRDDMapping = new 
HashMap<Integer, ArrayList<Integer>>(); 
-       
-       public void addCurrentInstruction(SPInstruction inst) {
-               synchronized(currentInstructions) {
-                       currentInstructions.add(inst);
-               }
-       }
-       
-       public void removeCurrentInstruction(SPInstruction inst) {
-               synchronized(currentInstructions) {
-                       currentInstructions.remove(inst);
-               }
-       }
-
-       @Override
-       public void onExecutorMetricsUpdate(
-                       SparkListenerExecutorMetricsUpdate 
executorMetricsUpdate) {
-               super.onExecutorMetricsUpdate(executorMetricsUpdate);
-       }
-       
-       
-       @SuppressWarnings("deprecation")
-       @Override
-       public void onJobEnd(org.apache.spark.scheduler.SparkListenerJobEnd 
jobEnd) {
-               super.onJobEnd(jobEnd);
-               int jobID = jobEnd.jobId();
-               Seq<Node> jobNodes = 
org.apache.spark.ui.UIUtils.showDagVizForJob(jobID, 
this.getOperationGraphForJob(jobID));
-               jobDAGs.put(jobID, jobNodes);
-               synchronized(currentInstructions) {
-                       for(SPInstruction inst : currentInstructions) {
-                               Object mlContextObj = 
MLContextProxy.getActiveMLContext();
-                               if (mlContextObj != null) {
-                                       if (mlContextObj instanceof 
org.apache.sysml.api.MLContext) {
-                                               org.apache.sysml.api.MLContext 
mlContext = (org.apache.sysml.api.MLContext) mlContextObj;
-                                               if 
(mlContext.getMonitoringUtil() != null) {
-                                                       
mlContext.getMonitoringUtil().setJobId(inst, jobID);
-                                               }
-                                       } else if (mlContextObj instanceof 
org.apache.sysml.api.mlcontext.MLContext) {
-                                               
org.apache.sysml.api.mlcontext.MLContext mlContext = 
(org.apache.sysml.api.mlcontext.MLContext) mlContextObj;
-                                               if 
(mlContext.getSparkMonitoringUtil() != null) {
-                                                       
mlContext.getSparkMonitoringUtil().setJobId(inst, jobID);
-                                               }
-                                       }
-                               }
-                       }
-               }
-       }
-       
-       @SuppressWarnings("deprecation")
-       @Override
-       public void onStageSubmitted(SparkListenerStageSubmitted 
stageSubmitted) {
-               super.onStageSubmitted(stageSubmitted);
-               // stageSubmitted.stageInfo()
-               
-               Integer stageID = stageSubmitted.stageInfo().stageId();
-               synchronized(currentInstructions) {
-                       stageTaskMapping.put(stageID, new 
ArrayList<TaskUIData>());
-               }
-               
-               Option<org.apache.spark.ui.scope.RDDOperationGraph> rddOpGraph 
= 
Option.apply(org.apache.spark.ui.scope.RDDOperationGraph.makeOperationGraph(stageSubmitted.stageInfo()));
-               
-               Iterator<RDDInfo> iter = 
stageSubmitted.stageInfo().rddInfos().toList().toIterator();
-               ArrayList<Integer> rddIDs = new ArrayList<Integer>();
-               while(iter.hasNext()) {
-                       RDDInfo rddInfo = iter.next();
-                       rddIDs.add(rddInfo.id());
-               }
-               stageRDDMapping.put(stageSubmitted.stageInfo().stageId(), 
rddIDs);
-               
-               
-               Seq<Node> stageDAG = 
org.apache.spark.ui.UIUtils.showDagVizForStage(stageID, rddOpGraph);
-               stageDAGs.put(stageID, stageDAG);
-               
-               // Use org.apache.spark.ui.jobs.StagePage, 
org.apache.spark.ui.jobs.JobPage's makeTimeline method() to print timeline
-//             try {
-                       ArrayList<TaskUIData> taskUIData = 
stageTaskMapping.get(stageID);
-                       Seq<Node> currentStageTimeline = (new 
org.apache.spark.ui.jobs.StagePage(new StagesTab(_sc.ui().get())))
-                                       .makeTimeline(
-                                                       
scala.collection.JavaConversions.asScalaBuffer(taskUIData).toList(), 
-                                       System.currentTimeMillis());
-                       stageTimeline.put(stageID, currentStageTimeline);
-//             }
-//             catch(Exception e) {} // Ignore 
-               // Seq<RDDInfo> rddsInvolved = 
stageSubmitted.stageInfo().rddInfos();
-               
-               synchronized(currentInstructions) {
-                       for(SPInstruction inst : currentInstructions) {
-                               Object mlContextObj = 
MLContextProxy.getActiveMLContext();
-                               if (mlContextObj != null) {
-                                       if (mlContextObj instanceof 
org.apache.sysml.api.MLContext) {
-                                               org.apache.sysml.api.MLContext 
mlContext = (org.apache.sysml.api.MLContext) mlContextObj;
-                                               if 
(mlContext.getMonitoringUtil() != null) {
-                                                       
mlContext.getMonitoringUtil().setStageId(inst, 
stageSubmitted.stageInfo().stageId());
-                                               }
-                                       } else if (mlContextObj instanceof 
org.apache.sysml.api.mlcontext.MLContext) {
-                                               
org.apache.sysml.api.mlcontext.MLContext mlContext = 
(org.apache.sysml.api.mlcontext.MLContext) mlContextObj;
-                                               if 
(mlContext.getSparkMonitoringUtil() != null) {
-                                                       
mlContext.getSparkMonitoringUtil().setStageId(inst, 
stageSubmitted.stageInfo().stageId());
-                                               }
-                                       }
-                               }
-                       }
-               }
-       }
-       
-       @Override
-       public void onTaskEnd(org.apache.spark.scheduler.SparkListenerTaskEnd 
taskEnd) {
-               Integer stageID = taskEnd.stageId();
-               
-               synchronized(currentInstructions) {
-                       if(stageTaskMapping.containsKey(stageID)) {
-                               //Option<String> errorMessage = 
Option.apply(null); // TODO
-                               //TaskUIData taskData = new 
TaskUIData(taskEnd.taskInfo(), Option.apply(taskEnd.taskMetrics()), 
errorMessage);
-                               TaskUIData taskData = new 
TaskUIData(taskEnd.taskInfo(), null); //TODO
-                               stageTaskMapping.get(stageID).add(taskData);
-                       }
-                       else {
-                               // TODO: throw exception
-                       }
-               }
-       };
-       
-       @Override
-       public void onStageCompleted(SparkListenerStageCompleted 
stageCompleted) {
-               super.onStageCompleted(stageCompleted); 
-               try {
-                       long completionTime = 
Long.parseLong(stageCompleted.stageInfo().completionTime().get().toString());
-                       long submissionTime = 
Long.parseLong(stageCompleted.stageInfo().submissionTime().get().toString());
-                       
stageExecutionTime.put(stageCompleted.stageInfo().stageId(), 
completionTime-submissionTime);
-               }
-               catch(Exception e) {}
-       }
-       
-}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1b4f1ec4/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameRowFrameConversionTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameRowFrameConversionTest.java
 
b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameRowFrameConversionTest.java
index a4ee2bb..0e826a3 100644
--- 
a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameRowFrameConversionTest.java
+++ 
b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameRowFrameConversionTest.java
@@ -48,9 +48,9 @@ public class DataFrameRowFrameConversionTest extends 
AutomatedTestBase
        private final static String TEST_NAME = "DataFrameConversion";
        private final static String TEST_CLASS_DIR = TEST_DIR + 
DataFrameRowFrameConversionTest.class.getSimpleName() + "/";
 
-       private final static int  rows1 = 2245;
-       private final static int  cols1 = 745;
-       private final static int  cols2 = 1264;
+       private final static int  rows1 = 1045;
+       private final static int  cols1 = 545;
+       private final static int  cols2 = 864;
        private final static double sparsity1 = 0.9;
        private final static double sparsity2 = 0.1;
        private final static double eps=0.0000000001;
@@ -216,7 +216,9 @@ public class DataFrameRowFrameConversionTest extends 
AutomatedTestBase
                        //setup spark context
                        sec = (SparkExecutionContext) 
ExecutionContextFactory.createContext();          
                        JavaSparkContext sc = sec.getSparkContext();
+                       sc.getConf().set("spark.memory.offHeap.enabled", 
"false");
                        SQLContext sqlctx = new SQLContext(sc);
+                       sqlctx.setConf("spark.sql.codegen.wholeStage", "false");
                        
                        //get binary block input rdd
                        JavaPairRDD<Long,FrameBlock> in = 
SparkExecutionContext.toFrameJavaPairRDD(sc, fbA);

Reply via email to