Repository: incubator-zeppelin Updated Branches: refs/heads/master ec7a59c09 -> f0301cdfa
ZEPPELIN-44 Interpreter for Apache Flink #### Interpreter for [Apache Flink](http://flink.apache.org/). Flink people helped a lot to write the interpreter. Thanks so much! Some codes are copied from Flink's development branch. Once Flink releases 0.9, copied code and snapshot repository configuration will be removed. #### Build if there're no options, by default it is building against flink 0.9.0-milestone-1. With combination of Zeppelin, it is good idea to use 0.9-SNAPSHOT, because of it support .collect() that helps really a lot to get results data and display it on Zeppelin. So, you might want to build in this way, ``` mvn package -Dflink.version=0.9-SNAPSHOT -DskipTests ``` #### Screenshot  Author: Lee moon soo <[email protected]> Closes #75 from Leemoonsoo/flink and squashes the following commits: f08bd25 [Lee moon soo] Update pom.xml after https://github.com/apache/incubator-zeppelin/pull/88 460cf46 [Lee moon soo] jarr up -> jar up 501efb3 [Lee moon soo] Add scalastyle e69e5ba [Lee moon soo] Add license 7be1f90 [Lee moon soo] Add apache snapshot repo ebbd0da [Lee moon soo] Fix unittest and update comment 27fc306 [Lee moon soo] Cleaning up f2a66df [Lee moon soo] Initial implementation of interpreter for Apache Flink Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/f0301cdf Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/f0301cdf Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/f0301cdf Branch: refs/heads/master Commit: f0301cdfa4ba498f1a4725c7eee41823fad7fcd6 Parents: ec7a59c Author: Lee moon soo <[email protected]> Authored: Mon Jun 8 00:44:38 2015 -0700 Committer: Lee moon soo <[email protected]> Committed: Tue Jun 9 11:14:42 2015 -0700 ---------------------------------------------------------------------- _tools/scalastyle.xml | 146 +++++++ conf/zeppelin-site.xml.template | 2 +- flink/pom.xml | 386 +++++++++++++++++++ .../apache/zeppelin/flink/FlinkEnvironment.java | 83 ++++ .../org/apache/zeppelin/flink/FlinkIMain.java | 92 +++++ .../apache/zeppelin/flink/FlinkInterpreter.java | 312 +++++++++++++++ .../org/apache/zeppelin/flink/JarHelper.java | 219 +++++++++++ .../zeppelin/flink/FlinkInterpreterTest.java | 66 ++++ pom.xml | 1 + .../zeppelin/conf/ZeppelinConfiguration.java | 3 +- 10 files changed, 1308 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/f0301cdf/_tools/scalastyle.xml ---------------------------------------------------------------------- diff --git a/_tools/scalastyle.xml b/_tools/scalastyle.xml new file mode 100644 index 0000000..f7bb0d4 --- /dev/null +++ b/_tools/scalastyle.xml @@ -0,0 +1,146 @@ +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> +<!-- NOTE: This was taken and adapted from Apache Spark. --> + +<!-- If you wish to turn off checking for a section of code, you can put a comment in the source + before and after the section, with the following syntax: --> +<!-- // scalastyle:off --> +<!-- ... --> +<!-- // naughty stuff --> +<!-- ... --> +<!-- // scalastyle:on --> + +<scalastyle> + <name>Scalastyle standard configuration</name> + <check level="error" class="org.scalastyle.file.FileTabChecker" enabled="true"></check> + <!-- <check level="error" class="org.scalastyle.file.FileLengthChecker" enabled="true"> --> + <!-- <parameters> --> + <!-- <parameter name="maxFileLength"><![CDATA[800]]></parameter> --> + <!-- </parameters> --> + <!-- </check> --> + <check level="error" class="org.scalastyle.file.HeaderMatchesChecker" enabled="true"> + <parameters> + <parameter name="header"><![CDATA[/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */]]></parameter> + </parameters> + </check> + <check level="error" class="org.scalastyle.scalariform.SpacesAfterPlusChecker" enabled="true"></check> + <check level="error" class="org.scalastyle.file.WhitespaceEndOfLineChecker" enabled="false"></check> + <check level="error" class="org.scalastyle.scalariform.SpacesBeforePlusChecker" enabled="true"></check> + <check level="error" class="org.scalastyle.file.FileLineLengthChecker" enabled="true"> + <parameters> + <parameter name="maxLineLength"><![CDATA[100]]></parameter> + <parameter name="tabSize"><![CDATA[2]]></parameter> + <parameter name="ignoreImports">true</parameter> + </parameters> + </check> + <check level="error" class="org.scalastyle.scalariform.ClassNamesChecker" enabled="true"> + <parameters> + <parameter name="regex"><![CDATA[[A-Z][A-Za-z]*]]></parameter> + </parameters> + </check> + <check level="error" class="org.scalastyle.scalariform.ObjectNamesChecker" enabled="true"> + <parameters> + <parameter name="regex"><![CDATA[[A-Z][A-Za-z]*]]></parameter> + </parameters> + </check> + <check level="error" class="org.scalastyle.scalariform.PackageObjectNamesChecker" enabled="true"> + <parameters> + <parameter name="regex"><![CDATA[^[a-z][A-Za-z]*$]]></parameter> + </parameters> + </check> + <check level="error" class="org.scalastyle.scalariform.EqualsHashCodeChecker" enabled="false"></check> + <!-- <check level="error" class="org.scalastyle.scalariform.IllegalImportsChecker" enabled="true"> --> + <!-- <parameters> --> + <!-- <parameter name="illegalImports"><![CDATA[sun._,java.awt._]]></parameter> --> + <!-- </parameters> --> + <!-- </check> --> + <check level="error" class="org.scalastyle.scalariform.ParameterNumberChecker" enabled="true"> + <parameters> + <parameter name="maxParameters"><![CDATA[10]]></parameter> + </parameters> + </check> + <!-- <check level="error" class="org.scalastyle.scalariform.MagicNumberChecker" enabled="true"> --> + <!-- <parameters> --> + <!-- <parameter name="ignore"><![CDATA[-1,0,1,2,3]]></parameter> --> + <!-- </parameters> --> + <!-- </check> --> + <check level="error" class="org.scalastyle.scalariform.NoWhitespaceBeforeLeftBracketChecker" enabled="false"></check> + <check level="error" class="org.scalastyle.scalariform.NoWhitespaceAfterLeftBracketChecker" enabled="false"></check> + <!-- <check level="error" class="org.scalastyle.scalariform.ReturnChecker" enabled="true"></check> --> + <!-- <check level="error" class="org.scalastyle.scalariform.NullChecker" enabled="true"></check> --> + <!-- <check level="error" class="org.scalastyle.scalariform.NoCloneChecker" enabled="true"></check> --> + <!-- <check level="error" class="org.scalastyle.scalariform.NoFinalizeChecker" enabled="true"></check> --> + <!-- <check level="error" class="org.scalastyle.scalariform.CovariantEqualsChecker" enabled="true"></check> --> + <!-- <check level="error" class="org.scalastyle.scalariform.StructuralTypeChecker" enabled="true"></check> --> + <!-- <check level="error" class="org.scalastyle.file.RegexChecker" enabled="true"> --> + <!-- <parameters> --> + <!-- <parameter name="regex"><![CDATA[println]]></parameter> --> + <!-- </parameters> --> + <!-- </check> --> + <!-- <check level="error" class="org.scalastyle.scalariform.NumberOfTypesChecker" enabled="true"> --> + <!-- <parameters> --> + <!-- <parameter name="maxTypes"><![CDATA[30]]></parameter> --> + <!-- </parameters> --> + <!-- </check> --> + <!-- <check level="error" class="org.scalastyle.scalariform.CyclomaticComplexityChecker" enabled="true"> --> + <!-- <parameters> --> + <!-- <parameter name="maximum"><![CDATA[10]]></parameter> --> + <!-- </parameters> --> + <!-- </check> --> + <check level="error" class="org.scalastyle.scalariform.UppercaseLChecker" enabled="true"></check> + <check level="error" class="org.scalastyle.scalariform.SimplifyBooleanExpressionChecker" enabled="false"></check> + <check level="error" class="org.scalastyle.scalariform.IfBraceChecker" enabled="true"> + <parameters> + <parameter name="singleLineAllowed"><![CDATA[true]]></parameter> + <parameter name="doubleLineAllowed"><![CDATA[true]]></parameter> + </parameters> + </check> + <!-- <check level="error" class="org.scalastyle.scalariform.MethodLengthChecker" enabled="true"> --> + <!-- <parameters> --> + <!-- <parameter name="maxLength"><![CDATA[50]]></parameter> --> + <!-- </parameters> --> + <!-- </check> --> + <!-- <check level="error" class="org.scalastyle.scalariform.MethodNamesChecker" enabled="true"> --> + <!-- <parameters> --> + <!-- <parameter name="regex"><![CDATA[^[a-z][A-Za-z0-9]*$]]></parameter> --> + <!-- </parameters> --> + <!-- </check> --> + <!-- <check level="error" class="org.scalastyle.scalariform.NumberOfMethodsInTypeChecker" enabled="true"> --> + <!-- <parameters> --> + <!-- <parameter name="maxMethods"><![CDATA[30]]></parameter> --> + <!-- </parameters> --> + <!-- </check> --> + <!-- <check level="error" class="org.scalastyle.scalariform.PublicMethodsHaveTypeChecker" enabled="true"></check> --> + <check level="error" class="org.scalastyle.file.NewLineAtEofChecker" enabled="true"></check> + <check level="error" class="org.scalastyle.file.NoNewLineAtEofChecker" enabled="false"></check> +</scalastyle> http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/f0301cdf/conf/zeppelin-site.xml.template ---------------------------------------------------------------------- diff --git a/conf/zeppelin-site.xml.template b/conf/zeppelin-site.xml.template index 9f773d5..e10c85e 100644 --- a/conf/zeppelin-site.xml.template +++ b/conf/zeppelin-site.xml.template @@ -66,7 +66,7 @@ <property> <name>zeppelin.interpreters</name> - <value>org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter,org.apache.zeppelin.tajo.TajoInterpreter</value> + <value>org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter,org.apache.zeppelin.tajo.TajoInterpreter,org.apache.zeppelin.flink.FlinkInterpreter</value> <description>Comma separated interpreter configurations. First interpreter become a default</description> </property> http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/f0301cdf/flink/pom.xml ---------------------------------------------------------------------- diff --git a/flink/pom.xml b/flink/pom.xml new file mode 100644 index 0000000..68aa62d --- /dev/null +++ b/flink/pom.xml @@ -0,0 +1,386 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <artifactId>zeppelin</artifactId> + <groupId>org.apache.zeppelin</groupId> + <version>0.5.0-incubating-SNAPSHOT</version> + </parent> + + <groupId>org.apache.zeppelin</groupId> + <artifactId>zeppelin-flink</artifactId> + <packaging>jar</packaging> + <version>0.5.0-incubating-SNAPSHOT</version> + <name>Zeppelin: Flink</name> + <description>Zeppelin flink support</description> + <url>http://zeppelin.incubator.apache.org</url> + + <properties> + <flink.version>0.9.0-milestone-1</flink.version> + <flink.akka.version>2.3.7</flink.akka.version> + <flink.scala.binary.version>2.10</flink.scala.binary.version> + <flink.scala.version>2.10.4</flink.scala.version> + <scala.macros.version>2.0.1</scala.macros.version> + </properties> + + <repositories> + <!-- for flink 0.9-SNAPSHOT. After 0.9 released, it can be removed --> + <repository> + <id>apache.snapshots</id> + <name>Apache Development Snapshot Repository</name> + <url>https://repository.apache.org/content/repositories/snapshots/</url> + <releases> + <enabled>false</enabled> + </releases> + <snapshots> + <enabled>true</enabled> + </snapshots> + </repository> + </repositories> + + <dependencies> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </dependency> + + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>zeppelin-interpreter</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>com.google.code.gson</groupId> + <artifactId>gson</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-core</artifactId> + <version>${flink.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-clients</artifactId> + <version>${flink.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-runtime</artifactId> + <version>${flink.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-scala</artifactId> + <version>${flink.version}</version> + </dependency> + + <dependency> + <groupId>com.typesafe.akka</groupId> + <artifactId>akka-actor_${flink.scala.binary.version}</artifactId> + <version>${flink.akka.version}</version> + </dependency> + + <dependency> + <groupId>com.typesafe.akka</groupId> + <artifactId>akka-remote_${flink.scala.binary.version}</artifactId> + <version>${flink.akka.version}</version> + </dependency> + + <dependency> + <groupId>com.typesafe.akka</groupId> + <artifactId>akka-slf4j_${flink.scala.binary.version}</artifactId> + <version>${flink.akka.version}</version> + </dependency> + + <dependency> + <groupId>com.typesafe.akka</groupId> + <artifactId>akka-testkit_${flink.scala.binary.version}</artifactId> + <version>${flink.akka.version}</version> + </dependency> + + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scala-library</artifactId> + <version>${flink.scala.version}</version> + </dependency> + + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scala-compiler</artifactId> + <version>${flink.scala.version}</version> + </dependency> + + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scala-reflect</artifactId> + <version>${flink.scala.version}</version> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.rat</groupId> + <artifactId>apache-rat-plugin</artifactId> + <configuration> + <excludes> + <exclude>**/.idea/</exclude> + <exclude>**/*.iml</exclude> + <exclude>.gitignore</exclude> + <exclude>**/.settings/*</exclude> + <exclude>**/.classpath</exclude> + <exclude>**/.project</exclude> + <exclude>**/target/**</exclude> + <exclude>**/README.md</exclude> + <exclude>dependency-reduced-pom.xml</exclude> + </excludes> + </configuration> + </plugin> + + <!-- Scala Compiler --> + <plugin> + <groupId>net.alchim31.maven</groupId> + <artifactId>scala-maven-plugin</artifactId> + <version>3.1.4</version> + <executions> + <!-- Run scala compiler in the process-resources phase, so that dependencies on + scala classes can be resolved later in the (Java) compile phase --> + <execution> + <id>scala-compile-first</id> + <phase>process-resources</phase> + <goals> + <goal>compile</goal> + </goals> + </execution> + + <!-- Run scala compiler in the process-test-resources phase, so that dependencies on + scala classes can be resolved later in the (Java) test-compile phase --> + <execution> + <id>scala-test-compile</id> + <phase>process-test-resources</phase> + <goals> + <goal>testCompile</goal> + </goals> + </execution> + </executions> + <configuration> + <jvmArgs> + <jvmArg>-Xms128m</jvmArg> + <jvmArg>-Xmx512m</jvmArg> + </jvmArgs> + <compilerPlugins combine.children="append"> + <compilerPlugin> + <groupId>org.scalamacros</groupId> + <artifactId>paradise_${flink.scala.version}</artifactId> + <version>${scala.macros.version}</version> + </compilerPlugin> + </compilerPlugins> + </configuration> + </plugin> + + <!-- Eclipse Integration --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-eclipse-plugin</artifactId> + <version>2.8</version> + <configuration> + <downloadSources>true</downloadSources> + <projectnatures> + <projectnature>org.scala-ide.sdt.core.scalanature</projectnature> + <projectnature>org.eclipse.jdt.core.javanature</projectnature> + </projectnatures> + <buildcommands> + <buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand> + </buildcommands> + <classpathContainers> + <classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer> + <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer> + </classpathContainers> + <!-- excludes> + <exclude>org.scala-lang:scala-library</exclude> + <exclude>org.scala-lang:scala-compiler</exclude> + </excludes --> + <sourceIncludes> + <sourceInclude>**/*.scala</sourceInclude> + <sourceInclude>**/*.java</sourceInclude> + </sourceIncludes> + </configuration> + </plugin> + + <!-- Adding scala source directories to build path --> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <version>1.7</version> + <executions> + <!-- Add src/main/scala to eclipse build path --> + <execution> + <id>add-source</id> + <phase>generate-sources</phase> + <goals> + <goal>add-source</goal> + </goals> + <configuration> + <sources> + <source>src/main/scala</source> + </sources> + </configuration> + </execution> + <!-- Add src/test/scala to eclipse build path --> + <execution> + <id>add-test-source</id> + <phase>generate-test-sources</phase> + <goals> + <goal>add-test-source</goal> + </goals> + <configuration> + <sources> + <source>src/test/scala</source> + </sources> + </configuration> + </execution> + </executions> + </plugin> + + <plugin> + <groupId>org.scalastyle</groupId> + <artifactId>scalastyle-maven-plugin</artifactId> + <version>0.5.0</version> + <executions> + <execution> + <goals> + <goal>check</goal> + </goals> + </execution> + </executions> + <configuration> + <verbose>false</verbose> + <failOnViolation>true</failOnViolation> + <includeTestSourceDirectory>true</includeTestSourceDirectory> + <failOnWarning>false</failOnWarning> + <sourceDirectory>${basedir}/src/main/scala</sourceDirectory> + <testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory> + <configLocation>${project.basedir}/../_tools/scalastyle.xml</configLocation> + <outputFile>${project.basedir}/target/scalastyle-output.xml</outputFile> + <outputEncoding>UTF-8</outputEncoding> + </configuration> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-deploy-plugin</artifactId> + <version>2.7</version> + <configuration> + <skip>true</skip> + </configuration> + </plugin> + + <plugin> + <artifactId>maven-enforcer-plugin</artifactId> + <version>1.3.1</version> + <executions> + <execution> + <id>enforce</id> + <phase>none</phase> + </execution> + </executions> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <version>2.17</version> + <configuration> + <forkCount>1</forkCount> + <reuseForks>false</reuseForks> + <argLine>-Xmx1024m -XX:MaxPermSize=256m</argLine> + </configuration> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-dependency-plugin</artifactId> + <version>2.4</version> + <executions> + <execution> + <id>copy-dependencies</id> + <phase>package</phase> + <goals> + <goal>copy-dependencies</goal> + </goals> + <configuration> + <outputDirectory>${project.build.directory}/../../interpreter/flink</outputDirectory> + <overWriteReleases>false</overWriteReleases> + <overWriteSnapshots>false</overWriteSnapshots> + <overWriteIfNewer>true</overWriteIfNewer> + <includeScope>runtime</includeScope> + </configuration> + </execution> + </executions> + </plugin> + + <plugin> + <artifactId>maven-dependency-plugin</artifactId> + <version>2.8</version> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>copy</goal> + </goals> + <configuration> + <outputDirectory>${project.build.directory}/../../interpreter/flink</outputDirectory> + <overWriteReleases>false</overWriteReleases> + <overWriteSnapshots>false</overWriteSnapshots> + <overWriteIfNewer>true</overWriteIfNewer> + <includeScope>runtime</includeScope> + <artifactItems> + <artifactItem> + <groupId>${project.groupId}</groupId> + <artifactId>${project.artifactId}</artifactId> + <version>${project.version}</version> + <type>${project.packaging}</type> + </artifactItem> + </artifactItems> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/f0301cdf/flink/src/main/java/org/apache/zeppelin/flink/FlinkEnvironment.java ---------------------------------------------------------------------- diff --git a/flink/src/main/java/org/apache/zeppelin/flink/FlinkEnvironment.java b/flink/src/main/java/org/apache/zeppelin/flink/FlinkEnvironment.java new file mode 100644 index 0000000..629932b --- /dev/null +++ b/flink/src/main/java/org/apache/zeppelin/flink/FlinkEnvironment.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.flink; + +import java.io.File; + +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.PlanExecutor; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.operators.translation.JavaPlan; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The class override execute() method to create an PlanExecutor with + * jar file that packages classes from scala compiler. + */ +public class FlinkEnvironment extends ExecutionEnvironment { + Logger logger = LoggerFactory.getLogger(FlinkEnvironment.class); + + private String host; + private int port; + + private FlinkIMain imain; + + public FlinkEnvironment(String host, int port, FlinkIMain imain) { + this.host = host; + this.port = port; + this.imain = imain; + + logger.info("jobManager host={}, port={}", host, port); + } + + @Override + public JobExecutionResult execute(String jobName) throws Exception { + JavaPlan plan = createProgramPlan(jobName); + + File jarFile = imain.jar(); + PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port, + jarFile.getAbsolutePath()); + + JobExecutionResult result = executor.executePlan(plan); + + if (jarFile.isFile()) { + jarFile.delete(); + } + + return result; + } + + @Override + public String getExecutionPlan() throws Exception { + JavaPlan plan = createProgramPlan("unnamed", false); + plan.setDefaultParallelism(getParallelism()); + registerCachedFilesWithPlan(plan); + + File jarFile = imain.jar(); + PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port, + jarFile.getAbsolutePath()); + String jsonPlan = executor.getOptimizerPlanAsJSON(plan); + + if (jarFile != null && jarFile.isFile()) { + jarFile.delete(); + } + + return jsonPlan; + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/f0301cdf/flink/src/main/java/org/apache/zeppelin/flink/FlinkIMain.java ---------------------------------------------------------------------- diff --git a/flink/src/main/java/org/apache/zeppelin/flink/FlinkIMain.java b/flink/src/main/java/org/apache/zeppelin/flink/FlinkIMain.java new file mode 100644 index 0000000..ee6516c --- /dev/null +++ b/flink/src/main/java/org/apache/zeppelin/flink/FlinkIMain.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.flink; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.PrintWriter; + +import org.apache.commons.io.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import scala.collection.Iterator; +import scala.reflect.io.AbstractFile; +import scala.reflect.io.VirtualDirectory; +import scala.tools.nsc.Settings; +import scala.tools.nsc.interpreter.IMain; + +/** + * Scala compiler + */ +public class FlinkIMain extends IMain { + Logger logger = LoggerFactory.getLogger(FlinkIMain.class); + + public FlinkIMain(Settings setting, PrintWriter out) { + super(setting, out); + } + + public File jar() throws IOException { + VirtualDirectory classDir = virtualDirectory(); + // create execution environment + File jarBuildDir = new File(System.getProperty("java.io.tmpdir") + + "/ZeppelinFlinkJarBiuldDir_" + System.currentTimeMillis()); + jarBuildDir.mkdirs(); + + File jarFile = new File(System.getProperty("java.io.tmpdir") + + "/ZeppelinFlinkJarFile_" + System.currentTimeMillis() + ".jar"); + + + Iterator<AbstractFile> vdIt = classDir.iterator(); + while (vdIt.hasNext()) { + AbstractFile fi = vdIt.next(); + if (fi.isDirectory()) { + Iterator<AbstractFile> fiIt = fi.iterator(); + while (fiIt.hasNext()) { + AbstractFile f = fiIt.next(); + + // directory for compiled line + File lineDir = new File(jarBuildDir.getAbsolutePath(), fi.name()); + lineDir.mkdirs(); + + // compiled classes for commands from shell + File writeFile = new File(lineDir.getAbsolutePath(), f.name()); + FileOutputStream outputStream = new FileOutputStream(writeFile); + InputStream inputStream = f.input(); + + // copy file contents + org.apache.commons.io.IOUtils.copy(inputStream, outputStream); + + inputStream.close(); + outputStream.close(); + } + } + } + + // jar up + JarHelper jh = new JarHelper(); + jh.jarDir(jarBuildDir, jarFile); + + FileUtils.deleteDirectory(jarBuildDir); + return jarFile; + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/f0301cdf/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java ---------------------------------------------------------------------- diff --git a/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java b/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java new file mode 100644 index 0000000..b342f4e --- /dev/null +++ b/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java @@ -0,0 +1,312 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.flink; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.PrintWriter; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; +import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.InterpreterResult.Code; +import org.apache.zeppelin.interpreter.InterpreterUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import scala.Console; +import scala.None; +import scala.Some; +import scala.tools.nsc.Settings; +import scala.tools.nsc.settings.MutableSettings.BooleanSetting; +import scala.tools.nsc.settings.MutableSettings.PathSetting; + +/** + * Interpreter for Apache Flink (http://flink.apache.org) + */ +public class FlinkInterpreter extends Interpreter { + Logger logger = LoggerFactory.getLogger(FlinkInterpreter.class); + private Settings settings; + private ByteArrayOutputStream out; + private FlinkIMain imain; + private Map<String, Object> binder; + private ExecutionEnvironment env; + private Configuration flinkConf; + private LocalFlinkMiniCluster localFlinkCluster; + + public FlinkInterpreter(Properties property) { + super(property); + } + + static { + Interpreter.register( + "flink", + "flink", + FlinkInterpreter.class.getName(), + new InterpreterPropertyBuilder() + .add("local", "true", "Run flink locally") + .add("jobmanager.rpc.address", "localhost", "Flink cluster") + .add("jobmanager.rpc.port", "6123", "Flink cluster") + .build() + ); + } + + @Override + public void open() { + URL[] urls = getClassloaderUrls(); + this.settings = new Settings(); + + // set classpath + PathSetting pathSettings = settings.classpath(); + String classpath = ""; + List<File> paths = currentClassPath(); + for (File f : paths) { + if (classpath.length() > 0) { + classpath += File.pathSeparator; + } + classpath += f.getAbsolutePath(); + } + + if (urls != null) { + for (URL u : urls) { + if (classpath.length() > 0) { + classpath += File.pathSeparator; + } + classpath += u.getFile(); + } + } + + pathSettings.v_$eq(classpath); + settings.scala$tools$nsc$settings$ScalaSettings$_setter_$classpath_$eq(pathSettings); + settings.explicitParentLoader_$eq(new Some<ClassLoader>(Thread.currentThread() + .getContextClassLoader())); + BooleanSetting b = (BooleanSetting) settings.usejavacp(); + b.v_$eq(true); + settings.scala$tools$nsc$settings$StandardScalaSettings$_setter_$usejavacp_$eq(b); + + out = new ByteArrayOutputStream(); + imain = new FlinkIMain(settings, new PrintWriter(out)); + + initializeFlinkEnv(); + } + + private boolean localMode() { + return Boolean.parseBoolean(getProperty("local")); + } + + private String getRpcAddress() { + if (localMode()) { + return "localhost"; + } else { + return getProperty("jobmanager.rpc.address"); + } + } + + private int getRpcPort() { + if (localMode()) { + return localFlinkCluster.getJobManagerRPCPort(); + } else { + return Integer.parseInt(getProperty("jobmanager.rpc.port")); + } + } + + private void initializeFlinkEnv() { + // prepare bindings + imain.interpret("@transient var _binder = new java.util.HashMap[String, Object]()"); + binder = (Map<String, Object>) getValue("_binder"); + + flinkConf = new org.apache.flink.configuration.Configuration(); + Properties intpProperty = getProperty(); + for (Object k : intpProperty.keySet()) { + String key = (String) k; + String val = toString(intpProperty.get(key)); + flinkConf.setString(key, val); + } + + if (localMode()) { + startFlinkMiniCluster(); + } + + env = new FlinkEnvironment(getRpcAddress(), getRpcPort(), imain); + binder.put("env", new org.apache.flink.api.scala.ExecutionEnvironment(env)); + + // do import and create val + imain.interpret("@transient val env = " + + "_binder.get(\"env\")" + + ".asInstanceOf[org.apache.flink.api.scala.ExecutionEnvironment]"); + + imain.interpret("import org.apache.flink.api.scala._"); + } + + + private List<File> currentClassPath() { + List<File> paths = classPath(Thread.currentThread().getContextClassLoader()); + String[] cps = System.getProperty("java.class.path").split(File.pathSeparator); + if (cps != null) { + for (String cp : cps) { + paths.add(new File(cp)); + } + } + return paths; + } + + private List<File> classPath(ClassLoader cl) { + List<File> paths = new LinkedList<File>(); + if (cl == null) { + return paths; + } + + if (cl instanceof URLClassLoader) { + URLClassLoader ucl = (URLClassLoader) cl; + URL[] urls = ucl.getURLs(); + if (urls != null) { + for (URL url : urls) { + paths.add(new File(url.getFile())); + } + } + } + return paths; + } + + public Object getValue(String name) { + Object ret = imain.valueOfTerm(name); + if (ret instanceof None) { + return null; + } else if (ret instanceof Some) { + return ((Some) ret).get(); + } else { + return ret; + } + } + + @Override + public void close() { + imain.close(); + + if (localMode()) { + stopFlinkMiniCluster(); + } + } + + @Override + public InterpreterResult interpret(String line, InterpreterContext context) { + if (line == null || line.trim().length() == 0) { + return new InterpreterResult(Code.SUCCESS); + } + + InterpreterResult result = interpret(line.split("\n"), context); + return result; + } + + public InterpreterResult interpret(String[] lines, InterpreterContext context) { + String[] linesToRun = new String[lines.length + 1]; + for (int i = 0; i < lines.length; i++) { + linesToRun[i] = lines[i]; + } + linesToRun[lines.length] = "print(\"\")"; + + Console.setOut(out); + out.reset(); + Code r = null; + + String incomplete = ""; + for (String s : linesToRun) { + scala.tools.nsc.interpreter.Results.Result res = null; + try { + res = imain.interpret(incomplete + s); + } catch (Exception e) { + logger.info("Interpreter exception", e); + return new InterpreterResult(Code.ERROR, InterpreterUtils.getMostRelevantMessage(e)); + } + + r = getResultCode(res); + + if (r == Code.ERROR) { + return new InterpreterResult(r, out.toString()); + } else if (r == Code.INCOMPLETE) { + incomplete += s + "\n"; + } else { + incomplete = ""; + } + } + + if (r == Code.INCOMPLETE) { + return new InterpreterResult(r, "Incomplete expression"); + } else { + return new InterpreterResult(r, out.toString()); + } + } + + private Code getResultCode(scala.tools.nsc.interpreter.Results.Result r) { + if (r instanceof scala.tools.nsc.interpreter.Results.Success$) { + return Code.SUCCESS; + } else if (r instanceof scala.tools.nsc.interpreter.Results.Incomplete$) { + return Code.INCOMPLETE; + } else { + return Code.ERROR; + } + } + + + + @Override + public void cancel(InterpreterContext context) { + } + + @Override + public FormType getFormType() { + return FormType.NATIVE; + } + + @Override + public int getProgress(InterpreterContext context) { + return 0; + } + + @Override + public List<String> completion(String buf, int cursor) { + return new LinkedList<String>(); + } + + private void startFlinkMiniCluster() { + localFlinkCluster = new LocalFlinkMiniCluster(flinkConf, false); + localFlinkCluster.waitForTaskManagersToBeRegistered(); + } + + private void stopFlinkMiniCluster() { + if (localFlinkCluster != null) { + localFlinkCluster.shutdown(); + localFlinkCluster = null; + } + } + + static final String toString(Object o) { + return (o instanceof String) ? (String) o : ""; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/f0301cdf/flink/src/main/java/org/apache/zeppelin/flink/JarHelper.java ---------------------------------------------------------------------- diff --git a/flink/src/main/java/org/apache/zeppelin/flink/JarHelper.java b/flink/src/main/java/org/apache/zeppelin/flink/JarHelper.java new file mode 100644 index 0000000..efc4951 --- /dev/null +++ b/flink/src/main/java/org/apache/zeppelin/flink/JarHelper.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.flink; + +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.jar.JarEntry; +import java.util.jar.JarInputStream; +import java.util.jar.JarOutputStream; + +/** + * This class copied from flink-scala-shell. Once the flink-0.9 is published in + * the maven repository, this class can be removed + * + * Provides utility services for jarring and unjarring files and directories. + * Note that a given instance of JarHelper is not threadsafe with respect to + * multiple jar operations. + * + * Copied from + * http://grepcode.com/file_/repo1.maven.org/maven2/org.apache.xmlbeans + * /xmlbeans/2.4.0/org/apache/xmlbeans/impl/common/JarHelper.java/?v=source + * + * @author Patrick Calahan <[email protected]> + */ +public class JarHelper { + // ======================================================================== + // Constants + + private static final int BUFFER_SIZE = 2156; + + // ======================================================================== + // Variables + + private byte[] mBuffer = new byte[BUFFER_SIZE]; + private int mByteCount = 0; + private boolean mVerbose = false; + private String mDestJarName = ""; + + // ======================================================================== + // Constructor + + /** + * Instantiates a new JarHelper. + */ + public JarHelper() { + } + + // ======================================================================== + // Public methods + + /** + * Jars a given directory or single file into a JarOutputStream. + */ + public void jarDir(File dirOrFile2Jar, File destJar) throws IOException { + + if (dirOrFile2Jar == null || destJar == null) { + throw new IllegalArgumentException(); + } + + mDestJarName = destJar.getCanonicalPath(); + FileOutputStream fout = new FileOutputStream(destJar); + JarOutputStream jout = new JarOutputStream(fout); + // jout.setLevel(0); + try { + jarDir(dirOrFile2Jar, jout, null); + } catch (IOException ioe) { + throw ioe; + } finally { + jout.close(); + fout.close(); + } + } + + /** + * Unjars a given jar file into a given directory. + */ + public void unjarDir(File jarFile, File destDir) throws IOException { + BufferedOutputStream dest = null; + FileInputStream fis = new FileInputStream(jarFile); + unjar(fis, destDir); + } + + /** + * Given an InputStream on a jar file, unjars the contents into the given + * directory. + */ + public void unjar(InputStream in, File destDir) throws IOException { + BufferedOutputStream dest = null; + JarInputStream jis = new JarInputStream(in); + JarEntry entry; + while ((entry = jis.getNextJarEntry()) != null) { + if (entry.isDirectory()) { + File dir = new File(destDir, entry.getName()); + dir.mkdir(); + if (entry.getTime() != -1) { + dir.setLastModified(entry.getTime()); + } + continue; + } + int count; + byte[] data = new byte[BUFFER_SIZE]; + File destFile = new File(destDir, entry.getName()); + if (mVerbose) { + System.out + .println("unjarring " + destFile + " from " + entry.getName()); + } + FileOutputStream fos = new FileOutputStream(destFile); + dest = new BufferedOutputStream(fos, BUFFER_SIZE); + while ((count = jis.read(data, 0, BUFFER_SIZE)) != -1) { + dest.write(data, 0, count); + } + dest.flush(); + dest.close(); + if (entry.getTime() != -1) { + destFile.setLastModified(entry.getTime()); + } + } + jis.close(); + } + + public void setVerbose(boolean b) { + mVerbose = b; + } + + // ======================================================================== + // Private methods + + private static final char SEP = '/'; + + /** + * Recursively jars up the given path under the given directory. + */ + private void jarDir(File dirOrFile2jar, JarOutputStream jos, String path) + throws IOException { + if (mVerbose) { + System.out.println("checking " + dirOrFile2jar); + } + if (dirOrFile2jar.isDirectory()) { + String[] dirList = dirOrFile2jar.list(); + String subPath = (path == null) ? "" + : (path + dirOrFile2jar.getName() + SEP); + if (path != null) { + JarEntry je = new JarEntry(subPath); + je.setTime(dirOrFile2jar.lastModified()); + jos.putNextEntry(je); + jos.flush(); + jos.closeEntry(); + } + for (int i = 0; i < dirList.length; i++) { + File f = new File(dirOrFile2jar, dirList[i]); + jarDir(f, jos, subPath); + } + } else { + if (dirOrFile2jar.getCanonicalPath().equals(mDestJarName)) { + if (mVerbose) { + System.out.println("skipping " + dirOrFile2jar.getPath()); + } + return; + } + + if (mVerbose) { + System.out.println("adding " + dirOrFile2jar.getPath()); + } + FileInputStream fis = new FileInputStream(dirOrFile2jar); + try { + JarEntry entry = new JarEntry(path + dirOrFile2jar.getName()); + entry.setTime(dirOrFile2jar.lastModified()); + jos.putNextEntry(entry); + while ((mByteCount = fis.read(mBuffer)) != -1) { + jos.write(mBuffer, 0, mByteCount); + if (mVerbose) { + System.out.println("wrote " + mByteCount + " bytes"); + } + } + jos.flush(); + jos.closeEntry(); + } catch (IOException ioe) { + throw ioe; + } finally { + fis.close(); + } + } + } + + // for debugging + public static void main(String[] args) throws IOException { + if (args.length < 2) { + System.err.println("Usage: JarHelper jarname.jar directory"); + return; + } + + JarHelper jarHelper = new JarHelper(); + jarHelper.mVerbose = true; + + File destJar = new File(args[0]); + File dirOrFile2Jar = new File(args[1]); + + jarHelper.jarDir(dirOrFile2Jar, destJar); + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/f0301cdf/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java ---------------------------------------------------------------------- diff --git a/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java b/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java new file mode 100644 index 0000000..264008a --- /dev/null +++ b/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.flink; + +import static org.junit.Assert.assertEquals; + +import java.util.Properties; + +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.InterpreterResult.Code; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +public class FlinkInterpreterTest { + + private static FlinkInterpreter flink; + private static InterpreterContext context; + + @BeforeClass + public static void setUp() { + Properties p = new Properties(); + flink = new FlinkInterpreter(p); + flink.open(); + context = new InterpreterContext(null, null, null, null, null, null, null); + } + + @AfterClass + public static void tearDown() { + flink.close(); + flink.destroy(); + } + + @Test + public void testSimpleStatement() { + InterpreterResult result = flink.interpret("val a=1", context); + result = flink.interpret("print(a)", context); + assertEquals("1", result.message()); + } + + + @Test + public void testWordCount() { + flink.interpret("val text = env.fromElements(\"To be or not to be\")", context); + flink.interpret("val counts = text.flatMap { _.toLowerCase.split(\" \") }.map { (_, 1) }.groupBy(0).sum(1)", context); + flink.interpret("counts.print()", context); + InterpreterResult result = flink.interpret("env.execute(\"WordCount Example\")", context); + assertEquals(Code.SUCCESS, result.code()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/f0301cdf/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index bbde084..dd3c2a3 100644 --- a/pom.xml +++ b/pom.xml @@ -92,6 +92,7 @@ <module>shell</module> <module>hive</module> <module>tajo</module> + <module>flink</module> <module>zeppelin-web</module> <module>zeppelin-server</module> <module>zeppelin-distribution</module> http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/f0301cdf/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java index bbf46fc..78a463c 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java @@ -389,7 +389,8 @@ public class ZeppelinConfiguration extends XMLConfiguration { + "org.apache.zeppelin.angular.AngularInterpreter," + "org.apache.zeppelin.shell.ShellInterpreter," + "org.apache.zeppelin.hive.HiveInterpreter," - + "org.apache.zeppelin.tajo.TajoInterpreter"), + + "org.apache.zeppelin.tajo.TajoInterpreter," + + "org.apache.zeppelin.flink.FlinkInterpreter"), ZEPPELIN_INTERPRETER_DIR("zeppelin.interpreter.dir", "interpreter"), ZEPPELIN_ENCODING("zeppelin.encoding", "UTF-8"), ZEPPELIN_NOTEBOOK_DIR("zeppelin.notebook.dir", "notebook"),
