[ 
https://issues.apache.org/jira/browse/FLINK-6746?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16032128#comment-16032128
 ] 

ASF GitHub Bot commented on FLINK-6746:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4012#discussion_r119490480
  
    --- Diff: docs/dev/table/common.md ---
    @@ -98,374 +89,767 @@ env.execute("Your Query")
     </div>
     </div>
     
    +**Note:** Table API and SQL queries can be easily integrated with and 
embedded into DataStream or DataSet programs. Have a look a the [Integration 
with DataStream and DataSet API](#integration-with-datastream-and-dataset-api) 
section to learn how DataStreams and DataSets can be converted into Tables and 
vice versa.
    +
     {% top %}
     
     Create a TableEnvironment
     -------------------------
     
    -A `Table` is always bound to a specific `TableEnvironment`. It is not 
possible to combine Tables of different TableEnvironments.
    +The `TableEnvironment` is a central concept of the Table API and SQL 
integration. It is responsible for:
    +* Registering a `Table` in the internal catalog
    +* Registering an external catalog 
    +* Executing SQL queries
    +* Registering a user-defined (scalar, table, or aggregation) function
    +* Converting a `DataStream` or `DataSet` into a `Table`
    +* Holding a reference to an `ExecutionEnvironment` or 
`StreamExecutionEnvironment`
    +
    +A `Table` is always bound to a specific `TableEnvironment`. It is not 
process tables of different TableEnvironments in the same query, e.g., to join 
or union them.
    +
    +A `TableEnvironment` is created by calling the static 
`TableEnvironment.getTableEnvironment()` method with a 
`StreamExecutionEnvironment` or an `ExecutionEnvironment` and an optional 
`TableConfig`. The `TableConfig` can be used to configure the 
`TableEnvironment` or to customize the query optimization and translation 
process (see [Query Optimization](#query-optimization)).
     
    -**TODO: Extend**
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +// ***************
    +// STREAMING QUERY
    +// ***************
    +StreamExecutionEnvironment sEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
    +// Create a TableEnvironment for streaming queries
    +StreamTableEnvironment sTableEnv = 
TableEnvironment.getTableEnvironment(sEnv);
    +
    +// ***********
    +// BATCH QUERY
    +// ***********
    +ExecutionEnvironment bEnv = ExecutionEnvironment.getExecutionEnvironment();
    +// Create a TableEnvironment for batch queries
    +BatchTableEnvironment bTableEnv = 
TableEnvironment.getTableEnvironment(bEnv);
    +{% endhighlight %}
    +</div>
    +
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +// ***************
    +// STREAMING QUERY
    +// ***************
    +val sEnv = StreamExecutionEnvironment.getExecutionEnvironment
    +// Create a TableEnvironment for streaming queries
    +val sTableEnv = TableEnvironment.getTableEnvironment(sEnv)
    +
    +// ***********
    +// BATCH QUERY
    +// ***********
    +val bEnv = ExecutionEnvironment.getExecutionEnvironment
    +// Create a TableEnvironment for batch queries
    +val bTableEnv = TableEnvironment.getTableEnvironment(bEnv)
    +{% endhighlight %}
    +</div>
    +</div>
     
     {% top %}
     
     Register a Table in the Catalog
     -------------------------------
     
    -`TableEnvironment`s have an internal table catalog to which tables can be 
registered with a unique name. After registration, a table can be accessed from 
the `TableEnvironment` by its name.
    +A `TableEnvironment` has an internal catalog to register tables by name. 
Table API or SQL queries can access tables, which are registered in the catalog 
by referencing them with their name. 
     
    -*Note: `DataSet`s or `DataStream`s can be directly converted into `Table`s 
without registering them in the `TableEnvironment`. See [Create a Table from a 
DataStream or DataSet](#tbd) for details.
    +A `TableEnvironment` allows to register a table from various sources:
    +* an existing `Table` object, usually the result of a Table API or SQL 
query.
    +* a `TableSource`, which accesses external data, such as a file, database, 
or messaging system. 
    +* a `DataStream` or `DataSet` from a DataStream or DataSet program.
    +
    +Registering a `DataStream` or `DataSet` as a table is discussed in the 
[Integration with DataStream and DataSet 
API](#integration-with-datastream-and-dataset-api) section.
     
     ### Register a Table
     
    -A `Table` that originates from a Table API operation or a SQL query is 
registered in a `TableEnvironment` as follows:
    +A `Table` is registered in a `TableEnvironment` as follows:
     
     <div class="codetabs" markdown="1">
     <div data-lang="java" markdown="1">
     {% highlight java %}
    -// works for StreamExecutionEnvironment identically
    -ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    -BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
    +// get a StreamTableEnvironment, works for BatchTableEnvironment 
equivalently
    +StreamTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);
     
    -// convert a DataSet into a Table
    -Table custT = tableEnv
    -  .toTable(custDs, "name, zipcode")
    -  .where("zipcode = '12345'")
    -  .select("name");
    +// Table is the result of a simple projection query 
    +Table projX = tableEnv.scan("X").project(...);
     
    -// register the Table custT as table "custNames"
    -tableEnv.registerTable("custNames", custT);
    +// register the Table projX as table "projectedX"
    +tableEnv.registerTable("projectedX", projX);
     {% endhighlight %}
     </div>
     
     <div data-lang="scala" markdown="1">
     {% highlight scala %}
    -// works for StreamExecutionEnvironment identically
    -val env = ExecutionEnvironment.getExecutionEnvironment
    +// get a TableEnvironment
     val tableEnv = TableEnvironment.getTableEnvironment(env)
     
    -// convert a DataSet into a Table
    -val custT = custDs
    -  .toTable(tableEnv, 'name, 'zipcode)
    -  .where('zipcode === "12345")
    -  .select('name)
    +// Table is the result of a simple projection query 
    +val projX: Table = tableEnv.scan("X").project(...)
     
    -// register the Table custT as table "custNames"
    -tableEnv.registerTable("custNames", custT)
    +// register the Table projX as table "projectedX"
    +tableEnv.registerTable("projectedX", projX)
     {% endhighlight %}
     </div>
     </div>
     
    -A registered `Table` that originates from a Table API operation or SQL 
query is treated similarly as a view as known from relational DBMS, i.e., it 
can be inlined when optimizing the query.
    +**Note:** A registered `Table` is treated similarly to a `VIEW` as known 
from relational database systems, i.e., the query that defines the `Table` is 
not optimized but will be inlined when another query references the registered 
`Table`. If multiple queries reference the same registered `Table`, it will be 
inlined for each referencing query and executed multiple times, i.e., the 
result of the registered `Table` will *not* be shared.
     
     {% top %}
     
    -### Register a DataSet
    +### Register a TableSource
    +
    +A `TableSource` provides access to external data which is stored in a 
storage systems such as a database (MySQL, HBase, ...), a file with specific 
encoding (CSV, Apache \[Parquet, Avro, ORC\], ...), or a messaging system 
(Apache Kafka, RabbitMQ, ...). 
    +
    +Flink aims to provide TableSources for common data formats and storage 
systems. Please have a look at the [Table Sources and Sinks]({{ site.baseurl 
}}/dev/table/sourceSinks.html) page for a list of supported TableSources and 
instructions for how to build a custom `TableSource`.
     
    -A `DataSet` is registered as a `Table` in a `BatchTableEnvironment` as 
follows:
    +A `TableSource` is registered in a `TableEnvironment` as follows:
     
     <div class="codetabs" markdown="1">
     <div data-lang="java" markdown="1">
     {% highlight java %}
    -ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    -BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
    +// get a StreamTableEnvironment, works for BatchTableEnvironment 
equivalently
    +StreamTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);
     
    -// register the DataSet cust as table "Customers" with fields derived from 
the dataset
    -tableEnv.registerDataSet("Customers", cust);
    +// create a TableSource
    +TableSource csvSource = new CsvTableSource("/path/to/file", ...);
     
    -// register the DataSet ord as table "Orders" with fields user, product, 
and amount
    -tableEnv.registerDataSet("Orders", ord, "user, product, amount");
    +// register the TableSource as table "CsvTable"
    +tableEnv.registerTableSource("CsvTable", csvSource);
     {% endhighlight %}
     </div>
     
     <div data-lang="scala" markdown="1">
     {% highlight scala %}
    -val env = ExecutionEnvironment.getExecutionEnvironment
    +// get a TableEnvironment
     val tableEnv = TableEnvironment.getTableEnvironment(env)
     
    -// register the DataSet cust as table "Customers" with fields derived from 
the dataset
    -tableEnv.registerDataSet("Customers", cust)
    +// create a TableSource
    +val csvSource: TableSource = new CsvTableSource("/path/to/file", ...)
     
    -// register the DataSet ord as table "Orders" with fields user, product, 
and amount
    -tableEnv.registerDataSet("Orders", ord, 'user, 'product, 'amount)
    +// register the TableSource as table "CsvTable"
    +tableEnv.registerTableSource("CsvTable", csvSource)
     {% endhighlight %}
     </div>
     </div>
     
    -*Note: The name of a `DataSet` `Table` must not match the 
`^_DataSetTable_[0-9]+` pattern which is reserved for internal use only.*
    -
     {% top %}
     
    -### Register a DataStream
    +Register an External Catalog
    +----------------------------
    +
    +An external catalog can provide information about external databases and 
tables such as their name, schema, statistics, and information for how to 
access data stored in an external database, table, or file.
     
    -A `DataStream` is registered as a `Table` in a `StreamTableEnvironment` as 
follows:
    +An external catalog can be created by implementing the `ExternalCatalog` 
interface and is registered in a `TableEnvironment` as follows:
     
     <div class="codetabs" markdown="1">
     <div data-lang="java" markdown="1">
     {% highlight java %}
    -StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
    +// get a StreamTableEnvironment, works for BatchTableEnvironment 
equivalently
     StreamTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);
     
    -// register the DataStream cust as table "Customers" with fields derived 
from the datastream
    -tableEnv.registerDataStream("Customers", cust);
    +// create an external catalog
    +ExternalCatalog catalog = new InMemoryExternalCatalog();
     
    -// register the DataStream ord as table "Orders" with fields user, 
product, and amount
    -tableEnv.registerDataStream("Orders", ord, "user, product, amount");
    +// register the ExternalCatalog catalog
    +tableEnv.registerExternalCatalog("InMemCatalog", catalog);
     {% endhighlight %}
     </div>
     
     <div data-lang="scala" markdown="1">
     {% highlight scala %}
    -val env = StreamExecutionEnvironment.getExecutionEnvironment
    +// get a TableEnvironment
     val tableEnv = TableEnvironment.getTableEnvironment(env)
     
    -// register the DataStream cust as table "Customers" with fields derived 
from the datastream
    -tableEnv.registerDataStream("Customers", cust)
    +// create an external catalog
    +val catalog: ExternalCatalog = new InMemoryExternalCatalog
     
    -// register the DataStream ord as table "Orders" with fields user, 
product, and amount
    -tableEnv.registerDataStream("Orders", ord, 'user, 'product, 'amount)
    +// register the ExternalCatalog catalog
    +tableEnv.registerExternalCatalog("InMemCatalog", catalog)
     {% endhighlight %}
     </div>
     </div>
     
    -*Note: The name of a `DataStream` `Table` must not match the 
`^_DataStreamTable_[0-9]+` pattern which is reserved for internal use only.*
    +Once registered in a `TableEnvironment`, all tables defined in a 
`ExternalCatalog` can be accessed from Table API or SQL queries by specifying 
their full path, such as for example `catalog.database.table`.
    +
    +Currently, Flink provides an `InMemoryExternalCatalog` for demo and 
testing purposes. However, the `ExternalCatalog` interface can also be used to 
connect catalogs like HCatalog or Metastore to the Table API.
     
     {% top %}
     
    -### Register a TableSource
    +Query a Table 
    +-------------
    +
    +### Table API
    +
    +The Table API is a language-integrated query API for Scala and Java. In 
contrast to SQL, queries are not specified as Strings but are composed 
step-by-step in the host language. 
    +
    +The API is based on the `Table` class which represents a table (streaming 
or batch) and offers methods to apply relational operations. These methods 
return a new `Table` object, which represents the result of applying the 
relational operation on the input `Table`. Some relational operations are 
composed of multiple method calls such as `table.groupBy(...).select()`, where 
`groupBy(...)` specifies a grouping of `table` and `select(...)` the projection 
on the grouping of `table`.
     
    -TableSources provided access to data stored in various storage systems 
such as databases (MySQL, HBase, ...), file formats (CSV, Apache Parquet, Avro, 
ORC, ...), or messaging systems (Apache Kafka, RabbitMQ, ...). Flink provides a 
TableSources for common data formats and storage systems. Please have a look at 
the [Table Sources and Sinks page]({{ site.baseurl 
}}/dev/table/sourceSinks.html) for a list of provided TableSources and 
documentation for how to built your own.
    +The [Table API]({{ site.baseurl }}/dev/table/tableapi.html) document 
describes all Table API operations that are supported on streaming and batch 
tables.
     
    -An external table is registered in a `TableEnvironment` using a 
`TableSource` as follows:
    +The following example shows a simple Table API aggregation query:
     
     <div class="codetabs" markdown="1">
     <div data-lang="java" markdown="1">
     {% highlight java %}
    -// works for StreamExecutionEnvironment identically
    -ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    -BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
    +// get a StreamTableEnvironment, works for BatchTableEnvironment 
equivalently
    +StreamTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);
     
    -TableSource custTS = new CsvTableSource("/path/to/file", ...);
    +// register Orders table
     
    -// register a `TableSource` as external table "Customers"
    -tableEnv.registerTableSource("Customers", custTS);
    +// scan registered Orders table
    +Table orders = tableEnv.scan("Orders");
    +// compute revenue for all customers from France
    +Table revenue = orders
    +  .filter("cCountry === 'FRANCE'")
    +  .groupBy("cID, cName")
    +  .select("cID, cName, revenue.sum AS revSum");
    +
    +// emit or convert Table
    +// execute query
     {% endhighlight %}
     </div>
     
     <div data-lang="scala" markdown="1">
     {% highlight scala %}
    -// works for StreamExecutionEnvironment identically
    -val env = ExecutionEnvironment.getExecutionEnvironment
    +// get a TableEnvironment
     val tableEnv = TableEnvironment.getTableEnvironment(env)
     
    -val custTS: TableSource = new CsvTableSource("/path/to/file", ...)
    +// register Orders table
     
    -// register a `TableSource` as external table "Customers"
    -tableEnv.registerTableSource("Customers", custTS)
    +// scan registered Orders table
    +Table orders = tableEnv.scan("Orders")
    +// compute revenue for all customers from France
    +Table revenue = orders
    +  .filter('cCountry === "FRANCE")
    +  .groupBy('cID, 'cName)
    +  .select('cID, 'cName, 'revenue.sum AS 'revSum)
     
    +// emit or convert Table
    +// execute query
     {% endhighlight %}
    +
    +**Note:** The Scala Table API uses Scala Symbols, which start with a 
single tick (`'`) to reference the attributes of a `Table`.
     </div>
     </div>
     
    -A `TableSource` can provide access to data stored in various storage 
systems such as databases (MySQL, HBase, ...), file formats (CSV, Apache 
Parquet, Avro, ORC, ...), or messaging systems (Apache Kafka, RabbitMQ, ...).
    -
     {% top %}
     
    -Register an External Catalog
    -----------------------------
    +### SQL
     
    -An external catalog is defined by the `ExternalCatalog` interface and 
provides information about databases and tables such as their name, schema, 
statistics, and access information. An `ExternalCatalog` is registered in a 
`TableEnvironment` as follows: 
    +Flink's SQL integration is based on [Apache 
Calcite](https://calcite.apache.org) which implements the SQL standard. SQL 
queries are specified as regular Strings.
    +
    +The [SQL]({{ site.baseurl }}/dev/table/sql.html) document describes 
Flink's SQL support for streaming and batch tables.
    +
    +The following example shows how to specify a query and return the result 
as Table.
     
     <div class="codetabs" markdown="1">
     <div data-lang="java" markdown="1">
     {% highlight java %}
    -// works for StreamExecutionEnvironment identically
    -ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    -BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
    +// get a StreamTableEnvironment, works for BatchTableEnvironment 
equivalently
    +StreamTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);
    +
    +// register Orders table
     
    -ExternalCatalog customerCatalog = new InMemoryExternalCatalog();
    +// compute revenue for all customers from France
    +Table revenue = tableEnv.sql(
    +    "SELECT cID, cName, SUM(revenue) AS revSum " +
    +    "FROM Orders " +
    +    "WHERE cCountry = 'FRANCE' " +
    +    "GROUP BY cID, cName"
    +  );
     
    -// register the ExternalCatalog customerCatalog
    -tableEnv.registerExternalCatalog("Customers", customerCatalog);
    +// emit or convert Table
    +// execute query
     {% endhighlight %}
     </div>
     
     <div data-lang="scala" markdown="1">
     {% highlight scala %}
    -// works for StreamExecutionEnvironment identically
    -val env = ExecutionEnvironment.getExecutionEnvironment
    +// get a TableEnvironment
     val tableEnv = TableEnvironment.getTableEnvironment(env)
     
    -val customerCatalog: ExternalCatalog = new InMemoryExternalCatalog
    +// register Orders table
     
    -// register the ExternalCatalog customerCatalog
    -tableEnv.registerExternalCatalog("Customers", customerCatalog)
    +// compute revenue for all customers from France
    +Table revenue = tableEnv.sql(""" 
    +  |SELECT cID, cName, SUM(revenue) AS revSum
    +  |FROM Orders
    +  |WHERE cCountry = 'FRANCE'
    +  |GROUP BY cID, cName
    +  """.stripMargin)
     
    +// emit or convert Table
    +// execute query
     {% endhighlight %}
    +
     </div>
     </div>
     
    -Once registered in a `TableEnvironment`, all tables defined in a 
`ExternalCatalog` can be accessed from Table API or SQL queries by specifying 
their full path (`catalog`.`database`.`table`).
    +{% top %}
    +
    +### Mixing Table API and SQL
     
    -Currently, Flink provides an `InMemoryExternalCatalog` for demo and 
testing purposes. However, the `ExternalCatalog` interface can also be used to 
connect catalogs like HCatalog or Metastore to the Table API.
    +Table API and SQL queries can be easily mixed because both return `Table` 
objects:
    +
    +* A Table API query can be defined on the `Table` object returned by a SQL 
query.
    +* A SQL query can be defined on the result of a Table API query by 
[registering the resulting Table](#register-a-table) in the `TableEnvironment` 
and referencing it in the `FROM` clause of the SQL query.
     
     {% top %}
     
    -Create a Table from a DataStream or DataSet
    --------------------------------------------
    +Emit a Table 
    +------------
     
    -Besides registering a Table in a catalog, it is also possible to directly 
create a `Table` from a `DataStream` or `DataSet`. 
    +In order to emit a `Table`, it can be written to a `TableSink`. A 
`TableSink` is a generic interface to support a wide variety of file formats 
(e.g. CSV, Apache Parquet, Apache Avro), storage systems (e.g., JDBC, Apache 
HBase, Apache Cassandra, Elasticsearch), or messaging systems (e.g., Apache 
Kafka, RabbitMQ). 
     
    -### Create a Table from a DataStream
    +A batch `Table` can only be written to a `BatchTableSink`, a streaming 
table requires either an `AppendStreamTableSink`, a `RetractStreamTableSink`, 
or an `UpsertStreamTableSink`. 
     
    -**TODO**
    +Please see the documentation about [Table Sources & Sinks]({{ site.baseurl 
}}/dev/table/sourceSinks.html) for details about available sinks and 
instructions for how to implement a custom `TableSink`.
     
    -{% top %}
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +// get a StreamTableEnvironment, works for BatchTableEnvironment 
equivalently
    +StreamTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);
    +
    +// compute a result Table using Table API operators and/or SQL queries
    +Table result = ...
    +
    +// create a TableSink
    +TableSink sink = new CsvTableSink("/path/to/file", fieldDelim = "|");
    +
    +// write the result Table to the TableSink
    +result.writeToSink(sink);
    +
    +// execute the program
    +{% endhighlight %}
    +</div>
    +
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +// get a TableEnvironment
    +val tableEnv = TableEnvironment.getTableEnvironment(env)
     
    -### Create a Table from a DataSet
    +// compute a result Table using Table API operators and/or SQL queries
    +val result: Table = ...
     
    -**TODO**
    +// create a TableSink
    +val sink: TableSink = new CsvTableSink("/path/to/file", fieldDelim = "|")
     
    -### Scala Implicit Conversion
    +// write the result Table to the TableSink
    +result.writeToSink(sink)
     
    -If you use the Scala API, A `DataSet` or `DataStream` can be implicitly 
converted into a `Table`.
    +// execute the program
    +{% endhighlight %}
    +</div>
    +</div>
     
     {% top %}
     
    -Query a Table 
    --------------
     
    -### Table API
    +Translate and Execute a Query
    +-----------------------------
    +
    +Table API and SQL queries are translated into [DataStream]({{ site.baseurl 
}}/dev/datastream_api.html) or [DataSet]({{ site.baseurl }}/dev/batch) programs 
depending on whether their input is a streaming or batch input. A query is 
internally represented as a logical query plan and is translated in two phases: 
     
    -**TODO**
    +1. optimization of the logical plan, 
    +2. translation into a DataStream or DataSet program.
    +
    +A Table API or SQL query is translated when:
    +
    +* the `Table` is emitted to a `TableSink`, i.e., when 
`Table.writeToSink()` is called.
    +* the `Table` is converted into a `DataStream` or `DataSet` (see 
[Integration with DataStream and DataSet 
API](#integration-with-dataStream-and-dataSet-api)).
    +
    +Once translated, a Table API or SQL query is handled like a regular 
DataStream or DataSet program and is executed when 
`StreamExecutionEnvironment.execute()` or `ExecutionEnvironment.execute()` is 
called.
     
     {% top %}
     
    -### SQL
    +Integration with DataStream and DataSet API
    +-------------------------------------------
    +
    +Table API and SQL queries can be easily integrated with and embedded into 
[DataStream]({{ site.baseurl }}/dev/datastream_api.html) and [DataSet]({{ 
site.baseurl }}/dev/batch) programs. For instance it is possible to query an 
external table (for example from a RDBMS), do some pre-processing, such as 
filtering, projecting, aggregating, or joining with meta data, before the data 
is processed with a custom DataStream or DataSet program (including any of the 
libraries on top of these APIs, such as CEP or Gelly). Inversely, a Table API 
or SQL query can also be applied on the result of a DataStream or DataSet 
program.
    +
    +This interaction can be achieved by converting a `DataStream` or `DataSet` 
into a `Table` and vice versa. In this section, we describe how these 
conversions are done.
    +
    +### Implicit Conversion for Scala
    +
    +The Scala Table API features implicit conversions for the `DataSet`, 
`DataStream`, and `Table` classes. These conversions are enabled by importing 
the package `org.apache.flink.table.api.scala._`
    +
    +### Register a DataStream or DataSet as Table
    +
    +A `DataStream` or `DataSet` can be registered in a `TableEnvironment` as a 
Table. The schema of the resulting table depends on the data type of the 
registered `DataStream` or `DataSet`. Please check the section about [mapping 
of data types to table schema](#mapping-of-data-types-to-table-schema) for 
details.
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +// get StreamTableEnvironment. 
    +// registration of a DataSet in a BatchTableEnvironment is equivalent
    +StreamTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);
    +
    +DataStream<Tuple2<Long, String>> stream = ...
    +
    +// register the DataStream as Table "myTable" with fields "f0", "f1"
    +tableEnv.registerDataStream("myTable", stream);
    +
    +// register the DataStream as table "myTable2" with fields "myLong", 
"myString"
    +tableEnv.registerDataStream("myTable2", stream, "myLong, myString");
    +{% endhighlight %}
    +</div>
     
    -**TODO**
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +// get TableEnvironment. 
    +// registration of a DataSet is equivalent
    +val tableEnv = TableEnvironment.getTableEnvironment(env)
    +
    +val stream: DataStream[(Long, String)] = ...
    +
    +// register the DataStream as Table "myTable" with fields "f0", "f1"
    +tableEnv.registerDataStream("myTable", stream)
    +
    +// register the DataStream as table "myTable2" with fields "myLong", 
"myString"
    +tableEnv.registerDataStream("myTable2", stream, 'myLong, 'myString)
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +**Note:** The name of a `DataStream` `Table` must not match the 
`^_DataStreamTable_[0-9]+` pattern and the name of a `DataSet` `Table` must not 
match the `^_DataSetTable_[0-9]+` pattern. These patterns are reserved for 
internal use only.
     
     {% top %}
     
    -### Interoperability
    +### Convert a DataStream or DataSet into a Table
    +
    +Instead of registering a `DataStream` or `DataSet` in a 
`TableEnvironment`, it can also be directly converted into a `Table`. This is 
convenient if you want to use the Table in a Table API query. 
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +// get StreamTableEnvironment. 
    +// registration of a DataSet in a BatchTableEnvironment is equivalent
    +StreamTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);
    +
    +DataStream<Tuple2<Long, String>> stream = ...
     
    -**TODO**
    +// Convert the DataStream into a Table with default fields "f0", "f1"
    +Table table1 = tableEnv.fromDataStream(stream);
     
    -* Mix SQL and Table as you like
    -* Table API to SQL requires registered tables, register Table
    -* SQL to Table API just use resulting table
    +// Convert the DataStream into a Table with fields "myLong", "myString"
    +Table table2 = tableEnv.fromDataStream(stream, "myLong, myString");
    +{% endhighlight %}
    +</div>
    +
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +// get TableEnvironment. 
    +// registration of a DataSet is equivalent
    +val tableEnv = TableEnvironment.getTableEnvironment(env)
    +
    +val stream: DataStream[(Long, String)] = ...
    +
    +// convert the DataStream into a Table with default fields '_1, '_2
    +val table1: Table = tableEnv.fromDataStream(stream)
    +
    +// convert the DataStream into a Table with fields 'myLong, 'myString
    +val table2: Table = tableEnv.fromDataStream(stream, 'myLong, 'myString)
    +{% endhighlight %}
    +</div>
    +</div>
     
     {% top %}
     
    -Emit a Table 
    -------------
    +### Convert a Table into a DataStream or DataSet
    +
    +A `Table` can be converted into a `DataStream` or `DataSet`. This allows 
to run custom DataStream or DataSet programs on the result of a Table API or 
SQL query.
    +
    +When converting a `Table` into a `DataStream` or `DataSet`, you need to 
specify the data type of the resulting `DataStream` or `DataSet`, i.e., the 
data type into which the rows of the `Table` are to be converted. Often the 
most convenient conversion type is `Row`. The following list gives an overview 
of the features of the different options:
    +
    +- **Row**: fields are mapped by position, arbitrary number of fields, 
support for `null` values, no type-safe access.
    +- **POJO**: fields are mapped by name (POJO fields must be named as 
`Table` fields), arbitrary number of fields, support for `null` values, 
type-safe access.
    +- **Case Class**: fields are mapped by position, no support for `null` 
values, type-safe access.
    +- **Tuple**: fields are mapped by position, limitation to 22 (Scala) or 25 
(Java) fields, no support for `null` values, type-safe access.
    +- **Atomic Type**: `Table` must have a single field, no support for `null` 
values, type-safe access.
    +
    +#### Convert a Table into a DataStream
    +
    +A `Table` that is the result of a streaming query is dynamically changing, 
i.e., it is updated depending on the input streams of the query. Hence, the 
`DataStream` into which such a dynamic query is converted needs to encode the 
updates of the table. 
    +
    +There are two modes to convert a `Table` into a `DataStream`:
    +
    +1. **Append Mode**: This mode can only be used in the dynamic `Table` is 
only modified by `INSERT` changes, i.e, it is append-only and previously 
emitted results are never updated.
    +2. **Retract Mode**: This mode can always be used. It encodes `INSERT` and 
`DELETE` changes with a `boolean` flag.
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +// get StreamTableEnvironment. 
    +StreamTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);
    --- End diff --
    
    No, conversion of Table into DataSet is discussed below.


> Table API / SQL Docs: Common Page
> ---------------------------------
>
>                 Key: FLINK-6746
>                 URL: https://issues.apache.org/jira/browse/FLINK-6746
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Documentation, Table API & SQL
>    Affects Versions: 1.3.0
>            Reporter: Fabian Hueske
>            Assignee: Fabian Hueske
>
> Update and refine ./docs/dev/table/common.md in feature branch 
> https://github.com/apache/flink/tree/tableDocs



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to