Unsubscribe On Fri, Jul 15, 2022, 06:34 <jongy...@apache.org> wrote:
> This is an automated email from the ASF dual-hosted git repository. > > jongyoul pushed a commit to branch master > in repository https://gitbox.apache.org/repos/asf/zeppelin.git > > > The following commit(s) were added to refs/heads/master by this push: > new 11d82c4bda [ZEPPELIN-5764] Remove Scalding interpreter (#4405) > 11d82c4bda is described below > > commit 11d82c4bda72abe276f8c4309f617569c3f4a57c > Author: 김민수 <alstnals...@gmail.com> > AuthorDate: Fri Jul 15 13:34:12 2022 +0900 > > [ZEPPELIN-5764] Remove Scalding interpreter (#4405) > > * [ZEPPELIN-5764] Remove Scalding interpreter > > * [ZEPPELIN-5764] remove scalding interpreter docs > > in how_to_build.md > remove scalding interpreter command example, scalding keword > --- > .github/workflows/core.yml | 2 +- > conf/interpreter-list | 1 - > dev/create_release.sh | 2 +- > docs/_includes/themes/zeppelin/_navigation.html | 1 - > docs/index.md | 1 - > docs/interpreter/scalding.md | 168 ------------- > docs/setup/basics/how_to_build.md | 8 +- > docs/usage/interpreter/installation.md | 16 -- > pom.xml | 1 - > scalding/pom.xml | 197 --------------- > .../zeppelin/scalding/ScaldingInterpreter.java | 280 > --------------------- > .../src/main/resources/interpreter-setting.json | 21 -- > .../zeppelin/scalding/ZeppelinReplState.scala | 48 ---- > .../zeppelin/scalding/ZeppelinScaldingLoop.scala | 46 ---- > .../zeppelin/scalding/ZeppelinScaldingShell.scala | 72 ------ > .../zeppelin/scalding/ScaldingInterpreterTest.java | 144 ----------- > 16 files changed, 3 insertions(+), 1005 deletions(-) > > diff --git a/.github/workflows/core.yml b/.github/workflows/core.yml > index 4b3e42a369..db075e1bc2 100644 > --- a/.github/workflows/core.yml > +++ b/.github/workflows/core.yml > @@ -82,7 +82,7 @@ jobs: > interpreter-test-non-core: > runs-on: ubuntu-20.04 > env: > - INTERPRETERS: > 'beam,hbase,pig,jdbc,file,flink-cmd,ignite,cassandra,elasticsearch,bigquery,alluxio,scio,livy,groovy,sap,java,geode,neo4j,hazelcastjet,submarine,sparql,mongodb,influxdb,ksql,scalding' > + INTERPRETERS: > 'beam,hbase,pig,jdbc,file,flink-cmd,ignite,cassandra,elasticsearch,bigquery,alluxio,scio,livy,groovy,sap,java,geode,neo4j,hazelcastjet,submarine,sparql,mongodb,influxdb,ksql' > steps: > - name: Checkout > uses: actions/checkout@v2 > diff --git a/conf/interpreter-list b/conf/interpreter-list > index 76584969c0..270d243b5c 100644 > --- a/conf/interpreter-list > +++ b/conf/interpreter-list > @@ -39,7 +39,6 @@ neo4j > org.apache.zeppelin:zeppelin-neo4j:0.10.0 Neo4j i > pig org.apache.zeppelin:zeppelin-pig:0.10.0 > Pig interpreter > python org.apache.zeppelin:zeppelin-python:0.10.0 > Python interpreter > sap org.apache.zeppelin:zeppelin-sap:0.10.0 > SAP Support > -scalding org.apache.zeppelin:zeppelin-scalding_2.0.10:0.10.0 > Scalding interpreter > scio org.apache.zeppelin:zeppelin-scio:0.10.0 > Scio interpreter > shell org.apache.zeppelin:zeppelin-shell:0.10.0 > Shell command > sparql org.apache.zeppelin:zeppelin-sparql:0.10.0 > Sparql interpreter > diff --git a/dev/create_release.sh b/dev/create_release.sh > index a3bef0d1d5..ae2162aa9b 100755 > --- a/dev/create_release.sh > +++ b/dev/create_release.sh > @@ -97,7 +97,7 @@ function make_binary_release() { > > git_clone > make_source_package > -make_binary_release netinst "-Pweb-angular -Phadoop-2.6 -pl > !beam,!hbase,!pig,!jdbc,!file,!flink,!ignite,!cassandra,!elasticsearch,!bigquery,!alluxio,!scio,!livy,!groovy,!sap,!java,!geode,!neo4j,!hazelcastjet,!submarine,!sparql,!mongodb,!ksql,!scalding > -am" > +make_binary_release netinst "-Pweb-angular -Phadoop-2.6 -pl > !beam,!hbase,!pig,!jdbc,!file,!flink,!ignite,!cassandra,!elasticsearch,!bigquery,!alluxio,!scio,!livy,!groovy,!sap,!java,!geode,!neo4j,!hazelcastjet,!submarine,!sparql,!mongodb,!ksql > -am" > make_binary_release all "-Pweb-angular -Phadoop-2.6" > > # remove non release files and dirs > diff --git a/docs/_includes/themes/zeppelin/_navigation.html > b/docs/_includes/themes/zeppelin/_navigation.html > index 205e8fc7fc..ceed569605 100644 > --- a/docs/_includes/themes/zeppelin/_navigation.html > +++ b/docs/_includes/themes/zeppelin/_navigation.html > @@ -163,7 +163,6 @@ > <li><a > href="{{BASE_PATH}}/interpreter/pig.html">Pig</a></li> > <li><a > href="{{BASE_PATH}}/interpreter/postgresql.html">Postgresql, HAWQ</a></li> > <li><a > href="{{BASE_PATH}}/interpreter/sap.html">SAP</a></li> > - <li><a > href="{{BASE_PATH}}/interpreter/scalding.html">Scalding</a></li> > <li><a > href="{{BASE_PATH}}/interpreter/scio.html">Scio</a></li> > <li><a > href="{{BASE_PATH}}/interpreter/shell.html">Shell</a></li> > <li><a > href="{{BASE_PATH}}/interpreter/sparql.html">Sparql</a></li> > diff --git a/docs/index.md b/docs/index.md > index d955496160..d3e5a461d5 100644 > --- a/docs/index.md > +++ b/docs/index.md > @@ -163,7 +163,6 @@ limitations under the License. > * [Python](./interpreter/python.html) > * [R](./interpreter/r.html) > * [SAP](./interpreter/sap.html) > - * [Scalding](./interpreter/scalding.html) > * [Scio](./interpreter/scio.html) > * [Shell](./interpreter/shell.html) > * [Spark](./interpreter/spark.html) > diff --git a/docs/interpreter/scalding.md b/docs/interpreter/scalding.md > deleted file mode 100644 > index 1d55e59a38..0000000000 > --- a/docs/interpreter/scalding.md > +++ /dev/null > @@ -1,168 +0,0 @@ > ---- > -layout: page > -title: "Scalding Interpreter for Apache Zeppelin" > -description: "Scalding is an open source Scala library for writing > MapReduce jobs." > -group: interpreter > ---- > -<!-- > -Licensed under the Apache License, Version 2.0 (the "License"); > -you may not use this file except in compliance with the License. > -You may obtain a copy of the License at > - > -http://www.apache.org/licenses/LICENSE-2.0 > - > -Unless required by applicable law or agreed to in writing, software > -distributed under the License is distributed on an "AS IS" BASIS, > -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > -See the License for the specific language governing permissions and > -limitations under the License. > ---> > -{% include JB/setup %} > - > -# Scalding Interpreter for Apache Zeppelin > - > -<div id="toc"></div> > - > -[Scalding](https://github.com/twitter/scalding) is an open source Scala > library for writing MapReduce jobs. > - > -## Building the Scalding Interpreter > -You have to first build the Scalding interpreter by enable the > **scalding** profile as follows: > - > -```bash > -./mvnw clean package -Pscalding -DskipTests > -``` > - > -## Enabling the Scalding Interpreter > -In a notebook, to enable the **Scalding** interpreter, click on the > **Gear** icon,select **Scalding**, and hit **Save**. > - > -<center> > - > - > - > - > - > -</center> > - > -## Configuring the Interpreter > - > -Scalding interpreter runs in two modes: > - > -* local > -* hdfs > - > -In the local mode, you can access files on the local server and scalding > transformation are done locally. > - > -In hdfs mode you can access files in HDFS and scalding transformation are > run as hadoop map-reduce jobs. > - > -Zeppelin comes with a pre-configured Scalding interpreter in local mode. > - > -To run the scalding interpreter in the hdfs mode you have to do the > following: > - > -**Set the classpath with ZEPPELIN\_CLASSPATH\_OVERRIDES** > - > -In conf/zeppelin_env.sh, you have to set > -ZEPPELIN_CLASSPATH_OVERRIDES to the contents of 'hadoop classpath' > -and directories with custom jar files you need for your scalding commands. > - > -**Set arguments to the scalding repl** > - > -The default arguments are: `--local --repl` > - > -For hdfs mode you need to add: `--hdfs --repl` > - > -If you want to add custom jars, you need to add: `-libjars > directory/*:directory/*` > - > -For reducer estimation, you need to add something like: > > -`-Dscalding.reducer.estimator.classes=com.twitter.scalding.reducer_estimation.InputSizeReducerEstimator` > - > -**Set max.open.instances** > - > -If you want to control the maximum number of open interpreters, you have > to select "scoped" interpreter for note > -option and set `max.open.instances` argument. > - > -## Testing the Interpreter > - > -### Local mode > - > -In example, by using the [Alice in Wonderland]( > https://gist.github.com/johnynek/a47699caa62f4f38a3e2) tutorial, > -we will count words (of course!), and plot a graph of the top 10 words in > the book. > - > -```scala > -%scalding > - > -import scala.io.Source > - > -// Get the Alice in Wonderland book from gutenberg.org: > -val alice = Source.fromURL("http://www.gutenberg.org/files/11/11.txt > ").getLines > -val aliceLineNum = alice.zipWithIndex.toList > -val alicePipe = TypedPipe.from(aliceLineNum) > - > -// Now get a list of words for the book: > -val aliceWords = alicePipe.flatMap { case (text, _) => > text.split("\\s+").toList } > - > -// Now lets add a count for each word: > -val aliceWithCount = aliceWords.filterNot(_.equals("")).map { word => > (word, 1L) } > - > -// let's sum them for each word: > -val wordCount = aliceWithCount.group.sum > - > -print ("Here are the top 10 words\n") > -val top10 = wordCount > - .groupAll > - .sortBy { case (word, count) => -count } > - .take(10) > -top10.dump > - > -``` > -``` > -%scalding > - > -val table = "words\t count\n" + top10.toIterator.map{case (k, (word, > count)) => s"$word\t$count"}.mkString("\n") > -print("%table " + table) > - > -``` > - > -If you click on the icon for the pie chart, you should be able to see a > chart like this: > - > - > - > -### HDFS mode > - > -**Test mode** > - > -``` > -%scalding > -mode > -``` > -This command should print: > - > -``` > -res4: com.twitter.scalding.Mode = Hdfs(true,Configuration: > core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, > yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml) > -``` > - > - > -**Test HDFS read** > - > -```scala > -val testfile = TypedPipe.from(TextLine("/user/x/testfile")) > -testfile.dump > -``` > - > -This command should print the contents of the hdfs file /user/x/testfile. > - > -**Test map-reduce job** > - > -```scala > -val testfile = TypedPipe.from(TextLine("/user/x/testfile")) > -val a = testfile.groupAll.size.values > -a.toList > - > -``` > - > -This command should create a map reduce job. > - > -## Future Work > -* Better user feedback (hadoop url, progress updates) > -* Ability to cancel jobs > -* Ability to dynamically load jars without restarting the interpreter > -* Multiuser scalability (run scalding interpreters on different servers) > diff --git a/docs/setup/basics/how_to_build.md > b/docs/setup/basics/how_to_build.md > index 56715a2fec..df5620af29 100644 > --- a/docs/setup/basics/how_to_build.md > +++ b/docs/setup/basics/how_to_build.md > @@ -82,7 +82,7 @@ You can directly start Zeppelin by running the following > command after successfu > > #### Scala profile > > -To be noticed, this scala profile affect the modules (e.g. cassandra, > scalding) that use scala except Spark interpreter (Spark interpreter use > other profiles to control its scala version, see the doc below). > +To be noticed, this scala profile affect the modules (e.g. cassandra) > that use scala except Spark interpreter (Spark interpreter use other > profiles to control its scala version, see the doc below). > > Set scala version (default 2.10). Available profiles are > > @@ -170,12 +170,6 @@ Ignite Interpreter > ./mvnw clean package -Dignite.version=1.9.0 -DskipTests > ``` > > -Scalding Interpreter > - > -```bash > -./mvnw clean package -Pscalding -DskipTests > -``` > - > ### Optional configurations > > Here are additional configurations that could be optionally tuned using > the trailing `-D` option for maven commands > diff --git a/docs/usage/interpreter/installation.md > b/docs/usage/interpreter/installation.md > index 2e26b19896..aaea7b8ebc 100644 > --- a/docs/usage/interpreter/installation.md > +++ b/docs/usage/interpreter/installation.md > @@ -61,19 +61,8 @@ Zeppelin support both Scala 2.10 and 2.11 for several > interpreters as below: > <td>org.apache.zeppelin:zeppelin-spark_2.10:0.10.0</td> > <td>org.apache.zeppelin:zeppelin-spark_2.11:0.10.0</td> > </tr> > - <tr> > - <td>scalding</td> > - <td>org.apache.zeppelin:zeppelin-scalding_2.10:0.10.0</td> > - <td>org.apache.zeppelin:zeppelin-scalding_2.11:0.10.0</td> > - </tr> > </table> > > -If you install one of these interpreters only with `--name` option, > installer will download interpreter built with Scala 2.11 by default. If > you want to specify Scala version, you will need to add `--artifact` > option. Here is the example of installing flink interpreter built with > Scala 2.10. > - > -```bash > -./bin/install-interpreter.sh --name flink --artifact > org.apache.zeppelin:zeppelin-scalding_2.10:0.10.0 > -``` > - > #### Install Spark interpreter built with Scala 2.10 > > Spark distribution package has been built with Scala 2.10 until 1.6.2. If > you have `SPARK_HOME` set pointing to Spark version earlier than 2.0.0, you > need to download Spark interpreter packaged with Scala 2.10. To do so, use > follow command: > @@ -223,11 +212,6 @@ You can also find the below community managed > interpreter list in `conf/interpre > <td>org.apache.zeppelin:zeppelin-sap:0.10.0</td> > <td>SAP support</td> > </tr> > - <tr> > - <td>scalding</td> > - <td>org.apache.zeppelin:zeppelin-scalding_2.0.10:0.10.0</td> > - <td>Scalding interpreter</td> > - </tr> > <tr> > <td>scio</td> > <td>org.apache.zeppelin:zeppelin-scio:0.10.0</td> > diff --git a/pom.xml b/pom.xml > index da5b999307..b4c1814c9a 100644 > --- a/pom.xml > +++ b/pom.xml > @@ -86,7 +86,6 @@ > <module>scio</module> > <module>neo4j</module> > <module>sap</module> > - <module>scalding</module> > <module>java</module> > <module>beam</module> > <module>hazelcastjet</module> > diff --git a/scalding/pom.xml b/scalding/pom.xml > deleted file mode 100644 > index 54beb0b9d4..0000000000 > --- a/scalding/pom.xml > +++ /dev/null > @@ -1,197 +0,0 @@ > -<?xml version="1.0" encoding="UTF-8"?> > -<!-- > - ~ Licensed to the Apache Software Foundation (ASF) under one or more > - ~ contributor license agreements. See the NOTICE file distributed with > - ~ this work for additional information regarding copyright ownership. > - ~ The ASF licenses this file to You under the Apache License, Version > 2.0 > - ~ (the "License"); you may not use this file except in compliance with > - ~ the License. You may obtain a copy of the License at > - ~ > - ~ http://www.apache.org/licenses/LICENSE-2.0 > - ~ > - ~ Unless required by applicable law or agreed to in writing, software > - ~ distributed under the License is distributed on an "AS IS" BASIS, > - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or > implied. > - ~ See the License for the specific language governing permissions and > - ~ limitations under the License. > - --> > - > -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi=" > http://www.w3.org/2001/XMLSchema-instance" > - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 > https://maven.apache.org/xsd/maven-4.0.0.xsd"> > - <modelVersion>4.0.0</modelVersion> > - > - <parent> > - <artifactId>zeppelin-interpreter-parent</artifactId> > - <groupId>org.apache.zeppelin</groupId> > - <version>0.11.0-SNAPSHOT</version> > - <relativePath>../zeppelin-interpreter-parent/pom.xml</relativePath> > - </parent> > - > - <artifactId>zeppelin-scalding_2.10</artifactId> > - <packaging>jar</packaging> > - <name>Zeppelin: Scalding interpreter</name> > - > - <properties> > - <interpreter.name>scalding</interpreter.name> > - <!--library versions--> > - <hadoop.version>${hadoop2.6.version}</hadoop.version> > - <scalding.version>0.16.1-RC1</scalding.version> > - > - <!--plugin versions--> > - <plugin.scala.version>2.15.2</plugin.scala.version> > - </properties> > - > - <repositories> > - <repository> > - <id>conjars</id> > - <name>Concurrent Maven Repo</name> > - <url>https://conjars.org/repo</url> > - </repository> > - > - <!-- the twitter repo is unreliable ( > https://github.com/twitter/hadoop-lzo/issues/148) --> > - <repository> > - <id>twitter</id> > - <name>Twitter Maven Repo</name> > - <url>https://maven.twttr.com</url> > - </repository> > - > - <!-- Temporary repo --> > - <repository> > - <id>zeppelin-dependencies</id> > - <name>bintray</name> > - <url>https://jetbrains.bintray.com/zeppelin-dependencies</url> > - </repository> > - </repositories> > - > - <dependencies> > - > - <dependency> > - <groupId>org.apache.commons</groupId> > - <artifactId>commons-exec</artifactId> > - <version>${commons.exec.version}</version> > - </dependency> > - > - <dependency> > - <groupId>com.twitter</groupId> > - <artifactId>scalding-core_${scala.binary.version}</artifactId> > - <version>${scalding.version}</version> > - </dependency> > - > - <dependency> > - <groupId>com.twitter</groupId> > - <artifactId>scalding-args_${scala.binary.version}</artifactId> > - <version>${scalding.version}</version> > - </dependency> > - > - <dependency> > - <groupId>com.twitter</groupId> > - <artifactId>scalding-date_${scala.binary.version}</artifactId> > - <version>${scalding.version}</version> > - </dependency> > - > - <dependency> > - <groupId>com.twitter</groupId> > - <artifactId>scalding-commons_${scala.binary.version}</artifactId> > - <version>${scalding.version}</version> > - </dependency> > - > - <dependency> > - <groupId>com.twitter</groupId> > - <artifactId>scalding-avro_${scala.binary.version}</artifactId> > - <version>${scalding.version}</version> > - </dependency> > - > - <dependency> > - <groupId>com.twitter</groupId> > - <artifactId>scalding-parquet_${scala.binary.version}</artifactId> > - <version>${scalding.version}</version> > - </dependency> > - > - <dependency> > - <groupId>com.twitter</groupId> > - <artifactId>scalding-repl_${scala.binary.version}</artifactId> > - <version>${scalding.version}</version> > - </dependency> > - > - <dependency> > - <groupId>org.scala-lang</groupId> > - <artifactId>scala-library</artifactId> > - <version>${scala.version}</version> > - </dependency> > - > - <dependency> > - <groupId>org.scala-lang</groupId> > - <artifactId>scala-compiler</artifactId> > - <version>${scala.version}</version> > - </dependency> > - > - <dependency> > - <groupId>org.scala-lang</groupId> > - <artifactId>scala-reflect</artifactId> > - <version>${scala.version}</version> > - </dependency> > - > - <dependency> > - <groupId>org.apache.hadoop</groupId> > - <artifactId>hadoop-client</artifactId> > - <version>${hadoop.version}</version> > - </dependency> > - > - <dependency> > - <groupId>org.apache.hadoop</groupId> > - <artifactId>hadoop-common</artifactId> > - <version>${hadoop.version}</version> > - </dependency> > - > - </dependencies> > - > - <build> > - <plugins> > - <plugin> > - <artifactId>maven-enforcer-plugin</artifactId> > - </plugin> > - <plugin> > - <artifactId>maven-resources-plugin</artifactId> > - </plugin> > - <plugin> > - <artifactId>maven-shade-plugin</artifactId> > - </plugin> > - <!-- Plugin to compile Scala code --> > - <plugin> > - <groupId>org.scala-tools</groupId> > - <artifactId>maven-scala-plugin</artifactId> > - <executions> > - <execution> > - <id>compile</id> > - <goals> > - <goal>compile</goal> > - </goals> > - <phase>compile</phase> > - </execution> > - <execution> > - <id>test-compile</id> > - <goals> > - <goal>testCompile</goal> > - </goals> > - <phase>test-compile</phase> > - </execution> > - <execution> > - <phase>process-resources</phase> > - <goals> > - <goal>compile</goal> > - </goals> > - </execution> > - </executions> > - </plugin> > - > - <plugin> > - <groupId>org.apache.maven.plugins</groupId> > - <artifactId>maven-checkstyle-plugin</artifactId> > - <configuration> > - <skip>false</skip> > - </configuration> > - </plugin> > - </plugins> > - </build> > - > -</project> > diff --git > a/scalding/src/main/java/org/apache/zeppelin/scalding/ScaldingInterpreter.java > b/scalding/src/main/java/org/apache/zeppelin/scalding/ScaldingInterpreter.java > deleted file mode 100644 > index f104a587bb..0000000000 > --- > a/scalding/src/main/java/org/apache/zeppelin/scalding/ScaldingInterpreter.java > +++ /dev/null > @@ -1,280 +0,0 @@ > -/* > - * Licensed to the Apache Software Foundation (ASF) under one or more > - * contributor license agreements. See the NOTICE file distributed with > - * this work for additional information regarding copyright ownership. > - * The ASF licenses this file to You under the Apache License, Version 2.0 > - * (the "License"); you may not use this file except in compliance with > - * the License. You may obtain a copy of the License at > - * > - * http://www.apache.org/licenses/LICENSE-2.0 > - * > - * Unless required by applicable law or agreed to in writing, software > - * distributed under the License is distributed on an "AS IS" BASIS, > - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or > implied. > - * See the License for the specific language governing permissions and > - * limitations under the License. > - */ > - > -package org.apache.zeppelin.scalding; > - > -import org.apache.hadoop.security.UserGroupInformation; > -import org.slf4j.Logger; > -import org.slf4j.LoggerFactory; > - > -import java.io.ByteArrayOutputStream; > -import java.io.IOException; > -import java.io.PrintStream; > -import java.io.PrintWriter; > -import java.security.PrivilegedExceptionAction; > -import java.util.ArrayList; > -import java.util.Arrays; > -import java.util.Collections; > -import java.util.List; > -import java.util.Properties; > - > -import com.twitter.scalding.ScaldingILoop; > - > -import scala.Console; > - > -import org.apache.zeppelin.interpreter.Interpreter; > -import org.apache.zeppelin.interpreter.InterpreterContext; > -import org.apache.zeppelin.interpreter.InterpreterResult; > -import org.apache.zeppelin.interpreter.InterpreterResult.Code; > -import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; > -import org.apache.zeppelin.scheduler.Scheduler; > -import org.apache.zeppelin.scheduler.SchedulerFactory; > - > -/** > - * Scalding interpreter for Zeppelin. Based off the Spark interpreter > code. > - * > - */ > -public class ScaldingInterpreter extends Interpreter { > - public static final Logger LOGGER = > LoggerFactory.getLogger(ScaldingInterpreter.class); > - > - static final String ARGS_STRING = "args.string"; > - static final String ARGS_STRING_DEFAULT = "--local --repl"; > - static final String MAX_OPEN_INSTANCES = "max.open.instances"; > - static final String MAX_OPEN_INSTANCES_DEFAULT = "50"; > - > - public static final List NO_COMPLETION = > Collections.unmodifiableList(new ArrayList<>()); > - > - static int numOpenInstances = 0; > - private ScaldingILoop interpreter; > - private ByteArrayOutputStream out; > - > - public ScaldingInterpreter(Properties property) { > - super(property); > - out = new ByteArrayOutputStream(); > - } > - > - @Override > - public void open() { > - numOpenInstances = numOpenInstances + 1; > - String maxOpenInstancesStr = getProperty(MAX_OPEN_INSTANCES, > - MAX_OPEN_INSTANCES_DEFAULT); > - int maxOpenInstances = 50; > - try { > - maxOpenInstances = Integer.valueOf(maxOpenInstancesStr); > - } catch (Exception e) { > - LOGGER.error("Error reading max.open.instances", e); > - } > - LOGGER.info("max.open.instances = {}", maxOpenInstances); > - if (numOpenInstances > maxOpenInstances) { > - LOGGER.error("Reached maximum number of open instances"); > - return; > - } > - LOGGER.info("Opening instance {}", numOpenInstances); > - LOGGER.info("property: {}", getProperties()); > - String argsString = getProperty(ARGS_STRING, ARGS_STRING_DEFAULT); > - String[] args; > - if (argsString == null) { > - args = new String[0]; > - } else { > - args = argsString.split(" "); > - } > - LOGGER.info("{}", Arrays.toString(args)); > - > - PrintWriter printWriter = new PrintWriter(out, true); > - interpreter = ZeppelinScaldingShell.getRepl(args, printWriter); > - interpreter.createInterpreter(); > - } > - > - @Override > - public void close() { > - interpreter.intp().close(); > - } > - > - > - @Override > - public InterpreterResult interpret(String cmd, InterpreterContext > contextInterpreter) { > - String user = contextInterpreter.getAuthenticationInfo().getUser(); > - LOGGER.info("Running Scalding command: user: {} cmd: '{}'", user, > cmd); > - > - if (interpreter == null) { > - LOGGER.error( > - "interpreter == null, open may not have been called because > max.open.instances reached"); > - return new InterpreterResult(Code.ERROR, > - "interpreter == null\n" + > - "open may not have been called because max.open.instances reached" > - ); > - } > - if (cmd == null || cmd.trim().length() == 0) { > - return new InterpreterResult(Code.SUCCESS); > - } > - InterpreterResult interpreterResult = new > InterpreterResult(Code.ERROR); > - if (getProperty(ARGS_STRING).contains("hdfs")) { > - UserGroupInformation ugi = null; > - try { > - ugi = UserGroupInformation.createProxyUser(user, > UserGroupInformation.getLoginUser()); > - } catch (IOException e) { > - LOGGER.error("Error creating UserGroupInformation", e); > - return new InterpreterResult(Code.ERROR, e.getMessage()); > - } > - try { > - // Make variables final to avoid "local variable is accessed from > within inner class; > - // needs to be declared final" exception in JDK7 > - final String cmd1 = cmd; > - final InterpreterContext contextInterpreter1 = contextInterpreter; > - PrivilegedExceptionAction<InterpreterResult> action = > - new PrivilegedExceptionAction<InterpreterResult>() { > - @Override > - public InterpreterResult run() throws Exception { > - return interpret(cmd1.split("\n"), contextInterpreter1); > - } > - }; > - interpreterResult = ugi.doAs(action); > - } catch (Exception e) { > - LOGGER.error("Error running command with ugi.doAs", e); > - return new InterpreterResult(Code.ERROR, e.getMessage()); > - } > - } else { > - interpreterResult = interpret(cmd.split("\n"), contextInterpreter); > - } > - return interpreterResult; > - } > - > - public InterpreterResult interpret(String[] lines, InterpreterContext > context) { > - synchronized (this) { > - InterpreterResult r = interpretInput(lines); > - return r; > - } > - } > - > - public InterpreterResult interpretInput(String[] lines) { > - > - // add print("") to make sure not finishing with comment > - // see https://github.com/NFLabs/zeppelin/issues/151 > - String[] linesToRun = new String[lines.length + 1]; > - for (int i = 0; i < lines.length; i++) { > - linesToRun[i] = lines[i]; > - } > - linesToRun[lines.length] = "print(\"\")"; > - > - out.reset(); > - > - // Moving two lines below from open() to this function. > - // If they are in open output is incomplete. > - PrintStream printStream = new PrintStream(out, true); > - Console.setOut(printStream); > - > - Code r = null; > - String incomplete = ""; > - boolean inComment = false; > - > - for (int l = 0; l < linesToRun.length; l++) { > - String s = linesToRun[l]; > - // check if next line starts with "." (but not ".." or "./") it is > treated as an invocation > - if (l + 1 < linesToRun.length) { > - String nextLine = linesToRun[l + 1].trim(); > - boolean continuation = false; > - if (nextLine.isEmpty() > - || nextLine.startsWith("//") // skip empty line > or comment > - || nextLine.startsWith("}") > - || nextLine.startsWith("object")) { // include "} object" > for Scala companion object > - continuation = true; > - } else if (!inComment && nextLine.startsWith("/*")) { > - inComment = true; > - continuation = true; > - } else if (inComment && nextLine.lastIndexOf("*/") >= 0) { > - inComment = false; > - continuation = true; > - } else if (nextLine.length() > 1 > - && nextLine.charAt(0) == '.' > - && nextLine.charAt(1) != '.' // ".." > - && nextLine.charAt(1) != '/') { // "./" > - continuation = true; > - } else if (inComment) { > - continuation = true; > - } > - if (continuation) { > - incomplete += s + "\n"; > - continue; > - } > - } > - > - scala.tools.nsc.interpreter.Results.Result res = null; > - try { > - res = interpreter.intp().interpret(incomplete + s); > - } catch (Exception e) { > - LOGGER.error("Interpreter exception: ", e); > - return new InterpreterResult(Code.ERROR, e.getMessage()); > - } > - > - r = getResultCode(res); > - > - if (r == Code.ERROR) { > - Console.flush(); > - return new InterpreterResult(r, out.toString()); > - } else if (r == Code.INCOMPLETE) { > - incomplete += s + "\n"; > - } else { > - incomplete = ""; > - } > - } > - if (r == Code.INCOMPLETE) { > - return new InterpreterResult(r, "Incomplete expression"); > - } else { > - Console.flush(); > - return new InterpreterResult(r, out.toString()); > - } > - } > - > - private Code getResultCode(scala.tools.nsc.interpreter.Results.Result > r) { > - if (r instanceof scala.tools.nsc.interpreter.Results.Success$) { > - return Code.SUCCESS; > - } else if (r instanceof > scala.tools.nsc.interpreter.Results.Incomplete$) { > - return Code.INCOMPLETE; > - } else { > - return Code.ERROR; > - } > - } > - > - @Override > - public void cancel(InterpreterContext context) { > - // not implemented > - } > - > - @Override > - public FormType getFormType() { > - return FormType.NATIVE; > - } > - > - @Override > - public int getProgress(InterpreterContext context) { > - // fine-grained progress not implemented - return 0 > - return 0; > - } > - > - @Override > - public Scheduler getScheduler() { > - return SchedulerFactory.singleton().createOrGetFIFOScheduler( > - ScaldingInterpreter.class.getName() + this.hashCode()); > - } > - > - @Override > - public List<InterpreterCompletion> completion(String buf, int cursor, > - InterpreterContext interpreterContext) { > - return NO_COMPLETION; > - } > - > -} > diff --git a/scalding/src/main/resources/interpreter-setting.json > b/scalding/src/main/resources/interpreter-setting.json > deleted file mode 100644 > index ca6cd9295a..0000000000 > --- a/scalding/src/main/resources/interpreter-setting.json > +++ /dev/null > @@ -1,21 +0,0 @@ > -[ > - { > - "group": "scalding", > - "name": "scalding", > - "className": "org.apache.zeppelin.scalding.ScaldingInterpreter", > - "properties": { > - "args.string": { > - "envName": null, > - "defaultValue": "--local --repl", > - "description": "Arguments for scalding REPL", > - "type": "textarea" > - }, > - "max.open.instances": { > - "envName": null, > - "defaultValue": "50", > - "description": "Maximum number of open interpreter instances", > - "type": "number" > - } > - } > - } > -] > diff --git > a/scalding/src/main/scala/org/apache/zeppelin/scalding/ZeppelinReplState.scala > b/scalding/src/main/scala/org/apache/zeppelin/scalding/ZeppelinReplState.scala > deleted file mode 100644 > index b847eba001..0000000000 > --- > a/scalding/src/main/scala/org/apache/zeppelin/scalding/ZeppelinReplState.scala > +++ /dev/null > @@ -1,48 +0,0 @@ > -/* > - * Licensed to the Apache Software Foundation (ASF) under one or more > - * contributor license agreements. See the NOTICE file distributed with > - * this work for additional information regarding copyright ownership. > - * The ASF licenses this file to You under the Apache License, Version 2.0 > - * (the "License"); you may not use this file except in compliance with > - * the License. You may obtain a copy of the License at > - * > - * http://www.apache.org/licenses/LICENSE-2.0 > - * > - * Unless required by applicable law or agreed to in writing, software > - * distributed under the License is distributed on an "AS IS" BASIS, > - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or > implied. > - * See the License for the specific language governing permissions and > - * limitations under the License. > - */ > - > -package org.apache.zeppelin.scalding > - > -/** > - * Stores REPL state > - */ > - > -import cascading.flow.FlowDef > -import com.twitter.scalding.BaseReplState > -import scala.concurrent.{ ExecutionContext => ConcurrentExecutionContext } > -import scala.concurrent.Future > -import scala.util.{Failure, Success} > - > -object ZeppelinReplState extends BaseReplState { > - override def shell = ZeppelinScaldingShell > -} > - > -/** > - * Implicit FlowDef and Mode, import in the REPL to have the global > context implicitly > - * used everywhere. > - */ > -object ZeppelinReplImplicitContext { > - /** Implicit execution context for using the Execution monad */ > - implicit val executionContext = ConcurrentExecutionContext.global > - /** Implicit repl state used for ShellPipes */ > - implicit def stateImpl = ZeppelinReplState > - /** Implicit flowDef for this Scalding shell session. */ > - implicit def flowDefImpl = ZeppelinReplState.flowDef > - /** Defaults to running in local mode if no mode is specified. */ > - implicit def modeImpl = ZeppelinReplState.mode > - implicit def configImpl = ZeppelinReplState.config > -} > diff --git > a/scalding/src/main/scala/org/apache/zeppelin/scalding/ZeppelinScaldingLoop.scala > b/scalding/src/main/scala/org/apache/zeppelin/scalding/ZeppelinScaldingLoop.scala > deleted file mode 100644 > index 9be0199869..0000000000 > --- > a/scalding/src/main/scala/org/apache/zeppelin/scalding/ZeppelinScaldingLoop.scala > +++ /dev/null > @@ -1,46 +0,0 @@ > -/* > - * Licensed to the Apache Software Foundation (ASF) under one or more > - * contributor license agreements. See the NOTICE file distributed with > - * this work for additional information regarding copyright ownership. > - * The ASF licenses this file to You under the Apache License, Version 2.0 > - * (the "License"); you may not use this file except in compliance with > - * the License. You may obtain a copy of the License at > - * > - * http://www.apache.org/licenses/LICENSE-2.0 > - * > - * Unless required by applicable law or agreed to in writing, software > - * distributed under the License is distributed on an "AS IS" BASIS, > - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or > implied. > - * See the License for the specific language governing permissions and > - * limitations under the License. > - */ > - > -package org.apache.zeppelin.scalding > - > -import java.io.BufferedReader > -import com.twitter.scalding.ScaldingILoop > - > -import scala.tools.nsc.interpreter._ > - > -/** > - * TBD > - */ > -class ZeppelinScaldingILoop(in: Option[BufferedReader], out: JPrintWriter) > - extends ScaldingILoop(in, out) { > - > - override protected def imports = List( > - "com.twitter.scalding.{ ScaldingILoop => ScaldingScaldingILoop, > ScaldingShell => ScaldingScaldingShell, _ }", > - // ReplImplicits minus fields API parts (esp FieldConversions) > - """com.twitter.scalding.ReplImplicits.{ > - iterableToSource, > - keyedListLikeToShellTypedPipe, > - typedPipeToShellTypedPipe, > - valuePipeToShellValuePipe > - }""", > - "com.twitter.scalding.ReplImplicits", > - "org.apache.zeppelin.scalding.ZeppelinReplImplicitContext._", > - "org.apache.zeppelin.scalding.ZeppelinReplState", > - "org.apache.zeppelin.scalding.ZeppelinReplState._" > - ) > - > -} > diff --git > a/scalding/src/main/scala/org/apache/zeppelin/scalding/ZeppelinScaldingShell.scala > b/scalding/src/main/scala/org/apache/zeppelin/scalding/ZeppelinScaldingShell.scala > deleted file mode 100644 > index 29e5f835cb..0000000000 > --- > a/scalding/src/main/scala/org/apache/zeppelin/scalding/ZeppelinScaldingShell.scala > +++ /dev/null > @@ -1,72 +0,0 @@ > -/* > - * Licensed to the Apache Software Foundation (ASF) under one or more > - * contributor license agreements. See the NOTICE file distributed with > - * this work for additional information regarding copyright ownership. > - * The ASF licenses this file to You under the Apache License, Version 2.0 > - * (the "License"); you may not use this file except in compliance with > - * the License. You may obtain a copy of the License at > - * > - * http://www.apache.org/licenses/LICENSE-2.0 > - * > - * Unless required by applicable law or agreed to in writing, software > - * distributed under the License is distributed on an "AS IS" BASIS, > - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or > implied. > - * See the License for the specific language governing permissions and > - * limitations under the License. > - */ > - > -package org.apache.zeppelin.scalding > - > -import com.twitter.scalding._ > -import com.twitter.scalding.typed.TypedPipe > -import scala.tools.nsc.{GenericRunnerCommand} > -import scala.tools.nsc.interpreter._ > - > -/** > - * TBD > - */ > -object ZeppelinScaldingShell extends BaseScaldingShell { > - > - override def replState = ZeppelinReplState > - > - def getRepl(args: Array[String], out: JPrintWriter): ScaldingILoop = { > - > - val argsExpanded = ExpandLibJarsGlobs(args) > - val ShellArgs(cfg, mode, cmdArgs) = parseModeArgs(argsExpanded) > - > - // Process command line arguments into a settings object, and use > that to start the REPL. > - // We ignore params we don't care about - hence error function is > empty > - val command = new GenericRunnerCommand(cmdArgs, _ => ()) > - > - // inherit defaults for embedded interpretter (needed for running > with SBT) > - // (TypedPipe chosen arbitrarily, just needs to be something > representative) > - command.settings.embeddedDefaults[TypedPipe[String]] > - > - // if running from the assembly, need to explicitly tell it to use > java classpath > - if (args.contains("--repl")) command.settings.usejavacp.value = true > - > - > command.settings.classpath.append(System.getProperty("java.class.path")) > - > - // Force the repl to be synchronous, so all cmds are executed in the > same thread > - command.settings.Yreplsync.value = true > - > - val repl = new ZeppelinScaldingILoop(None, out) > - scaldingREPL = Some(repl) > - replState.mode = mode > - replState.customConfig = replState.customConfig ++ (mode match { > - case _: HadoopMode => cfg > - case _ => Config.empty > - }) > - > - // if in Hdfs mode, store the mode to enable switching between Local > and Hdfs > - mode match { > - case m @ Hdfs(_, _) => replState.storedHdfsMode = Some(m) > - case _ => () > - } > - > - repl.settings = command.settings > - return repl; > - > - } > - > -} > diff --git > a/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java > b/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java > deleted file mode 100644 > index 992c15594f..0000000000 > --- > a/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java > +++ /dev/null > @@ -1,144 +0,0 @@ > -/* > - * Licensed to the Apache Software Foundation (ASF) under one or more > - * contributor license agreements. See the NOTICE file distributed with > - * this work for additional information regarding copyright ownership. > - * The ASF licenses this file to You under the Apache License, Version 2.0 > - * (the "License"); you may not use this file except in compliance with > - * the License. You may obtain a copy of the License at > - * > - * http://www.apache.org/licenses/LICENSE-2.0 > - * > - * Unless required by applicable law or agreed to in writing, software > - * distributed under the License is distributed on an "AS IS" BASIS, > - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or > implied. > - * See the License for the specific language governing permissions and > - * limitations under the License. > - */ > - > -package org.apache.zeppelin.scalding; > - > -import org.apache.zeppelin.interpreter.InterpreterContext; > -import org.apache.zeppelin.interpreter.InterpreterResult; > -import org.apache.zeppelin.interpreter.InterpreterResult.Code; > -import org.apache.zeppelin.user.AuthenticationInfo; > -import org.junit.After; > -import org.junit.Before; > -import org.junit.FixMethodOrder; > -import org.junit.Test; > -import org.junit.runners.MethodSorters; > - > -import java.io.File; > -import java.util.Properties; > - > -import static org.junit.Assert.assertEquals; > -import static org.junit.Assert.assertTrue; > - > -/** > - * Tests for the Scalding interpreter for Zeppelin. > - * > - */ > -@FixMethodOrder(MethodSorters.NAME_ASCENDING) > -public class ScaldingInterpreterTest { > - public static ScaldingInterpreter repl; > - private InterpreterContext context; > - private File tmpDir; > - > - @Before > - public void setUp() throws Exception { > - tmpDir = new File(System.getProperty("java.io.tmpdir") + > "/ZeppelinLTest_" + > - System.currentTimeMillis()); > - System.setProperty("zeppelin.dep.localrepo", tmpDir.getAbsolutePath() > + "/local-repo"); > - > - tmpDir.mkdirs(); > - > - if (repl == null) { > - Properties p = new Properties(); > - p.setProperty(ScaldingInterpreter.ARGS_STRING, "--local --repl"); > - > - repl = new ScaldingInterpreter(p); > - repl.open(); > - } > - > - context = InterpreterContext.builder() > - .setNoteId("noteId") > - .setParagraphId("paragraphId") > - .setAuthenticationInfo(new AuthenticationInfo()) > - .build(); > - } > - > - @After > - public void tearDown() throws Exception { > - delete(tmpDir); > - repl.close(); > - } > - > - private void delete(File file) { > - if (file.isFile()) { > - file.delete(); > - } else if (file.isDirectory()) { > - File[] files = file.listFiles(); > - if (files != null && files.length > 0) { > - for (File f : files) { > - delete(f); > - } > - } > - file.delete(); > - } > - } > - > - @Test > - public void testNextLineComments() { > - assertEquals(InterpreterResult.Code.SUCCESS, > - repl.interpret("\"123\"\n/*comment here\n*/.toInt", > context).code()); > - } > - > - @Test > - public void testNextLineCompanionObject() { > - String code = "class Counter {\nvar value: Long = 0\n}\n // > comment\n\n object Counter " + > - "{\n def apply(x: Long) = new Counter()\n}"; > - assertEquals(InterpreterResult.Code.SUCCESS, repl.interpret(code, > context).code()); > - } > - > - @Test > - public void testBasicIntp() { > - assertEquals(InterpreterResult.Code.SUCCESS, > - repl.interpret("val a = 1\nval b = 2", context).code()); > - > - // when interpret incomplete expression > - InterpreterResult incomplete = repl.interpret("val a = \"\"\"", > context); > - assertEquals(InterpreterResult.Code.INCOMPLETE, incomplete.code()); > - assertTrue(incomplete.message().get(0).getData().length() > 0); // > expecting some error > - // message > - } > - > - @Test > - public void testBasicScalding() { > - assertEquals(InterpreterResult.Code.SUCCESS, > - repl.interpret("case class Sale(state: String, name: String, > sale: Int)\n" + > - "val salesList = List(Sale(\"CA\", \"A\", 60), Sale(\"CA\", > \"A\", 20), " + > - "Sale(\"VA\", \"B\", 15))\n" + > - "val salesPipe = TypedPipe.from(salesList)\n" + > - "val results = salesPipe.map{x => (1, Set(x.state), > x.sale)}.\n" + > - " groupAll.sum.values.map{ case(count, set, sum) => (count, > set.size, sum) }\n" + > - "results.dump", > - context).code()); > - } > - > - @Test > - public void testNextLineInvocation() { > - assertEquals(InterpreterResult.Code.SUCCESS, > repl.interpret("\"123\"\n.toInt", context).code()); > - } > - > - @Test > - public void testEndWithComment() { > - assertEquals(InterpreterResult.Code.SUCCESS, repl.interpret("val > c=1\n//comment", > - context).code()); > - } > - > - @Test > - public void testReferencingUndefinedVal() { > - InterpreterResult result = repl.interpret("def category(min: Int) = {" > - + " if (0 <= value) \"error\"" + "}", context); > - assertEquals(Code.ERROR, result.code()); > - } > -} > >