Github user sunjincheng121 commented on a diff in the pull request:
https://github.com/apache/flink/pull/4012#discussion_r119077032
--- Diff: docs/dev/table/common.md ---
@@ -98,374 +89,767 @@ env.execute("Your Query")
</div>
</div>
+**Note:** Table API and SQL queries can be easily integrated with and
embedded into DataStream or DataSet programs. Have a look a the [Integration
with DataStream and DataSet API](#integration-with-datastream-and-dataset-api)
section to learn how DataStreams and DataSets can be converted into Tables and
vice versa.
+
{% top %}
Create a TableEnvironment
-------------------------
-A `Table` is always bound to a specific `TableEnvironment`. It is not
possible to combine Tables of different TableEnvironments.
+The `TableEnvironment` is a central concept of the Table API and SQL
integration. It is responsible for:
+* Registering a `Table` in the internal catalog
+* Registering an external catalog
+* Executing SQL queries
+* Registering a user-defined (scalar, table, or aggregation) function
+* Converting a `DataStream` or `DataSet` into a `Table`
+* Holding a reference to an `ExecutionEnvironment` or
`StreamExecutionEnvironment`
+
+A `Table` is always bound to a specific `TableEnvironment`. It is not
process tables of different TableEnvironments in the same query, e.g., to join
or union them.
+
+A `TableEnvironment` is created by calling the static
`TableEnvironment.getTableEnvironment()` method with a
`StreamExecutionEnvironment` or an `ExecutionEnvironment` and an optional
`TableConfig`. The `TableConfig` can be used to configure the
`TableEnvironment` or to customize the query optimization and translation
process (see [Query Optimization](#query-optimization)).
-**TODO: Extend**
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+// ***************
+// STREAMING QUERY
+// ***************
+StreamExecutionEnvironment sEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
+// Create a TableEnvironment for streaming queries
+StreamTableEnvironment sTableEnv =
TableEnvironment.getTableEnvironment(sEnv);
+
+// ***********
+// BATCH QUERY
+// ***********
+ExecutionEnvironment bEnv = ExecutionEnvironment.getExecutionEnvironment();
+// Create a TableEnvironment for batch queries
+BatchTableEnvironment bTableEnv =
TableEnvironment.getTableEnvironment(bEnv);
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+// ***************
+// STREAMING QUERY
+// ***************
+val sEnv = StreamExecutionEnvironment.getExecutionEnvironment
+// Create a TableEnvironment for streaming queries
+val sTableEnv = TableEnvironment.getTableEnvironment(sEnv)
+
+// ***********
+// BATCH QUERY
+// ***********
+val bEnv = ExecutionEnvironment.getExecutionEnvironment
+// Create a TableEnvironment for batch queries
+val bTableEnv = TableEnvironment.getTableEnvironment(bEnv)
+{% endhighlight %}
+</div>
+</div>
{% top %}
Register a Table in the Catalog
-------------------------------
-`TableEnvironment`s have an internal table catalog to which tables can be
registered with a unique name. After registration, a table can be accessed from
the `TableEnvironment` by its name.
+A `TableEnvironment` has an internal catalog to register tables by name.
Table API or SQL queries can access tables, which are registered in the catalog
by referencing them with their name.
-*Note: `DataSet`s or `DataStream`s can be directly converted into `Table`s
without registering them in the `TableEnvironment`. See [Create a Table from a
DataStream or DataSet](#tbd) for details.
+A `TableEnvironment` allows to register a table from various sources:
+* an existing `Table` object, usually the result of a Table API or SQL
query.
+* a `TableSource`, which accesses external data, such as a file, database,
or messaging system.
+* a `DataStream` or `DataSet` from a DataStream or DataSet program.
+
+Registering a `DataStream` or `DataSet` as a table is discussed in the
[Integration with DataStream and DataSet
API](#integration-with-datastream-and-dataset-api) section.
### Register a Table
-A `Table` that originates from a Table API operation or a SQL query is
registered in a `TableEnvironment` as follows:
+A `Table` is registered in a `TableEnvironment` as follows:
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
-// works for StreamExecutionEnvironment identically
-ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+// get a StreamTableEnvironment, works for BatchTableEnvironment
equivalently
+StreamTableEnvironment tableEnv =
TableEnvironment.getTableEnvironment(env);
-// convert a DataSet into a Table
-Table custT = tableEnv
- .toTable(custDs, "name, zipcode")
- .where("zipcode = '12345'")
- .select("name");
+// Table is the result of a simple projection query
+Table projX = tableEnv.scan("X").project(...);
-// register the Table custT as table "custNames"
-tableEnv.registerTable("custNames", custT);
+// register the Table projX as table "projectedX"
+tableEnv.registerTable("projectedX", projX);
{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
{% highlight scala %}
-// works for StreamExecutionEnvironment identically
-val env = ExecutionEnvironment.getExecutionEnvironment
+// get a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
-// convert a DataSet into a Table
-val custT = custDs
- .toTable(tableEnv, 'name, 'zipcode)
- .where('zipcode === "12345")
- .select('name)
+// Table is the result of a simple projection query
+val projX: Table = tableEnv.scan("X").project(...)
-// register the Table custT as table "custNames"
-tableEnv.registerTable("custNames", custT)
+// register the Table projX as table "projectedX"
+tableEnv.registerTable("projectedX", projX)
{% endhighlight %}
</div>
</div>
-A registered `Table` that originates from a Table API operation or SQL
query is treated similarly as a view as known from relational DBMS, i.e., it
can be inlined when optimizing the query.
+**Note:** A registered `Table` is treated similarly to a `VIEW` as known
from relational database systems, i.e., the query that defines the `Table` is
not optimized but will be inlined when another query references the registered
`Table`. If multiple queries reference the same registered `Table`, it will be
inlined for each referencing query and executed multiple times, i.e., the
result of the registered `Table` will *not* be shared.
{% top %}
-### Register a DataSet
+### Register a TableSource
+
+A `TableSource` provides access to external data which is stored in a
storage systems such as a database (MySQL, HBase, ...), a file with specific
encoding (CSV, Apache \[Parquet, Avro, ORC\], ...), or a messaging system
(Apache Kafka, RabbitMQ, ...).
+
+Flink aims to provide TableSources for common data formats and storage
systems. Please have a look at the [Table Sources and Sinks]({{ site.baseurl
}}/dev/table/sourceSinks.html) page for a list of supported TableSources and
instructions for how to build a custom `TableSource`.
-A `DataSet` is registered as a `Table` in a `BatchTableEnvironment` as
follows:
+A `TableSource` is registered in a `TableEnvironment` as follows:
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
-ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+// get a StreamTableEnvironment, works for BatchTableEnvironment
equivalently
+StreamTableEnvironment tableEnv =
TableEnvironment.getTableEnvironment(env);
-// register the DataSet cust as table "Customers" with fields derived from
the dataset
-tableEnv.registerDataSet("Customers", cust);
+// create a TableSource
+TableSource csvSource = new CsvTableSource("/path/to/file", ...);
-// register the DataSet ord as table "Orders" with fields user, product,
and amount
-tableEnv.registerDataSet("Orders", ord, "user, product, amount");
+// register the TableSource as table "CsvTable"
+tableEnv.registerTableSource("CsvTable", csvSource);
{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
{% highlight scala %}
-val env = ExecutionEnvironment.getExecutionEnvironment
+// get a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
-// register the DataSet cust as table "Customers" with fields derived from
the dataset
-tableEnv.registerDataSet("Customers", cust)
+// create a TableSource
+val csvSource: TableSource = new CsvTableSource("/path/to/file", ...)
-// register the DataSet ord as table "Orders" with fields user, product,
and amount
-tableEnv.registerDataSet("Orders", ord, 'user, 'product, 'amount)
+// register the TableSource as table "CsvTable"
+tableEnv.registerTableSource("CsvTable", csvSource)
{% endhighlight %}
</div>
</div>
-*Note: The name of a `DataSet` `Table` must not match the
`^_DataSetTable_[0-9]+` pattern which is reserved for internal use only.*
-
{% top %}
-### Register a DataStream
+Register an External Catalog
+----------------------------
+
+An external catalog can provide information about external databases and
tables such as their name, schema, statistics, and information for how to
access data stored in an external database, table, or file.
-A `DataStream` is registered as a `Table` in a `StreamTableEnvironment` as
follows:
+An external catalog can be created by implementing the `ExternalCatalog`
interface and is registered in a `TableEnvironment` as follows:
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
-StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+// get a StreamTableEnvironment, works for BatchTableEnvironment
equivalently
StreamTableEnvironment tableEnv =
TableEnvironment.getTableEnvironment(env);
-// register the DataStream cust as table "Customers" with fields derived
from the datastream
-tableEnv.registerDataStream("Customers", cust);
+// create an external catalog
+ExternalCatalog catalog = new InMemoryExternalCatalog();
-// register the DataStream ord as table "Orders" with fields user,
product, and amount
-tableEnv.registerDataStream("Orders", ord, "user, product, amount");
+// register the ExternalCatalog catalog
+tableEnv.registerExternalCatalog("InMemCatalog", catalog);
{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
{% highlight scala %}
-val env = StreamExecutionEnvironment.getExecutionEnvironment
+// get a TableEnvironment
--- End diff --
Add `// get a StreamTableEnvironment, works for BatchTableEnvironment
equivalently`?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---