godfreyhe commented on a change in pull request #9362: [FLINK-13354] [docs] Add 
documentation for how to use blink planner
URL: https://github.com/apache/flink/pull/9362#discussion_r314605474
 
 

 ##########
 File path: docs/dev/table/common.md
 ##########
 @@ -1495,6 +1635,324 @@ Stage 4 : Data Source
 </div>
 </div>
 
+The following code shows an example and the corresponding output for 
multiple-sinks plan using `explain()`:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+
+EnvironmentSettings settings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
+TableEnvironment tEnv = TableEnvironment.create(settings);
+
+String[] fieldNames = { "count", "word" };
+TypeInformation[] fieldTypes = { Types.INT, Types.STRING };
+tEnv.registerTableSource("MySource1", new CsvTableSource("/source/path1", 
fieldNames, fieldTypes));
+tEnv.registerTableSource("MySource2", new CsvTableSource("/source/path2", 
fieldNames, fieldTypes));
+tEnv.registerTableSink("MySink1", new 
CsvTableSink("/sink/path1").configure(fieldNames, fieldTypes));
+tEnv.registerTableSink("MySink2", new 
CsvTableSink("/sink/path2").configure(fieldNames, fieldTypes));
+
+Table table1 = tEnv.scan("MySource1").where("LIKE(word, 'F%')");
+table1.insertInto("MySink1");
+
+Table table2 = table1.unionAll(tEnv.scan("MySource2"));
+table2.insertInto("MySink2");
+
+String explanation = tEnv.explain(false);
+System.out.println(explanation);
+
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val settings = 
EnvironmentSettings.newInstance.useBlinkPlanner.inStreamingMode.build
+val tEnv = TableEnvironment.create(settings)
+
+val fieldNames = Array("count", "word")
+val fieldTypes = Array[TypeInformation[_]](Types.INT, Types.STRING)
+tEnv.registerTableSource("MySource1", new CsvTableSource("/source/path1", 
fieldNames, fieldTypes))
+tEnv.registerTableSource("MySource2", new 
CsvTableSource("/source/path2",fieldNames, fieldTypes))
+tEnv.registerTableSink("MySink1", new 
CsvTableSink("/sink/path1").configure(fieldNames, fieldTypes))
+tEnv.registerTableSink("MySink2", new 
CsvTableSink("/sink/path2").configure(fieldNames, fieldTypes))
+
+val table1 = tEnv.scan("MySource1").where("LIKE(word, 'F%')")
+table1.insertInto("MySink1")
+
+val table2 = table1.unionAll(tEnv.scan("MySource2"))
+table2.insertInto("MySink2")
+
+val explanation = tEnv.explain(false)
+println(explanation)
+
+{% endhighlight %}
+</div>
+
+<div data-lang="python" markdown="1">
 
 Review comment:
   java code, scala code and python code is very similar for this example 
(explain multiple-sink plan), and their plans are also the same.  while the 
python plan is different from java & scala plan for above example (explain 
given table), because the python implementation of `from_elements` is different 
from java and scala: the python collection will be output a local file first, 
and then read as byte array in a `map`, finally deserialize the byte array to 
object list in a `flatMap` (the code is in `PythonBridgeUtils`) . so the 
`Execution Plan` about the source part is different.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to