This is an automated email from the ASF dual-hosted git repository. kurt pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit aa4e1d64ac26ca973ff953c6837648feb220f760 Author: Kurt Young <k...@apache.org> AuthorDate: Thu Apr 23 22:19:44 2020 +0800 [FLINK-17339][examples] Update examples due to default planner changing. --- .../apache/flink/table/examples/java/StreamSQLExample.java | 14 +++++++++----- .../flink/table/examples/java/StreamWindowSQLExample.java | 10 +--------- .../flink/table/examples/scala/StreamSQLExample.scala | 14 +++++++++----- 3 files changed, 19 insertions(+), 19 deletions(-) diff --git a/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/StreamSQLExample.java b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/StreamSQLExample.java index a4dbbf0..bce8054 100644 --- a/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/StreamSQLExample.java +++ b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/StreamSQLExample.java @@ -50,19 +50,23 @@ public class StreamSQLExample { public static void main(String[] args) throws Exception { final ParameterTool params = ParameterTool.fromArgs(args); - String planner = params.has("planner") ? params.get("planner") : "flink"; + String planner = params.has("planner") ? params.get("planner") : "blink"; // set up execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv; if (Objects.equals(planner, "blink")) { // use blink planner in streaming mode EnvironmentSettings settings = EnvironmentSettings.newInstance() - .useBlinkPlanner() - .inStreamingMode() - .build(); + .inStreamingMode() + .useBlinkPlanner() + .build(); tEnv = StreamTableEnvironment.create(env, settings); } else if (Objects.equals(planner, "flink")) { // use flink planner in streaming mode - tEnv = StreamTableEnvironment.create(env); + EnvironmentSettings settings = EnvironmentSettings.newInstance() + .inStreamingMode() + .useOldPlanner() + .build(); + tEnv = StreamTableEnvironment.create(env, settings); } else { System.err.println("The planner is incorrect. Please run 'StreamSQLExample --planner <planner>', " + "where planner (it is either flink or blink, and the default is flink) indicates whether the " + diff --git a/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/StreamWindowSQLExample.java b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/StreamWindowSQLExample.java index f86de17..4620a8a5 100644 --- a/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/StreamWindowSQLExample.java +++ b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/StreamWindowSQLExample.java @@ -19,7 +19,6 @@ package org.apache.flink.table.examples.java; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.types.Row; @@ -41,16 +40,9 @@ import java.io.IOException; public class StreamWindowSQLExample { public static void main(String[] args) throws Exception { - // set up execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - // use blink planner in streaming mode, - // because watermark statement is only available in blink planner. - EnvironmentSettings settings = EnvironmentSettings.newInstance() - .useBlinkPlanner() - .inStreamingMode() - .build(); - StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); // write source data into temporary file and get the absolute path String contents = "1,beer,3,2019-12-12 00:00:01\n" + diff --git a/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala b/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala index 5ddc95d..e95a302 100644 --- a/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala +++ b/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala @@ -43,18 +43,22 @@ object StreamSQLExample { def main(args: Array[String]): Unit = { val params = ParameterTool.fromArgs(args) - val planner = if (params.has("planner")) params.get("planner") else "flink" + val planner = if (params.has("planner")) params.get("planner") else "blink" // set up execution environment val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = if (planner == "blink") { // use blink planner in streaming mode val settings = EnvironmentSettings.newInstance() - .useBlinkPlanner() - .inStreamingMode() - .build() + .useBlinkPlanner() + .inStreamingMode() + .build() StreamTableEnvironment.create(env, settings) } else if (planner == "flink") { // use flink planner in streaming mode - StreamTableEnvironment.create(env) + val settings = EnvironmentSettings.newInstance() + .useOldPlanner() + .inStreamingMode() + .build() + StreamTableEnvironment.create(env, settings) } else { System.err.println("The planner is incorrect. Please run 'StreamSQLExample --planner <planner>', " + "where planner (it is either flink or blink, and the default is flink) indicates whether the " +