lirui-apache commented on a change in pull request #14145:
URL: https://github.com/apache/flink/pull/14145#discussion_r527368062
##########
File path: docs/dev/table/hive/hive_read_write.md
##########
@@ -22,119 +22,199 @@ specific language governing permissions and limitations
under the License.
-->
-Using the `HiveCatalog` and Flink's connector to Hive, Flink can read and
write from Hive data as an alternative to Hive's batch engine.
-Be sure to follow the instructions to include the correct [dependencies]({{
site.baseurl }}/dev/table/hive/#depedencies) in your application.
-And please also note that Hive connector only works with blink planner.
+Using the HiveCatalog, Apache Flink can be used for unified `BATCH` and STREAM
processing of Apache
+Hive Tables. This means Flink can be used as a more performant alternative to
Hive’s batch engine,
+or to continuously read and write data into and out of Hive tables to power
real-time data
+warehousing applications.
+
+<div class="alert alert-info">
+ <b>IMPORTANT:</b> Reading and writing to and from Apache Hive is only
supported by the Blink table planner.
+</div>
* This will be replaced by the TOC
{:toc}
-## Reading From Hive
+## Reading
-Assume Hive contains a single table in its `default` database, named people
that contains several rows.
+Flink supports reading data from Hive in both `BATCH` and `STREAMING` modes.
When run as a `BATCH`
+application, Flink will execute its query over the state of the table at the
point in time when the
+query is executed. `STREAMING` reads will continuously monitor the table and
incrementally fetch
+new data as it is made available. Flink will read tables as bounded by default.
+
+`STREAMING` reads support consuming both partitioned and non-partitioned
tables.
+For partitioned tables, Flink will monitor the generation of new partitions,
and read
+them incrementally when available. For non-partitioned tables, Flink will
monitor the generation
+of new files in the folder and read new files incrementally.
+
+<table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 20%">Key</th>
+ <th class="text-left" style="width: 15%">Default</th>
+ <th class="text-left" style="width: 10%">Type</th>
+ <th class="text-left" style="width: 55%">Description</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><h5>streaming-source.enable</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Enable streaming source or not. NOTES: Please make sure that each
partition/file should be written atomically, otherwise the reader may get
incomplete data.</td>
+ </tr>
+ <tr>
+ <td><h5>streaming-source.monitor-interval</h5></td>
+ <td style="word-wrap: break-word;">1 m</td>
+ <td>Duration</td>
+ <td>Time interval for consecutively monitoring partition/file.</td>
+ </tr>
+ <tr>
+ <td><h5>streaming-source.consume-order</h5></td>
+ <td style="word-wrap: break-word;">create-time</td>
+ <td>String</td>
+ <td>The consume order of streaming source, support create-time and
partition-time. create-time compare partition/file creation time, this is not
the partition create time in Hive metaStore, but the folder/file modification
time in filesystem; partition-time compare time represented by partition name,
if the partition folder somehow gets updated, e.g. add new file into folder, it
can affect how the data is consumed. For non-partition table, this value should
always be 'create-time'.</td>
+ </tr>
+ <tr>
+ <td><h5>streaming-source.consume-start-offset</h5></td>
+ <td style="word-wrap: break-word;">1970-00-00</td>
+ <td>String</td>
+ <td>Start offset for streaming consuming. How to parse and compare
offsets depends on your order. For create-time and partition-time, should be a
timestamp string (yyyy-[m]m-[d]d [hh:mm:ss]). For partition-time, will use
partition time extractor to extract time from partition.</td>
+ </tr>
+ </tbody>
+</table>
+
+[SQL Hints]({% link dev/table/sql/hints.md %}) can be used to apply
configurations to a Hive table
+without changing its definition in the Hive metastore.
+
+{% highlight sql %}
+
+SELECT *
+FROM hive_table
+/*+ OPTIONS('streaming-source.enable'='true',
'streaming-source.consume-start-offset'='2020-05-20') */;
-{% highlight bash %}
-hive> show databases;
-OK
-default
-Time taken: 0.841 seconds, Fetched: 1 row(s)
-
-hive> show tables;
-OK
-Time taken: 0.087 seconds
-
-hive> CREATE TABLE mytable(name string, value double);
-OK
-Time taken: 0.127 seconds
-
-hive> SELECT * FROM mytable;
-OK
-Tom 4.72
-John 8.0
-Tom 24.2
-Bob 3.14
-Bob 4.72
-Tom 34.9
-Mary 4.79
-Tiff 2.72
-Bill 4.33
-Mary 77.7
-Time taken: 0.097 seconds, Fetched: 10 row(s)
{% endhighlight %}
-With the data ready your can connect to Hive [connect to an existing Hive
installation]({{ site.baseurl }}/dev/table/hive/#connecting-to-hive) and begin
querying.
+**Notes**
-{% highlight bash %}
+- Monitor strategy is to scan all directories/files currently in the location
path. Many partitions may cause performance degradation.
+- Streaming reads for non-partitioned tables requires that each file be
written atomically into the target directory.
+- Streaming reading for partitioned tables requires that each partition should
be added atomically in the view of hive metastore. If not, new data added to an
existing partition will be consumed.
+- Streaming reads do not support watermark grammar in Flink DDL. These tables
cannot be used for window operators.
-Flink SQL> show catalogs;
-myhive
-default_catalog
+## Reading Hive Views
-# ------ Set the current catalog to be 'myhive' catalog if you haven't set it
in the yaml file ------
+Flink is able to read from Hive defined views, but some limitations apply:
-Flink SQL> use catalog myhive;
+1) The Hive catalog must be set as the current catalog before you can query
the view.
+This can be done by either `tableEnv.useCatalog(...)` in Table API or `USE
CATALOG ...` in SQL Client.
-# ------ See all registered database in catalog 'mytable' ------
+2) Hive and Flink SQL have different syntax, e.g. different reserved keywords
and literals.
+Make sure the view’s query is compatible with Flink grammar.
-Flink SQL> show databases;
-default
+### Temporal Table Join
Review comment:
I wonder whether we should list all the source tables that can be used
as temporal tables (maybe in [this
page](https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/temporal_tables.html#temporal-table)).
And then we can put a cross link here. It's annoying for users to have to
check each connector doc to decide whether a table can be used in temporal
join. And maybe users don't even expect to find such information in the
"read/write" page.
##########
File path: docs/dev/table/hive/hive_read_write.md
##########
@@ -22,119 +22,199 @@ specific language governing permissions and limitations
under the License.
-->
-Using the `HiveCatalog` and Flink's connector to Hive, Flink can read and
write from Hive data as an alternative to Hive's batch engine.
-Be sure to follow the instructions to include the correct [dependencies]({{
site.baseurl }}/dev/table/hive/#depedencies) in your application.
-And please also note that Hive connector only works with blink planner.
+Using the HiveCatalog, Apache Flink can be used for unified `BATCH` and STREAM
processing of Apache
+Hive Tables. This means Flink can be used as a more performant alternative to
Hive’s batch engine,
+or to continuously read and write data into and out of Hive tables to power
real-time data
+warehousing applications.
+
+<div class="alert alert-info">
+ <b>IMPORTANT:</b> Reading and writing to and from Apache Hive is only
supported by the Blink table planner.
+</div>
* This will be replaced by the TOC
{:toc}
-## Reading From Hive
+## Reading
-Assume Hive contains a single table in its `default` database, named people
that contains several rows.
+Flink supports reading data from Hive in both `BATCH` and `STREAMING` modes.
When run as a `BATCH`
+application, Flink will execute its query over the state of the table at the
point in time when the
+query is executed. `STREAMING` reads will continuously monitor the table and
incrementally fetch
+new data as it is made available. Flink will read tables as bounded by default.
+
+`STREAMING` reads support consuming both partitioned and non-partitioned
tables.
+For partitioned tables, Flink will monitor the generation of new partitions,
and read
+them incrementally when available. For non-partitioned tables, Flink will
monitor the generation
+of new files in the folder and read new files incrementally.
+
+<table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 20%">Key</th>
+ <th class="text-left" style="width: 15%">Default</th>
+ <th class="text-left" style="width: 10%">Type</th>
+ <th class="text-left" style="width: 55%">Description</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><h5>streaming-source.enable</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Enable streaming source or not. NOTES: Please make sure that each
partition/file should be written atomically, otherwise the reader may get
incomplete data.</td>
+ </tr>
+ <tr>
+ <td><h5>streaming-source.monitor-interval</h5></td>
+ <td style="word-wrap: break-word;">1 m</td>
+ <td>Duration</td>
+ <td>Time interval for consecutively monitoring partition/file.</td>
+ </tr>
+ <tr>
+ <td><h5>streaming-source.consume-order</h5></td>
+ <td style="word-wrap: break-word;">create-time</td>
+ <td>String</td>
+ <td>The consume order of streaming source, support create-time and
partition-time. create-time compare partition/file creation time, this is not
the partition create time in Hive metaStore, but the folder/file modification
time in filesystem; partition-time compare time represented by partition name,
if the partition folder somehow gets updated, e.g. add new file into folder, it
can affect how the data is consumed. For non-partition table, this value should
always be 'create-time'.</td>
+ </tr>
+ <tr>
+ <td><h5>streaming-source.consume-start-offset</h5></td>
+ <td style="word-wrap: break-word;">1970-00-00</td>
+ <td>String</td>
+ <td>Start offset for streaming consuming. How to parse and compare
offsets depends on your order. For create-time and partition-time, should be a
timestamp string (yyyy-[m]m-[d]d [hh:mm:ss]). For partition-time, will use
partition time extractor to extract time from partition.</td>
+ </tr>
+ </tbody>
+</table>
+
+[SQL Hints]({% link dev/table/sql/hints.md %}) can be used to apply
configurations to a Hive table
+without changing its definition in the Hive metastore.
+
+{% highlight sql %}
+
+SELECT *
+FROM hive_table
+/*+ OPTIONS('streaming-source.enable'='true',
'streaming-source.consume-start-offset'='2020-05-20') */;
-{% highlight bash %}
-hive> show databases;
-OK
-default
-Time taken: 0.841 seconds, Fetched: 1 row(s)
-
-hive> show tables;
-OK
-Time taken: 0.087 seconds
-
-hive> CREATE TABLE mytable(name string, value double);
-OK
-Time taken: 0.127 seconds
-
-hive> SELECT * FROM mytable;
-OK
-Tom 4.72
-John 8.0
-Tom 24.2
-Bob 3.14
-Bob 4.72
-Tom 34.9
-Mary 4.79
-Tiff 2.72
-Bill 4.33
-Mary 77.7
-Time taken: 0.097 seconds, Fetched: 10 row(s)
{% endhighlight %}
-With the data ready your can connect to Hive [connect to an existing Hive
installation]({{ site.baseurl }}/dev/table/hive/#connecting-to-hive) and begin
querying.
+**Notes**
-{% highlight bash %}
+- Monitor strategy is to scan all directories/files currently in the location
path. Many partitions may cause performance degradation.
+- Streaming reads for non-partitioned tables requires that each file be
written atomically into the target directory.
+- Streaming reading for partitioned tables requires that each partition should
be added atomically in the view of hive metastore. If not, new data added to an
existing partition will be consumed.
+- Streaming reads do not support watermark grammar in Flink DDL. These tables
cannot be used for window operators.
-Flink SQL> show catalogs;
-myhive
-default_catalog
+## Reading Hive Views
-# ------ Set the current catalog to be 'myhive' catalog if you haven't set it
in the yaml file ------
+Flink is able to read from Hive defined views, but some limitations apply:
-Flink SQL> use catalog myhive;
+1) The Hive catalog must be set as the current catalog before you can query
the view.
+This can be done by either `tableEnv.useCatalog(...)` in Table API or `USE
CATALOG ...` in SQL Client.
-# ------ See all registered database in catalog 'mytable' ------
+2) Hive and Flink SQL have different syntax, e.g. different reserved keywords
and literals.
+Make sure the view’s query is compatible with Flink grammar.
-Flink SQL> show databases;
-default
+### Temporal Table Join
-# ------ See the previously registered table 'mytable' ------
+You can use a Hive table as a temporal table and join streaming data with it.
Please follow
+the [example]({% link dev/table/streaming/temporal_tables.md
%}#temporal-table) to find
+out how to join a temporal table.
-Flink SQL> show tables;
-mytable
+When performing the join, the Hive table will be cached in Slot memory and
each record from
+the stream is joined against the table by key to decide whether a match is
found. Using a Hive
+table as a temporal table does not require any additional configuration.
Optionally, you can
+configure the TTL of the Hive table cache with the following property. After
the cache expires,
+the Hive table will be scanned again to load the latest data.
-# ------ The table schema that Flink sees is the same that we created in Hive,
two columns - name as string and value as double ------
-Flink SQL> describe mytable;
-root
- |-- name: name
- |-- type: STRING
- |-- name: value
- |-- type: DOUBLE
+<table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 20%">Key</th>
+ <th class="text-left" style="width: 15%">Default</th>
+ <th class="text-left" style="width: 10%">Type</th>
+ <th class="text-left" style="width: 55%">Description</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><h5>lookup.join.cache.ttl</h5></td>
+ <td style="word-wrap: break-word;">60 min</td>
+ <td>Duration</td>
+ <td>The cache TTL (e.g. 10min) for the build table in lookup join. By
default the TTL is 60 minutes.</td>
+ </tr>
+ </tbody>
+</table>
-# ------ Select from hive table or hive view ------
-Flink SQL> SELECT * FROM mytable;
+**Notes**:
- name value
-__________ __________
+- Each joining subtask needs to keep its own cache of the Hive table. Please
ensure the Hive table can fit into
+the memory of a TM task slot.
+- It is encouraged to set a relatively large value for
`lookup.join.cache.ttl`. Otherwise, Jobs
+are prone to performance issues as the table needs to be updated and reloaded
too frequently.
+- Currently, Flink simply loads the whole Hive table whenever the cache needs
to be refreshed.
+There is no way to differentiate new data from old.
- Tom 4.72
- John 8.0
- Tom 24.2
- Bob 3.14
- Bob 4.72
- Tom 34.9
- Mary 4.79
- Tiff 2.72
- Bill 4.33
- Mary 77.7
+### Vectorized Optimization upon Read
+Flink will automatically used vectorized reads of Hive tables when the
following conditions are met:
+
+- Format: ORC or Parquet.
+- Columns without complex data type, like hive types: List, Map, Struct, Union.
+
+This feature is enabled by default.
+It may be disabled with the following configuration.
+
+{% highlight bash %}
+table.exec.hive.fallback-mapred-reader=true
{% endhighlight %}
-### Querying Hive views
+### Source Parallelism Inference
-If you need to query Hive views, please note:
+By default, Flink will infer the optimal parallelism for its Hive readers
+based on the number of files, and number of blocks in each file.
-1. You have to use the Hive catalog as your current catalog before you can
query views in that catalog. It can be done by either
`tableEnv.useCatalog(...)` in Table API or `USE CATALOG ...` in SQL Client.
-2. Hive and Flink SQL have different syntax, e.g. different reserved keywords
and literals. Make sure the view's query is compatible with Flink grammar.
+Flink allows you to flexibly configure the policy of parallelism inference.
You can configure the
+following parameters in `TableConfig` (note that these parameters affect all
sources of the job):
-## Writing To Hive
+<table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 20%">Key</th>
+ <th class="text-left" style="width: 15%">Default</th>
+ <th class="text-left" style="width: 10%">Type</th>
+ <th class="text-left" style="width: 55%">Description</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><h5>table.exec.hive.infer-source-parallelism</h5></td>
+ <td style="word-wrap: break-word;">true</td>
+ <td>Boolean</td>
+ <td>If is true, source parallelism is inferred according to splits
number. If is false, parallelism of source are set by config.</td>
+ </tr>
+ <tr>
+ <td><h5>table.exec.hive.infer-source-parallelism.max</h5></td>
+ <td style="word-wrap: break-word;">1000</td>
+ <td>Integer</td>
+ <td>Sets max infer parallelism for source operator.</td>
+ </tr>
+ </tbody>
+</table>
-Similarly, data can be written into hive using an `INSERT` clause.
+## Writing
-Consider there is an example table named "mytable" with two columns: name and
age, in string and int type.
+Flink supports writing data from Hive in both `BATCH` and `STREAMING` modes.
When run as a `BATCH`
+application, Flink will write to a Hive table only making those records
visible when the Job finishes.
+`BATCH` writes support both appending to and overwriting existing tables.
-{% highlight bash %}
+{% highlight sql %}
# ------ INSERT INTO will append to the table or partition, keeping the
existing data intact ------
Flink SQL> INSERT INTO mytable SELECT 'Tom', 25;
# ------ INSERT OVERWRITE will overwrite any existing data in the table or
partition ------
Flink SQL> INSERT OVERWRITE mytable SELECT 'Tom', 25;
{% endhighlight %}
-We support partitioned table too, Consider there is a partitioned table named
myparttable with four columns: name, age, my_type and my_date, in types ......
my_type and my_date are the partition keys.
+Data can also be inserted into a particular partitions.
Review comment:
```suggestion
Data can also be inserted into particular partitions.
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]