Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/1955#discussion_r61755531
--- Diff: docs/apis/table.md ---
@@ -57,6 +52,170 @@ The following dependency must be added to your project
in order to use the Table
Note that the Table API is currently not part of the binary distribution.
See linking with it for cluster execution [here]({{ site.baseurl
}}/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution).
+
+Registering and Accessing Tables
+--------------------------------
+
+`TableEnvironment`s have an internal table catalog to which tables can be
registered with a unique name. After registration, a table can be accessed from
the `TableEnvironment` by its name. Tables can be registered in different ways.
+
+*Note that it is not required to register a `DataSet` or `DataStream` as a
table in a `TableEnvironment` in order to process it with the Table API.*
+
+### Register a DataSet
+
+A `DataSet` is registered as a `Table` in a `BatchTableEnvironment` as
follows:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+// register the DataSet cust as table "Customers" with fields derived from
the dataset
+tableEnv.registerDataSet("Customers", cust)
+
+// register the DataSet ord as table "Orders" with fields user, product,
and amount
+tableEnv.registerDataSet("Orders", ord, "user, product, amount");
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tableEnv = TableEnvironment.getTableEnvironment(env)
+
+// register the DataSet cust as table "Customers" with fields derived from
the dataset
+tableEnv.registerDataSet("Customers", cust)
+
+// register the DataSet ord as table "Orders" with fields user, product,
and amount
+tableEnv.registerDataSet("Orders", ord, 'user, 'product, 'amount)
+{% endhighlight %}
+</div>
+</div>
+
+*Note: DataSet table names are not allowed to follow the
`^_DataSetTable_[0-9]+` pattern, as these are reserved for internal use only.*
+
+### Register a DataStream
+
+A `DataStream` is registered as a `Table` in a `StreamTableEnvironment` as
follows:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+StreamTableEnvironment tableEnv =
TableEnvironment.getTableEnvironment(env);
+
+// register the DataStream cust as table "Customers" with fields derived
from the datastream
+tableEnv.registerDataStream("Customers", cust)
+
+// register the DataStream ord as table "Orders" with fields user,
product, and amount
+tableEnv.registerDataStream("Orders", ord, "user, product, amount");
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tableEnv = TableEnvironment.getTableEnvironment(env)
+
+// register the DataStream cust as table "Customers" with fields derived
from the datastream
+tableEnv.registerDataStream("Customers", cust)
+
+// register the DataStream ord as table "Orders" with fields user,
product, and amount
+tableEnv.registerDataStream("Orders", ord, 'user, 'product, 'amount)
+{% endhighlight %}
+</div>
+</div>
+
+*Note: DataStream table names are not allowed to follow the
`^_DataStreamTable_[0-9]+` pattern, as these are reserved for internal use
only.*
+
+### Register a Table
+
+A `Table` that originates from a Table API operation or a SQL query is
registered in a `TableEnvironemnt` as follows:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+// works for StreamExecutionEnvironment identically
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+// convert a DataSet into a Table
+Table custT = tableEnv
+ .toTable(custDs, "name, zipcode")
+ .where("zipcode = '12345'")
+ .select("name")
+
+// register the Table custT as table "custNames"
+tableEnv.registerTable("custNames", custT)
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+// works for StreamExecutionEnvironment identically
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tableEnv = TableEnvironment.getTableEnvironment(env)
+
+// convert a DataSet into a Table
+val custT = custDs
+ .toTable(tableEnv, 'name, 'zipcode)
+ .where('zipcode === "12345")
+ .select('name)
+
+// register the Table custT as table "custNames"
+tableEnv.registerTable("custNames", custT)
+{% endhighlight %}
+</div>
+</div>
+
+A registered `Table` that originates from a Table API operation or SQL
query is treated similarly as a view as known from relational DBMS, i.e., it
can be inlined when optimizing the query.
+
+### Register an external Table using a TableSource
+
+An external table is registered in a `TableEnvironment` using a
`TableSource` as follows:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+// works for StreamExecutionEnvironment identically
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+TableSource custTS = new CsvTableSource("/path/to/file", ...)
+
+// register a `TableSource` as external table "Customers"
+tableEnv.registerTableSource("Customers", custTS)
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+// works for StreamExecutionEnvironment identically
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tableEnv = TableEnvironment.getTableEnvironment(env)
+
+val custTS: TableSource = new CsvTableSource("/path/to/file", ...)
+
+// register a `TableSource` as external table "Customers"
+tableEnv.registerTableSource("Customers", custTS)
+
+{% endhighlight %}
+</div>
+</div>
+
+A `TableSource` can provide access to data stored in various storage
systems such as databases (MySQL, HBase, ...), file formats (CSV, Apache
Parquet, Avro, ORC, ...), or messaging systems (Apache Kafka, RabbitMQ, ...).
+
+Currently, Flink only provides a `CsvTableSource` to read CSV files. A
custom `TableSource` can be defined by implementing the `BatchTableSource` or
`StreamTableSource` interface.
+
+### Access a registered Table
--- End diff --
This section can be removed if we move the accessing to the respective
sections (Table API / SQL).
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---