This is an automated email from the ASF dual-hosted git repository.

liuxun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/submarine.git


The following commit(s) were added to refs/heads/master by this push:
     new 9857b70  SUBMARINE-277. Support Spark Interpreter
9857b70 is described below

commit 9857b703e5d89cafb39f1f0ce863578f27703523
Author: Zhonghao Lu <luzhong...@bupt.edu.cn>
AuthorDate: Sat Nov 9 11:53:36 2019 +0800

    SUBMARINE-277. Support Spark Interpreter
    
    ### What is this PR for?
    support spark interpreter for submarine
    
    ### What type of PR is it?
    Feature
    
    ### What is the Jira issue?
    https://issues.apache.org/jira/browse/SUBMARINE-277
    
    ### How should this be tested?
    CI : https://travis-ci.org/luzhonghao/hadoop-submarine/builds/608108764
    
    ### Questions:
    * Does the licenses files need update?No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Zhonghao Lu <luzhong...@bupt.edu.cn>
    Author: luzhonghao <luzhonghao12...@163.com>
    
    Closes #82 from luzhonghao/SUBMARINE-277 and squashes the following commits:
    
    7279ea0 [luzhonghao] fix conflict
    ed96c06 [Zhonghao Lu] fix failed to load submarine class
    1d12880 [Zhonghao Lu] add README.md for spark interpreter
    2b1c7b3 [Zhonghao Lu] call close() in test method for SparkInterpreter
    2f7860b [Zhonghao Lu] remove useless test cases
    1925958 [luzhonghao] [SUBMARINE-277]. Support Spark Interpreter
    f4528cb [Zhonghao Lu] remove redundant spaces
    aa50fc6 [Zhonghao Lu] add test code for sparkInterpreter
    928c79a [Zhonghao Lu] remove unused dependency
    02405e3 [Zhonghao Lu] fix typo
    60d5e15 [Zhonghao Lu] remove useless log
    2c0350c [Zhonghao Lu] Merge branch 'master' into SUBMARINE-277
    5b4d418 [luzhonghao] [SUBMARINE-277]. Support Spark Interpreter
---
 .travis.yml                                        |   2 +-
 pom.xml                                            |   2 +
 submarine-dist/src/assembly/distribution.xml       |  14 +
 .../submarine/interpreter/InterpreterProcess.java  |  16 +
 submarine-workbench/interpreter/pom.xml            |   1 +
 .../interpreter/python-interpreter/pom.xml         |   4 +-
 .../interpreter/spark-interpreter/README.md        |  75 ++++
 .../interpreter/spark-interpreter/pom.xml          | 265 ++++++++++++++
 .../submarine/interpreter/SparkInterpreter.java    | 175 ++++++++++
 .../interpreter/SparkInterpreterTest.java          | 387 +++++++++++++++++++++
 10 files changed, 938 insertions(+), 3 deletions(-)

diff --git a/.travis.yml b/.travis.yml
index 134e5ff..313743e 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -42,7 +42,7 @@ env:
     # If you need to compile Phadoop-3.1 or Phadoop-3.2, you need to add 
`!submarine-server/server-submitter/submitter-yarnservice` in EXCLUDE_SUBMARINE
     - 
EXCLUDE_SUBMARINE="!submarine-all,!submarine-client,!submarine-commons,!submarine-commons/commons-runtime,!submarine-dist,!submarine-server/server-submitter/submitter-yarn"
     - 
EXCLUDE_WORKBENCH="!submarine-workbench,!submarine-workbench/workbench-web,!submarine-workbench/workbench-server"
-    - 
EXCLUDE_INTERPRETER="!submarine-workbench/interpreter,!submarine-workbench/interpreter/interpreter-engine,!submarine-workbench/interpreter/python-interpreter"
+    - 
EXCLUDE_INTERPRETER="!submarine-workbench/interpreter,!submarine-workbench/interpreter/interpreter-engine,!submarine-workbench/interpreter/python-interpreter,!submarine-workbench/interpreter/spark-interpreter""
     - 
EXCLUDE_SUBMODULE_TONY="!submodules/tony,!submodules/tony/tony-mini,!submodules/tony/tony-core,!submodules/tony/tony-proxy,!submodules/tony/tony-portal,!submodules/tony/tony-azkaban,!submodules/tony/tony-cli"
 
 before_install:
diff --git a/pom.xml b/pom.xml
index 02e1e94..66e1dd1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -119,6 +119,8 @@
     <zeppelin.version>0.9.0-SNAPSHOT</zeppelin.version>
     <jgit.version>5.5.1.201910021850-r</jgit.version>
     <atomix.version>3.0.0-rc4</atomix.version>
+    <spark.scala.version>2.11.8</spark.scala.version>
+    <spark.scala.binary.version>2.11</spark.scala.binary.version>
     <hive.version>2.1.1</hive.version>
   </properties>
 
diff --git a/submarine-dist/src/assembly/distribution.xml 
b/submarine-dist/src/assembly/distribution.xml
index 33e3783..76926f2 100644
--- a/submarine-dist/src/assembly/distribution.xml
+++ b/submarine-dist/src/assembly/distribution.xml
@@ -136,6 +136,20 @@
       </includes>
     </fileSet>
     <fileSet>
+      
<directory>../submarine-workbench/interpreter/spark/scala-2.11</directory>
+      
<outputDirectory>/workbench/interpreter/spark/scala-2.11</outputDirectory>
+      <includes>
+        <include>spark-scala-2.11-${zeppelin.version}.jar</include>
+      </includes>
+    </fileSet>
+    <fileSet>
+      
<directory>../submarine-workbench/interpreter/spark-interpreter/target</directory>
+      <outputDirectory>/workbench/interpreter/spark</outputDirectory>
+      <includes>
+        <include>spark-interpreter-${project.version}-shade.jar</include>
+      </includes>
+    </fileSet>
+    <fileSet>
       <directory>../submarine-commons/commons-cluster/target</directory>
       <outputDirectory>/lib/commons</outputDirectory>
       <includes>
diff --git 
a/submarine-workbench/interpreter/interpreter-engine/src/main/java/org/apache/submarine/interpreter/InterpreterProcess.java
 
b/submarine-workbench/interpreter/interpreter-engine/src/main/java/org/apache/submarine/interpreter/InterpreterProcess.java
index 67969fe..202f6a2 100644
--- 
a/submarine-workbench/interpreter/interpreter-engine/src/main/java/org/apache/submarine/interpreter/InterpreterProcess.java
+++ 
b/submarine-workbench/interpreter/interpreter-engine/src/main/java/org/apache/submarine/interpreter/InterpreterProcess.java
@@ -155,6 +155,8 @@ public class InterpreterProcess extends Thread implements 
Interpreter {
     String superIntpClassName = "";
     if (StringUtils.equals(intpName, "python")) {
       superIntpClassName = 
"org.apache.submarine.interpreter.PythonInterpreter";
+    } else if (StringUtils.equals(intpName, "spark")) {
+      superIntpClassName = "org.apache.submarine.interpreter.SparkInterpreter";
     } else {
       superIntpClassName = 
"org.apache.submarine.interpreter.InterpreterProcess";
     }
@@ -246,6 +248,20 @@ public class InterpreterProcess extends Thread implements 
Interpreter {
     }
   }
 
+  protected Properties mergeZeplSparkIntpProp(Properties newProps) {
+    Properties properties = new Properties();
+
+    properties.setProperty("zeppelin.spark.maxResult", "1000");
+    properties.setProperty("zeppelin.spark.scala.color", "false");
+
+    if (null != newProps) {
+      newProps.putAll(properties);
+      return newProps;
+    } else {
+      return properties;
+    }
+  }
+
   protected static InterpreterContext getIntpContext() {
     return InterpreterContext.builder()
         .setInterpreterOut(new InterpreterOutput(null))
diff --git a/submarine-workbench/interpreter/pom.xml 
b/submarine-workbench/interpreter/pom.xml
index 5ca3ed3..f24fec5 100644
--- a/submarine-workbench/interpreter/pom.xml
+++ b/submarine-workbench/interpreter/pom.xml
@@ -39,6 +39,7 @@
   <modules>
     <module>interpreter-engine</module>
     <module>python-interpreter</module>
+    <module>spark-interpreter</module>
   </modules>
 
 </project>
diff --git a/submarine-workbench/interpreter/python-interpreter/pom.xml 
b/submarine-workbench/interpreter/python-interpreter/pom.xml
index 71b2c59..810087e 100644
--- a/submarine-workbench/interpreter/python-interpreter/pom.xml
+++ b/submarine-workbench/interpreter/python-interpreter/pom.xml
@@ -32,8 +32,8 @@
 
   <artifactId>python-interpreter</artifactId>
   <version>0.3.0-SNAPSHOT</version>
-  <name>Submarine: Interpreter Pythone</name>
-  <description>Submarine Pythone Interpreter</description>
+  <name>Submarine: Interpreter Python</name>
+  <description>Submarine Python Interpreter</description>
 
   <dependencies>
     <dependency>
diff --git a/submarine-workbench/interpreter/spark-interpreter/README.md 
b/submarine-workbench/interpreter/spark-interpreter/README.md
new file mode 100644
index 0000000..9b210ce
--- /dev/null
+++ b/submarine-workbench/interpreter/spark-interpreter/README.md
@@ -0,0 +1,75 @@
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+   http://www.apache.org/licenses/LICENSE-2.0
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+# Submarine Spark Interpreter
+
+## Test Submarine Spark Interpreter
+
+### Execute test command
+```
+export SUBMARINE_HOME=/path/to/your/submarine_home
+java -jar spark-interpreter-0.3.0-SNAPSHOT-shade.jar spark 
spark-interpreter-id test
+```
+
+### Print test result 
+```
+ INFO [2019-11-09 11:12:04,888] ({main} ContextHandler.java[doStart]:781) - 
Started 
o.s.j.s.ServletContextHandler@58b97c15{/stages/stage/kill,null,AVAILABLE,@Spark}
+ INFO [2019-11-09 11:12:04,889] ({main} Logging.scala[logInfo]:54) - Bound 
SparkUI to 0.0.0.0, and started at http://10.0.0.3:4040
+ INFO [2019-11-09 11:12:04,923] ({main} Logging.scala[logInfo]:54) - Starting 
executor ID driver on host localhost
+ INFO [2019-11-09 11:12:04,927] ({main} Logging.scala[logInfo]:54) - Using 
REPL class URI: spark://10.0.0.3:64837/classes
+ INFO [2019-11-09 11:12:04,937] ({main} Logging.scala[logInfo]:54) - 
Successfully started service 
'org.apache.spark.network.netty.NettyBlockTransferService' on port 64838.
+ INFO [2019-11-09 11:12:04,937] ({main} Logging.scala[logInfo]:54) - Server 
created on 10.0.0.3:64838
+ INFO [2019-11-09 11:12:04,938] ({main} Logging.scala[logInfo]:54) - Using 
org.apache.spark.storage.RandomBlockReplicationPolicy for block replication 
policy
+ INFO [2019-11-09 11:12:04,950] ({main} Logging.scala[logInfo]:54) - 
Registering BlockManager BlockManagerId(driver, 10.0.0.3, 64838, None)
+ INFO [2019-11-09 11:12:04,952] ({dispatcher-event-loop-10} 
Logging.scala[logInfo]:54) - Registering block manager 10.0.0.3:64838 with 
2004.6 MB RAM, BlockManagerId(driver, 10.0.0.3, 64838, None)
+ INFO [2019-11-09 11:12:04,954] ({main} Logging.scala[logInfo]:54) - 
Registered BlockManager BlockManagerId(driver, 10.0.0.3, 64838, None)
+ INFO [2019-11-09 11:12:04,954] ({main} Logging.scala[logInfo]:54) - 
Initialized BlockManager: BlockManagerId(driver, 10.0.0.3, 64838, None)
+ INFO [2019-11-09 11:12:07,727] ({main} ContextHandler.java[doStart]:781) - 
Started 
o.s.j.s.ServletContextHandler@7aac6d13{/SQL/execution/json,null,AVAILABLE,@Spark}
+ INFO [2019-11-09 11:12:07,735] ({main} ContextHandler.java[doStart]:781) - 
Started 
o.s.j.s.ServletContextHandler@89017e5{/static/sql,null,AVAILABLE,@Spark}        
                                                                                
                                                                                
                                  +- LocalRelation <empty>, [_1#0, _2#1]
+ INFO [2019-11-09 11:12:08,499] ({main} Logging.scala[logInfo]:54) - Code 
generated in 81.495131 ms
+ INFO [2019-11-09 11:12:08,518] ({main} Logging.scala[logInfo]:54) - Code 
generated in 11.738758 ms
+ INFO [2019-11-09 11:12:08,525] ({main} SparkInterpreter.java[test]:159) - 
Execution Spark Interpreter, Calculation Spark Code  val df = 
spark.createDataFrame(Seq((1,"a"),(2, null)))
+ 
+ df.show(), Result = +---+----+
+ | _1 |  _2|
+ +---+----+
+ |  1|   a|
+ |  2|null|
+ +---+----+
+
+ df: org.apache.spark.sql.DataFrame = [_1: int, _2: string]
+ INFO [2019-11-09 11:12:08,525] ({main} SparkInterpreter.java[close]:159) - 
Close SparkInterpreter
+ INFO [2019-11-09 11:12:08,536] ({main} Logging.scala[logInfo]:54) - Stopped 
Spark web UI at http://10.0.0.3:4040
+ INFO [2019-11-09 11:12:08,539] ({dispatcher-event-loop-15} 
Logging.scala[logInfo]:54) - MapOutputTrackerMasterEndpoint stopped!
+ INFO [2019-11-09 11:12:08,542] ({main} Logging.scala[logInfo]:54) - 
MemoryStore cleared
+ INFO [2019-11-09 11:12:08,542] ({main} Logging.scala[logInfo]:54) - 
BlockManager stopped
+ INFO [2019-11-09 11:12:08,544] ({main} Logging.scala[logInfo]:54) - 
BlockManagerMaster stopped
+ INFO [2019-11-09 11:12:08,546] ({dispatcher-event-loop-4} 
Logging.scala[logInfo]:54) - OutputCommitCoordinator stopped!
+ INFO [2019-11-09 11:12:08,551] ({main} Logging.scala[logInfo]:54) - 
Successfully stopped SparkContext
+ INFO [2019-11-09 11:12:08,551] ({main} Logging.scala[logInfo]:54) - 
SparkContext already stopped.
+ INFO [2019-11-09 11:12:08,551] ({main} InterpreterProcess.java[<init>]:120) - 
Interpreter test result: true
+ INFO [2019-11-09 11:12:08,552] ({shutdown-hook-0} Logging.scala[logInfo]:54) 
- Shutdown hook called
+ INFO [2019-11-09 11:12:08,553] ({shutdown-hook-0} Logging.scala[logInfo]:54) 
- Deleting directory 
/private/var/folders/xl/_xb3fgzj5zd698khfz6z74cc0000gn/T/spark-2f4acad9-a72d-4bca-8d85-3ef310f0b08c
+```
+
+
+## Debug Submarine Spark Interpreter
+
+### Execute debug command
+
+```
+java -jar -agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005 
spark-interpreter-0.3.0-SNAPSHOT-shade.jar spark spark-interpreter-id
+```
+
+Connect via remote debugging in IDEA
diff --git a/submarine-workbench/interpreter/spark-interpreter/pom.xml 
b/submarine-workbench/interpreter/spark-interpreter/pom.xml
new file mode 100644
index 0000000..0ba95a9
--- /dev/null
+++ b/submarine-workbench/interpreter/spark-interpreter/pom.xml
@@ -0,0 +1,265 @@
+<?xml version="1.0"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
+         https://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.submarine</groupId>
+        <artifactId>interpreter</artifactId>
+        <version>0.3.0-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+
+    <artifactId>spark-interpreter</artifactId>
+    <version>0.3.0-SNAPSHOT</version>
+    <name>Submarine: Interpreter Spark</name>
+    <description>Submarine spark Interpreter</description>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.submarine</groupId>
+            <artifactId>interpreter-engine</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.zeppelin</groupId>
+            <artifactId>spark-interpreter</artifactId>
+            <version>${zeppelin.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.zeppelin</groupId>
+            <artifactId>zeppelin-interpreter</artifactId>
+            <version>${zeppelin.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>io.atomix</groupId>
+                    <artifactId>atomix</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>io.atomix</groupId>
+                    <artifactId>atomix-raft</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>io.atomix</groupId>
+                    <artifactId>atomix-primary-backup</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>commons-lang</groupId>
+                    <artifactId>commons-lang</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.httpcomponents</groupId>
+                    <artifactId>httpcore</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.httpcomponents</groupId>
+                    <artifactId>httpclient</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.google.code.gson</groupId>
+                    <artifactId>gson</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-api</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>commons-codec</groupId>
+                    <artifactId>commons-codec</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>commons-logging</groupId>
+                    <artifactId>commons-logging</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>commons-configuration</groupId>
+                    <artifactId>commons-configuration</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.commons</groupId>
+                    <artifactId>commons-math3</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>com.google.code.findbugs</groupId>
+            <artifactId>jsr305</artifactId>
+            <version>${jsr305.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>commons-lang</groupId>
+            <artifactId>commons-lang</artifactId>
+            <version>${commons-lang.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpcore</artifactId>
+            <version>${httpcore.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpclient</artifactId>
+            <version>${httpclient.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>commons-logging</groupId>
+                    <artifactId>commons-logging</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>com.google.code.gson</groupId>
+            <artifactId>gson</artifactId>
+            <version>${gson.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <version>${slf4j.version}</version>
+        </dependency>
+
+        <!-- Test libraries -->
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>${junit.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <version>${mockito.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.zeppelin</groupId>
+            <artifactId>zeppelin-spark-dependencies</artifactId>
+            <version>${zeppelin.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.zeppelin</groupId>
+            <artifactId>spark-scala-2.11</artifactId>
+            <version>0.9.0-SNAPSHOT</version>
+            <scope>provided</scope>
+        </dependency>
+
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <version>${plugin.shade.version}</version>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            
<outputFile>target/${project.artifactId}-${project.version}-shade.jar</outputFile>
+                            <filters>
+                                <filter>
+                                    <artifact>*:*</artifact>
+                                    <excludes>
+                                        <exclude>META-INF/*.SF</exclude>
+                                        <exclude>META-INF/*.DSA</exclude>
+                                        <exclude>META-INF/*.RSA</exclude>
+                                    </excludes>
+                                </filter>
+                            </filters>
+                            <transformers>
+                                <transformer 
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                                    
<mainClass>org.apache.submarine.interpreter.InterpreterProcess</mainClass>
+                                </transformer>
+                                <transformer 
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+                            </transformers>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-dependency-plugin</artifactId>
+            </plugin>
+
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-checkstyle-plugin</artifactId>
+                <configuration>
+                    <skip>false</skip>
+                </configuration>
+            </plugin>
+
+            <plugin>
+                <artifactId>maven-enforcer-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>enforce</id>
+                        <phase>none</phase>
+                    </execution>
+                </executions>
+            </plugin>
+
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-dependency-plugin</artifactId>
+
+                <executions>
+                    <execution>
+                        <id>copy-dependencies-runtime</id>
+                        <phase>validate</phase>
+                        <goals>
+                            <goal>copy-dependencies</goal>
+                        </goals>
+                        <configuration>
+                            
<includeArtifactIds>spark-scala-2.11</includeArtifactIds>
+                            
<includeGroupIds>org.apache.zeppelin</includeGroupIds>
+                            
<outputDirectory>../spark/scala-2.11</outputDirectory>
+                        </configuration>
+                    </execution>
+                    <execution>
+                        <id>copy-dependencies-system</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>copy-dependencies</goal>
+                        </goals>
+                        <configuration>
+                            <includeScope>system</includeScope>
+                            <excludeTransitive>true</excludeTransitive>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+
+        </plugins>
+    </build>
+</project>
diff --git 
a/submarine-workbench/interpreter/spark-interpreter/src/main/java/org/apache/submarine/interpreter/SparkInterpreter.java
 
b/submarine-workbench/interpreter/spark-interpreter/src/main/java/org/apache/submarine/interpreter/SparkInterpreter.java
new file mode 100644
index 0000000..40ee160
--- /dev/null
+++ 
b/submarine-workbench/interpreter/spark-interpreter/src/main/java/org/apache/submarine/interpreter/SparkInterpreter.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.submarine.interpreter;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.InterpreterGroup;
+import org.apache.zeppelin.interpreter.InterpreterResultMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+public class SparkInterpreter extends InterpreterProcess {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkInterpreter.class);
+
+  private org.apache.zeppelin.spark.SparkInterpreter zpleSparkInterpreter;
+  private InterpreterContext intpContext;
+
+  private String extractScalaVersion() throws InterpreterException {
+    String scalaVersionString = scala.util.Properties.versionString();
+    LOG.info("Using Scala: " + scalaVersionString);
+    if (scalaVersionString.contains("version 2.10")) {
+      return "2.10";
+    } else if (scalaVersionString.contains("version 2.11")) {
+      return "2.11";
+    } else if (scalaVersionString.contains("version 2.12")) {
+      return "2.12";
+    } else {
+      throw new InterpreterException("Unsupported scala version: " + 
scalaVersionString);
+    }
+  }
+
+  public SparkInterpreter(Properties properties) {
+    properties = mergeZeplSparkIntpProp(properties);
+    zpleSparkInterpreter = new 
org.apache.zeppelin.spark.SparkInterpreter(properties);
+    zpleSparkInterpreter.setInterpreterGroup(new InterpreterGroup());
+    intpContext = this.getIntpContext();
+  }
+  public SparkInterpreter() {
+    this(new Properties());
+  }
+
+  @Override
+  public void open() {
+    try {
+      ClassLoader scalaInterpreterClassLoader = null;
+      String submarineHome = System.getenv("SUBMARINE_HOME");
+      String interpreterDir  = "";
+      if (StringUtils.isBlank(submarineHome)) {
+        LOG.warn("SUBMARINE_HOME is not set, default interpreter directory is 
../ ");
+        interpreterDir = "..";
+      } else {
+        interpreterDir = submarineHome + "/workbench/interpreter";
+      }
+      String scalaVersion = extractScalaVersion();
+      File scalaJarFolder = new File(interpreterDir + "/spark/scala-" + 
scalaVersion);
+      List<URL> urls = new ArrayList<>();
+      for (File file : scalaJarFolder.listFiles()) {
+        LOG.info("Add file " + file.getAbsolutePath() + " to classpath of 
spark scala interpreter: "
+                + scalaJarFolder);
+        urls.add(file.toURI().toURL());
+      }
+      scalaInterpreterClassLoader = new URLClassLoader(urls.toArray(new 
URL[0]),
+              Thread.currentThread().getContextClassLoader());
+      if (scalaInterpreterClassLoader != null) {
+        
Thread.currentThread().setContextClassLoader(scalaInterpreterClassLoader);
+      }
+      zpleSparkInterpreter.open();
+    } catch (InterpreterException e) {
+      LOG.error(e.getMessage(), e);
+    } catch (MalformedURLException e) {
+      LOG.error(e.getMessage(), e);
+    }
+  }
+
+  @Override
+  public InterpreterResult interpret(String code) {
+    InterpreterResult interpreterResult = null;
+    try {
+      org.apache.zeppelin.interpreter.InterpreterResult zeplInterpreterResult
+              = zpleSparkInterpreter.interpret(code, intpContext);
+      interpreterResult = new InterpreterResult(zeplInterpreterResult);
+
+      List<InterpreterResultMessage> interpreterResultMessages =
+              intpContext.out.toInterpreterResultMessage();
+
+      for (org.apache.zeppelin.interpreter.InterpreterResultMessage message : 
interpreterResultMessages) {
+        interpreterResult.add(message);
+      }
+    } catch (InterpreterException | IOException e) {
+      LOG.error(e.getMessage(), e);
+    }
+
+    return interpreterResult;
+  }
+
+  @Override
+  public void close() {
+    try {
+      zpleSparkInterpreter.close();
+    } catch (InterpreterException e) {
+      LOG.error(e.getMessage(), e);
+    }
+  }
+
+  @Override
+  public void cancel() {
+    try {
+      zpleSparkInterpreter.cancel(intpContext);
+    } catch (InterpreterException e) {
+      LOG.error(e.getMessage(), e);
+    }
+  }
+
+  @Override
+  public int getProgress() {
+    int process = 0;
+    try {
+      process = zpleSparkInterpreter.getProgress(intpContext);
+    } catch (InterpreterException e) {
+      LOG.error(e.getMessage(), e);
+    }
+
+    return process;
+  }
+
+  @Override
+  public boolean test() {
+    open();
+    String code = "val df = spark.createDataFrame(Seq((1,\"a\"),(2, null)))\n" 
+
+        "df.show()";
+    InterpreterResult result = interpret(code);
+    LOG.info("Execution Spark Interpreter, Calculation Spark Code  {}, Result 
= {}",
+             code, result.message().get(0).getData());
+
+    if (result.code() != InterpreterResult.Code.SUCCESS) {
+      close();
+      return false;
+    }
+    boolean success = (result.message().get(0).getData().contains(
+            "+---+----+\n" +
+                "| _1|  _2|\n" +
+                "+---+----+\n" +
+                "|  1|   a|\n" +
+                "|  2|null|\n" +
+                "+---+----+"));
+    close();
+    return success;
+  }
+}
diff --git 
a/submarine-workbench/interpreter/spark-interpreter/src/test/java/org/apache/submarine/interpreter/SparkInterpreterTest.java
 
b/submarine-workbench/interpreter/spark-interpreter/src/test/java/org/apache/submarine/interpreter/SparkInterpreterTest.java
new file mode 100644
index 0000000..9672e21
--- /dev/null
+++ 
b/submarine-workbench/interpreter/spark-interpreter/src/test/java/org/apache/submarine/interpreter/SparkInterpreterTest.java
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.submarine.interpreter;
+
+import org.apache.zeppelin.display.AngularObjectRegistry;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+
+import org.apache.zeppelin.interpreter.InterpreterOutput;
+import org.apache.zeppelin.interpreter.InterpreterOutputListener;
+import org.apache.zeppelin.interpreter.InterpreterResultMessageOutput;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+public class SparkInterpreterTest {
+  private SparkInterpreter interpreter;
+
+  // catch the streaming output in onAppend
+  private volatile String output = "";
+  // catch the interpreter output in onUpdate
+  private InterpreterResultMessageOutput messageOutput;
+
+  private RemoteInterpreterEventClient mockRemoteEventClient;
+
+  @Before
+  public void setUp() {
+    mockRemoteEventClient = mock(RemoteInterpreterEventClient.class);
+  }
+
+  @Test
+  public void testSparkInterpreter() throws InterruptedException {
+    Properties properties = new Properties();
+    properties.setProperty("spark.master", "local");
+    properties.setProperty("spark.app.name", "test");
+    properties.setProperty("zeppelin.spark.maxResult", "100");
+    properties.setProperty("zeppelin.spark.test", "true");
+    properties.setProperty("zeppelin.spark.uiWebUrl", "fake_spark_weburl");
+    // disable color output for easy testing
+    properties.setProperty("zeppelin.spark.scala.color", "false");
+    properties.setProperty("zeppelin.spark.deprecatedMsg.show", "false");
+
+    InterpreterContext context = InterpreterContext.builder()
+        .setInterpreterOut(new InterpreterOutput(null))
+        .setIntpEventClient(mockRemoteEventClient)
+        .setAngularObjectRegistry(new AngularObjectRegistry("spark", null))
+        .build();
+    InterpreterContext.set(context);
+
+    interpreter = new SparkInterpreter();
+    try {
+      interpreter.open();
+    } catch (Throwable ex) {
+      ex.printStackTrace();
+    }
+
+
+    InterpreterResult result = interpreter.interpret("val a=\"hello world\"");
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    assertEquals("a: String = hello world\n", 
result.message().get(0).getData());
+
+    result = interpreter.interpret("print(a)");
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    assertEquals("hello world", result.message().get(0).getData());
+
+    // java stdout
+    result = interpreter.interpret("System.out.print(a)");
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    assertEquals("hello world", result.message().get(0).getData());
+
+    // incomplete
+    result = interpreter.interpret("println(a");
+    assertEquals(InterpreterResult.Code.INCOMPLETE, result.code());
+
+    // syntax error
+    result = interpreter.interpret("println(b)");
+    assertEquals(InterpreterResult.Code.ERROR, result.code());
+    assertTrue(result.message().get(0).getData().contains("not found: value 
b"));
+
+    //multiple line
+    result = interpreter.interpret("\"123\".\ntoInt");
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+    // single line comment
+    result = interpreter.interpret("print(\"hello world\")/*comment here*/");
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    assertEquals("hello world", result.message().get(0).getData());
+
+    result = interpreter.interpret("/*comment here*/\nprint(\"hello world\")");
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+    // multiple line comment
+    result = interpreter.interpret("/*line 1 \n line 2*/");
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+    // test function
+    result = interpreter.interpret("def add(x:Int, y:Int)\n{ return x+y }");
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+    result = interpreter.interpret("print(add(1,2))");
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+    result = interpreter.interpret("/*line 1 \n line 2*/print(\"hello 
world\")");
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+    // Companion object with case class
+    result = interpreter.interpret("import scala.math._\n" +
+        "object Circle {\n" +
+        "  private def calculateArea(radius: Double): Double = Pi * 
pow(radius, 2.0)\n" +
+        "}\n" +
+        "case class Circle(radius: Double) {\n" +
+        "  import Circle._\n" +
+        "  def area: Double = calculateArea(radius)\n" +
+        "}\n" +
+        "\n" +
+        "val circle1 = new Circle(5.0)");
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+    // class extend
+    result = interpreter.interpret("import java.util.ArrayList");
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+    context = getInterpreterContext();
+    context.setParagraphId("pid_1");
+    result = interpreter.interpret("sc\n.range(1, 10)\n.sum");
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    assertTrue(result.message().get(0).getData().contains("45"));
+    result = interpreter.interpret("sc\n.range(1, 10)\n.sum");
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    assertTrue(result.message().get(0).getData().contains("45"));
+    result = interpreter.interpret("val bankText = sc.textFile(\"bank.csv\")");
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+    result = interpreter.interpret(
+        "case class Bank(age:Integer, job:String, marital : String, edu : 
String, balance : Integer)\n" +
+            "val bank = bankText.map(s=>s.split(\";\")).filter(s => 
s(0)!=\"\\\"age\\\"\").map(\n" +
+            "    s => Bank(s(0).toInt, \n" +
+            "            s(1).replaceAll(\"\\\"\", \"\"),\n" +
+            "            s(2).replaceAll(\"\\\"\", \"\"),\n" +
+            "            s(3).replaceAll(\"\\\"\", \"\"),\n" +
+            "            s(5).replaceAll(\"\\\"\", \"\").toInt\n" +
+            "        )\n" +
+            ").toDF()");
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+    // spark version
+    result = interpreter.interpret("sc.version");
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+    // spark sql test
+    String version = result.message().get(0).getData().trim();
+    if (version.contains("String = 1.")) {
+      result = interpreter.interpret("sqlContext");
+      assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+      result = interpreter.interpret(
+          "val df = sqlContext.createDataFrame(Seq((1,\"a\"),(2, null)))\n" +
+              "df.show()");
+      assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+      assertTrue(result.message().get(0).getData().contains(
+          "+---+----+\n" +
+              "| _1|  _2|\n" +
+              "+---+----+\n" +
+              "|  1|   a|\n" +
+              "|  2|null|\n" +
+              "+---+----+"));
+    } else if (version.contains("String = 2.")) {
+      result = interpreter.interpret("spark");
+      assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+      result = interpreter.interpret(
+          "val df = spark.createDataFrame(Seq((1,\"a\"),(2, null)))\n" +
+              "df.show()");
+      assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+      assertTrue(result.message().get(0).getData().contains(
+          "+---+----+\n" +
+              "| _1|  _2|\n" +
+              "+---+----+\n" +
+              "|  1|   a|\n" +
+              "|  2|null|\n" +
+              "+---+----+"));
+    }
+
+    // ZeppelinContext
+    result = interpreter.interpret("z.show(df)");
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    assertEquals(InterpreterResult.Type.TABLE, 
result.message().get(0).getType());
+    assertEquals("_1\t_2\n1\ta\n2\tnull\n", result.message().get(0).getData());
+
+    result = interpreter.interpret("z.input(\"name\", \"default_name\")");
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+    // getProgress;
+    Thread interpretThread = new Thread() {
+      @Override
+      public void run() {
+        InterpreterResult result = null;
+        result = interpreter.interpret(
+            "val df = sc.parallelize(1 to 10, 
5).foreach(e=>Thread.sleep(1000))");
+        assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+      }
+    };
+    interpretThread.start();
+    boolean nonZeroProgress = false;
+    int progress = 0;
+    while (interpretThread.isAlive()) {
+      progress = interpreter.getProgress();
+      assertTrue(progress >= 0);
+      if (progress != 0 && progress != 100) {
+        nonZeroProgress = true;
+      }
+      Thread.sleep(100);
+    }
+    assertTrue(nonZeroProgress);
+
+    interpretThread = new Thread() {
+      @Override
+      public void run() {
+        InterpreterResult result = null;
+        result = interpreter.interpret(
+            "val df = sc.parallelize(1 to 10, 
2).foreach(e=>Thread.sleep(1000))");
+        assertEquals(InterpreterResult.Code.ERROR, result.code());
+        assertTrue(result.message().get(0).getData().contains("cancelled"));
+      }
+    };
+
+    interpretThread.start();
+    // sleep 1 second to wait for the spark job start
+    Thread.sleep(1000);
+    interpreter.cancel();
+    interpretThread.join();
+  }
+
+  @Test
+  public void testDisableReplOutput() {
+    Properties properties = new Properties();
+    properties.setProperty("spark.master", "local");
+    properties.setProperty("spark.app.name", "test");
+    properties.setProperty("zeppelin.spark.maxResult", "100");
+    properties.setProperty("zeppelin.spark.test", "true");
+    properties.setProperty("zeppelin.spark.printREPLOutput", "false");
+    // disable color output for easy testing
+    properties.setProperty("zeppelin.spark.scala.color", "false");
+    properties.setProperty("zeppelin.spark.deprecatedMsg.show", "false");
+
+    InterpreterContext.set(getInterpreterContext());
+    interpreter = new SparkInterpreter();
+    interpreter.open();
+
+    InterpreterResult result = interpreter.interpret("val a=\"hello world\"");
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    // no output for define new variable
+    assertEquals("", output);
+
+    result = interpreter.interpret("print(a)");
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    // output from print statement will still be displayed
+    assertEquals("hello world", result.message().get(0).getData());
+  }
+
+  @Test
+  public void testSchedulePool() {
+    Properties properties = new Properties();
+    properties.setProperty("spark.master", "local");
+    properties.setProperty("spark.app.name", "test");
+    properties.setProperty("zeppelin.spark.maxResult", "100");
+    properties.setProperty("zeppelin.spark.test", "true");
+    properties.setProperty("spark.scheduler.mode", "FAIR");
+    // disable color output for easy testing
+    properties.setProperty("zeppelin.spark.scala.color", "false");
+    properties.setProperty("zeppelin.spark.deprecatedMsg.show", "false");
+
+    interpreter = new SparkInterpreter();
+    InterpreterContext.set(getInterpreterContext());
+    interpreter.open();
+
+    InterpreterResult result = interpreter.interpret("sc.range(1, 10).sum");
+    // pool is reset to null if user don't specify it via paragraph properties
+    result = interpreter.interpret("sc.range(1, 10).sum");
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+  }
+
+  // spark.ui.enabled: false
+  @Test
+  public void testDisableSparkUI_1() {
+    Properties properties = new Properties();
+    properties.setProperty("spark.master", "local");
+    properties.setProperty("spark.app.name", "test");
+    properties.setProperty("zeppelin.spark.maxResult", "100");
+    properties.setProperty("zeppelin.spark.test", "true");
+    properties.setProperty("spark.ui.enabled", "false");
+    // disable color output for easy testing
+    properties.setProperty("zeppelin.spark.scala.color", "false");
+    properties.setProperty("zeppelin.spark.deprecatedMsg.show", "false");
+
+    interpreter = new SparkInterpreter();
+    InterpreterContext.set(getInterpreterContext());
+    interpreter.open();
+
+    InterpreterResult result = interpreter.interpret("sc.range(1, 10).sum");
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+  }
+
+  // zeppelin.spark.ui.hidden: true
+  @Test
+  public void testDisableSparkUI_2() {
+    Properties properties = new Properties();
+    properties.setProperty("spark.master", "local");
+    properties.setProperty("spark.app.name", "test");
+    properties.setProperty("zeppelin.spark.maxResult", "100");
+    properties.setProperty("zeppelin.spark.test", "true");
+    properties.setProperty("zeppelin.spark.ui.hidden", "true");
+    // disable color output for easy testing
+    properties.setProperty("zeppelin.spark.scala.color", "false");
+    properties.setProperty("zeppelin.spark.deprecatedMsg.show", "false");
+
+    interpreter = new SparkInterpreter();
+    InterpreterContext.set(getInterpreterContext());
+    interpreter.open();
+
+    InterpreterResult result = interpreter.interpret("sc.range(1, 10).sum");
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+  }
+
+
+  @After
+  public void tearDown() {
+    if (this.interpreter != null) {
+      this.interpreter.close();
+    }
+  }
+
+  private InterpreterContext getInterpreterContext() {
+    output = "";
+    InterpreterContext context = InterpreterContext.builder()
+        .setInterpreterOut(new InterpreterOutput(null))
+        .setIntpEventClient(mockRemoteEventClient)
+        .setAngularObjectRegistry(new AngularObjectRegistry("spark", null))
+        .build();
+    context.out = new InterpreterOutput(
+        new InterpreterOutputListener() {
+          @Override
+          public void onUpdateAll(InterpreterOutput out) {
+
+          }
+
+          @Override
+          public void onAppend(int index, InterpreterResultMessageOutput out, 
byte[] line) {
+            try {
+              output = out.toInterpreterResultMessage().getData();
+            } catch (IOException e) {
+              e.printStackTrace();
+            }
+          }
+
+          @Override
+          public void onUpdate(int index, InterpreterResultMessageOutput out) {
+            messageOutput = out;
+          }
+        }
+    );
+    return context;
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@submarine.apache.org
For additional commands, e-mail: dev-h...@submarine.apache.org

Reply via email to