godfreyhe commented on a change in pull request #9362: [FLINK-13354] [docs] Add 
documentation for how to use blink planner
URL: https://github.com/apache/flink/pull/9362#discussion_r314593013
 
 

 ##########
 File path: docs/dev/table/common.md
 ##########
 @@ -1495,6 +1635,324 @@ Stage 4 : Data Source
 </div>
 </div>
 
+The following code shows an example and the corresponding output for 
multiple-sinks plan using `explain()`:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+
+EnvironmentSettings settings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
+TableEnvironment tEnv = TableEnvironment.create(settings);
+
+String[] fieldNames = { "count", "word" };
+TypeInformation[] fieldTypes = { Types.INT, Types.STRING };
+tEnv.registerTableSource("MySource1", new CsvTableSource("/source/path1", 
fieldNames, fieldTypes));
+tEnv.registerTableSource("MySource2", new CsvTableSource("/source/path2", 
fieldNames, fieldTypes));
+tEnv.registerTableSink("MySink1", new 
CsvTableSink("/sink/path1").configure(fieldNames, fieldTypes));
+tEnv.registerTableSink("MySink2", new 
CsvTableSink("/sink/path2").configure(fieldNames, fieldTypes));
+
+Table table1 = tEnv.scan("MySource1").where("LIKE(word, 'F%')");
+table1.insertInto("MySink1");
+
+Table table2 = table1.unionAll(tEnv.scan("MySource2"));
+table2.insertInto("MySink2");
+
+String explanation = tEnv.explain(false);
+System.out.println(explanation);
+
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val settings = 
EnvironmentSettings.newInstance.useBlinkPlanner.inStreamingMode.build
+val tEnv = TableEnvironment.create(settings)
+
+val fieldNames = Array("count", "word")
+val fieldTypes = Array[TypeInformation[_]](Types.INT, Types.STRING)
+tEnv.registerTableSource("MySource1", new CsvTableSource("/source/path1", 
fieldNames, fieldTypes))
+tEnv.registerTableSource("MySource2", new 
CsvTableSource("/source/path2",fieldNames, fieldTypes))
+tEnv.registerTableSink("MySink1", new 
CsvTableSink("/sink/path1").configure(fieldNames, fieldTypes))
+tEnv.registerTableSink("MySink2", new 
CsvTableSink("/sink/path2").configure(fieldNames, fieldTypes))
+
+val table1 = tEnv.scan("MySource1").where("LIKE(word, 'F%')")
+table1.insertInto("MySink1")
+
+val table2 = table1.unionAll(tEnv.scan("MySource2"))
+table2.insertInto("MySink2")
+
+val explanation = tEnv.explain(false)
+println(explanation)
+
+{% endhighlight %}
+</div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+settings = 
EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
+t_env = TableEnvironment.create(environment_settings=settings)
+
+field_names = ["count", "word"]
+field_types = [DataTypes.INT(), DataTypes.STRING()]
+t_env.register_table_source("MySource1", CsvTableSource("/source/path1", 
field_names, field_types))
+t_env.register_table_source("MySource2", CsvTableSource("/source/path2", 
field_names, field_types))
+t_env.register_table_sink("MySink1", CsvTableSink("/sink/path1", field_names, 
field_types))
+t_env.register_table_sink("MySink2", CsvTableSink("/sink/path2", field_names, 
field_types))
+            
+table1 = t_env.scan("MySource1").where("LIKE(word, 'F%')")
+table1.insert_into("MySink1")
+
+table2 = table1.union_all(t_env.scan("MySource2"))
+table2.insert_into("MySink2")
+
+explanation = t_env.explain()
+print(explanation)
+{% endhighlight %}
+</div>
+</div>
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight text %}
+
+== Abstract Syntax Tree ==
+LogicalSink(name=[MySink1], fields=[count, word])
++- LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MySource1, 
source: [CsvTableSource(read fields: count, word)]]])
+
+LogicalSink(name=[MySink2], fields=[count, word])
++- LogicalUnion(all=[true])
+   :- LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, 
MySource1, source: [CsvTableSource(read fields: count, word)]]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MySource2, 
source: [CsvTableSource(read fields: count, word)]]])
+
+== Optimized Logical Plan ==
+Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')], reuse_id=[1])
++- TableSourceScan(table=[[default_catalog, default_database, MySource1, 
source: [CsvTableSource(read fields: count, word)]]], fields=[count, word])
+
+Sink(name=[MySink1], fields=[count, word])
++- Reused(reference_id=[1])
+
+Sink(name=[MySink2], fields=[count, word])
++- Union(all=[true], union=[count, word])
+   :- Reused(reference_id=[1])
+   +- TableSourceScan(table=[[default_catalog, default_database, MySource2, 
source: [CsvTableSource(read fields: count, word)]]], fields=[count, word])
+
+== Physical Execution Plan ==
+Stage 1 : Data Source
+       content : collect elements with CollectionInputFormat
+
+       Stage 2 : Operator
+               content : CsvTableSource(read fields: count, word)
+               ship_strategy : REBALANCE
+
+               Stage 3 : Operator
+                       content : 
SourceConversion(table:Buffer(default_catalog, default_database, MySource1, 
source: [CsvTableSource(read fields: count, word)]), fields:(count, word))
+                       ship_strategy : FORWARD
+
+                       Stage 4 : Operator
+                               content : Calc(where: (word LIKE 
_UTF-16LE'F%'), select: (count, word))
+                               ship_strategy : FORWARD
+
+                               Stage 5 : Operator
+                                       content : SinkConversionToRow
+                                       ship_strategy : FORWARD
+
+                                       Stage 6 : Operator
+                                               content : Map
+                                               ship_strategy : FORWARD
+
+Stage 8 : Data Source
+       content : collect elements with CollectionInputFormat
+
+       Stage 9 : Operator
+               content : CsvTableSource(read fields: count, word)
+               ship_strategy : REBALANCE
+
+               Stage 10 : Operator
+                       content : 
SourceConversion(table:Buffer(default_catalog, default_database, MySource2, 
source: [CsvTableSource(read fields: count, word)]), fields:(count, word))
+                       ship_strategy : FORWARD
+
+                       Stage 12 : Operator
+                               content : SinkConversionToRow
+                               ship_strategy : FORWARD
+
+                               Stage 13 : Operator
+                                       content : Map
+                                       ship_strategy : FORWARD
+
+                                       Stage 7 : Data Sink
+                                               content : Sink: 
CsvTableSink(count, word)
+                                               ship_strategy : FORWARD
+
+                                               Stage 14 : Data Sink
+                                                       content : Sink: 
CsvTableSink(count, word)
+                                                       ship_strategy : FORWARD
+
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight text %}
+
+== Abstract Syntax Tree ==
+LogicalSink(name=[MySink1], fields=[count, word])
++- LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MySource1, 
source: [CsvTableSource(read fields: count, word)]]])
+
+LogicalSink(name=[MySink2], fields=[count, word])
++- LogicalUnion(all=[true])
+   :- LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, 
MySource1, source: [CsvTableSource(read fields: count, word)]]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MySource2, 
source: [CsvTableSource(read fields: count, word)]]])
+
+== Optimized Logical Plan ==
+Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')], reuse_id=[1])
++- TableSourceScan(table=[[default_catalog, default_database, MySource1, 
source: [CsvTableSource(read fields: count, word)]]], fields=[count, word])
+
+Sink(name=[MySink1], fields=[count, word])
++- Reused(reference_id=[1])
+
+Sink(name=[MySink2], fields=[count, word])
++- Union(all=[true], union=[count, word])
+   :- Reused(reference_id=[1])
+   +- TableSourceScan(table=[[default_catalog, default_database, MySource2, 
source: [CsvTableSource(read fields: count, word)]]], fields=[count, word])
+
+== Physical Execution Plan ==
+Stage 1 : Data Source
+       content : collect elements with CollectionInputFormat
+
+       Stage 2 : Operator
+               content : CsvTableSource(read fields: count, word)
+               ship_strategy : REBALANCE
+
+               Stage 3 : Operator
+                       content : 
SourceConversion(table:Buffer(default_catalog, default_database, MySource1, 
source: [CsvTableSource(read fields: count, word)]), fields:(count, word))
+                       ship_strategy : FORWARD
+
+                       Stage 4 : Operator
+                               content : Calc(where: (word LIKE 
_UTF-16LE'F%'), select: (count, word))
+                               ship_strategy : FORWARD
+
+                               Stage 5 : Operator
+                                       content : SinkConversionToRow
+                                       ship_strategy : FORWARD
+
+                                       Stage 6 : Operator
+                                               content : Map
+                                               ship_strategy : FORWARD
+
+Stage 8 : Data Source
+       content : collect elements with CollectionInputFormat
+
+       Stage 9 : Operator
+               content : CsvTableSource(read fields: count, word)
+               ship_strategy : REBALANCE
+
+               Stage 10 : Operator
+                       content : 
SourceConversion(table:Buffer(default_catalog, default_database, MySource2, 
source: [CsvTableSource(read fields: count, word)]), fields:(count, word))
+                       ship_strategy : FORWARD
+
+                       Stage 12 : Operator
+                               content : SinkConversionToRow
+                               ship_strategy : FORWARD
+
+                               Stage 13 : Operator
+                                       content : Map
+                                       ship_strategy : FORWARD
+
+                                       Stage 7 : Data Sink
+                                               content : Sink: 
CsvTableSink(count, word)
+                                               ship_strategy : FORWARD
+
+                                               Stage 14 : Data Sink
+                                                       content : Sink: 
CsvTableSink(count, word)
+                                                       ship_strategy : FORWARD
+
+{% endhighlight %}
+</div>
+
+<div data-lang="python" markdown="1">
 
 Review comment:
   those plans are the same, and only one will be left.
   
   > I think different planners will output different plan, right?
   
   yes

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to