Repository: incubator-systemml Updated Branches: refs/heads/master 67f16c46e -> 1b4f1ec4d
[SYSTEMML-1228] Cleanup monitoring util and switch to Spark 2.1.0 Closes #376. Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/1b4f1ec4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/1b4f1ec4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/1b4f1ec4 Branch: refs/heads/master Commit: 1b4f1ec4d983b97afab2f144fded7c73f3923768 Parents: 67f16c4 Author: Niketan Pansare <[email protected]> Authored: Fri Feb 10 16:41:36 2017 -0800 Committer: Niketan Pansare <[email protected]> Committed: Fri Feb 10 16:42:18 2017 -0800 ---------------------------------------------------------------------- pom.xml | 2 +- .../java/org/apache/sysml/api/MLContext.java | 88 +-- .../org/apache/sysml/api/MLContextProxy.java | 38 -- .../apache/sysml/api/mlcontext/MLContext.java | 24 +- .../sysml/api/mlcontext/ScriptExecutor.java | 74 --- .../api/monitoring/InstructionComparator.java | 36 -- .../apache/sysml/api/monitoring/Location.java | 55 -- .../api/monitoring/SparkMonitoringUtil.java | 630 ------------------- .../context/SparkExecutionContext.java | 27 +- .../sysml/runtime/instructions/Instruction.java | 11 - .../runtime/instructions/cp/CPInstruction.java | 4 - .../instructions/spark/SPInstruction.java | 48 +- .../spark/functions/SparkListener.java | 199 ------ .../DataFrameRowFrameConversionTest.java | 8 +- 14 files changed, 10 insertions(+), 1234 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1b4f1ec4/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index f81557e..cd95e37 100644 --- a/pom.xml +++ b/pom.xml @@ -65,7 +65,7 @@ <properties> <hadoop.version>2.4.1</hadoop.version> <antlr.version>4.5.3</antlr.version> - <spark.version>2.0.2</spark.version> + <spark.version>2.1.0</spark.version> <scala.version>2.11.8</scala.version> <scala.binary.version>2.11</scala.binary.version> <scala.test.version>2.2.6</scala.test.version> http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1b4f1ec4/src/main/java/org/apache/sysml/api/MLContext.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/api/MLContext.java b/src/main/java/org/apache/sysml/api/MLContext.java index 42fb018..520e51e 100644 --- a/src/main/java/org/apache/sysml/api/MLContext.java +++ b/src/main/java/org/apache/sysml/api/MLContext.java @@ -41,7 +41,6 @@ import org.apache.spark.sql.SQLContext; import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM; import org.apache.sysml.api.jmlc.JMLCUtils; import org.apache.sysml.api.mlcontext.ScriptType; -import org.apache.sysml.api.monitoring.SparkMonitoringUtil; import org.apache.sysml.conf.CompilerConfig; import org.apache.sysml.conf.CompilerConfig.ConfigType; import org.apache.sysml.conf.ConfigurationManager; @@ -75,7 +74,6 @@ import org.apache.sysml.runtime.instructions.cp.Data; import org.apache.sysml.runtime.instructions.spark.data.RDDObject; import org.apache.sysml.runtime.instructions.spark.functions.ConvertStringToLongTextPair; import org.apache.sysml.runtime.instructions.spark.functions.CopyTextInputFunction; -import org.apache.sysml.runtime.instructions.spark.functions.SparkListener; import org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils; import org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtils; import org.apache.sysml.runtime.instructions.spark.utils.SparkUtils; @@ -207,19 +205,6 @@ public class MLContext { private Map<String, String> _additionalConfigs = new HashMap<String, String>(); - // -------------------------------------------------- - // _monitorUtils is set only when MLContext(sc, true) - private SparkMonitoringUtil _monitorUtils = null; - - /** - * Experimental API. Not supported in Python MLContext API. - * @return SparkMonitoringUtil the Spark monitoring util - */ - public SparkMonitoringUtil getMonitoringUtil() { - return _monitorUtils; - } - // -------------------------------------------------- - /** * Create an associated MLContext for given spark session. * @param sc SparkContext @@ -1219,56 +1204,6 @@ public class MLContext { } // -------------------------------- Utility methods ends ---------------------------------------------------------- - - - // -------------------------------- Experimental API begins ---------------------------------------------------------- - /** - * Experimental api: - * Setting monitorPerformance to true adds additional overhead of storing state. So, use it only if necessary. - * @param sc SparkContext - * @param monitorPerformance if true, monitor performance, otherwise false - * @throws DMLRuntimeException if DMLRuntimeException occurs - */ - public MLContext(SparkContext sc, boolean monitorPerformance) throws DMLRuntimeException { - initializeSpark(sc, monitorPerformance, false); - } - - /** - * Experimental api: - * Setting monitorPerformance to true adds additional overhead of storing state. So, use it only if necessary. - * @param sc JavaSparkContext - * @param monitorPerformance if true, monitor performance, otherwise false - * @throws DMLRuntimeException if DMLRuntimeException occurs - */ - public MLContext(JavaSparkContext sc, boolean monitorPerformance) throws DMLRuntimeException { - initializeSpark(sc.sc(), monitorPerformance, false); - } - - /** - * Experimental api: - * Setting monitorPerformance to true adds additional overhead of storing state. So, use it only if necessary. - * @param sc SparkContext - * @param monitorPerformance if true, monitor performance, otherwise false - * @param setForcedSparkExecType set forced spark exec type - * @throws DMLRuntimeException if DMLRuntimeException occurs - */ - public MLContext(SparkContext sc, boolean monitorPerformance, boolean setForcedSparkExecType) throws DMLRuntimeException { - initializeSpark(sc, monitorPerformance, setForcedSparkExecType); - } - - /** - * Experimental api: - * Setting monitorPerformance to true adds additional overhead of storing state. So, use it only if necessary. - * @param sc JavaSparkContext - * @param monitorPerformance if true, monitor performance, otherwise false - * @param setForcedSparkExecType set forced spark exec type - * @throws DMLRuntimeException if DMLRuntimeException occurs - */ - public MLContext(JavaSparkContext sc, boolean monitorPerformance, boolean setForcedSparkExecType) throws DMLRuntimeException { - initializeSpark(sc.sc(), monitorPerformance, setForcedSparkExecType); - } - - // -------------------------------- Experimental API ends ---------------------------------------------------------- // -------------------------------- Private methods begins ---------------------------------------------------------- private boolean isRegisteredAsInput(String varName) { @@ -1347,20 +1282,8 @@ public class MLContext { DMLScript.rtplatform = RUNTIME_PLATFORM.SPARK; else DMLScript.rtplatform = RUNTIME_PLATFORM.HYBRID_SPARK; - - if(monitorPerformance) { - initializeSparkListener(sc); - } } - private void initializeSparkListener(SparkContext sc) throws DMLRuntimeException { - if(compareVersion(sc.version(), "1.4.0") < 0 ) { - throw new DMLRuntimeException("Expected spark version >= 1.4.0 for monitoring MLContext performance"); - } - SparkListener sparkListener = new SparkListener(sc); - _monitorUtils = new SparkMonitoringUtil(sparkListener); - sc.addSparkListener(sparkListener); - } /** * Execute a script stored in a string. @@ -1502,9 +1425,6 @@ public class MLContext { // Set active MLContext. _activeMLContext = this; - if(_monitorUtils != null) { - _monitorUtils.resetMonitoringData(); - } if( OptimizerUtils.isSparkExecutionMode() ) { // Depending on whether registerInput/registerOutput was called initialize the variables @@ -1589,9 +1509,6 @@ public class MLContext { //read dml script string String dmlScriptStr = DMLScript.readDMLScript( isFile?"-f":"-s", dmlScriptFilePath); - if(_monitorUtils != null) { - _monitorUtils.setDMLString(dmlScriptStr); - } //simplified compilation chain _rtprog = null; @@ -1653,9 +1570,6 @@ public class MLContext { //core execute runtime program _rtprog.execute( ec ); - if(_monitorUtils != null) - _monitorUtils.setExplainOutput(Explain.explain(_rtprog)); - return ec; } @@ -1680,4 +1594,4 @@ public class MLContext { MatrixCharacteristics mcOut = out.getMatrixCharacteristics("output"); return MLMatrix.createMLMatrix(this, sqlContext, blocks, mcOut); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1b4f1ec4/src/main/java/org/apache/sysml/api/MLContextProxy.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/api/MLContextProxy.java b/src/main/java/org/apache/sysml/api/MLContextProxy.java index 88423b8..db87230 100644 --- a/src/main/java/org/apache/sysml/api/MLContextProxy.java +++ b/src/main/java/org/apache/sysml/api/MLContextProxy.java @@ -22,11 +22,9 @@ package org.apache.sysml.api; import java.util.ArrayList; import org.apache.sysml.api.mlcontext.MLContextException; -import org.apache.sysml.api.monitoring.Location; import org.apache.sysml.parser.Expression; import org.apache.sysml.parser.LanguageException; import org.apache.sysml.runtime.instructions.Instruction; -import org.apache.sysml.runtime.instructions.spark.SPInstruction; /** * The purpose of this proxy is to shield systemml internals from direct access to MLContext @@ -87,41 +85,5 @@ public class MLContextProxy throw new MLContextException("No MLContext object is currently active. Have you created one? " + "Hint: in Scala, 'val ml = new MLContext(sc)'", true); } - - @SuppressWarnings("deprecation") - public static void setInstructionForMonitoring(Instruction inst) { - Location loc = inst.getLocation(); - if (loc == null) { - return; - } - - if (org.apache.sysml.api.MLContext.getActiveMLContext() != null) { - org.apache.sysml.api.MLContext mlContext = org.apache.sysml.api.MLContext.getActiveMLContext(); - if(mlContext.getMonitoringUtil() != null) { - mlContext.getMonitoringUtil().setInstructionLocation(loc, inst); - } - } else if (org.apache.sysml.api.mlcontext.MLContext.getActiveMLContext() != null) { - org.apache.sysml.api.mlcontext.MLContext mlContext = org.apache.sysml.api.mlcontext.MLContext.getActiveMLContext(); - if(mlContext.getSparkMonitoringUtil() != null) { - mlContext.getSparkMonitoringUtil().setInstructionLocation(loc, inst); - } - } - } - - @SuppressWarnings("deprecation") - public static void addRDDForInstructionForMonitoring(SPInstruction inst, Integer rddID) { - - if (org.apache.sysml.api.MLContext.getActiveMLContext() != null) { - org.apache.sysml.api.MLContext mlContext = org.apache.sysml.api.MLContext.getActiveMLContext(); - if(mlContext.getMonitoringUtil() != null) { - mlContext.getMonitoringUtil().addRDDForInstruction(inst, rddID); - } - } else if (org.apache.sysml.api.mlcontext.MLContext.getActiveMLContext() != null) { - org.apache.sysml.api.mlcontext.MLContext mlContext = org.apache.sysml.api.mlcontext.MLContext.getActiveMLContext(); - if(mlContext.getSparkMonitoringUtil() != null) { - mlContext.getSparkMonitoringUtil().addRDDForInstruction(inst, rddID); - } - } - } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1b4f1ec4/src/main/java/org/apache/sysml/api/mlcontext/MLContext.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/api/mlcontext/MLContext.java b/src/main/java/org/apache/sysml/api/mlcontext/MLContext.java index 17cb92b..48f013e 100644 --- a/src/main/java/org/apache/sysml/api/mlcontext/MLContext.java +++ b/src/main/java/org/apache/sysml/api/mlcontext/MLContext.java @@ -32,7 +32,6 @@ import org.apache.sysml.api.DMLScript; import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM; import org.apache.sysml.api.MLContextProxy; import org.apache.sysml.api.jmlc.JMLCUtils; -import org.apache.sysml.api.monitoring.SparkMonitoringUtil; import org.apache.sysml.conf.ConfigurationManager; import org.apache.sysml.conf.DMLConfig; import org.apache.sysml.parser.DataExpression; @@ -46,7 +45,6 @@ import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext; import org.apache.sysml.runtime.instructions.Instruction; import org.apache.sysml.runtime.instructions.cp.Data; import org.apache.sysml.runtime.instructions.cp.ScalarObject; -import org.apache.sysml.runtime.instructions.spark.functions.SparkListener; import org.apache.sysml.runtime.matrix.MatrixFormatMetaData; import org.apache.sysml.runtime.matrix.data.OutputInfo; import org.apache.sysml.utils.Explain.ExplainType; @@ -68,11 +66,6 @@ public class MLContext { private SparkContext sc = null; /** - * SparkMonitoringUtil monitors SystemML performance on Spark. - */ - private SparkMonitoringUtil sparkMonitoringUtil = null; - - /** * Reference to the currently executing script. */ private Script executingScript = null; @@ -232,12 +225,6 @@ public class MLContext { MLContextUtil.setDefaultConfig(); MLContextUtil.setCompilerConfig(); - - if (monitorPerformance) { - SparkListener sparkListener = new SparkListener(sc); - sparkMonitoringUtil = new SparkMonitoringUtil(sparkListener); - sc.addSparkListener(sparkListener); - } } /** @@ -273,7 +260,7 @@ public class MLContext { * @return the results as a MLResults object */ public MLResults execute(Script script) { - ScriptExecutor scriptExecutor = new ScriptExecutor(sparkMonitoringUtil); + ScriptExecutor scriptExecutor = new ScriptExecutor(); scriptExecutor.setExplain(explain); scriptExecutor.setExplainLevel(explainLevel); scriptExecutor.setStatistics(statistics); @@ -326,15 +313,6 @@ public class MLContext { } /** - * Obtain the SparkMonitoringUtil if it is available. - * - * @return the SparkMonitoringUtil if it is available. - */ - public SparkMonitoringUtil getSparkMonitoringUtil() { - return sparkMonitoringUtil; - } - - /** * Obtain the SparkContext associated with this MLContext. * * @return the SparkContext associated with this MLContext. http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1b4f1ec4/src/main/java/org/apache/sysml/api/mlcontext/ScriptExecutor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/api/mlcontext/ScriptExecutor.java b/src/main/java/org/apache/sysml/api/mlcontext/ScriptExecutor.java index 734475e..5ee8622 100644 --- a/src/main/java/org/apache/sysml/api/mlcontext/ScriptExecutor.java +++ b/src/main/java/org/apache/sysml/api/mlcontext/ScriptExecutor.java @@ -27,7 +27,6 @@ import org.apache.commons.lang3.StringUtils; import org.apache.sysml.api.DMLScript; import org.apache.sysml.api.jmlc.JMLCUtils; import org.apache.sysml.api.mlcontext.MLContext.ExplainLevel; -import org.apache.sysml.api.monitoring.SparkMonitoringUtil; import org.apache.sysml.conf.ConfigurationManager; import org.apache.sysml.conf.DMLConfig; import org.apache.sysml.hops.HopsException; @@ -108,7 +107,6 @@ import org.apache.sysml.utils.Statistics; public class ScriptExecutor { protected DMLConfig config; - protected SparkMonitoringUtil sparkMonitoringUtil; protected DMLProgram dmlProgram; protected DMLTranslator dmlTranslator; protected Program runtimeProgram; @@ -141,32 +139,6 @@ public class ScriptExecutor { } /** - * ScriptExecutor constructor, where a SparkMonitoringUtil object is passed - * in. - * - * @param sparkMonitoringUtil - * SparkMonitoringUtil object to monitor Spark - */ - public ScriptExecutor(SparkMonitoringUtil sparkMonitoringUtil) { - this(); - this.sparkMonitoringUtil = sparkMonitoringUtil; - } - - /** - * ScriptExecutor constructor, where the configuration properties and a - * SparkMonitoringUtil object are passed in. - * - * @param config - * the configuration properties to use by the ScriptExecutor - * @param sparkMonitoringUtil - * SparkMonitoringUtil object to monitor Spark - */ - public ScriptExecutor(DMLConfig config, SparkMonitoringUtil sparkMonitoringUtil) { - this.config = config; - this.sparkMonitoringUtil = sparkMonitoringUtil; - } - - /** * Construct DAGs of high-level operators (HOPs) for each block of * statements. */ @@ -318,7 +290,6 @@ public class ScriptExecutor { cleanupRuntimeProgram(); createAndInitializeExecutionContext(); executeRuntimeProgram(); - setExplainRuntimeProgramInSparkMonitor(); cleanupAfterExecution(); // add symbol table to MLResults @@ -344,7 +315,6 @@ public class ScriptExecutor { this.script = script; checkScriptHasTypeAndString(); script.setScriptExecutor(this); - setScriptStringInSparkMonitor(); // Set global variable indicating the script type DMLScript.SCRIPT_TYPE = script.getScriptType(); } @@ -403,15 +373,6 @@ public class ScriptExecutor { } /** - * Obtain the SparkMonitoringUtil object. - * - * @return the SparkMonitoringUtil object, if available - */ - public SparkMonitoringUtil getSparkMonitoringUtil() { - return sparkMonitoringUtil; - } - - /** * Check security, create scratch space, cleanup working directories, * initialize caching, and reset statistics. */ @@ -500,41 +461,6 @@ public class ScriptExecutor { } /** - * Set the explanation of the runtime program in the SparkMonitoringUtil if - * it exists. - */ - protected void setExplainRuntimeProgramInSparkMonitor() { - if (sparkMonitoringUtil != null) { - try { - String explainOutput = Explain.explain(runtimeProgram); - sparkMonitoringUtil.setExplainOutput(explainOutput); - } catch (HopsException e) { - throw new MLContextException("Exception occurred while explaining runtime program", e); - } - } - - } - - /** - * Set the script string in the SparkMonitoringUtil if it exists. - */ - protected void setScriptStringInSparkMonitor() { - if (sparkMonitoringUtil != null) { - sparkMonitoringUtil.setDMLString(script.getScriptString()); - } - } - - /** - * Set the SparkMonitoringUtil object. - * - * @param sparkMonitoringUtil - * The SparkMonitoringUtil object - */ - public void setSparkMonitoringUtil(SparkMonitoringUtil sparkMonitoringUtil) { - this.sparkMonitoringUtil = sparkMonitoringUtil; - } - - /** * Liveness analysis is performed on the program, obtaining sets of live-in * and live-out variables by forward and backward passes over the program. */ http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1b4f1ec4/src/main/java/org/apache/sysml/api/monitoring/InstructionComparator.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/api/monitoring/InstructionComparator.java b/src/main/java/org/apache/sysml/api/monitoring/InstructionComparator.java deleted file mode 100644 index 485ada4..0000000 --- a/src/main/java/org/apache/sysml/api/monitoring/InstructionComparator.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.sysml.api.monitoring; - -import java.util.Comparator; -import java.util.HashMap; - -public class InstructionComparator implements Comparator<String>{ - - HashMap<String, Long> instructionCreationTime; - public InstructionComparator(HashMap<String, Long> instructionCreationTime) { - this.instructionCreationTime = instructionCreationTime; - } - - @Override - public int compare(String o1, String o2) { - return instructionCreationTime.get(o1) - .compareTo(instructionCreationTime.get(o2)); - } -} http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1b4f1ec4/src/main/java/org/apache/sysml/api/monitoring/Location.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/api/monitoring/Location.java b/src/main/java/org/apache/sysml/api/monitoring/Location.java deleted file mode 100644 index ddc9749..0000000 --- a/src/main/java/org/apache/sysml/api/monitoring/Location.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.sysml.api.monitoring; - -public class Location implements Comparable<Location> { - public int beginLine; - public int endLine; - public int beginCol; - public int endCol; - public Location(int beginLine, int endLine, int beginCol, int endCol) { - this.beginLine = beginLine; - this.endLine = endLine; - this.beginCol = beginCol; - this.endCol = endCol; - } - - @Override - public boolean equals(Object other) { - if(!( other instanceof Location )) - return false; - Location loc = (Location) other; - return loc.beginLine == beginLine && loc.endLine == endLine - && loc.beginCol == beginCol && loc.endCol == endCol; - } - - public String toString() { - return beginLine + ":" + beginCol + ", " + endLine + ":" + endCol; - } - - @Override - public int compareTo(Location loc) { - int ret1 = Integer.compare(loc.beginLine, beginLine); - int ret2 = Integer.compare(loc.endLine, endLine); - int ret3 = Integer.compare(loc.beginCol, beginCol); - int ret4 = Integer.compare(loc.endCol, endCol); - - return (ret1 != 0) ? ret1 : (ret2 != 0) ? ret2 : (ret3 != 0) ? ret3 : ret4; - } -} http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1b4f1ec4/src/main/java/org/apache/sysml/api/monitoring/SparkMonitoringUtil.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/api/monitoring/SparkMonitoringUtil.java b/src/main/java/org/apache/sysml/api/monitoring/SparkMonitoringUtil.java deleted file mode 100644 index 42a1f32..0000000 --- a/src/main/java/org/apache/sysml/api/monitoring/SparkMonitoringUtil.java +++ /dev/null @@ -1,630 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.sysml.api.monitoring; - -import java.io.BufferedWriter; -import java.io.FileWriter; -import java.io.IOException; -import java.util.AbstractMap.SimpleEntry; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map.Entry; -import java.util.Set; -import java.util.SortedMap; -import java.util.TreeMap; - -import org.apache.sysml.lops.Lop; -import org.apache.sysml.runtime.DMLRuntimeException; -import org.apache.sysml.runtime.instructions.Instruction; -import org.apache.sysml.runtime.instructions.spark.SPInstruction; -import org.apache.sysml.runtime.instructions.spark.functions.SparkListener; - -import scala.collection.Seq; -import scala.xml.Node; - -/** - * Usage guide: - * MLContext mlCtx = new MLContext(sc, true); - * mlCtx.register... - * mlCtx.execute(...) - * mlCtx.getMonitoringUtil().getRuntimeInfoInHTML("runtime.html"); - */ -public class SparkMonitoringUtil { - private HashMap<String, String> lineageInfo = new HashMap<String, String>(); // instruction -> lineageInfo - private HashMap<String, Long> instructionCreationTime = new HashMap<String, Long>(); - private MultiMap<Location, String> instructions = new MultiMap<Location, String>(); - private MultiMap<String, Integer> stageIDs = new MultiMap<String, Integer>(); - private MultiMap<String, Integer> jobIDs = new MultiMap<String, Integer>(); - private MultiMap<Integer, String> rddInstructionMapping = new MultiMap<Integer, String>(); - - private HashSet<String> getRelatedInstructions(int stageID) { - HashSet<String> retVal = new HashSet<String>(); - if(_sparkListener != null) { - ArrayList<Integer> rdds = _sparkListener.stageRDDMapping.get(stageID); - for(Integer rddID : rdds) { - retVal.addAll(rddInstructionMapping.get(rddID)); - } - } - return retVal; - } - - private SparkListener _sparkListener = null; - public SparkListener getSparkListener() { - return _sparkListener; - } - - private String explainOutput = ""; - - public String getExplainOutput() { - return explainOutput; - } - - public void setExplainOutput(String explainOutput) { - this.explainOutput = explainOutput; - } - - public SparkMonitoringUtil(SparkListener sparkListener) { - _sparkListener = sparkListener; - } - - public void addCurrentInstruction(SPInstruction inst) { - if(_sparkListener != null) { - _sparkListener.addCurrentInstruction(inst); - } - } - - public void addRDDForInstruction(SPInstruction inst, Integer rddID) { - this.rddInstructionMapping.put(rddID, getInstructionString(inst)); - } - - public void removeCurrentInstruction(SPInstruction inst) { - if(_sparkListener != null) { - _sparkListener.removeCurrentInstruction(inst); - } - } - - public void setDMLString(String dmlStr) { - this.dmlStrForMonitoring = dmlStr; - } - - public void resetMonitoringData() { - if(_sparkListener != null && _sparkListener.stageDAGs != null) - _sparkListener.stageDAGs.clear(); - if(_sparkListener != null && _sparkListener.stageTimeline != null) - _sparkListener.stageTimeline.clear(); - } - - // public Multimap<Location, String> hops = ArrayListMultimap.create(); TODO: - private String dmlStrForMonitoring = null; - public void getRuntimeInfoInHTML(String htmlFilePath) throws DMLRuntimeException, IOException { - String jsAndCSSFiles = "<script src=\"js/lodash.min.js\"></script>" - + "<script src=\"js/jquery-1.11.1.min.js\"></script>" - + "<script src=\"js/d3.min.js\"></script>" - + "<script src=\"js/bootstrap-tooltip.js\"></script>" - + "<script src=\"js/dagre-d3.min.js\"></script>" - + "<script src=\"js/graphlib-dot.min.js\"></script>" - + "<script src=\"js/spark-dag-viz.js\"></script>" - + "<script src=\"js/timeline-view.js\"></script>" - + "<script src=\"js/vis.min.js\"></script>" - + "<link rel=\"stylesheet\" href=\"css/bootstrap.min.css\">" - + "<link rel=\"stylesheet\" href=\"css/vis.min.css\">" - + "<link rel=\"stylesheet\" href=\"css/spark-dag-viz.css\">" - + "<link rel=\"stylesheet\" href=\"css/timeline-view.css\"> "; - BufferedWriter bw = new BufferedWriter(new FileWriter(htmlFilePath)); - bw.write("<html><head>\n"); - bw.write(jsAndCSSFiles + "\n"); - bw.write("</head><body>\n<table border=1>\n"); - - bw.write("<tr>\n"); - bw.write("<td><b>Position in script</b></td>\n"); - bw.write("<td><b>DML</b></td>\n"); - bw.write("<td><b>Instruction</b></td>\n"); - bw.write("<td><b>StageIDs</b></td>\n"); - bw.write("<td><b>RDD Lineage</b></td>\n"); - bw.write("</tr>\n"); - - for(Location loc : instructions.keySet()) { - String dml = getExpression(loc); - - // Sort the instruction with time - so as to separate recompiled instructions - List<String> listInst = new ArrayList<String>(instructions.get(loc)); - Collections.sort(listInst, new InstructionComparator(instructionCreationTime)); - - if(dml != null && dml.trim().length() > 1) { - bw.write("<tr>\n"); - int rowSpan = listInst.size(); - bw.write("<td rowspan=\"" + rowSpan + "\">" + loc.toString() + "</td>\n"); - bw.write("<td rowspan=\"" + rowSpan + "\">" + dml + "</td>\n"); - boolean firstTime = true; - for(String inst : listInst) { - if(!firstTime) - bw.write("<tr>\n"); - - if(inst.startsWith("SPARK")) - bw.write("<td style=\"color:red\">" + inst + "</td>\n"); - else if(isInterestingCP(inst)) - bw.write("<td style=\"color:blue\">" + inst + "</td>\n"); - else - bw.write("<td>" + inst + "</td>\n"); - - bw.write("<td>" + getStageIDAsString(inst) + "</td>\n"); - if(lineageInfo.containsKey(inst)) - bw.write("<td>" + lineageInfo.get(inst).replaceAll("\n", "<br />") + "</td>\n"); - else - bw.write("<td></td>\n"); - - bw.write("</tr>\n"); - firstTime = false; - } - - } - - } - - bw.write("</table></body>\n</html>"); - bw.close(); - } - - private String getInQuotes(String str) { - return "\"" + str + "\""; - } - private String getEscapedJSON(String json) { - if(json == null) - return ""; - else { - return json - //.replaceAll("\\\\", "\\\\\\") - .replaceAll("\\t", "\\\\t") - .replaceAll("/", "\\\\/") - .replaceAll("\"", "\\\\\"") - .replaceAll("\\r?\\n", "\\\\n"); - } - } - - private long maxExpressionExecutionTime = 0; - HashMap<Integer, Long> stageExecutionTimes = new HashMap<Integer, Long>(); - HashMap<String, Long> expressionExecutionTimes = new HashMap<String, Long>(); - HashMap<String, Long> instructionExecutionTimes = new HashMap<String, Long>(); - HashMap<Integer, HashSet<String>> relatedInstructionsPerStage = new HashMap<Integer, HashSet<String>>(); - private void fillExecutionTimes() { - stageExecutionTimes.clear(); - expressionExecutionTimes.clear(); - for(Location loc : instructions.keySet()) { - List<String> listInst = new ArrayList<String>(instructions.get(loc)); - long expressionExecutionTime = 0; - - for(String inst : listInst) { - long instructionExecutionTime = 0; - for(Integer stageId : stageIDs.get(inst)) { - try { - if(getStageExecutionTime(stageId) != null) { - long stageExecTime = getStageExecutionTime(stageId); - instructionExecutionTime += stageExecTime; - expressionExecutionTime += stageExecTime; - stageExecutionTimes.put(stageId, stageExecTime); - } - } - catch(Exception e) {} - - relatedInstructionsPerStage.put(stageId, getRelatedInstructions(stageId)); - } - instructionExecutionTimes.put(inst, instructionExecutionTime); - } - expressionExecutionTime /= listInst.size(); // average - maxExpressionExecutionTime = Math.max(maxExpressionExecutionTime, expressionExecutionTime); - expressionExecutionTimes.put(loc.toString(), expressionExecutionTime); - } - - // Now fill empty instructions - for(Entry<String, Long> kv : instructionExecutionTimes.entrySet()) { - if(kv.getValue() == 0) { - // Find all stages that contain this as related instruction - long sumExecutionTime = 0; - for(Entry<Integer, HashSet<String>> kv1 : relatedInstructionsPerStage.entrySet()) { - if(kv1.getValue().contains(kv.getKey())) { - sumExecutionTime += stageExecutionTimes.get(kv1.getKey()); - } - } - kv.setValue(sumExecutionTime); - } - } - - for(Location loc : instructions.keySet()) { - if(expressionExecutionTimes.get(loc.toString()) == 0) { - List<String> listInst = new ArrayList<String>(instructions.get(loc)); - long expressionExecutionTime = 0; - for(String inst : listInst) { - expressionExecutionTime += instructionExecutionTimes.get(inst); - } - expressionExecutionTime /= listInst.size(); // average - maxExpressionExecutionTime = Math.max(maxExpressionExecutionTime, expressionExecutionTime); - expressionExecutionTimes.put(loc.toString(), expressionExecutionTime); - } - } - - } - - public String getRuntimeInfoInJSONFormat() throws DMLRuntimeException, IOException { - StringBuilder retVal = new StringBuilder("{\n"); - - retVal.append(getInQuotes("dml") + ":" + getInQuotes(getEscapedJSON(dmlStrForMonitoring)) + ",\n"); - retVal.append(getInQuotes("expressions") + ":" + "[\n"); - - boolean isFirstExpression = true; - fillExecutionTimes(); - - for(Location loc : instructions.keySet()) { - String dml = getEscapedJSON(getExpressionInJSON(loc)); - - if(dml != null) { - // Sort the instruction with time - so as to separate recompiled instructions - List<String> listInst = new ArrayList<String>(instructions.get(loc)); - Collections.sort(listInst, new InstructionComparator(instructionCreationTime)); - - if(!isFirstExpression) { - retVal.append(",\n"); - } - retVal.append("{\n"); - isFirstExpression = false; - - retVal.append(getInQuotes("beginLine") + ":" + loc.beginLine + ",\n"); - retVal.append(getInQuotes("beginCol") + ":" + loc.beginCol + ",\n"); - retVal.append(getInQuotes("endLine") + ":" + loc.endLine + ",\n"); - retVal.append(getInQuotes("endCol") + ":" + loc.endCol + ",\n"); - - long expressionExecutionTime = expressionExecutionTimes.get(loc.toString()); - retVal.append(getInQuotes("expressionExecutionTime") + ":" + expressionExecutionTime + ",\n"); - retVal.append(getInQuotes("expressionHeavyHitterFactor") + ":" + ((double)expressionExecutionTime / (double)maxExpressionExecutionTime) + ",\n"); - - retVal.append(getInQuotes("expression") + ":" + getInQuotes(dml) + ",\n"); - - retVal.append(getInQuotes("instructions") + ":" + "[\n"); - - boolean firstTime = true; - for(String inst : listInst) { - - if(!firstTime) - retVal.append(", {"); - else - retVal.append("{"); - - if(inst.startsWith("SPARK")) { - retVal.append(getInQuotes("isSpark") + ":" + "true,\n"); - } - else if(isInterestingCP(inst)) { - retVal.append(getInQuotes("isInteresting") + ":" + "true,\n"); - } - - retVal.append(getStageIDAsJSONString(inst) + "\n"); - if(lineageInfo.containsKey(inst)) { - retVal.append(getInQuotes("lineageInfo") + ":" + getInQuotes(getEscapedJSON(lineageInfo.get(inst))) + ",\n"); - } - - retVal.append(getInQuotes("instruction") + ":" + getInQuotes(getEscapedJSON(inst))); - retVal.append("}"); - firstTime = false; - } - - retVal.append("]\n"); - retVal.append("}\n"); - } - - } - - return retVal.append("]\n}").toString(); - } - - - private boolean isInterestingCP(String inst) { - if(inst.startsWith("CP rmvar") || inst.startsWith("CP cpvar") || inst.startsWith("CP mvvar")) - return false; - else if(inst.startsWith("CP")) - return true; - else - return false; - } - - private String getStageIDAsString(String instruction) { - String retVal = ""; - for(Integer stageId : stageIDs.get(instruction)) { - String stageDAG = ""; - String stageTimeLine = ""; - - if(getStageDAGs(stageId) != null) { - stageDAG = getStageDAGs(stageId).toString(); - } - - if(getStageTimeLine(stageId) != null) { - stageTimeLine = getStageTimeLine(stageId).toString(); - } - - retVal += "Stage:" + stageId + - " (" - + "<div>" - + stageDAG.replaceAll("toggleDagViz\\(false\\)", "toggleDagViz(false, this)") - + "</div>, " - + "<div id=\"timeline-" + stageId + "\">" - + stageTimeLine - .replaceAll("drawTaskAssignmentTimeline\\(", "registerTimelineData(" + stageId + ", ") - .replaceAll("class=\"expand-task-assignment-timeline\"", "class=\"expand-task-assignment-timeline\" onclick=\"toggleStageTimeline(this)\"") - + "</div>" - + ")"; - } - return retVal; - } - - private String getStageIDAsJSONString(String instruction) { - long instructionExecutionTime = instructionExecutionTimes.get(instruction); - - StringBuilder retVal = new StringBuilder(getInQuotes("instructionExecutionTime") + ":" + instructionExecutionTime + ",\n"); - - boolean isFirst = true; - if(stageIDs.get(instruction).size() == 0) { - // Find back references - HashSet<Integer> relatedStages = new HashSet<Integer>(); - for(Entry<Integer, HashSet<String>> kv : relatedInstructionsPerStage.entrySet()) { - if(kv.getValue().contains(instruction)) { - relatedStages.add(kv.getKey()); - } - } - HashSet<String> relatedInstructions = new HashSet<String>(); - for(Entry<String, Integer> kv : stageIDs.entries()) { - if(relatedStages.contains(kv.getValue())) { - relatedInstructions.add(kv.getKey()); - } - } - - retVal.append(getInQuotes("backReferences") + ": [\n"); - boolean isFirstRelInst = true; - for(String relInst : relatedInstructions) { - if(!isFirstRelInst) { - retVal.append(",\n"); - } - retVal.append(getInQuotes(relInst)); - isFirstRelInst = false; - } - retVal.append("], \n"); - } - else { - retVal.append(getInQuotes("stages") + ": {"); - for(Integer stageId : stageIDs.get(instruction)) { - String stageDAG = ""; - String stageTimeLine = ""; - - if(getStageDAGs(stageId) != null) { - stageDAG = getStageDAGs(stageId).toString(); - } - - if(getStageTimeLine(stageId) != null) { - stageTimeLine = getStageTimeLine(stageId).toString(); - } - - long stageExecutionTime = stageExecutionTimes.get(stageId); - if(!isFirst) { - retVal.append(",\n"); - } - - retVal.append(getInQuotes("" + stageId) + ": {"); - - // Now add related instructions - HashSet<String> relatedInstructions = relatedInstructionsPerStage.get(stageId); - - retVal.append(getInQuotes("relatedInstructions") + ": [\n"); - boolean isFirstRelInst = true; - for(String relInst : relatedInstructions) { - if(!isFirstRelInst) { - retVal.append(",\n"); - } - retVal.append(getInQuotes(relInst)); - isFirstRelInst = false; - } - retVal.append("],\n"); - - retVal.append(getInQuotes("DAG") + ":") - .append( - getInQuotes( - getEscapedJSON(stageDAG.replaceAll("toggleDagViz\\(false\\)", "toggleDagViz(false, this)")) - ) + ",\n" - ) - .append(getInQuotes("stageExecutionTime") + ":" + stageExecutionTime + ",\n") - .append(getInQuotes("timeline") + ":") - .append( - getInQuotes( - getEscapedJSON( - stageTimeLine - .replaceAll("drawTaskAssignmentTimeline\\(", "registerTimelineData(" + stageId + ", ") - .replaceAll("class=\"expand-task-assignment-timeline\"", "class=\"expand-task-assignment-timeline\" onclick=\"toggleStageTimeline(this)\"")) - ) - ) - .append("}"); - - isFirst = false; - } - retVal.append("}, "); - } - - - retVal.append(getInQuotes("jobs") + ": {"); - isFirst = true; - for(Integer jobId : jobIDs.get(instruction)) { - String jobDAG = ""; - - if(getJobDAGs(jobId) != null) { - jobDAG = getJobDAGs(jobId).toString(); - } - if(!isFirst) { - retVal.append(",\n"); - } - - retVal.append(getInQuotes("" + jobId) + ": {") - .append(getInQuotes("DAG") + ":" ) - .append(getInQuotes( - getEscapedJSON(jobDAG.replaceAll("toggleDagViz\\(true\\)", "toggleDagViz(true, this)")) - ) + "}\n"); - - isFirst = false; - } - retVal.append("}, "); - - return retVal.toString(); - } - - - String [] dmlLines = null; - private String getExpression(Location loc) { - try { - if(dmlLines == null) { - dmlLines = dmlStrForMonitoring.split("\\r?\\n"); - } - if(loc.beginLine == loc.endLine) { - return dmlLines[loc.beginLine-1].substring(loc.beginCol-1, loc.endCol); - } - else { - String retVal = dmlLines[loc.beginLine-1].substring(loc.beginCol-1); - for(int i = loc.beginLine+1; i < loc.endLine; i++) { - retVal += "<br />" + dmlLines[i-1]; - } - retVal += "<br />" + dmlLines[loc.endLine-1].substring(0, loc.endCol); - return retVal; - } - } - catch(Exception e) { - return null; // "[[" + loc.beginLine + "," + loc.endLine + "," + loc.beginCol + "," + loc.endCol + "]]"; - } - } - - - private String getExpressionInJSON(Location loc) { - try { - if(dmlLines == null) { - dmlLines = dmlStrForMonitoring.split("\\r?\\n"); - } - if(loc.beginLine == loc.endLine) { - return dmlLines[loc.beginLine-1].substring(loc.beginCol-1, loc.endCol); - } - else { - String retVal = dmlLines[loc.beginLine-1].substring(loc.beginCol-1); - for(int i = loc.beginLine+1; i < loc.endLine; i++) { - retVal += "\\n" + dmlLines[i-1]; - } - retVal += "\\n" + dmlLines[loc.endLine-1].substring(0, loc.endCol); - return retVal; - } - } - catch(Exception e) { - return null; // "[[" + loc.beginLine + "," + loc.endLine + "," + loc.beginCol + "," + loc.endCol + "]]"; - } - } - - public Seq<Node> getStageDAGs(int stageIDs) { - if(_sparkListener == null || _sparkListener.stageDAGs == null) - return null; - else - return _sparkListener.stageDAGs.get(stageIDs); - } - - public Long getStageExecutionTime(int stageID) { - if(_sparkListener == null || _sparkListener.stageDAGs == null) - return null; - else - return _sparkListener.stageExecutionTime.get(stageID); - } - - public Seq<Node> getJobDAGs(int jobID) { - if(_sparkListener == null || _sparkListener.jobDAGs == null) - return null; - else - return _sparkListener.jobDAGs.get(jobID); - } - - public Seq<Node> getStageTimeLine(int stageIDs) { - if(_sparkListener == null || _sparkListener.stageTimeline == null) - return null; - else - return _sparkListener.stageTimeline.get(stageIDs); - } - public void setLineageInfo(Instruction inst, String plan) { - lineageInfo.put(getInstructionString(inst), plan); - } - public void setStageId(Instruction inst, int stageId) { - stageIDs.put(getInstructionString(inst), stageId); - } - public void setJobId(Instruction inst, int jobId) { - jobIDs.put(getInstructionString(inst), jobId); - } - public void setInstructionLocation(Location loc, Instruction inst) { - String instStr = getInstructionString(inst); - instructions.put(loc, instStr); - instructionCreationTime.put(instStr, System.currentTimeMillis()); - } - private String getInstructionString(Instruction inst) { - String tmp = inst.toString(); - tmp = tmp.replaceAll(Lop.OPERAND_DELIMITOR, " "); - tmp = tmp.replaceAll(Lop.DATATYPE_PREFIX, "."); - tmp = tmp.replaceAll(Lop.INSTRUCTION_DELIMITOR, ", "); - return tmp; - } - - public class MultiMap<K, V extends Comparable<V>> { - private SortedMap<K, List<V>> m = new TreeMap<K, List<V>>(); - - public MultiMap(){ - } - - public void put(K key, V value) { - List<V> list; - if (!m.containsKey(key)) { - list = new ArrayList<V>(); - m.put(key, list); - } else { - list = m.get(key); - } - list.add(value); - Collections.sort(list); - } - - public Collection<Entry<K, V>> entries() { - // the treemap is sorted and the lists are sorted, so can traverse - // to generate a key/value ordered list of all entries. - Collection<Entry<K, V>> allEntries = new ArrayList<Entry<K, V>>(); - - for (K key : m.keySet()) { - List<V> list = m.get(key); - for (V value : list) { - Entry<K, V> listEntry = new SimpleEntry<K, V>(key, value); - allEntries.add(listEntry); - } - } - - return allEntries; - } - - public List<V> get(K key) { - return m.get(key); - } - - public Set<K> keySet() { - return m.keySet(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1b4f1ec4/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java index 8e5e4dc..bbc9fb3 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java @@ -1263,9 +1263,7 @@ public class SparkExecutionContext extends ExecutionContext if( parentLineage == null || parentLineage.getRDD() == null ) return; - - MLContextProxy.addRDDForInstructionForMonitoring(inst, parentLineage.getRDD().id()); - + JavaPairRDD<?, ?> out = parentLineage.getRDD(); JavaPairRDD<?, ?> in1 = null; JavaPairRDD<?, ?> in2 = null; @@ -1344,29 +1342,6 @@ public class SparkExecutionContext extends ExecutionContext } } - - Object mlContextObj = MLContextProxy.getActiveMLContext(); - if (mlContextObj != null) { - if (mlContextObj instanceof org.apache.sysml.api.MLContext) { - org.apache.sysml.api.MLContext mlCtx = (org.apache.sysml.api.MLContext) mlContextObj; - if (mlCtx.getMonitoringUtil() != null) { - mlCtx.getMonitoringUtil().setLineageInfo(inst, outDebugString); - } else { - throw new DMLRuntimeException("The method setLineageInfoForExplain should be called only through MLContext"); - } - } else if (mlContextObj instanceof org.apache.sysml.api.mlcontext.MLContext) { - org.apache.sysml.api.mlcontext.MLContext mlCtx = (org.apache.sysml.api.mlcontext.MLContext) mlContextObj; - if (mlCtx.getSparkMonitoringUtil() != null) { - mlCtx.getSparkMonitoringUtil().setLineageInfo(inst, outDebugString); - } else { - throw new DMLRuntimeException("The method setLineageInfoForExplain should be called only through MLContext"); - } - } - - } else { - throw new DMLRuntimeException("The method setLineageInfoForExplain should be called only through MLContext"); - } - } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1b4f1ec4/src/main/java/org/apache/sysml/runtime/instructions/Instruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/Instruction.java b/src/main/java/org/apache/sysml/runtime/instructions/Instruction.java index fae12ee..0681f14 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/Instruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/Instruction.java @@ -22,7 +22,6 @@ package org.apache.sysml.runtime.instructions; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.sysml.api.monitoring.Location; import org.apache.sysml.lops.Lop; import org.apache.sysml.parser.DataIdentifier; import org.apache.sysml.runtime.DMLRuntimeException; @@ -112,16 +111,6 @@ public abstract class Instruction } } - public Location getLocation() { - // Rather than exposing 4 different getter methods. Also Location doesnot contain any references to Spark libraries - if(beginLine == -1 || endLine == -1 || beginCol == -1 || endCol == -1) { - return null; - } - else - return new Location(beginLine, endLine, beginCol, endCol); - } - - /** * Getter for instruction line number * @return lineNum Instruction approximate DML script line number http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1b4f1ec4/src/main/java/org/apache/sysml/runtime/instructions/cp/CPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/cp/CPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/cp/CPInstruction.java index a63202a..1d192d5 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/cp/CPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/cp/CPInstruction.java @@ -19,7 +19,6 @@ package org.apache.sysml.runtime.instructions.cp; -import org.apache.sysml.api.MLContextProxy; import org.apache.sysml.lops.runtime.RunMRJobs; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.context.ExecutionContext; @@ -79,9 +78,6 @@ public abstract class CPInstruction extends Instruction //note: no exchange of updated instruction as labels might change in the general case String updInst = RunMRJobs.updateLabels(tmp.toString(), ec.getVariables()); tmp = CPInstructionParser.parseSingleInstruction(updInst); - if(MLContextProxy.isActive()) { - MLContextProxy.setInstructionForMonitoring(tmp); - } } return tmp; http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1b4f1ec4/src/main/java/org/apache/sysml/runtime/instructions/spark/SPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/SPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/SPInstruction.java index 53a1c4b..5660cb3 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/SPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/SPInstruction.java @@ -19,11 +19,9 @@ package org.apache.sysml.runtime.instructions.spark; -import org.apache.sysml.api.MLContextProxy; import org.apache.sysml.lops.runtime.RunMRJobs; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.context.ExecutionContext; -import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext; import org.apache.sysml.runtime.instructions.Instruction; import org.apache.sysml.runtime.instructions.SPInstructionParser; import org.apache.sysml.runtime.matrix.operators.Operator; @@ -91,29 +89,7 @@ public abstract class SPInstruction extends Instruction String updInst = RunMRJobs.updateLabels(tmp.toString(), ec.getVariables()); tmp = SPInstructionParser.parseSingleInstruction(updInst); } - - - //spark-explain-specific handling of current instructions - //This only relevant for ComputationSPInstruction as in postprocess we call setDebugString which is valid only for ComputationSPInstruction - Object mlCtxObj = MLContextProxy.getActiveMLContext(); - if (mlCtxObj instanceof org.apache.sysml.api.MLContext) { - org.apache.sysml.api.MLContext mlCtx = (org.apache.sysml.api.MLContext) mlCtxObj; - if (tmp instanceof ComputationSPInstruction - && mlCtx != null && mlCtx.getMonitoringUtil() != null - && ec instanceof SparkExecutionContext ) { - mlCtx.getMonitoringUtil().addCurrentInstruction((SPInstruction)tmp); - MLContextProxy.setInstructionForMonitoring(tmp); - } - } else if (mlCtxObj instanceof org.apache.sysml.api.mlcontext.MLContext) { - org.apache.sysml.api.mlcontext.MLContext mlCtx = (org.apache.sysml.api.mlcontext.MLContext) mlCtxObj; - if (tmp instanceof ComputationSPInstruction - && mlCtx != null && mlCtx.getSparkMonitoringUtil() != null - && ec instanceof SparkExecutionContext ) { - mlCtx.getSparkMonitoringUtil().addCurrentInstruction((SPInstruction)tmp); - MLContextProxy.setInstructionForMonitoring(tmp); - } - } - + return tmp; } @@ -126,28 +102,6 @@ public abstract class SPInstruction extends Instruction public void postprocessInstruction(ExecutionContext ec) throws DMLRuntimeException { - //spark-explain-specific handling of current instructions - Object mlCtxObj = MLContextProxy.getActiveMLContext(); - if (mlCtxObj instanceof org.apache.sysml.api.MLContext) { - org.apache.sysml.api.MLContext mlCtx = (org.apache.sysml.api.MLContext) mlCtxObj; - if (this instanceof ComputationSPInstruction - && mlCtx != null && mlCtx.getMonitoringUtil() != null - && ec instanceof SparkExecutionContext ) { - SparkExecutionContext sec = (SparkExecutionContext) ec; - sec.setDebugString(this, ((ComputationSPInstruction) this).getOutputVariableName()); - mlCtx.getMonitoringUtil().removeCurrentInstruction(this); - } - } else if (mlCtxObj instanceof org.apache.sysml.api.mlcontext.MLContext) { - org.apache.sysml.api.mlcontext.MLContext mlCtx = (org.apache.sysml.api.mlcontext.MLContext) mlCtxObj; - if (this instanceof ComputationSPInstruction - && mlCtx != null && mlCtx.getSparkMonitoringUtil() != null - && ec instanceof SparkExecutionContext ) { - SparkExecutionContext sec = (SparkExecutionContext) ec; - sec.setDebugString(this, ((ComputationSPInstruction) this).getOutputVariableName()); - mlCtx.getSparkMonitoringUtil().removeCurrentInstruction(this); - } - } - //maintain statistics Statistics.incrementNoOfExecutedSPInst(); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1b4f1ec4/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/SparkListener.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/SparkListener.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/SparkListener.java deleted file mode 100644 index edb2417..0000000 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/SparkListener.java +++ /dev/null @@ -1,199 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - -package org.apache.sysml.runtime.instructions.spark.functions; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Set; - -import org.apache.spark.SparkContext; -import org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate; -import org.apache.spark.scheduler.SparkListenerStageCompleted; -import org.apache.spark.scheduler.SparkListenerStageSubmitted; -import org.apache.spark.storage.RDDInfo; -import org.apache.spark.ui.jobs.StagesTab; -import org.apache.spark.ui.jobs.UIData.TaskUIData; -import org.apache.spark.ui.scope.RDDOperationGraphListener; -import org.apache.sysml.api.MLContextProxy; -import org.apache.sysml.runtime.instructions.spark.SPInstruction; - -import scala.Option; -import scala.collection.Iterator; -import scala.collection.Seq; -import scala.xml.Node; - -// Instead of extending org.apache.spark.JavaSparkListener -/** - * This class is only used by MLContext for now. It is used to provide UI data for Python notebook. - * - */ -public class SparkListener extends RDDOperationGraphListener { - - - public SparkListener(SparkContext sc) { - super(sc.conf()); - this._sc = sc; - } - - // protected SparkExecutionContext sec = null; - protected SparkContext _sc = null; - protected Set<SPInstruction> currentInstructions = new HashSet<SPInstruction>(); - private HashMap<Integer, ArrayList<TaskUIData>> stageTaskMapping = new HashMap<Integer, ArrayList<TaskUIData>>(); - - public HashMap<Integer, Seq<Node>> stageDAGs = new HashMap<Integer, Seq<Node>>(); - public HashMap<Integer, Seq<Node>> stageTimeline = new HashMap<Integer, Seq<Node>>(); - public HashMap<Integer, Seq<Node>> jobDAGs = new HashMap<Integer, Seq<Node>>(); - public HashMap<Integer, Long> stageExecutionTime = new HashMap<Integer, Long>(); - public HashMap<Integer, ArrayList<Integer>> stageRDDMapping = new HashMap<Integer, ArrayList<Integer>>(); - - public void addCurrentInstruction(SPInstruction inst) { - synchronized(currentInstructions) { - currentInstructions.add(inst); - } - } - - public void removeCurrentInstruction(SPInstruction inst) { - synchronized(currentInstructions) { - currentInstructions.remove(inst); - } - } - - @Override - public void onExecutorMetricsUpdate( - SparkListenerExecutorMetricsUpdate executorMetricsUpdate) { - super.onExecutorMetricsUpdate(executorMetricsUpdate); - } - - - @SuppressWarnings("deprecation") - @Override - public void onJobEnd(org.apache.spark.scheduler.SparkListenerJobEnd jobEnd) { - super.onJobEnd(jobEnd); - int jobID = jobEnd.jobId(); - Seq<Node> jobNodes = org.apache.spark.ui.UIUtils.showDagVizForJob(jobID, this.getOperationGraphForJob(jobID)); - jobDAGs.put(jobID, jobNodes); - synchronized(currentInstructions) { - for(SPInstruction inst : currentInstructions) { - Object mlContextObj = MLContextProxy.getActiveMLContext(); - if (mlContextObj != null) { - if (mlContextObj instanceof org.apache.sysml.api.MLContext) { - org.apache.sysml.api.MLContext mlContext = (org.apache.sysml.api.MLContext) mlContextObj; - if (mlContext.getMonitoringUtil() != null) { - mlContext.getMonitoringUtil().setJobId(inst, jobID); - } - } else if (mlContextObj instanceof org.apache.sysml.api.mlcontext.MLContext) { - org.apache.sysml.api.mlcontext.MLContext mlContext = (org.apache.sysml.api.mlcontext.MLContext) mlContextObj; - if (mlContext.getSparkMonitoringUtil() != null) { - mlContext.getSparkMonitoringUtil().setJobId(inst, jobID); - } - } - } - } - } - } - - @SuppressWarnings("deprecation") - @Override - public void onStageSubmitted(SparkListenerStageSubmitted stageSubmitted) { - super.onStageSubmitted(stageSubmitted); - // stageSubmitted.stageInfo() - - Integer stageID = stageSubmitted.stageInfo().stageId(); - synchronized(currentInstructions) { - stageTaskMapping.put(stageID, new ArrayList<TaskUIData>()); - } - - Option<org.apache.spark.ui.scope.RDDOperationGraph> rddOpGraph = Option.apply(org.apache.spark.ui.scope.RDDOperationGraph.makeOperationGraph(stageSubmitted.stageInfo())); - - Iterator<RDDInfo> iter = stageSubmitted.stageInfo().rddInfos().toList().toIterator(); - ArrayList<Integer> rddIDs = new ArrayList<Integer>(); - while(iter.hasNext()) { - RDDInfo rddInfo = iter.next(); - rddIDs.add(rddInfo.id()); - } - stageRDDMapping.put(stageSubmitted.stageInfo().stageId(), rddIDs); - - - Seq<Node> stageDAG = org.apache.spark.ui.UIUtils.showDagVizForStage(stageID, rddOpGraph); - stageDAGs.put(stageID, stageDAG); - - // Use org.apache.spark.ui.jobs.StagePage, org.apache.spark.ui.jobs.JobPage's makeTimeline method() to print timeline -// try { - ArrayList<TaskUIData> taskUIData = stageTaskMapping.get(stageID); - Seq<Node> currentStageTimeline = (new org.apache.spark.ui.jobs.StagePage(new StagesTab(_sc.ui().get()))) - .makeTimeline( - scala.collection.JavaConversions.asScalaBuffer(taskUIData).toList(), - System.currentTimeMillis()); - stageTimeline.put(stageID, currentStageTimeline); -// } -// catch(Exception e) {} // Ignore - // Seq<RDDInfo> rddsInvolved = stageSubmitted.stageInfo().rddInfos(); - - synchronized(currentInstructions) { - for(SPInstruction inst : currentInstructions) { - Object mlContextObj = MLContextProxy.getActiveMLContext(); - if (mlContextObj != null) { - if (mlContextObj instanceof org.apache.sysml.api.MLContext) { - org.apache.sysml.api.MLContext mlContext = (org.apache.sysml.api.MLContext) mlContextObj; - if (mlContext.getMonitoringUtil() != null) { - mlContext.getMonitoringUtil().setStageId(inst, stageSubmitted.stageInfo().stageId()); - } - } else if (mlContextObj instanceof org.apache.sysml.api.mlcontext.MLContext) { - org.apache.sysml.api.mlcontext.MLContext mlContext = (org.apache.sysml.api.mlcontext.MLContext) mlContextObj; - if (mlContext.getSparkMonitoringUtil() != null) { - mlContext.getSparkMonitoringUtil().setStageId(inst, stageSubmitted.stageInfo().stageId()); - } - } - } - } - } - } - - @Override - public void onTaskEnd(org.apache.spark.scheduler.SparkListenerTaskEnd taskEnd) { - Integer stageID = taskEnd.stageId(); - - synchronized(currentInstructions) { - if(stageTaskMapping.containsKey(stageID)) { - //Option<String> errorMessage = Option.apply(null); // TODO - //TaskUIData taskData = new TaskUIData(taskEnd.taskInfo(), Option.apply(taskEnd.taskMetrics()), errorMessage); - TaskUIData taskData = new TaskUIData(taskEnd.taskInfo(), null); //TODO - stageTaskMapping.get(stageID).add(taskData); - } - else { - // TODO: throw exception - } - } - }; - - @Override - public void onStageCompleted(SparkListenerStageCompleted stageCompleted) { - super.onStageCompleted(stageCompleted); - try { - long completionTime = Long.parseLong(stageCompleted.stageInfo().completionTime().get().toString()); - long submissionTime = Long.parseLong(stageCompleted.stageInfo().submissionTime().get().toString()); - stageExecutionTime.put(stageCompleted.stageInfo().stageId(), completionTime-submissionTime); - } - catch(Exception e) {} - } - -} http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1b4f1ec4/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameRowFrameConversionTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameRowFrameConversionTest.java b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameRowFrameConversionTest.java index a4ee2bb..0e826a3 100644 --- a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameRowFrameConversionTest.java +++ b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameRowFrameConversionTest.java @@ -48,9 +48,9 @@ public class DataFrameRowFrameConversionTest extends AutomatedTestBase private final static String TEST_NAME = "DataFrameConversion"; private final static String TEST_CLASS_DIR = TEST_DIR + DataFrameRowFrameConversionTest.class.getSimpleName() + "/"; - private final static int rows1 = 2245; - private final static int cols1 = 745; - private final static int cols2 = 1264; + private final static int rows1 = 1045; + private final static int cols1 = 545; + private final static int cols2 = 864; private final static double sparsity1 = 0.9; private final static double sparsity2 = 0.1; private final static double eps=0.0000000001; @@ -216,7 +216,9 @@ public class DataFrameRowFrameConversionTest extends AutomatedTestBase //setup spark context sec = (SparkExecutionContext) ExecutionContextFactory.createContext(); JavaSparkContext sc = sec.getSparkContext(); + sc.getConf().set("spark.memory.offHeap.enabled", "false"); SQLContext sqlctx = new SQLContext(sc); + sqlctx.setConf("spark.sql.codegen.wholeStage", "false"); //get binary block input rdd JavaPairRDD<Long,FrameBlock> in = SparkExecutionContext.toFrameJavaPairRDD(sc, fbA);
