liuxunorg commented on a change in pull request #107: SUBMARINE-277. Support
Spark Interpreter add sparkSQL interpreter
URL: https://github.com/apache/submarine/pull/107#discussion_r350595993
##########
File path:
submarine-workbench/interpreter/spark-interpreter/src/main/java/org/apache/submarine/interpreter/SparkInterpreter.java
##########
@@ -18,158 +18,72 @@
*/
package org.apache.submarine.interpreter;
-import org.apache.commons.lang.StringUtils;
-import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.spark.SparkContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterGroup;
-import org.apache.zeppelin.interpreter.InterpreterResultMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.util.ArrayList;
-import java.util.List;
import java.util.Properties;
-public class SparkInterpreter extends InterpreterProcess {
+public class SparkInterpreter extends AbstractInterpreter {
private static final Logger LOG =
LoggerFactory.getLogger(SparkInterpreter.class);
- private org.apache.zeppelin.spark.SparkInterpreter zpleSparkInterpreter;
- private InterpreterContext intpContext;
-
- private String extractScalaVersion() throws InterpreterException {
- String scalaVersionString = scala.util.Properties.versionString();
- LOG.info("Using Scala: " + scalaVersionString);
- if (scalaVersionString.contains("version 2.10")) {
- return "2.10";
- } else if (scalaVersionString.contains("version 2.11")) {
- return "2.11";
- } else if (scalaVersionString.contains("version 2.12")) {
- return "2.12";
- } else {
- throw new InterpreterException("Unsupported scala version: " +
scalaVersionString);
- }
- }
-
public SparkInterpreter(Properties properties) {
properties = mergeZeplSparkIntpProp(properties);
- zpleSparkInterpreter = new
org.apache.zeppelin.spark.SparkInterpreter(properties);
- zpleSparkInterpreter.setInterpreterGroup(new InterpreterGroup());
- intpContext = this.getIntpContext();
+ this.zeppelinInterpreter = new
org.apache.zeppelin.spark.SparkInterpreter(properties);
+ this.setInterpreterGroup(new InterpreterGroup());
}
+
public SparkInterpreter() {
this(new Properties());
}
@Override
- public void open() {
+ public boolean test() {
try {
- ClassLoader scalaInterpreterClassLoader = null;
- String submarineHome = System.getenv("SUBMARINE_HOME");
- String interpreterDir = "";
- if (StringUtils.isBlank(submarineHome)) {
- LOG.warn("SUBMARINE_HOME is not set, default interpreter directory is
../ ");
- interpreterDir = "..";
- } else {
- interpreterDir = submarineHome + "/workbench/interpreter";
- }
- String scalaVersion = extractScalaVersion();
- File scalaJarFolder = new File(interpreterDir + "/spark/scala-" +
scalaVersion);
- List<URL> urls = new ArrayList<>();
- for (File file : scalaJarFolder.listFiles()) {
- LOG.info("Add file " + file.getAbsolutePath() + " to classpath of
spark scala interpreter: "
- + scalaJarFolder);
- urls.add(file.toURI().toURL());
- }
- scalaInterpreterClassLoader = new URLClassLoader(urls.toArray(new
URL[0]),
- Thread.currentThread().getContextClassLoader());
- if (scalaInterpreterClassLoader != null) {
-
Thread.currentThread().setContextClassLoader(scalaInterpreterClassLoader);
+ open();
+ String code = "val df = spark.createDataFrame(Seq((1,\"a\"),(2,
null)))\n" +
+ "df.show()";
+ InterpreterResult result = interpret(code);
+ LOG.info("Execution Spark Interpreter, Calculation Spark Code {},
Result = {}",
+ code, result.message().get(0).getData()
+ );
+
+ if (result.code() != InterpreterResult.Code.SUCCESS) {
+ close();
+ return false;
}
- zpleSparkInterpreter.open();
+ boolean success = (result.message().get(0).getData().contains(
+ "+---+----+\n" +
+ "| _1| _2|\n" +
+ "+---+----+\n" +
+ "| 1| a|\n" +
+ "| 2|null|\n" +
+ "+---+----+")
+ );
+ close();
+ return success;
} catch (InterpreterException e) {
- LOG.error(e.getMessage(), e);
- } catch (MalformedURLException e) {
- LOG.error(e.getMessage(), e);
+ e.printStackTrace();
Review comment:
Need change to `LOG.error(e.getMessage(), e);`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]