This is an automated email from the ASF dual-hosted git repository.
liuxun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/submarine.git
The following commit(s) were added to refs/heads/master by this push:
new 9857b70 SUBMARINE-277. Support Spark Interpreter
9857b70 is described below
commit 9857b703e5d89cafb39f1f0ce863578f27703523
Author: Zhonghao Lu <[email protected]>
AuthorDate: Sat Nov 9 11:53:36 2019 +0800
SUBMARINE-277. Support Spark Interpreter
### What is this PR for?
support spark interpreter for submarine
### What type of PR is it?
Feature
### What is the Jira issue?
https://issues.apache.org/jira/browse/SUBMARINE-277
### How should this be tested?
CI : https://travis-ci.org/luzhonghao/hadoop-submarine/builds/608108764
### Questions:
* Does the licenses files need update?No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Zhonghao Lu <[email protected]>
Author: luzhonghao <[email protected]>
Closes #82 from luzhonghao/SUBMARINE-277 and squashes the following commits:
7279ea0 [luzhonghao] fix conflict
ed96c06 [Zhonghao Lu] fix failed to load submarine class
1d12880 [Zhonghao Lu] add README.md for spark interpreter
2b1c7b3 [Zhonghao Lu] call close() in test method for SparkInterpreter
2f7860b [Zhonghao Lu] remove useless test cases
1925958 [luzhonghao] [SUBMARINE-277]. Support Spark Interpreter
f4528cb [Zhonghao Lu] remove redundant spaces
aa50fc6 [Zhonghao Lu] add test code for sparkInterpreter
928c79a [Zhonghao Lu] remove unused dependency
02405e3 [Zhonghao Lu] fix typo
60d5e15 [Zhonghao Lu] remove useless log
2c0350c [Zhonghao Lu] Merge branch 'master' into SUBMARINE-277
5b4d418 [luzhonghao] [SUBMARINE-277]. Support Spark Interpreter
---
.travis.yml | 2 +-
pom.xml | 2 +
submarine-dist/src/assembly/distribution.xml | 14 +
.../submarine/interpreter/InterpreterProcess.java | 16 +
submarine-workbench/interpreter/pom.xml | 1 +
.../interpreter/python-interpreter/pom.xml | 4 +-
.../interpreter/spark-interpreter/README.md | 75 ++++
.../interpreter/spark-interpreter/pom.xml | 265 ++++++++++++++
.../submarine/interpreter/SparkInterpreter.java | 175 ++++++++++
.../interpreter/SparkInterpreterTest.java | 387 +++++++++++++++++++++
10 files changed, 938 insertions(+), 3 deletions(-)
diff --git a/.travis.yml b/.travis.yml
index 134e5ff..313743e 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -42,7 +42,7 @@ env:
# If you need to compile Phadoop-3.1 or Phadoop-3.2, you need to add
`!submarine-server/server-submitter/submitter-yarnservice` in EXCLUDE_SUBMARINE
-
EXCLUDE_SUBMARINE="!submarine-all,!submarine-client,!submarine-commons,!submarine-commons/commons-runtime,!submarine-dist,!submarine-server/server-submitter/submitter-yarn"
-
EXCLUDE_WORKBENCH="!submarine-workbench,!submarine-workbench/workbench-web,!submarine-workbench/workbench-server"
- -
EXCLUDE_INTERPRETER="!submarine-workbench/interpreter,!submarine-workbench/interpreter/interpreter-engine,!submarine-workbench/interpreter/python-interpreter"
+ -
EXCLUDE_INTERPRETER="!submarine-workbench/interpreter,!submarine-workbench/interpreter/interpreter-engine,!submarine-workbench/interpreter/python-interpreter,!submarine-workbench/interpreter/spark-interpreter""
-
EXCLUDE_SUBMODULE_TONY="!submodules/tony,!submodules/tony/tony-mini,!submodules/tony/tony-core,!submodules/tony/tony-proxy,!submodules/tony/tony-portal,!submodules/tony/tony-azkaban,!submodules/tony/tony-cli"
before_install:
diff --git a/pom.xml b/pom.xml
index 02e1e94..66e1dd1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -119,6 +119,8 @@
<zeppelin.version>0.9.0-SNAPSHOT</zeppelin.version>
<jgit.version>5.5.1.201910021850-r</jgit.version>
<atomix.version>3.0.0-rc4</atomix.version>
+ <spark.scala.version>2.11.8</spark.scala.version>
+ <spark.scala.binary.version>2.11</spark.scala.binary.version>
<hive.version>2.1.1</hive.version>
</properties>
diff --git a/submarine-dist/src/assembly/distribution.xml
b/submarine-dist/src/assembly/distribution.xml
index 33e3783..76926f2 100644
--- a/submarine-dist/src/assembly/distribution.xml
+++ b/submarine-dist/src/assembly/distribution.xml
@@ -136,6 +136,20 @@
</includes>
</fileSet>
<fileSet>
+
<directory>../submarine-workbench/interpreter/spark/scala-2.11</directory>
+
<outputDirectory>/workbench/interpreter/spark/scala-2.11</outputDirectory>
+ <includes>
+ <include>spark-scala-2.11-${zeppelin.version}.jar</include>
+ </includes>
+ </fileSet>
+ <fileSet>
+
<directory>../submarine-workbench/interpreter/spark-interpreter/target</directory>
+ <outputDirectory>/workbench/interpreter/spark</outputDirectory>
+ <includes>
+ <include>spark-interpreter-${project.version}-shade.jar</include>
+ </includes>
+ </fileSet>
+ <fileSet>
<directory>../submarine-commons/commons-cluster/target</directory>
<outputDirectory>/lib/commons</outputDirectory>
<includes>
diff --git
a/submarine-workbench/interpreter/interpreter-engine/src/main/java/org/apache/submarine/interpreter/InterpreterProcess.java
b/submarine-workbench/interpreter/interpreter-engine/src/main/java/org/apache/submarine/interpreter/InterpreterProcess.java
index 67969fe..202f6a2 100644
---
a/submarine-workbench/interpreter/interpreter-engine/src/main/java/org/apache/submarine/interpreter/InterpreterProcess.java
+++
b/submarine-workbench/interpreter/interpreter-engine/src/main/java/org/apache/submarine/interpreter/InterpreterProcess.java
@@ -155,6 +155,8 @@ public class InterpreterProcess extends Thread implements
Interpreter {
String superIntpClassName = "";
if (StringUtils.equals(intpName, "python")) {
superIntpClassName =
"org.apache.submarine.interpreter.PythonInterpreter";
+ } else if (StringUtils.equals(intpName, "spark")) {
+ superIntpClassName = "org.apache.submarine.interpreter.SparkInterpreter";
} else {
superIntpClassName =
"org.apache.submarine.interpreter.InterpreterProcess";
}
@@ -246,6 +248,20 @@ public class InterpreterProcess extends Thread implements
Interpreter {
}
}
+ protected Properties mergeZeplSparkIntpProp(Properties newProps) {
+ Properties properties = new Properties();
+
+ properties.setProperty("zeppelin.spark.maxResult", "1000");
+ properties.setProperty("zeppelin.spark.scala.color", "false");
+
+ if (null != newProps) {
+ newProps.putAll(properties);
+ return newProps;
+ } else {
+ return properties;
+ }
+ }
+
protected static InterpreterContext getIntpContext() {
return InterpreterContext.builder()
.setInterpreterOut(new InterpreterOutput(null))
diff --git a/submarine-workbench/interpreter/pom.xml
b/submarine-workbench/interpreter/pom.xml
index 5ca3ed3..f24fec5 100644
--- a/submarine-workbench/interpreter/pom.xml
+++ b/submarine-workbench/interpreter/pom.xml
@@ -39,6 +39,7 @@
<modules>
<module>interpreter-engine</module>
<module>python-interpreter</module>
+ <module>spark-interpreter</module>
</modules>
</project>
diff --git a/submarine-workbench/interpreter/python-interpreter/pom.xml
b/submarine-workbench/interpreter/python-interpreter/pom.xml
index 71b2c59..810087e 100644
--- a/submarine-workbench/interpreter/python-interpreter/pom.xml
+++ b/submarine-workbench/interpreter/python-interpreter/pom.xml
@@ -32,8 +32,8 @@
<artifactId>python-interpreter</artifactId>
<version>0.3.0-SNAPSHOT</version>
- <name>Submarine: Interpreter Pythone</name>
- <description>Submarine Pythone Interpreter</description>
+ <name>Submarine: Interpreter Python</name>
+ <description>Submarine Python Interpreter</description>
<dependencies>
<dependency>
diff --git a/submarine-workbench/interpreter/spark-interpreter/README.md
b/submarine-workbench/interpreter/spark-interpreter/README.md
new file mode 100644
index 0000000..9b210ce
--- /dev/null
+++ b/submarine-workbench/interpreter/spark-interpreter/README.md
@@ -0,0 +1,75 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+# Submarine Spark Interpreter
+
+## Test Submarine Spark Interpreter
+
+### Execute test command
+```
+export SUBMARINE_HOME=/path/to/your/submarine_home
+java -jar spark-interpreter-0.3.0-SNAPSHOT-shade.jar spark
spark-interpreter-id test
+```
+
+### Print test result
+```
+ INFO [2019-11-09 11:12:04,888] ({main} ContextHandler.java[doStart]:781) -
Started
o.s.j.s.ServletContextHandler@58b97c15{/stages/stage/kill,null,AVAILABLE,@Spark}
+ INFO [2019-11-09 11:12:04,889] ({main} Logging.scala[logInfo]:54) - Bound
SparkUI to 0.0.0.0, and started at http://10.0.0.3:4040
+ INFO [2019-11-09 11:12:04,923] ({main} Logging.scala[logInfo]:54) - Starting
executor ID driver on host localhost
+ INFO [2019-11-09 11:12:04,927] ({main} Logging.scala[logInfo]:54) - Using
REPL class URI: spark://10.0.0.3:64837/classes
+ INFO [2019-11-09 11:12:04,937] ({main} Logging.scala[logInfo]:54) -
Successfully started service
'org.apache.spark.network.netty.NettyBlockTransferService' on port 64838.
+ INFO [2019-11-09 11:12:04,937] ({main} Logging.scala[logInfo]:54) - Server
created on 10.0.0.3:64838
+ INFO [2019-11-09 11:12:04,938] ({main} Logging.scala[logInfo]:54) - Using
org.apache.spark.storage.RandomBlockReplicationPolicy for block replication
policy
+ INFO [2019-11-09 11:12:04,950] ({main} Logging.scala[logInfo]:54) -
Registering BlockManager BlockManagerId(driver, 10.0.0.3, 64838, None)
+ INFO [2019-11-09 11:12:04,952] ({dispatcher-event-loop-10}
Logging.scala[logInfo]:54) - Registering block manager 10.0.0.3:64838 with
2004.6 MB RAM, BlockManagerId(driver, 10.0.0.3, 64838, None)
+ INFO [2019-11-09 11:12:04,954] ({main} Logging.scala[logInfo]:54) -
Registered BlockManager BlockManagerId(driver, 10.0.0.3, 64838, None)
+ INFO [2019-11-09 11:12:04,954] ({main} Logging.scala[logInfo]:54) -
Initialized BlockManager: BlockManagerId(driver, 10.0.0.3, 64838, None)
+ INFO [2019-11-09 11:12:07,727] ({main} ContextHandler.java[doStart]:781) -
Started
o.s.j.s.ServletContextHandler@7aac6d13{/SQL/execution/json,null,AVAILABLE,@Spark}
+ INFO [2019-11-09 11:12:07,735] ({main} ContextHandler.java[doStart]:781) -
Started
o.s.j.s.ServletContextHandler@89017e5{/static/sql,null,AVAILABLE,@Spark}
+- LocalRelation <empty>, [_1#0, _2#1]
+ INFO [2019-11-09 11:12:08,499] ({main} Logging.scala[logInfo]:54) - Code
generated in 81.495131 ms
+ INFO [2019-11-09 11:12:08,518] ({main} Logging.scala[logInfo]:54) - Code
generated in 11.738758 ms
+ INFO [2019-11-09 11:12:08,525] ({main} SparkInterpreter.java[test]:159) -
Execution Spark Interpreter, Calculation Spark Code val df =
spark.createDataFrame(Seq((1,"a"),(2, null)))
+
+ df.show(), Result = +---+----+
+ | _1 | _2|
+ +---+----+
+ | 1| a|
+ | 2|null|
+ +---+----+
+
+ df: org.apache.spark.sql.DataFrame = [_1: int, _2: string]
+ INFO [2019-11-09 11:12:08,525] ({main} SparkInterpreter.java[close]:159) -
Close SparkInterpreter
+ INFO [2019-11-09 11:12:08,536] ({main} Logging.scala[logInfo]:54) - Stopped
Spark web UI at http://10.0.0.3:4040
+ INFO [2019-11-09 11:12:08,539] ({dispatcher-event-loop-15}
Logging.scala[logInfo]:54) - MapOutputTrackerMasterEndpoint stopped!
+ INFO [2019-11-09 11:12:08,542] ({main} Logging.scala[logInfo]:54) -
MemoryStore cleared
+ INFO [2019-11-09 11:12:08,542] ({main} Logging.scala[logInfo]:54) -
BlockManager stopped
+ INFO [2019-11-09 11:12:08,544] ({main} Logging.scala[logInfo]:54) -
BlockManagerMaster stopped
+ INFO [2019-11-09 11:12:08,546] ({dispatcher-event-loop-4}
Logging.scala[logInfo]:54) - OutputCommitCoordinator stopped!
+ INFO [2019-11-09 11:12:08,551] ({main} Logging.scala[logInfo]:54) -
Successfully stopped SparkContext
+ INFO [2019-11-09 11:12:08,551] ({main} Logging.scala[logInfo]:54) -
SparkContext already stopped.
+ INFO [2019-11-09 11:12:08,551] ({main} InterpreterProcess.java[<init>]:120) -
Interpreter test result: true
+ INFO [2019-11-09 11:12:08,552] ({shutdown-hook-0} Logging.scala[logInfo]:54)
- Shutdown hook called
+ INFO [2019-11-09 11:12:08,553] ({shutdown-hook-0} Logging.scala[logInfo]:54)
- Deleting directory
/private/var/folders/xl/_xb3fgzj5zd698khfz6z74cc0000gn/T/spark-2f4acad9-a72d-4bca-8d85-3ef310f0b08c
+```
+
+
+## Debug Submarine Spark Interpreter
+
+### Execute debug command
+
+```
+java -jar -agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005
spark-interpreter-0.3.0-SNAPSHOT-shade.jar spark spark-interpreter-id
+```
+
+Connect via remote debugging in IDEA
diff --git a/submarine-workbench/interpreter/spark-interpreter/pom.xml
b/submarine-workbench/interpreter/spark-interpreter/pom.xml
new file mode 100644
index 0000000..0ba95a9
--- /dev/null
+++ b/submarine-workbench/interpreter/spark-interpreter/pom.xml
@@ -0,0 +1,265 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
+ https://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.submarine</groupId>
+ <artifactId>interpreter</artifactId>
+ <version>0.3.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>spark-interpreter</artifactId>
+ <version>0.3.0-SNAPSHOT</version>
+ <name>Submarine: Interpreter Spark</name>
+ <description>Submarine spark Interpreter</description>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.submarine</groupId>
+ <artifactId>interpreter-engine</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.zeppelin</groupId>
+ <artifactId>spark-interpreter</artifactId>
+ <version>${zeppelin.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.zeppelin</groupId>
+ <artifactId>zeppelin-interpreter</artifactId>
+ <version>${zeppelin.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>io.atomix</groupId>
+ <artifactId>atomix</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.atomix</groupId>
+ <artifactId>atomix-raft</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.atomix</groupId>
+ <artifactId>atomix-primary-backup</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-lang</groupId>
+ <artifactId>commons-lang</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpcore</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.google.code.gson</groupId>
+ <artifactId>gson</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-codec</groupId>
+ <artifactId>commons-codec</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-configuration</groupId>
+ <artifactId>commons-configuration</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-math3</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.code.findbugs</groupId>
+ <artifactId>jsr305</artifactId>
+ <version>${jsr305.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>commons-lang</groupId>
+ <artifactId>commons-lang</artifactId>
+ <version>${commons-lang.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpcore</artifactId>
+ <version>${httpcore.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ <version>${httpclient.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>com.google.code.gson</groupId>
+ <artifactId>gson</artifactId>
+ <version>${gson.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>${slf4j.version}</version>
+ </dependency>
+
+ <!-- Test libraries -->
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>${junit.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <version>${mockito.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.zeppelin</groupId>
+ <artifactId>zeppelin-spark-dependencies</artifactId>
+ <version>${zeppelin.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.zeppelin</groupId>
+ <artifactId>spark-scala-2.11</artifactId>
+ <version>0.9.0-SNAPSHOT</version>
+ <scope>provided</scope>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>${plugin.shade.version}</version>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+
<outputFile>target/${project.artifactId}-${project.version}-shade.jar</outputFile>
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ <transformers>
+ <transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+
<mainClass>org.apache.submarine.interpreter.InterpreterProcess</mainClass>
+ </transformer>
+ <transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ <configuration>
+ <skip>false</skip>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <artifactId>maven-enforcer-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>enforce</id>
+ <phase>none</phase>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+
+ <executions>
+ <execution>
+ <id>copy-dependencies-runtime</id>
+ <phase>validate</phase>
+ <goals>
+ <goal>copy-dependencies</goal>
+ </goals>
+ <configuration>
+
<includeArtifactIds>spark-scala-2.11</includeArtifactIds>
+
<includeGroupIds>org.apache.zeppelin</includeGroupIds>
+
<outputDirectory>../spark/scala-2.11</outputDirectory>
+ </configuration>
+ </execution>
+ <execution>
+ <id>copy-dependencies-system</id>
+ <phase>package</phase>
+ <goals>
+ <goal>copy-dependencies</goal>
+ </goals>
+ <configuration>
+ <includeScope>system</includeScope>
+ <excludeTransitive>true</excludeTransitive>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ </plugins>
+ </build>
+</project>
diff --git
a/submarine-workbench/interpreter/spark-interpreter/src/main/java/org/apache/submarine/interpreter/SparkInterpreter.java
b/submarine-workbench/interpreter/spark-interpreter/src/main/java/org/apache/submarine/interpreter/SparkInterpreter.java
new file mode 100644
index 0000000..40ee160
--- /dev/null
+++
b/submarine-workbench/interpreter/spark-interpreter/src/main/java/org/apache/submarine/interpreter/SparkInterpreter.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.submarine.interpreter;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.InterpreterGroup;
+import org.apache.zeppelin.interpreter.InterpreterResultMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+public class SparkInterpreter extends InterpreterProcess {
+ private static final Logger LOG =
LoggerFactory.getLogger(SparkInterpreter.class);
+
+ private org.apache.zeppelin.spark.SparkInterpreter zpleSparkInterpreter;
+ private InterpreterContext intpContext;
+
+ private String extractScalaVersion() throws InterpreterException {
+ String scalaVersionString = scala.util.Properties.versionString();
+ LOG.info("Using Scala: " + scalaVersionString);
+ if (scalaVersionString.contains("version 2.10")) {
+ return "2.10";
+ } else if (scalaVersionString.contains("version 2.11")) {
+ return "2.11";
+ } else if (scalaVersionString.contains("version 2.12")) {
+ return "2.12";
+ } else {
+ throw new InterpreterException("Unsupported scala version: " +
scalaVersionString);
+ }
+ }
+
+ public SparkInterpreter(Properties properties) {
+ properties = mergeZeplSparkIntpProp(properties);
+ zpleSparkInterpreter = new
org.apache.zeppelin.spark.SparkInterpreter(properties);
+ zpleSparkInterpreter.setInterpreterGroup(new InterpreterGroup());
+ intpContext = this.getIntpContext();
+ }
+ public SparkInterpreter() {
+ this(new Properties());
+ }
+
+ @Override
+ public void open() {
+ try {
+ ClassLoader scalaInterpreterClassLoader = null;
+ String submarineHome = System.getenv("SUBMARINE_HOME");
+ String interpreterDir = "";
+ if (StringUtils.isBlank(submarineHome)) {
+ LOG.warn("SUBMARINE_HOME is not set, default interpreter directory is
../ ");
+ interpreterDir = "..";
+ } else {
+ interpreterDir = submarineHome + "/workbench/interpreter";
+ }
+ String scalaVersion = extractScalaVersion();
+ File scalaJarFolder = new File(interpreterDir + "/spark/scala-" +
scalaVersion);
+ List<URL> urls = new ArrayList<>();
+ for (File file : scalaJarFolder.listFiles()) {
+ LOG.info("Add file " + file.getAbsolutePath() + " to classpath of
spark scala interpreter: "
+ + scalaJarFolder);
+ urls.add(file.toURI().toURL());
+ }
+ scalaInterpreterClassLoader = new URLClassLoader(urls.toArray(new
URL[0]),
+ Thread.currentThread().getContextClassLoader());
+ if (scalaInterpreterClassLoader != null) {
+
Thread.currentThread().setContextClassLoader(scalaInterpreterClassLoader);
+ }
+ zpleSparkInterpreter.open();
+ } catch (InterpreterException e) {
+ LOG.error(e.getMessage(), e);
+ } catch (MalformedURLException e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public InterpreterResult interpret(String code) {
+ InterpreterResult interpreterResult = null;
+ try {
+ org.apache.zeppelin.interpreter.InterpreterResult zeplInterpreterResult
+ = zpleSparkInterpreter.interpret(code, intpContext);
+ interpreterResult = new InterpreterResult(zeplInterpreterResult);
+
+ List<InterpreterResultMessage> interpreterResultMessages =
+ intpContext.out.toInterpreterResultMessage();
+
+ for (org.apache.zeppelin.interpreter.InterpreterResultMessage message :
interpreterResultMessages) {
+ interpreterResult.add(message);
+ }
+ } catch (InterpreterException | IOException e) {
+ LOG.error(e.getMessage(), e);
+ }
+
+ return interpreterResult;
+ }
+
+ @Override
+ public void close() {
+ try {
+ zpleSparkInterpreter.close();
+ } catch (InterpreterException e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public void cancel() {
+ try {
+ zpleSparkInterpreter.cancel(intpContext);
+ } catch (InterpreterException e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public int getProgress() {
+ int process = 0;
+ try {
+ process = zpleSparkInterpreter.getProgress(intpContext);
+ } catch (InterpreterException e) {
+ LOG.error(e.getMessage(), e);
+ }
+
+ return process;
+ }
+
+ @Override
+ public boolean test() {
+ open();
+ String code = "val df = spark.createDataFrame(Seq((1,\"a\"),(2, null)))\n"
+
+ "df.show()";
+ InterpreterResult result = interpret(code);
+ LOG.info("Execution Spark Interpreter, Calculation Spark Code {}, Result
= {}",
+ code, result.message().get(0).getData());
+
+ if (result.code() != InterpreterResult.Code.SUCCESS) {
+ close();
+ return false;
+ }
+ boolean success = (result.message().get(0).getData().contains(
+ "+---+----+\n" +
+ "| _1| _2|\n" +
+ "+---+----+\n" +
+ "| 1| a|\n" +
+ "| 2|null|\n" +
+ "+---+----+"));
+ close();
+ return success;
+ }
+}
diff --git
a/submarine-workbench/interpreter/spark-interpreter/src/test/java/org/apache/submarine/interpreter/SparkInterpreterTest.java
b/submarine-workbench/interpreter/spark-interpreter/src/test/java/org/apache/submarine/interpreter/SparkInterpreterTest.java
new file mode 100644
index 0000000..9672e21
--- /dev/null
+++
b/submarine-workbench/interpreter/spark-interpreter/src/test/java/org/apache/submarine/interpreter/SparkInterpreterTest.java
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.submarine.interpreter;
+
+import org.apache.zeppelin.display.AngularObjectRegistry;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+
+import org.apache.zeppelin.interpreter.InterpreterOutput;
+import org.apache.zeppelin.interpreter.InterpreterOutputListener;
+import org.apache.zeppelin.interpreter.InterpreterResultMessageOutput;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+public class SparkInterpreterTest {
+ private SparkInterpreter interpreter;
+
+ // catch the streaming output in onAppend
+ private volatile String output = "";
+ // catch the interpreter output in onUpdate
+ private InterpreterResultMessageOutput messageOutput;
+
+ private RemoteInterpreterEventClient mockRemoteEventClient;
+
+ @Before
+ public void setUp() {
+ mockRemoteEventClient = mock(RemoteInterpreterEventClient.class);
+ }
+
+ @Test
+ public void testSparkInterpreter() throws InterruptedException {
+ Properties properties = new Properties();
+ properties.setProperty("spark.master", "local");
+ properties.setProperty("spark.app.name", "test");
+ properties.setProperty("zeppelin.spark.maxResult", "100");
+ properties.setProperty("zeppelin.spark.test", "true");
+ properties.setProperty("zeppelin.spark.uiWebUrl", "fake_spark_weburl");
+ // disable color output for easy testing
+ properties.setProperty("zeppelin.spark.scala.color", "false");
+ properties.setProperty("zeppelin.spark.deprecatedMsg.show", "false");
+
+ InterpreterContext context = InterpreterContext.builder()
+ .setInterpreterOut(new InterpreterOutput(null))
+ .setIntpEventClient(mockRemoteEventClient)
+ .setAngularObjectRegistry(new AngularObjectRegistry("spark", null))
+ .build();
+ InterpreterContext.set(context);
+
+ interpreter = new SparkInterpreter();
+ try {
+ interpreter.open();
+ } catch (Throwable ex) {
+ ex.printStackTrace();
+ }
+
+
+ InterpreterResult result = interpreter.interpret("val a=\"hello world\"");
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertEquals("a: String = hello world\n",
result.message().get(0).getData());
+
+ result = interpreter.interpret("print(a)");
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertEquals("hello world", result.message().get(0).getData());
+
+ // java stdout
+ result = interpreter.interpret("System.out.print(a)");
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertEquals("hello world", result.message().get(0).getData());
+
+ // incomplete
+ result = interpreter.interpret("println(a");
+ assertEquals(InterpreterResult.Code.INCOMPLETE, result.code());
+
+ // syntax error
+ result = interpreter.interpret("println(b)");
+ assertEquals(InterpreterResult.Code.ERROR, result.code());
+ assertTrue(result.message().get(0).getData().contains("not found: value
b"));
+
+ //multiple line
+ result = interpreter.interpret("\"123\".\ntoInt");
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+ // single line comment
+ result = interpreter.interpret("print(\"hello world\")/*comment here*/");
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertEquals("hello world", result.message().get(0).getData());
+
+ result = interpreter.interpret("/*comment here*/\nprint(\"hello world\")");
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+ // multiple line comment
+ result = interpreter.interpret("/*line 1 \n line 2*/");
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+ // test function
+ result = interpreter.interpret("def add(x:Int, y:Int)\n{ return x+y }");
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+ result = interpreter.interpret("print(add(1,2))");
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+ result = interpreter.interpret("/*line 1 \n line 2*/print(\"hello
world\")");
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+ // Companion object with case class
+ result = interpreter.interpret("import scala.math._\n" +
+ "object Circle {\n" +
+ " private def calculateArea(radius: Double): Double = Pi *
pow(radius, 2.0)\n" +
+ "}\n" +
+ "case class Circle(radius: Double) {\n" +
+ " import Circle._\n" +
+ " def area: Double = calculateArea(radius)\n" +
+ "}\n" +
+ "\n" +
+ "val circle1 = new Circle(5.0)");
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+ // class extend
+ result = interpreter.interpret("import java.util.ArrayList");
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+ context = getInterpreterContext();
+ context.setParagraphId("pid_1");
+ result = interpreter.interpret("sc\n.range(1, 10)\n.sum");
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertTrue(result.message().get(0).getData().contains("45"));
+ result = interpreter.interpret("sc\n.range(1, 10)\n.sum");
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertTrue(result.message().get(0).getData().contains("45"));
+ result = interpreter.interpret("val bankText = sc.textFile(\"bank.csv\")");
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+ result = interpreter.interpret(
+ "case class Bank(age:Integer, job:String, marital : String, edu :
String, balance : Integer)\n" +
+ "val bank = bankText.map(s=>s.split(\";\")).filter(s =>
s(0)!=\"\\\"age\\\"\").map(\n" +
+ " s => Bank(s(0).toInt, \n" +
+ " s(1).replaceAll(\"\\\"\", \"\"),\n" +
+ " s(2).replaceAll(\"\\\"\", \"\"),\n" +
+ " s(3).replaceAll(\"\\\"\", \"\"),\n" +
+ " s(5).replaceAll(\"\\\"\", \"\").toInt\n" +
+ " )\n" +
+ ").toDF()");
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+ // spark version
+ result = interpreter.interpret("sc.version");
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+ // spark sql test
+ String version = result.message().get(0).getData().trim();
+ if (version.contains("String = 1.")) {
+ result = interpreter.interpret("sqlContext");
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+ result = interpreter.interpret(
+ "val df = sqlContext.createDataFrame(Seq((1,\"a\"),(2, null)))\n" +
+ "df.show()");
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertTrue(result.message().get(0).getData().contains(
+ "+---+----+\n" +
+ "| _1| _2|\n" +
+ "+---+----+\n" +
+ "| 1| a|\n" +
+ "| 2|null|\n" +
+ "+---+----+"));
+ } else if (version.contains("String = 2.")) {
+ result = interpreter.interpret("spark");
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+ result = interpreter.interpret(
+ "val df = spark.createDataFrame(Seq((1,\"a\"),(2, null)))\n" +
+ "df.show()");
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertTrue(result.message().get(0).getData().contains(
+ "+---+----+\n" +
+ "| _1| _2|\n" +
+ "+---+----+\n" +
+ "| 1| a|\n" +
+ "| 2|null|\n" +
+ "+---+----+"));
+ }
+
+ // ZeppelinContext
+ result = interpreter.interpret("z.show(df)");
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertEquals(InterpreterResult.Type.TABLE,
result.message().get(0).getType());
+ assertEquals("_1\t_2\n1\ta\n2\tnull\n", result.message().get(0).getData());
+
+ result = interpreter.interpret("z.input(\"name\", \"default_name\")");
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+ // getProgress;
+ Thread interpretThread = new Thread() {
+ @Override
+ public void run() {
+ InterpreterResult result = null;
+ result = interpreter.interpret(
+ "val df = sc.parallelize(1 to 10,
5).foreach(e=>Thread.sleep(1000))");
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ }
+ };
+ interpretThread.start();
+ boolean nonZeroProgress = false;
+ int progress = 0;
+ while (interpretThread.isAlive()) {
+ progress = interpreter.getProgress();
+ assertTrue(progress >= 0);
+ if (progress != 0 && progress != 100) {
+ nonZeroProgress = true;
+ }
+ Thread.sleep(100);
+ }
+ assertTrue(nonZeroProgress);
+
+ interpretThread = new Thread() {
+ @Override
+ public void run() {
+ InterpreterResult result = null;
+ result = interpreter.interpret(
+ "val df = sc.parallelize(1 to 10,
2).foreach(e=>Thread.sleep(1000))");
+ assertEquals(InterpreterResult.Code.ERROR, result.code());
+ assertTrue(result.message().get(0).getData().contains("cancelled"));
+ }
+ };
+
+ interpretThread.start();
+ // sleep 1 second to wait for the spark job start
+ Thread.sleep(1000);
+ interpreter.cancel();
+ interpretThread.join();
+ }
+
+ @Test
+ public void testDisableReplOutput() {
+ Properties properties = new Properties();
+ properties.setProperty("spark.master", "local");
+ properties.setProperty("spark.app.name", "test");
+ properties.setProperty("zeppelin.spark.maxResult", "100");
+ properties.setProperty("zeppelin.spark.test", "true");
+ properties.setProperty("zeppelin.spark.printREPLOutput", "false");
+ // disable color output for easy testing
+ properties.setProperty("zeppelin.spark.scala.color", "false");
+ properties.setProperty("zeppelin.spark.deprecatedMsg.show", "false");
+
+ InterpreterContext.set(getInterpreterContext());
+ interpreter = new SparkInterpreter();
+ interpreter.open();
+
+ InterpreterResult result = interpreter.interpret("val a=\"hello world\"");
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ // no output for define new variable
+ assertEquals("", output);
+
+ result = interpreter.interpret("print(a)");
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ // output from print statement will still be displayed
+ assertEquals("hello world", result.message().get(0).getData());
+ }
+
+ @Test
+ public void testSchedulePool() {
+ Properties properties = new Properties();
+ properties.setProperty("spark.master", "local");
+ properties.setProperty("spark.app.name", "test");
+ properties.setProperty("zeppelin.spark.maxResult", "100");
+ properties.setProperty("zeppelin.spark.test", "true");
+ properties.setProperty("spark.scheduler.mode", "FAIR");
+ // disable color output for easy testing
+ properties.setProperty("zeppelin.spark.scala.color", "false");
+ properties.setProperty("zeppelin.spark.deprecatedMsg.show", "false");
+
+ interpreter = new SparkInterpreter();
+ InterpreterContext.set(getInterpreterContext());
+ interpreter.open();
+
+ InterpreterResult result = interpreter.interpret("sc.range(1, 10).sum");
+ // pool is reset to null if user don't specify it via paragraph properties
+ result = interpreter.interpret("sc.range(1, 10).sum");
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ }
+
+ // spark.ui.enabled: false
+ @Test
+ public void testDisableSparkUI_1() {
+ Properties properties = new Properties();
+ properties.setProperty("spark.master", "local");
+ properties.setProperty("spark.app.name", "test");
+ properties.setProperty("zeppelin.spark.maxResult", "100");
+ properties.setProperty("zeppelin.spark.test", "true");
+ properties.setProperty("spark.ui.enabled", "false");
+ // disable color output for easy testing
+ properties.setProperty("zeppelin.spark.scala.color", "false");
+ properties.setProperty("zeppelin.spark.deprecatedMsg.show", "false");
+
+ interpreter = new SparkInterpreter();
+ InterpreterContext.set(getInterpreterContext());
+ interpreter.open();
+
+ InterpreterResult result = interpreter.interpret("sc.range(1, 10).sum");
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+ }
+
+ // zeppelin.spark.ui.hidden: true
+ @Test
+ public void testDisableSparkUI_2() {
+ Properties properties = new Properties();
+ properties.setProperty("spark.master", "local");
+ properties.setProperty("spark.app.name", "test");
+ properties.setProperty("zeppelin.spark.maxResult", "100");
+ properties.setProperty("zeppelin.spark.test", "true");
+ properties.setProperty("zeppelin.spark.ui.hidden", "true");
+ // disable color output for easy testing
+ properties.setProperty("zeppelin.spark.scala.color", "false");
+ properties.setProperty("zeppelin.spark.deprecatedMsg.show", "false");
+
+ interpreter = new SparkInterpreter();
+ InterpreterContext.set(getInterpreterContext());
+ interpreter.open();
+
+ InterpreterResult result = interpreter.interpret("sc.range(1, 10).sum");
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ }
+
+
+ @After
+ public void tearDown() {
+ if (this.interpreter != null) {
+ this.interpreter.close();
+ }
+ }
+
+ private InterpreterContext getInterpreterContext() {
+ output = "";
+ InterpreterContext context = InterpreterContext.builder()
+ .setInterpreterOut(new InterpreterOutput(null))
+ .setIntpEventClient(mockRemoteEventClient)
+ .setAngularObjectRegistry(new AngularObjectRegistry("spark", null))
+ .build();
+ context.out = new InterpreterOutput(
+ new InterpreterOutputListener() {
+ @Override
+ public void onUpdateAll(InterpreterOutput out) {
+
+ }
+
+ @Override
+ public void onAppend(int index, InterpreterResultMessageOutput out,
byte[] line) {
+ try {
+ output = out.toInterpreterResultMessage().getData();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void onUpdate(int index, InterpreterResultMessageOutput out) {
+ messageOutput = out;
+ }
+ }
+ );
+ return context;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]