[ 
https://issues.apache.org/jira/browse/FLINK-3738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15241334#comment-15241334
 ] 

ASF GitHub Bot commented on FLINK-3738:
---------------------------------------

Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1887#discussion_r59735309
  
    --- Diff: docs/apis/batch/libs/table.md ---
    @@ -61,58 +59,73 @@ Note that the Table API is currently not part of the 
binary distribution. See li
     
     Table API
     ----------
    -The Table API provides methods for running relational operations on 
Tables, both in Scala and Java.
    -In the following sections you can find examples that show how to create 
Tables, how to define and execute relational queries on them,
    -and how to retrieve the result of a query as a `DataSet`.
    +The Table API provides methods to apply relational operations on DataSets, 
both in Scala and Java.
    +
    +The central concept of the Table API is a `Table` which is represents a 
table with relational schema (or relation). Tables can be created from a 
`DataSet`, converted into a `DataSet`, or registered in a table catalog using a 
`TableEnvironment`. A `Table` is always bound to a specific `TableEnvironment`. 
It is not possible to combine Tables of different TableEnvironments. 
    +
    +The following sections show by example how to use the Table API embedded 
in  the Scala and Java DataSet APIs.
     
     ### Scala Table API
     
    -The Table API can be enabled by importing 
`org.apache.flink.api.scala.table._`. This enables
    -implicit conversions that allow
    -converting a DataSet to a Table. This example shows how a DataSet can
    -be converted, how relational queries can be specified and how a Table can 
be
    -converted back to a DataSet:
    +The Table API is enabled by importing 
`org.apache.flink.api.scala.table._`. This enables
    +implicit conversions to convert a DataSet to a Table. The following 
example shows:
    +
    +- how a `DataSet` is converted to a `Table`,
    +- how relational queries are specified, and 
    +- how a `Table` is converted back to a `DataSet`.
     
     {% highlight scala %}
     import org.apache.flink.api.scala._
     import org.apache.flink.api.scala.table._
     
     case class WC(word: String, count: Int)
    +
    +val env = ExecutionEnvironment.getExecutionEnvironment
    +val tEnv = TableEnvironment.getTableEnvironment(env)
    +
     val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 1))
    -val expr = input.toTable
    -val result = expr.groupBy('word).select('word, 'count.sum as 
'count).toDataSet[WC]
    +val expr = input.toTable(tEnv)
    +val result = expr
    +               .groupBy('word)
    +               .select('word, 'count.sum as 'count)
    +               .toDataSet[WC]
     {% endhighlight %}
     
     The expression DSL uses Scala symbols to refer to field names and code 
generation to
     transform expressions to efficient runtime code. Please note that the 
conversion to and from
    -Tables only works when using Scala case classes or Flink POJOs. Please 
check out
    -the [Type Extraction and Serialization]({{ site.baseurl 
}}/internals/types_serialization.html) section
    -to learn the requirements for a class to be considered a POJO.
    +Tables only works when using Scala case classes or Java POJOs. Please 
refer to the [Type Extraction and Serialization]({{ site.baseurl 
}}/internals/types_serialization.html) section
    +to learn the characteristics of a valid POJO.
     
    -This is another example that shows how you
    -can join two Tables:
    +Another example shows how to join two Tables:
     
     {% highlight scala %}
     case class MyResult(a: String, d: Int)
     
    -val input1 = env.fromElements(...).toTable.as('a, 'b)
    -val input2 = env.fromElements(...).toTable.as('c, 'd)
    -val joined = input1.join(input2).where("a = c && d > 42").select("a, 
d").toDataSet[MyResult]
    +val input1 = env.fromElements(...).toTable(tEnv).as('a, 'b)
    +val input2 = env.fromElements(...).toTable(tEnv, 'c, 'd)
    +
    +val joined = input1.join(input2)
    +               .where("a = c && d > 42")
    +               .select("a, d")
    +               .toDataSet[MyResult]
     {% endhighlight %}
     
    -Notice, how a DataSet can be converted to a Table by using `as` and 
specifying new
    -names for the fields. This can also be used to disambiguate fields before 
a join operation. Also,
    -in this example we see that you can also use Strings to specify relational 
expressions.
    +Notice, how the field names of a Table can be changed with `as()` or 
specified with `toTable()` when converting a DataSet to a Table. In addition, 
the example shows how to use Strings to specify relational expressions.
     
    -Please refer to the Scaladoc (and Javadoc) for a full list of supported 
operations and a
    -description of the expression syntax.
    +Please refer to the Scaladoc (and Javadoc) for a full list of supported 
operations and a description of the expression syntax.
     
     {% top %}
     
     ### Java Table API
     
    -When using Java, Tables can be converted to and from DataSet using 
`TableEnvironment`.
    -This example is equivalent to the above Scala Example:
    +When using Flink's Java DataSet API, DataSet are converted to Tables and 
Tables to DataSet using a `TableEnvironment`.
    --- End diff --
    
    *DataSets


> Refactor TableEnvironment and TranslationContext
> ------------------------------------------------
>
>                 Key: FLINK-3738
>                 URL: https://issues.apache.org/jira/browse/FLINK-3738
>             Project: Flink
>          Issue Type: Task
>          Components: Table API
>            Reporter: Fabian Hueske
>            Assignee: Fabian Hueske
>
> Currently the TableAPI uses a static object called {{TranslationContext}} 
> which holds the Calcite table catalog and a Calcite planner instance. 
> Whenever a {{DataSet}} or {{DataStream}} is converted into a {{Table}} or 
> registered as a {{Table}} on the {{TableEnvironment}}, a new entry is added 
> to the catalog. The first time a {{Table}} is added, a planner instance is 
> created. The planner is used to optimize the query (defined by one or more 
> Table API operations and/or one ore more SQL queries) when a {{Table}} is 
> converted into a {{DataSet}} or {{DataStream}}. Since a planner may only be 
> used to optimize a single program, the choice of a single static object is 
> problematic.
> I propose to refactor the {{TableEnvironment}} to take over the 
> responsibility of holding the catalog and the planner instance. 
> - A {{TableEnvironment}} holds a catalog of registered tables and a single 
> planner instance.
> - A {{TableEnvironment}} will only allow to translate a single {{Table}} 
> (possibly composed of several Table API operations and SQL queries) into a 
> {{DataSet}} or {{DataStream}}. 
> - A {{TableEnvironment}} is bound to an {{ExecutionEnvironment}} or a 
> {{StreamExecutionEnvironment}}. This is necessary to create data source or 
> source functions to read external tables or streams.
> - {{DataSet}} and {{DataStream}} need a reference to a {{TableEnvironment}} 
> to be converted into a {{Table}}. This will prohibit implicit casts as 
> currently supported for the DataSet Scala API.
> - A {{Table}} needs a reference to the {{TableEnvironment}} it is bound to. 
> Only tables from the same {{TableEnvironment}} can be processed together.
> - The {{TranslationContext}} will be completely removed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to