Repository: incubator-zeppelin Updated Branches: refs/heads/master 0868dcbd9 -> c46d8a010
ZEPPELIN-145 Upgrade Flink version to 0.9.0 https://issues.apache.org/jira/browse/ZEPPELIN-145 Upgrade flink version to 0.9.0 Author: Lee moon soo <[email protected]> Closes #142 from Leemoonsoo/ZEPPELIN-145 and squashes the following commits: c60673c [Lee moon soo] Fix test 9097e04 [Lee moon soo] Change code to use FlinkILoop that is introduced in 0.9.0 Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/c46d8a01 Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/c46d8a01 Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/c46d8a01 Branch: refs/heads/master Commit: c46d8a01030e7d1ca41320cf4467a706f46167e5 Parents: 0868dcb Author: Lee moon soo <[email protected]> Authored: Wed Jul 8 13:38:55 2015 -0700 Committer: Lee moon soo <[email protected]> Committed: Sat Jul 11 00:44:21 2015 -0700 ---------------------------------------------------------------------- flink/pom.xml | 23 +- .../apache/zeppelin/flink/FlinkEnvironment.java | 83 ------- .../org/apache/zeppelin/flink/FlinkIMain.java | 92 -------- .../apache/zeppelin/flink/FlinkInterpreter.java | 138 ++++++------ .../org/apache/zeppelin/flink/JarHelper.java | 219 ------------------- .../zeppelin/flink/FlinkInterpreterTest.java | 3 +- 6 files changed, 82 insertions(+), 476 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/c46d8a01/flink/pom.xml ---------------------------------------------------------------------- diff --git a/flink/pom.xml b/flink/pom.xml index bfdc29b..b6c790b 100644 --- a/flink/pom.xml +++ b/flink/pom.xml @@ -34,28 +34,13 @@ <url>http://zeppelin.incubator.apache.org</url> <properties> - <flink.version>0.9.0-milestone-1</flink.version> + <flink.version>0.9.0</flink.version> <flink.akka.version>2.3.7</flink.akka.version> <flink.scala.binary.version>2.10</flink.scala.binary.version> <flink.scala.version>2.10.4</flink.scala.version> <scala.macros.version>2.0.1</scala.macros.version> </properties> - <repositories> - <!-- for flink 0.9-SNAPSHOT. After 0.9 released, it can be removed --> - <repository> - <id>apache.snapshots</id> - <name>Apache Development Snapshot Repository</name> - <url>https://repository.apache.org/content/repositories/snapshots/</url> - <releases> - <enabled>false</enabled> - </releases> - <snapshots> - <enabled>true</enabled> - </snapshots> - </repository> - </repositories> - <dependencies> <dependency> <groupId>org.slf4j</groupId> @@ -104,6 +89,12 @@ </dependency> <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-scala-shell</artifactId> + <version>${flink.version}</version> + </dependency> + + <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-actor_${flink.scala.binary.version}</artifactId> <version>${flink.akka.version}</version> http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/c46d8a01/flink/src/main/java/org/apache/zeppelin/flink/FlinkEnvironment.java ---------------------------------------------------------------------- diff --git a/flink/src/main/java/org/apache/zeppelin/flink/FlinkEnvironment.java b/flink/src/main/java/org/apache/zeppelin/flink/FlinkEnvironment.java deleted file mode 100644 index 629932b..0000000 --- a/flink/src/main/java/org/apache/zeppelin/flink/FlinkEnvironment.java +++ /dev/null @@ -1,83 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.zeppelin.flink; - -import java.io.File; - -import org.apache.flink.api.common.JobExecutionResult; -import org.apache.flink.api.common.PlanExecutor; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.operators.translation.JavaPlan; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * The class override execute() method to create an PlanExecutor with - * jar file that packages classes from scala compiler. - */ -public class FlinkEnvironment extends ExecutionEnvironment { - Logger logger = LoggerFactory.getLogger(FlinkEnvironment.class); - - private String host; - private int port; - - private FlinkIMain imain; - - public FlinkEnvironment(String host, int port, FlinkIMain imain) { - this.host = host; - this.port = port; - this.imain = imain; - - logger.info("jobManager host={}, port={}", host, port); - } - - @Override - public JobExecutionResult execute(String jobName) throws Exception { - JavaPlan plan = createProgramPlan(jobName); - - File jarFile = imain.jar(); - PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port, - jarFile.getAbsolutePath()); - - JobExecutionResult result = executor.executePlan(plan); - - if (jarFile.isFile()) { - jarFile.delete(); - } - - return result; - } - - @Override - public String getExecutionPlan() throws Exception { - JavaPlan plan = createProgramPlan("unnamed", false); - plan.setDefaultParallelism(getParallelism()); - registerCachedFilesWithPlan(plan); - - File jarFile = imain.jar(); - PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port, - jarFile.getAbsolutePath()); - String jsonPlan = executor.getOptimizerPlanAsJSON(plan); - - if (jarFile != null && jarFile.isFile()) { - jarFile.delete(); - } - - return jsonPlan; - } -} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/c46d8a01/flink/src/main/java/org/apache/zeppelin/flink/FlinkIMain.java ---------------------------------------------------------------------- diff --git a/flink/src/main/java/org/apache/zeppelin/flink/FlinkIMain.java b/flink/src/main/java/org/apache/zeppelin/flink/FlinkIMain.java deleted file mode 100644 index ee6516c..0000000 --- a/flink/src/main/java/org/apache/zeppelin/flink/FlinkIMain.java +++ /dev/null @@ -1,92 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.zeppelin.flink; - -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.PrintWriter; - -import org.apache.commons.io.FileUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import scala.collection.Iterator; -import scala.reflect.io.AbstractFile; -import scala.reflect.io.VirtualDirectory; -import scala.tools.nsc.Settings; -import scala.tools.nsc.interpreter.IMain; - -/** - * Scala compiler - */ -public class FlinkIMain extends IMain { - Logger logger = LoggerFactory.getLogger(FlinkIMain.class); - - public FlinkIMain(Settings setting, PrintWriter out) { - super(setting, out); - } - - public File jar() throws IOException { - VirtualDirectory classDir = virtualDirectory(); - // create execution environment - File jarBuildDir = new File(System.getProperty("java.io.tmpdir") - + "/ZeppelinFlinkJarBiuldDir_" + System.currentTimeMillis()); - jarBuildDir.mkdirs(); - - File jarFile = new File(System.getProperty("java.io.tmpdir") - + "/ZeppelinFlinkJarFile_" + System.currentTimeMillis() + ".jar"); - - - Iterator<AbstractFile> vdIt = classDir.iterator(); - while (vdIt.hasNext()) { - AbstractFile fi = vdIt.next(); - if (fi.isDirectory()) { - Iterator<AbstractFile> fiIt = fi.iterator(); - while (fiIt.hasNext()) { - AbstractFile f = fiIt.next(); - - // directory for compiled line - File lineDir = new File(jarBuildDir.getAbsolutePath(), fi.name()); - lineDir.mkdirs(); - - // compiled classes for commands from shell - File writeFile = new File(lineDir.getAbsolutePath(), f.name()); - FileOutputStream outputStream = new FileOutputStream(writeFile); - InputStream inputStream = f.input(); - - // copy file contents - org.apache.commons.io.IOUtils.copy(inputStream, outputStream); - - inputStream.close(); - outputStream.close(); - } - } - } - - // jar up - JarHelper jh = new JarHelper(); - jh.jarDir(jarBuildDir, jarFile); - - FileUtils.deleteDirectory(jarBuildDir); - return jarFile; - } - - -} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/c46d8a01/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java ---------------------------------------------------------------------- diff --git a/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java b/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java index b342f4e..c516b9d 100644 --- a/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java +++ b/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java @@ -17,6 +17,7 @@ */ package org.apache.zeppelin.flink; +import java.io.BufferedReader; import java.io.ByteArrayOutputStream; import java.io.File; import java.io.PrintWriter; @@ -28,6 +29,7 @@ import java.util.Map; import java.util.Properties; import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.scala.FlinkILoop; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.zeppelin.interpreter.Interpreter; @@ -41,8 +43,10 @@ import org.slf4j.LoggerFactory; import scala.Console; import scala.None; +import scala.Option; import scala.Some; import scala.tools.nsc.Settings; +import scala.tools.nsc.interpreter.IMain; import scala.tools.nsc.settings.MutableSettings.BooleanSetting; import scala.tools.nsc.settings.MutableSettings.PathSetting; @@ -51,13 +55,12 @@ import scala.tools.nsc.settings.MutableSettings.PathSetting; */ public class FlinkInterpreter extends Interpreter { Logger logger = LoggerFactory.getLogger(FlinkInterpreter.class); - private Settings settings; private ByteArrayOutputStream out; - private FlinkIMain imain; - private Map<String, Object> binder; - private ExecutionEnvironment env; private Configuration flinkConf; private LocalFlinkMiniCluster localFlinkCluster; + private FlinkILoop flinkIloop; + private Map<String, Object> binder; + private IMain imain; public FlinkInterpreter(Properties property) { super(property); @@ -69,17 +72,72 @@ public class FlinkInterpreter extends Interpreter { "flink", FlinkInterpreter.class.getName(), new InterpreterPropertyBuilder() - .add("local", "true", "Run flink locally") - .add("jobmanager.rpc.address", "localhost", "Flink cluster") - .add("jobmanager.rpc.port", "6123", "Flink cluster") + .add("host", "local", + "host name of running JobManager. 'local' runs flink in local mode") + .add("port", "6123", "port of running JobManager") .build() ); } @Override public void open() { + out = new ByteArrayOutputStream(); + flinkConf = new org.apache.flink.configuration.Configuration(); + Properties intpProperty = getProperty(); + for (Object k : intpProperty.keySet()) { + String key = (String) k; + String val = toString(intpProperty.get(key)); + flinkConf.setString(key, val); + } + + if (localMode()) { + startFlinkMiniCluster(); + } + + flinkIloop = new FlinkILoop(getHost(), getPort(), (BufferedReader) null, new PrintWriter(out)); + flinkIloop.settings_$eq(createSettings()); + flinkIloop.createInterpreter(); + + imain = flinkIloop.intp(); + + // prepare bindings + imain.interpret("@transient var _binder = new java.util.HashMap[String, Object]()"); + binder = (Map<String, Object>) getValue("_binder"); + + // import libraries + imain.interpret("import scala.tools.nsc.io._"); + imain.interpret("import Properties.userHome"); + imain.interpret("import scala.compat.Platform.EOL"); + + imain.interpret("import org.apache.flink.api.scala._"); + imain.interpret("import org.apache.flink.api.common.functions._"); + imain.bindValue("env", flinkIloop.scalaEnv()); + } + + private boolean localMode() { + String host = getProperty("host"); + return host == null || host.trim().length() == 0 || host.trim().equals("local"); + } + + private String getHost() { + if (localMode()) { + return "localhost"; + } else { + return getProperty("host"); + } + } + + private int getPort() { + if (localMode()) { + return localFlinkCluster.getJobManagerRPCPort(); + } else { + return Integer.parseInt(getProperty("port")); + } + } + + private Settings createSettings() { URL[] urls = getClassloaderUrls(); - this.settings = new Settings(); + Settings settings = new Settings(); // set classpath PathSetting pathSettings = settings.classpath(); @@ -108,61 +166,10 @@ public class FlinkInterpreter extends Interpreter { BooleanSetting b = (BooleanSetting) settings.usejavacp(); b.v_$eq(true); settings.scala$tools$nsc$settings$StandardScalaSettings$_setter_$usejavacp_$eq(b); - - out = new ByteArrayOutputStream(); - imain = new FlinkIMain(settings, new PrintWriter(out)); - - initializeFlinkEnv(); - } - - private boolean localMode() { - return Boolean.parseBoolean(getProperty("local")); - } - - private String getRpcAddress() { - if (localMode()) { - return "localhost"; - } else { - return getProperty("jobmanager.rpc.address"); - } - } - - private int getRpcPort() { - if (localMode()) { - return localFlinkCluster.getJobManagerRPCPort(); - } else { - return Integer.parseInt(getProperty("jobmanager.rpc.port")); - } - } - - private void initializeFlinkEnv() { - // prepare bindings - imain.interpret("@transient var _binder = new java.util.HashMap[String, Object]()"); - binder = (Map<String, Object>) getValue("_binder"); - - flinkConf = new org.apache.flink.configuration.Configuration(); - Properties intpProperty = getProperty(); - for (Object k : intpProperty.keySet()) { - String key = (String) k; - String val = toString(intpProperty.get(key)); - flinkConf.setString(key, val); - } - - if (localMode()) { - startFlinkMiniCluster(); - } - - env = new FlinkEnvironment(getRpcAddress(), getRpcPort(), imain); - binder.put("env", new org.apache.flink.api.scala.ExecutionEnvironment(env)); - - // do import and create val - imain.interpret("@transient val env = " - + "_binder.get(\"env\")" - + ".asInstanceOf[org.apache.flink.api.scala.ExecutionEnvironment]"); - - imain.interpret("import org.apache.flink.api.scala._"); + + return settings; } - + private List<File> currentClassPath() { List<File> paths = classPath(Thread.currentThread().getContextClassLoader()); @@ -194,6 +201,7 @@ public class FlinkInterpreter extends Interpreter { } public Object getValue(String name) { + IMain imain = flinkIloop.intp(); Object ret = imain.valueOfTerm(name); if (ret instanceof None) { return null; @@ -206,7 +214,7 @@ public class FlinkInterpreter extends Interpreter { @Override public void close() { - imain.close(); + flinkIloop.closeInterpreter(); if (localMode()) { stopFlinkMiniCluster(); @@ -224,6 +232,8 @@ public class FlinkInterpreter extends Interpreter { } public InterpreterResult interpret(String[] lines, InterpreterContext context) { + IMain imain = flinkIloop.intp(); + String[] linesToRun = new String[lines.length + 1]; for (int i = 0; i < lines.length; i++) { linesToRun[i] = lines[i]; @@ -300,7 +310,7 @@ public class FlinkInterpreter extends Interpreter { private void stopFlinkMiniCluster() { if (localFlinkCluster != null) { - localFlinkCluster.shutdown(); + localFlinkCluster.stop(); localFlinkCluster = null; } } http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/c46d8a01/flink/src/main/java/org/apache/zeppelin/flink/JarHelper.java ---------------------------------------------------------------------- diff --git a/flink/src/main/java/org/apache/zeppelin/flink/JarHelper.java b/flink/src/main/java/org/apache/zeppelin/flink/JarHelper.java deleted file mode 100644 index efc4951..0000000 --- a/flink/src/main/java/org/apache/zeppelin/flink/JarHelper.java +++ /dev/null @@ -1,219 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.zeppelin.flink; - -import java.io.BufferedOutputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.util.jar.JarEntry; -import java.util.jar.JarInputStream; -import java.util.jar.JarOutputStream; - -/** - * This class copied from flink-scala-shell. Once the flink-0.9 is published in - * the maven repository, this class can be removed - * - * Provides utility services for jarring and unjarring files and directories. - * Note that a given instance of JarHelper is not threadsafe with respect to - * multiple jar operations. - * - * Copied from - * http://grepcode.com/file_/repo1.maven.org/maven2/org.apache.xmlbeans - * /xmlbeans/2.4.0/org/apache/xmlbeans/impl/common/JarHelper.java/?v=source - * - * @author Patrick Calahan <[email protected]> - */ -public class JarHelper { - // ======================================================================== - // Constants - - private static final int BUFFER_SIZE = 2156; - - // ======================================================================== - // Variables - - private byte[] mBuffer = new byte[BUFFER_SIZE]; - private int mByteCount = 0; - private boolean mVerbose = false; - private String mDestJarName = ""; - - // ======================================================================== - // Constructor - - /** - * Instantiates a new JarHelper. - */ - public JarHelper() { - } - - // ======================================================================== - // Public methods - - /** - * Jars a given directory or single file into a JarOutputStream. - */ - public void jarDir(File dirOrFile2Jar, File destJar) throws IOException { - - if (dirOrFile2Jar == null || destJar == null) { - throw new IllegalArgumentException(); - } - - mDestJarName = destJar.getCanonicalPath(); - FileOutputStream fout = new FileOutputStream(destJar); - JarOutputStream jout = new JarOutputStream(fout); - // jout.setLevel(0); - try { - jarDir(dirOrFile2Jar, jout, null); - } catch (IOException ioe) { - throw ioe; - } finally { - jout.close(); - fout.close(); - } - } - - /** - * Unjars a given jar file into a given directory. - */ - public void unjarDir(File jarFile, File destDir) throws IOException { - BufferedOutputStream dest = null; - FileInputStream fis = new FileInputStream(jarFile); - unjar(fis, destDir); - } - - /** - * Given an InputStream on a jar file, unjars the contents into the given - * directory. - */ - public void unjar(InputStream in, File destDir) throws IOException { - BufferedOutputStream dest = null; - JarInputStream jis = new JarInputStream(in); - JarEntry entry; - while ((entry = jis.getNextJarEntry()) != null) { - if (entry.isDirectory()) { - File dir = new File(destDir, entry.getName()); - dir.mkdir(); - if (entry.getTime() != -1) { - dir.setLastModified(entry.getTime()); - } - continue; - } - int count; - byte[] data = new byte[BUFFER_SIZE]; - File destFile = new File(destDir, entry.getName()); - if (mVerbose) { - System.out - .println("unjarring " + destFile + " from " + entry.getName()); - } - FileOutputStream fos = new FileOutputStream(destFile); - dest = new BufferedOutputStream(fos, BUFFER_SIZE); - while ((count = jis.read(data, 0, BUFFER_SIZE)) != -1) { - dest.write(data, 0, count); - } - dest.flush(); - dest.close(); - if (entry.getTime() != -1) { - destFile.setLastModified(entry.getTime()); - } - } - jis.close(); - } - - public void setVerbose(boolean b) { - mVerbose = b; - } - - // ======================================================================== - // Private methods - - private static final char SEP = '/'; - - /** - * Recursively jars up the given path under the given directory. - */ - private void jarDir(File dirOrFile2jar, JarOutputStream jos, String path) - throws IOException { - if (mVerbose) { - System.out.println("checking " + dirOrFile2jar); - } - if (dirOrFile2jar.isDirectory()) { - String[] dirList = dirOrFile2jar.list(); - String subPath = (path == null) ? "" - : (path + dirOrFile2jar.getName() + SEP); - if (path != null) { - JarEntry je = new JarEntry(subPath); - je.setTime(dirOrFile2jar.lastModified()); - jos.putNextEntry(je); - jos.flush(); - jos.closeEntry(); - } - for (int i = 0; i < dirList.length; i++) { - File f = new File(dirOrFile2jar, dirList[i]); - jarDir(f, jos, subPath); - } - } else { - if (dirOrFile2jar.getCanonicalPath().equals(mDestJarName)) { - if (mVerbose) { - System.out.println("skipping " + dirOrFile2jar.getPath()); - } - return; - } - - if (mVerbose) { - System.out.println("adding " + dirOrFile2jar.getPath()); - } - FileInputStream fis = new FileInputStream(dirOrFile2jar); - try { - JarEntry entry = new JarEntry(path + dirOrFile2jar.getName()); - entry.setTime(dirOrFile2jar.lastModified()); - jos.putNextEntry(entry); - while ((mByteCount = fis.read(mBuffer)) != -1) { - jos.write(mBuffer, 0, mByteCount); - if (mVerbose) { - System.out.println("wrote " + mByteCount + " bytes"); - } - } - jos.flush(); - jos.closeEntry(); - } catch (IOException ioe) { - throw ioe; - } finally { - fis.close(); - } - } - } - - // for debugging - public static void main(String[] args) throws IOException { - if (args.length < 2) { - System.err.println("Usage: JarHelper jarname.jar directory"); - return; - } - - JarHelper jarHelper = new JarHelper(); - jarHelper.mVerbose = true; - - File destJar = new File(args[0]); - File dirOrFile2Jar = new File(args[1]); - - jarHelper.jarDir(dirOrFile2Jar, destJar); - } -} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/c46d8a01/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java ---------------------------------------------------------------------- diff --git a/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java b/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java index 264008a..37f9c27 100644 --- a/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java +++ b/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java @@ -59,8 +59,7 @@ public class FlinkInterpreterTest { public void testWordCount() { flink.interpret("val text = env.fromElements(\"To be or not to be\")", context); flink.interpret("val counts = text.flatMap { _.toLowerCase.split(\" \") }.map { (_, 1) }.groupBy(0).sum(1)", context); - flink.interpret("counts.print()", context); - InterpreterResult result = flink.interpret("env.execute(\"WordCount Example\")", context); + InterpreterResult result = flink.interpret("counts.print()", context); assertEquals(Code.SUCCESS, result.code()); } }
