godfreyhe commented on a change in pull request #9362: [FLINK-13354] [docs] Add
documentation for how to use blink planner
URL: https://github.com/apache/flink/pull/9362#discussion_r314605474
##########
File path: docs/dev/table/common.md
##########
@@ -1495,6 +1635,324 @@ Stage 4 : Data Source
</div>
</div>
+The following code shows an example and the corresponding output for
multiple-sinks plan using `explain()`:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+
+EnvironmentSettings settings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
+TableEnvironment tEnv = TableEnvironment.create(settings);
+
+String[] fieldNames = { "count", "word" };
+TypeInformation[] fieldTypes = { Types.INT, Types.STRING };
+tEnv.registerTableSource("MySource1", new CsvTableSource("/source/path1",
fieldNames, fieldTypes));
+tEnv.registerTableSource("MySource2", new CsvTableSource("/source/path2",
fieldNames, fieldTypes));
+tEnv.registerTableSink("MySink1", new
CsvTableSink("/sink/path1").configure(fieldNames, fieldTypes));
+tEnv.registerTableSink("MySink2", new
CsvTableSink("/sink/path2").configure(fieldNames, fieldTypes));
+
+Table table1 = tEnv.scan("MySource1").where("LIKE(word, 'F%')");
+table1.insertInto("MySink1");
+
+Table table2 = table1.unionAll(tEnv.scan("MySource2"));
+table2.insertInto("MySink2");
+
+String explanation = tEnv.explain(false);
+System.out.println(explanation);
+
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val settings =
EnvironmentSettings.newInstance.useBlinkPlanner.inStreamingMode.build
+val tEnv = TableEnvironment.create(settings)
+
+val fieldNames = Array("count", "word")
+val fieldTypes = Array[TypeInformation[_]](Types.INT, Types.STRING)
+tEnv.registerTableSource("MySource1", new CsvTableSource("/source/path1",
fieldNames, fieldTypes))
+tEnv.registerTableSource("MySource2", new
CsvTableSource("/source/path2",fieldNames, fieldTypes))
+tEnv.registerTableSink("MySink1", new
CsvTableSink("/sink/path1").configure(fieldNames, fieldTypes))
+tEnv.registerTableSink("MySink2", new
CsvTableSink("/sink/path2").configure(fieldNames, fieldTypes))
+
+val table1 = tEnv.scan("MySource1").where("LIKE(word, 'F%')")
+table1.insertInto("MySink1")
+
+val table2 = table1.unionAll(tEnv.scan("MySource2"))
+table2.insertInto("MySink2")
+
+val explanation = tEnv.explain(false)
+println(explanation)
+
+{% endhighlight %}
+</div>
+
+<div data-lang="python" markdown="1">
Review comment:
java code, scala code and python code is very similar for this example
(explain multiple-sink plan), and their plans are also the same. while the
python plan is different from java & scala plan for above example (explain
given table), because the python implementation of `from_elements` is different
from java and scala: the python collection will be output a local file first,
and then read as byte array in a `map`, finally deserialize the byte array to
object list in a `flatMap` (the code is in `PythonBridgeUtils`) . so the
`Execution Plan` about the source part is different.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services