[
https://issues.apache.org/jira/browse/FLINK-3738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15241334#comment-15241334
]
ASF GitHub Bot commented on FLINK-3738:
---------------------------------------
Github user vasia commented on a diff in the pull request:
https://github.com/apache/flink/pull/1887#discussion_r59735309
--- Diff: docs/apis/batch/libs/table.md ---
@@ -61,58 +59,73 @@ Note that the Table API is currently not part of the
binary distribution. See li
Table API
----------
-The Table API provides methods for running relational operations on
Tables, both in Scala and Java.
-In the following sections you can find examples that show how to create
Tables, how to define and execute relational queries on them,
-and how to retrieve the result of a query as a `DataSet`.
+The Table API provides methods to apply relational operations on DataSets,
both in Scala and Java.
+
+The central concept of the Table API is a `Table` which is represents a
table with relational schema (or relation). Tables can be created from a
`DataSet`, converted into a `DataSet`, or registered in a table catalog using a
`TableEnvironment`. A `Table` is always bound to a specific `TableEnvironment`.
It is not possible to combine Tables of different TableEnvironments.
+
+The following sections show by example how to use the Table API embedded
in the Scala and Java DataSet APIs.
### Scala Table API
-The Table API can be enabled by importing
`org.apache.flink.api.scala.table._`. This enables
-implicit conversions that allow
-converting a DataSet to a Table. This example shows how a DataSet can
-be converted, how relational queries can be specified and how a Table can
be
-converted back to a DataSet:
+The Table API is enabled by importing
`org.apache.flink.api.scala.table._`. This enables
+implicit conversions to convert a DataSet to a Table. The following
example shows:
+
+- how a `DataSet` is converted to a `Table`,
+- how relational queries are specified, and
+- how a `Table` is converted back to a `DataSet`.
{% highlight scala %}
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.table._
case class WC(word: String, count: Int)
+
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+
val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 1))
-val expr = input.toTable
-val result = expr.groupBy('word).select('word, 'count.sum as
'count).toDataSet[WC]
+val expr = input.toTable(tEnv)
+val result = expr
+ .groupBy('word)
+ .select('word, 'count.sum as 'count)
+ .toDataSet[WC]
{% endhighlight %}
The expression DSL uses Scala symbols to refer to field names and code
generation to
transform expressions to efficient runtime code. Please note that the
conversion to and from
-Tables only works when using Scala case classes or Flink POJOs. Please
check out
-the [Type Extraction and Serialization]({{ site.baseurl
}}/internals/types_serialization.html) section
-to learn the requirements for a class to be considered a POJO.
+Tables only works when using Scala case classes or Java POJOs. Please
refer to the [Type Extraction and Serialization]({{ site.baseurl
}}/internals/types_serialization.html) section
+to learn the characteristics of a valid POJO.
-This is another example that shows how you
-can join two Tables:
+Another example shows how to join two Tables:
{% highlight scala %}
case class MyResult(a: String, d: Int)
-val input1 = env.fromElements(...).toTable.as('a, 'b)
-val input2 = env.fromElements(...).toTable.as('c, 'd)
-val joined = input1.join(input2).where("a = c && d > 42").select("a,
d").toDataSet[MyResult]
+val input1 = env.fromElements(...).toTable(tEnv).as('a, 'b)
+val input2 = env.fromElements(...).toTable(tEnv, 'c, 'd)
+
+val joined = input1.join(input2)
+ .where("a = c && d > 42")
+ .select("a, d")
+ .toDataSet[MyResult]
{% endhighlight %}
-Notice, how a DataSet can be converted to a Table by using `as` and
specifying new
-names for the fields. This can also be used to disambiguate fields before
a join operation. Also,
-in this example we see that you can also use Strings to specify relational
expressions.
+Notice, how the field names of a Table can be changed with `as()` or
specified with `toTable()` when converting a DataSet to a Table. In addition,
the example shows how to use Strings to specify relational expressions.
-Please refer to the Scaladoc (and Javadoc) for a full list of supported
operations and a
-description of the expression syntax.
+Please refer to the Scaladoc (and Javadoc) for a full list of supported
operations and a description of the expression syntax.
{% top %}
### Java Table API
-When using Java, Tables can be converted to and from DataSet using
`TableEnvironment`.
-This example is equivalent to the above Scala Example:
+When using Flink's Java DataSet API, DataSet are converted to Tables and
Tables to DataSet using a `TableEnvironment`.
--- End diff --
*DataSets
> Refactor TableEnvironment and TranslationContext
> ------------------------------------------------
>
> Key: FLINK-3738
> URL: https://issues.apache.org/jira/browse/FLINK-3738
> Project: Flink
> Issue Type: Task
> Components: Table API
> Reporter: Fabian Hueske
> Assignee: Fabian Hueske
>
> Currently the TableAPI uses a static object called {{TranslationContext}}
> which holds the Calcite table catalog and a Calcite planner instance.
> Whenever a {{DataSet}} or {{DataStream}} is converted into a {{Table}} or
> registered as a {{Table}} on the {{TableEnvironment}}, a new entry is added
> to the catalog. The first time a {{Table}} is added, a planner instance is
> created. The planner is used to optimize the query (defined by one or more
> Table API operations and/or one ore more SQL queries) when a {{Table}} is
> converted into a {{DataSet}} or {{DataStream}}. Since a planner may only be
> used to optimize a single program, the choice of a single static object is
> problematic.
> I propose to refactor the {{TableEnvironment}} to take over the
> responsibility of holding the catalog and the planner instance.
> - A {{TableEnvironment}} holds a catalog of registered tables and a single
> planner instance.
> - A {{TableEnvironment}} will only allow to translate a single {{Table}}
> (possibly composed of several Table API operations and SQL queries) into a
> {{DataSet}} or {{DataStream}}.
> - A {{TableEnvironment}} is bound to an {{ExecutionEnvironment}} or a
> {{StreamExecutionEnvironment}}. This is necessary to create data source or
> source functions to read external tables or streams.
> - {{DataSet}} and {{DataStream}} need a reference to a {{TableEnvironment}}
> to be converted into a {{Table}}. This will prohibit implicit casts as
> currently supported for the DataSet Scala API.
> - A {{Table}} needs a reference to the {{TableEnvironment}} it is bound to.
> Only tables from the same {{TableEnvironment}} can be processed together.
> - The {{TranslationContext}} will be completely removed.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)