sjwiesman commented on a change in pull request #14156:
URL: https://github.com/apache/flink/pull/14156#discussion_r527895688
##########
File path: docs/dev/table/connectors/index.md
##########
@@ -95,20 +95,23 @@ Flink natively support various connectors. The following
tables list all availab
How to use connectors
--------
-Flink supports to use SQL CREATE TABLE statement to register a table. One can
define the table name, the table schema, and the table options for connecting
to an external system.
+Flink supports to use SQL `CREATE TABLE` statement to register a table. One
can define the table name,
+the table schema, and the table options for connecting to an external system.
Review comment:
```suggestion
Flink supports using SQL `CREATE TABLE` statements to register tables. One
can define the table name,
the table schema, and the table options for connecting to an external system.
```
##########
File path: docs/dev/table/sql/create.md
##########
@@ -182,24 +187,165 @@ CREATE TABLE [catalog_name.][db_name.]table_name
{% endhighlight %}
-Creates a table with the given name. If a table with the same name already
exists in the catalog, an exception is thrown.
+The statement above creates a table with the given name. If a table with the
same name already exists
+in the catalog, an exception is thrown.
+
+### Columns
-**COMPUTED COLUMN**
+**Physical / Regular Columns**
-A computed column is a virtual column that is generated using the syntax
"`column_name AS computed_column_expression`". It is generated from a non-query
expression that uses other columns in the same table and is not physically
stored within the table. For example, a computed column could be defined as
`cost AS price * quantity`. The expression may contain any combination of
physical column, constant, function, or variable. The expression cannot contain
a subquery.
+Physical columns are regular columns known from databases. They define the
names, the types, and the
+order of fields in the physical data. Thus, physical columns represent the
payload that is read from
+and written to an external system. Connectors and formats are using these
columns (in the defined order)
+to configure themselves. Other kinds of columns can be declared between
physical columns but will not
+influence the final physical schema.
-Computed columns are commonly used in Flink for defining [time attributes]({{
site.baseurl}}/dev/table/streaming/time_attributes.html) in CREATE TABLE
statements.
-A [processing time attribute]({{
site.baseurl}}/dev/table/streaming/time_attributes.html#processing-time) can be
defined easily via `proc AS PROCTIME()` using the system `PROCTIME()` function.
-On the other hand, computed column can be used to derive event time column
because an event time column may need to be derived from existing fields, e.g.
the original field is not `TIMESTAMP(3)` type or is nested in a JSON string.
+The following statement creates a table with only regular columns:
-Notes:
+{% highlight sql %}
+CREATE TABLE MyTable (
+ `user_id` BIGINT,
+ `name` STRING
+) WITH (
+ ...
+);
+{% endhighlight %}
-- A computed column defined on a source table is computed after reading from
the source, it can be used in the following SELECT query statements.
-- A computed column cannot be the target of an INSERT statement. In INSERT
statements, the schema of SELECT clause should match the schema of the target
table without computed columns.
+**Metadata Columns**
+
+Metadata columns are an extension to the SQL standard and allow to access
connector and/or format specific
+fields for every row of a table. A metadata column is indicated by the
`METADATA` keyword. For example,
+a metadata column can be be used to read and write the timestamp from and to
Kafka records for time-based
+operations. The [connector and format documentation]({% link
dev/table/connectors/index.md %}) lists the
+available metadata fields for every component. However, declaring a metadata
column in a table's schema
+is optional.
+
+The following statement creates a table with an additional metadata column
that references the metadata field `timestamp`:
+
+{% highlight sql %}
+CREATE TABLE MyTable (
+ `user_id` BIGINT,
+ `name` STRING,
+ `record_time` TIMESTAMP(3) WITH LOCAL TIME ZONE METADATA FROM 'timestamp'
-- reads and writes a Kafka record's timestamp
+) WITH (
+ 'connector' = 'kafka'
+ ...
+);
+{% endhighlight %}
-**WATERMARK**
+Every metadata field is identified by a string-based key and has a documented
data type. For example,
+the Kafka connector exposes a metadata field with key `timestamp` and data
type `TIMESTAMP(3) WITH LOCAL TIME ZONE`
+that can be used for both reading and writing records.
-The `WATERMARK` defines the event time attributes of a table and takes the
form `WATERMARK FOR rowtime_column_name AS watermark_strategy_expression`.
+In the example above, the metadata column `record_time` becomes part of the
table's schema and can be
+transformed and stored like a regular column:
+
+{% highlight sql %}
+INSERT INTO MyTable SELECT user_id, name, record_time + INTERVAL '1' SECOND
FROM MyTable;
+{% endhighlight %}
+
+For convenience, the `FROM` clause can be omitted if the column name should be
used as the identifying metadata key:
+
+{% highlight sql %}
+CREATE TABLE MyTable (
+ `user_id` BIGINT,
+ `name` STRING,
+ `timestamp` TIMESTAMP(3) WITH LOCAL TIME ZONE METADATA -- use column name
as metadata key
+) WITH (
+ 'connector' = 'kafka'
+ ...
+);
+{% endhighlight %}
+
+For convenience, the runtime will perform an explicit cast if the data type of
the column differs from
+the data type of the metadata field. Of course this requires that the two data
types are compatible.
Review comment:
```suggestion
the data type of the metadata field. Of course, this requires that the two
data types are compatible.
```
##########
File path: docs/dev/table/connectors/kafka.md
##########
@@ -189,6 +362,130 @@ Connector Options
Features
----------------
+
+### Key and Value Formats
+
+Both the key and value part of a Kafka record can be serialized to and
deserialized from raw bytes using
+one of the given [formats]({% link dev/table/connectors/formats/index.md %}).
+
+**Value Format**
+
+Since a key is optional in Kafka records, the following statement reads and
writes records with a configured
+value format but without a key format. The `'format'` option is a synonym for
`'value.format'`. All format
+options are prefixed with the format identifier.
+
+<div class="codetabs" markdown="1">
+<div data-lang="SQL" markdown="1">
+{% highlight sql %}
+CREATE TABLE KafkaTable (,
+ `ts` TIMESTAMP(3) METADATA FROM 'timestamp',
+ `user_id` BIGINT,
+ `item_id` BIGINT,
+ `behavior` STRING
+) WITH (
+ 'connector' = 'kafka',
+ ...
+
+ 'format' = 'json',
+ 'json.ignore-parse-errors' = 'true'
+)
+{% endhighlight %}
+</div>
+</div>
+
+The value format will be configured with the following data type:
+
+{% highlight text %}
+ROW<`user_id` BIGINT, `item_id` BIGINT, `behavior` STRING>
+{% endhighlight %}
+
+**Key and Value Format**
+
+The following example shows how to specify and configure key and value
formats. The format options are
+prefixed with either the `'key'` or `'value'` plus format identifier.
+
+<div class="codetabs" markdown="1">
+<div data-lang="SQL" markdown="1">
+{% highlight sql %}
+CREATE TABLE KafkaTable (
+ `ts` TIMESTAMP(3) METADATA FROM 'timestamp',
+ `user_id` BIGINT,
+ `item_id` BIGINT,
+ `behavior` STRING
+) WITH (
+ 'connector' = 'kafka',
+ ...
+
+ 'key.format' = 'json',
+ 'key.json.ignore-parse-errors' = 'true',
+ 'key.fields' = 'user_id;item_id',
+
+ 'value.format' = 'json',
+ 'value.json.fail-on-missing-field' = 'false',
+ 'value.fields-include' = 'ALL'
+)
+{% endhighlight %}
+</div>
+</div>
+
+The key format includes the fields listed in `'key.fields'` (in the same
order). Thus, it will be configured with
Review comment:
Can you specify the delimiter? I'm guessing `;` based on the example but
I'm not positive.
##########
File path: docs/dev/table/connectors/kinesis.md
##########
@@ -85,44 +82,46 @@ The following metadata can be exposed as read-only
(`VIRTUAL`) columns in a tabl
</thead>
<tbody>
<tr>
- <td><code><a
href="https://docs.aws.amazon.com/kinesis/latest/APIReference/API_Record.html#Streams-Type-Record-ApproximateArrivalTimestamp">'timestamp'</a></code></td>
- <td><code>TIMESTAMP(3) WITH LOCAL TIMEZONE NOT NULL</code></td>
+ <td><code><a
href="https://docs.aws.amazon.com/kinesis/latest/APIReference/API_Record.html#Streams-Type-Record-ApproximateArrivalTimestamp">timestamp</a></code></td>
+ <td><code>TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL</code></td>
<td>The approximate time when the record was inserted into the
stream.</td>
</tr>
<tr>
- <td><code><a
href="https://docs.aws.amazon.com/kinesis/latest/APIReference/API_Shard.html#Streams-Type-Shard-ShardId">'shard-id'</a></code></td>
+ <td><code><a
href="https://docs.aws.amazon.com/kinesis/latest/APIReference/API_Shard.html#Streams-Type-Shard-ShardId">shard-id</a></code></td>
<td><code>VARCHAR(128) NOT NULL</code></td>
<td>The unique identifier of the shard within the stream from which the
record was read.</td>
</tr>
<tr>
- <td><code><a
href="https://docs.aws.amazon.com/kinesis/latest/APIReference/API_Record.html#Streams-Type-Record-SequenceNumber">'sequence-number'</a></code></td>
+ <td><code><a
href="https://docs.aws.amazon.com/kinesis/latest/APIReference/API_Record.html#Streams-Type-Record-SequenceNumber">sequence-number</a></code></td>
<td><code>VARCHAR(128) NOT NULL</code></td>
<td>The unique identifier of the record within its shard.</td>
</tr>
</tbody>
</table>
-The extended `CREATE TABLE` example demonstrates the syntax for exposing these
metadata columns:
+The extended `CREATE TABLE` example demonstrates the syntax for exposing these
metadata fields:
<div class="codetabs" markdown="1">
<div data-lang="SQL" markdown="1">
{% highlight sql %}
CREATE TABLE KinesisTable (
- user_id BIGINT,
- item_id BIGINT,
- category_id BIGINT,
- behavior STRING,
- ts TIMESTAMP(3),
- arrival_time TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL,
- shard_id VARCHAR(128) NOT NULL METADATA FROM 'shard-id' VIRTUAL,
- sequence_number VARCHAR(128) NOT NULL METADATA FROM 'sequence-number' VIRTUAL
-) PARTITIONED BY (user_id, item_id) WITH (
- 'connector' = 'kinesis',
- 'stream' = 'user_behavior',
- 'aws.region' = 'us-east-2',
- 'scan.stream.initpos' = 'LATEST',
- 'format' = 'csv'
+ `user_id` BIGINT,
+ `item_id` BIGINT,
+ `category_id` BIGINT,
+ `behavior` STRING,
+ `ts` TIMESTAMP(3),
+ `arrival_time` TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL,
Review comment:
On the debezium page you said only Kafka can forward metadata. Is that
not true, or did I misunderstand what you said.
##########
File path:
flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonDecodingFormat.java
##########
@@ -150,13 +150,16 @@ public Object convert(GenericRowData row, int pos) {
INGESTION_TIMESTAMP(
"ingestion-timestamp",
- DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).notNull(),
+ DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).nullable(),
Review comment:
Is this supposed to be part of this PR?
##########
File path: docs/dev/table/sql/create.md
##########
@@ -182,24 +187,165 @@ CREATE TABLE [catalog_name.][db_name.]table_name
{% endhighlight %}
-Creates a table with the given name. If a table with the same name already
exists in the catalog, an exception is thrown.
+The statement above creates a table with the given name. If a table with the
same name already exists
+in the catalog, an exception is thrown.
+
+### Columns
-**COMPUTED COLUMN**
+**Physical / Regular Columns**
-A computed column is a virtual column that is generated using the syntax
"`column_name AS computed_column_expression`". It is generated from a non-query
expression that uses other columns in the same table and is not physically
stored within the table. For example, a computed column could be defined as
`cost AS price * quantity`. The expression may contain any combination of
physical column, constant, function, or variable. The expression cannot contain
a subquery.
+Physical columns are regular columns known from databases. They define the
names, the types, and the
+order of fields in the physical data. Thus, physical columns represent the
payload that is read from
+and written to an external system. Connectors and formats are using these
columns (in the defined order)
+to configure themselves. Other kinds of columns can be declared between
physical columns but will not
+influence the final physical schema.
Review comment:
```suggestion
Physical columns are regular columns known from databases. They define the
names, the types, and the
order of fields in the physical data. Thus, physical columns represent the
payload that is read from
and written to an external system. Connectors and formats use these columns
(in the defined order)
to configure themselves. Other kinds of columns can be declared between
physical columns but will not
influence the final physical schema.
```
##########
File path: docs/dev/table/sql/create.md
##########
@@ -182,24 +187,165 @@ CREATE TABLE [catalog_name.][db_name.]table_name
{% endhighlight %}
-Creates a table with the given name. If a table with the same name already
exists in the catalog, an exception is thrown.
+The statement above creates a table with the given name. If a table with the
same name already exists
+in the catalog, an exception is thrown.
+
+### Columns
-**COMPUTED COLUMN**
+**Physical / Regular Columns**
-A computed column is a virtual column that is generated using the syntax
"`column_name AS computed_column_expression`". It is generated from a non-query
expression that uses other columns in the same table and is not physically
stored within the table. For example, a computed column could be defined as
`cost AS price * quantity`. The expression may contain any combination of
physical column, constant, function, or variable. The expression cannot contain
a subquery.
+Physical columns are regular columns known from databases. They define the
names, the types, and the
+order of fields in the physical data. Thus, physical columns represent the
payload that is read from
+and written to an external system. Connectors and formats are using these
columns (in the defined order)
+to configure themselves. Other kinds of columns can be declared between
physical columns but will not
+influence the final physical schema.
-Computed columns are commonly used in Flink for defining [time attributes]({{
site.baseurl}}/dev/table/streaming/time_attributes.html) in CREATE TABLE
statements.
-A [processing time attribute]({{
site.baseurl}}/dev/table/streaming/time_attributes.html#processing-time) can be
defined easily via `proc AS PROCTIME()` using the system `PROCTIME()` function.
-On the other hand, computed column can be used to derive event time column
because an event time column may need to be derived from existing fields, e.g.
the original field is not `TIMESTAMP(3)` type or is nested in a JSON string.
+The following statement creates a table with only regular columns:
-Notes:
+{% highlight sql %}
+CREATE TABLE MyTable (
+ `user_id` BIGINT,
+ `name` STRING
+) WITH (
+ ...
+);
+{% endhighlight %}
-- A computed column defined on a source table is computed after reading from
the source, it can be used in the following SELECT query statements.
-- A computed column cannot be the target of an INSERT statement. In INSERT
statements, the schema of SELECT clause should match the schema of the target
table without computed columns.
+**Metadata Columns**
+
+Metadata columns are an extension to the SQL standard and allow to access
connector and/or format specific
+fields for every row of a table. A metadata column is indicated by the
`METADATA` keyword. For example,
+a metadata column can be be used to read and write the timestamp from and to
Kafka records for time-based
+operations. The [connector and format documentation]({% link
dev/table/connectors/index.md %}) lists the
+available metadata fields for every component. However, declaring a metadata
column in a table's schema
+is optional.
+
+The following statement creates a table with an additional metadata column
that references the metadata field `timestamp`:
+
+{% highlight sql %}
+CREATE TABLE MyTable (
+ `user_id` BIGINT,
+ `name` STRING,
+ `record_time` TIMESTAMP(3) WITH LOCAL TIME ZONE METADATA FROM 'timestamp'
-- reads and writes a Kafka record's timestamp
+) WITH (
+ 'connector' = 'kafka'
+ ...
+);
+{% endhighlight %}
-**WATERMARK**
+Every metadata field is identified by a string-based key and has a documented
data type. For example,
+the Kafka connector exposes a metadata field with key `timestamp` and data
type `TIMESTAMP(3) WITH LOCAL TIME ZONE`
+that can be used for both reading and writing records.
-The `WATERMARK` defines the event time attributes of a table and takes the
form `WATERMARK FOR rowtime_column_name AS watermark_strategy_expression`.
+In the example above, the metadata column `record_time` becomes part of the
table's schema and can be
+transformed and stored like a regular column:
+
+{% highlight sql %}
+INSERT INTO MyTable SELECT user_id, name, record_time + INTERVAL '1' SECOND
FROM MyTable;
+{% endhighlight %}
+
+For convenience, the `FROM` clause can be omitted if the column name should be
used as the identifying metadata key:
+
+{% highlight sql %}
+CREATE TABLE MyTable (
+ `user_id` BIGINT,
+ `name` STRING,
+ `timestamp` TIMESTAMP(3) WITH LOCAL TIME ZONE METADATA -- use column name
as metadata key
+) WITH (
+ 'connector' = 'kafka'
+ ...
+);
+{% endhighlight %}
+
+For convenience, the runtime will perform an explicit cast if the data type of
the column differs from
+the data type of the metadata field. Of course this requires that the two data
types are compatible.
+
+{% highlight sql %}
+CREATE TABLE MyTable (
+ `user_id` BIGINT,
+ `name` STRING,
+ `timestamp` BIGINT METADATA -- cast the timestamp as BIGINT
+) WITH (
+ 'connector' = 'kafka'
+ ...
+);
+{% endhighlight %}
+
+By default, the planner assumes that a metadata column can be used for both
reading and writing. However,
+in many cases an external system provides more read-only metadata fields than
writable fields. Therefore,
+it is possible to exclude metadata columns from persisting using the `VIRTUAL`
keyword.
+
+{% highlight sql %}
+CREATE TABLE MyTable (
+ `timestamp` BIGINT METADATA, -- part of the query-to-sink schema
+ `offset` BIGINT METADATA VIRTUAL, -- not part of the query-to-sink schema
+ `user_id` BIGINT,
+ `name` STRING,
+) WITH (
+ 'connector' = 'kafka'
+ ...
+);
+{% endhighlight %}
+
+In the example above, the `offset` is a read-only metadata column and excluded
from the query-to-sink
+schema. Thus, source-to-query schema (for `SELECT`) and query-to-sink (for
`INSERT INTO`) schema differ:
+
+{% highlight text %}
+source-to-query schema:
+MyTable(`timestamp` BIGINT, `offset` BIGINT, `user_id` BIGINT, `name` STRING)
+
+query-to-sink schema:
+MyTable(`timestamp` BIGINT, `user_id` BIGINT, `name` STRING)
+{% endhighlight %}
+
+**Computed Columns**
+
+Computed columns are virtual columns that are generated using the syntax
`column_name AS computed_column_expression`.
+
+A computed column evaluates an expression that can reference other columns
declared in the same table.
+Both physical columns and metadata columns can be accessed if they preceed the
computed column in the
+schema declaration. The column itself is not physically stored within the
table. The column's data type
Review comment:
Can you clarify when the computed column is evaluated? At the source or
on demand? Or both?
##########
File path: docs/dev/table/connectors/upsert-kafka.md
##########
@@ -181,6 +199,12 @@ Connector Options
Features
----------------
+### Key and Value Formats
+
+See the [regular Kafka connector]({% link dev/connectors/kafka.md
%}#key-and-value-formats) for more
+explanation around key and value formats. However, note that this connector
requires both a key and
+value format where the key fields are derived from the `PRIMARY KEY`
constraint.
Review comment:
Can you add an example, I suspect this will be a very common use case.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]