Repository: zeppelin Updated Branches: refs/heads/master d005c7967 -> 3a1568efc
[ZEPPELIN-1461] Update Flink with latest version 1.1.2 ### What is this PR for? Flink has had two releases since 1.0.3, we are now on 1.1.2 This includes new functionality for streaming support in repl environment. ### What type of PR is it? Improvement ### Todos * [x] - Update `pom.xml` * [x] - Update single (batch) environment to batch and streaming environments * [x] - Update Test to reflect `benv` (instead of `env`) ### What is the Jira issue? [https://issues.apache.org/jira/browse/ZEPPELIN-1416?filter=-1](https://issues.apache.org/jira/browse/ZEPPELIN-1416?filter=-1) ### How should this be tested? Tests for previous versions are the same as new version. ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? Yes* older code written in the Flink interpreter will now have to use `benv` in place of `env` * Does this needs documentation? No Author: rawkintrevo <[email protected]> Closes #1409 from rawkintrevo/zeppelin-1461 and squashes the following commits: 78502f0 [rawkintrevo] [ZEPPELIN-1461] Retrigger build 9b2d122 [rawkintrevo] [ZEPPELIN-1461] Update Test and remove unneeded code 9921a7e [rawkintrevo] [ZEPPELIN-1461] Update Flink with latest version 1.1.2 Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/3a1568ef Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/3a1568ef Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/3a1568ef Branch: refs/heads/master Commit: 3a1568efc9a31394292bc359bb9bf42e1eb0f9f9 Parents: d005c79 Author: rawkintrevo <[email protected]> Authored: Thu Sep 8 22:56:27 2016 -0500 Committer: Lee moon soo <[email protected]> Committed: Sat Sep 10 10:38:57 2016 -0700 ---------------------------------------------------------------------- flink/pom.xml | 2 +- .../apache/zeppelin/flink/FlinkInterpreter.java | 38 ++++++++++++++------ .../zeppelin/flink/FlinkInterpreterTest.java | 2 +- 3 files changed, 30 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/3a1568ef/flink/pom.xml ---------------------------------------------------------------------- diff --git a/flink/pom.xml b/flink/pom.xml index 628f542..1686d06 100644 --- a/flink/pom.xml +++ b/flink/pom.xml @@ -34,7 +34,7 @@ <description>Zeppelin flink support</description> <properties> - <flink.version>1.0.3</flink.version> + <flink.version>1.1.2</flink.version> <flink.akka.version>2.3.7</flink.akka.version> <scala.macros.version>2.0.1</scala.macros.version> </properties> http://git-wip-us.apache.org/repos/asf/zeppelin/blob/3a1568ef/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java ---------------------------------------------------------------------- diff --git a/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java b/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java index d3229cf..5ce3b85 100644 --- a/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java +++ b/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java @@ -30,6 +30,7 @@ import java.util.*; import org.apache.flink.api.scala.FlinkILoop; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; +import org.apache.flink.runtime.util.EnvironmentInformation; import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder; @@ -42,6 +43,7 @@ import org.slf4j.LoggerFactory; import scala.Console; import scala.None; +import scala.Option; import scala.Some; import scala.collection.JavaConversions; import scala.collection.immutable.Nil; @@ -83,14 +85,25 @@ public class FlinkInterpreter extends Interpreter { startFlinkMiniCluster(); } - flinkIloop = new FlinkILoop(getHost(), getPort(), (BufferedReader) null, new PrintWriter(out)); + flinkIloop = new FlinkILoop(getHost(), + getPort(), + flinkConf, + (BufferedReader) null, + new PrintWriter(out)); + flinkIloop.settings_$eq(createSettings()); flinkIloop.createInterpreter(); - + imain = flinkIloop.intp(); - org.apache.flink.api.scala.ExecutionEnvironment env = flinkIloop.scalaEnv(); - env.getConfig().disableSysoutLogging(); + org.apache.flink.api.scala.ExecutionEnvironment benv = + flinkIloop.scalaBenv(); + //new ExecutionEnvironment(remoteBenv) + org.apache.flink.streaming.api.scala.StreamExecutionEnvironment senv = + flinkIloop.scalaSenv(); + + senv.getConfig().disableSysoutLogging(); + benv.getConfig().disableSysoutLogging(); // prepare bindings imain.interpret("@transient var _binder = new java.util.HashMap[String, Object]()"); @@ -100,13 +113,19 @@ public class FlinkInterpreter extends Interpreter { imain.interpret("import scala.tools.nsc.io._"); imain.interpret("import Properties.userHome"); imain.interpret("import scala.compat.Platform.EOL"); - + imain.interpret("import org.apache.flink.api.scala._"); imain.interpret("import org.apache.flink.api.common.functions._"); - binder.put("env", env); - imain.interpret("val env = _binder.get(\"env\").asInstanceOf[" - + env.getClass().getName() + "]"); + + binder.put("benv", benv); + imain.interpret("val benv = _binder.get(\"benv\").asInstanceOf[" + + benv.getClass().getName() + "]"); + + binder.put("senv", senv); + imain.interpret("val senv = _binder.get(\"senv\").asInstanceOf[" + + senv.getClass().getName() + "]"); + } private boolean localMode() { @@ -313,8 +332,6 @@ public class FlinkInterpreter extends Interpreter { } } - - @Override public void cancel(InterpreterContext context) { } @@ -354,4 +371,5 @@ public class FlinkInterpreter extends Interpreter { static final String toString(Object o) { return (o instanceof String) ? (String) o : ""; } + } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/3a1568ef/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java ---------------------------------------------------------------------- diff --git a/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java b/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java index b6f9db6..1d8f437 100644 --- a/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java +++ b/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java @@ -81,7 +81,7 @@ public class FlinkInterpreterTest { @Test public void testWordCount() { - flink.interpret("val text = env.fromElements(\"To be or not to be\")", context); + flink.interpret("val text = benv.fromElements(\"To be or not to be\")", context); flink.interpret("val counts = text.flatMap { _.toLowerCase.split(\" \") }.map { (_, 1) }.groupBy(0).sum(1)", context); InterpreterResult result = flink.interpret("counts.print()", context); assertEquals(Code.SUCCESS, result.code());
