sjwiesman commented on a change in pull request #14156:
URL: https://github.com/apache/flink/pull/14156#discussion_r527895688



##########
File path: docs/dev/table/connectors/index.md
##########
@@ -95,20 +95,23 @@ Flink natively support various connectors. The following 
tables list all availab
 How to use connectors
 --------
 
-Flink supports to use SQL CREATE TABLE statement to register a table. One can 
define the table name, the table schema, and the table options for connecting 
to an external system.
+Flink supports to use SQL `CREATE TABLE` statement to register a table. One 
can define the table name,
+the table schema, and the table options for connecting to an external system.

Review comment:
       ```suggestion
   Flink supports using SQL `CREATE TABLE` statements to register tables. One 
can define the table name,
   the table schema, and the table options for connecting to an external system.
   ```

##########
File path: docs/dev/table/sql/create.md
##########
@@ -182,24 +187,165 @@ CREATE TABLE [catalog_name.][db_name.]table_name
 
 {% endhighlight %}
 
-Creates a table with the given name. If a table with the same name already 
exists in the catalog, an exception is thrown.
+The statement above creates a table with the given name. If a table with the 
same name already exists
+in the catalog, an exception is thrown.
+
+### Columns
 
-**COMPUTED COLUMN**
+**Physical / Regular Columns**
 
-A computed column is a virtual column that is generated using the syntax  
"`column_name AS computed_column_expression`". It is generated from a non-query 
expression that uses other columns in the same table and is not physically 
stored within the table. For example, a computed column could be defined as 
`cost AS price * quantity`. The expression may contain any combination of 
physical column, constant, function, or variable. The expression cannot contain 
a subquery.
+Physical columns are regular columns known from databases. They define the 
names, the types, and the
+order of fields in the physical data. Thus, physical columns represent the 
payload that is read from
+and written to an external system. Connectors and formats are using these 
columns (in the defined order)
+to configure themselves. Other kinds of columns can be declared between 
physical columns but will not
+influence the final physical schema.
 
-Computed columns are commonly used in Flink for defining [time attributes]({{ 
site.baseurl}}/dev/table/streaming/time_attributes.html) in CREATE TABLE 
statements.
-A [processing time attribute]({{ 
site.baseurl}}/dev/table/streaming/time_attributes.html#processing-time) can be 
defined easily via `proc AS PROCTIME()` using the system `PROCTIME()` function.
-On the other hand, computed column can be used to derive event time column 
because an event time column may need to be derived from existing fields, e.g. 
the original field is not `TIMESTAMP(3)` type or is nested in a JSON string.
+The following statement creates a table with only regular columns:
 
-Notes:
+{% highlight sql %}
+CREATE TABLE MyTable (
+  `user_id` BIGINT,
+  `name` STRING
+) WITH (
+  ...
+);
+{% endhighlight %}
 
-- A computed column defined on a source table is computed after reading from 
the source, it can be used in the following SELECT query statements.
-- A computed column cannot be the target of an INSERT statement. In INSERT 
statements, the schema of SELECT clause should match the schema of the target 
table without computed columns.
+**Metadata Columns**
+
+Metadata columns are an extension to the SQL standard and allow to access 
connector and/or format specific
+fields for every row of a table. A metadata column is indicated by the 
`METADATA` keyword. For example,
+a metadata column can be be used to read and write the timestamp from and to 
Kafka records for time-based
+operations. The [connector and format documentation]({% link 
dev/table/connectors/index.md %}) lists the
+available metadata fields for every component. However, declaring a metadata 
column in a table's schema
+is optional.
+
+The following statement creates a table with an additional metadata column 
that references the metadata field `timestamp`:
+
+{% highlight sql %}
+CREATE TABLE MyTable (
+  `user_id` BIGINT,
+  `name` STRING,
+  `record_time` TIMESTAMP(3) WITH LOCAL TIME ZONE METADATA FROM 'timestamp'    
-- reads and writes a Kafka record's timestamp
+) WITH (
+  'connector' = 'kafka'
+  ...
+);
+{% endhighlight %}
 
-**WATERMARK**
+Every metadata field is identified by a string-based key and has a documented 
data type. For example,
+the Kafka connector exposes a metadata field with key `timestamp` and data 
type `TIMESTAMP(3) WITH LOCAL TIME ZONE`
+that can be used for both reading and writing records.
 
-The `WATERMARK` defines the event time attributes of a table and takes the 
form `WATERMARK FOR rowtime_column_name  AS watermark_strategy_expression`.
+In the example above, the metadata column `record_time` becomes part of the 
table's schema and can be
+transformed and stored like a regular column:
+
+{% highlight sql %}
+INSERT INTO MyTable SELECT user_id, name, record_time + INTERVAL '1' SECOND 
FROM MyTable;
+{% endhighlight %}
+
+For convenience, the `FROM` clause can be omitted if the column name should be 
used as the identifying metadata key:
+
+{% highlight sql %}
+CREATE TABLE MyTable (
+  `user_id` BIGINT,
+  `name` STRING,
+  `timestamp` TIMESTAMP(3) WITH LOCAL TIME ZONE METADATA    -- use column name 
as metadata key
+) WITH (
+  'connector' = 'kafka'
+  ...
+);
+{% endhighlight %}
+
+For convenience, the runtime will perform an explicit cast if the data type of 
the column differs from
+the data type of the metadata field. Of course this requires that the two data 
types are compatible.

Review comment:
       ```suggestion
   the data type of the metadata field. Of course, this requires that the two 
data types are compatible.
   ```

##########
File path: docs/dev/table/connectors/kafka.md
##########
@@ -189,6 +362,130 @@ Connector Options
 
 Features
 ----------------
+
+### Key and Value Formats
+
+Both the key and value part of a Kafka record can be serialized to and 
deserialized from raw bytes using
+one of the given [formats]({% link dev/table/connectors/formats/index.md %}).
+
+**Value Format**
+
+Since a key is optional in Kafka records, the following statement reads and 
writes records with a configured
+value format but without a key format. The `'format'` option is a synonym for 
`'value.format'`. All format
+options are prefixed with the format identifier.
+
+<div class="codetabs" markdown="1">
+<div data-lang="SQL" markdown="1">
+{% highlight sql %}
+CREATE TABLE KafkaTable (,
+  `ts` TIMESTAMP(3) METADATA FROM 'timestamp',
+  `user_id` BIGINT,
+  `item_id` BIGINT,
+  `behavior` STRING
+) WITH (
+  'connector' = 'kafka',
+  ...
+
+  'format' = 'json',
+  'json.ignore-parse-errors' = 'true'
+)
+{% endhighlight %}
+</div>
+</div>
+
+The value format will be configured with the following data type:
+
+{% highlight text %}
+ROW<`user_id` BIGINT, `item_id` BIGINT, `behavior` STRING>
+{% endhighlight %}
+
+**Key and Value Format**
+
+The following example shows how to specify and configure key and value 
formats. The format options are
+prefixed with either the `'key'` or `'value'` plus format identifier.
+
+<div class="codetabs" markdown="1">
+<div data-lang="SQL" markdown="1">
+{% highlight sql %}
+CREATE TABLE KafkaTable (
+  `ts` TIMESTAMP(3) METADATA FROM 'timestamp',
+  `user_id` BIGINT,
+  `item_id` BIGINT,
+  `behavior` STRING
+) WITH (
+  'connector' = 'kafka',
+  ...
+
+  'key.format' = 'json',
+  'key.json.ignore-parse-errors' = 'true',
+  'key.fields' = 'user_id;item_id',
+
+  'value.format' = 'json',
+  'value.json.fail-on-missing-field' = 'false',
+  'value.fields-include' = 'ALL'
+)
+{% endhighlight %}
+</div>
+</div>
+
+The key format includes the fields listed in `'key.fields'` (in the same 
order). Thus, it will be configured with

Review comment:
       Can you specify the delimiter? I'm guessing `;` based on the example but 
I'm not positive. 

##########
File path: docs/dev/table/connectors/kinesis.md
##########
@@ -85,44 +82,46 @@ The following metadata can be exposed as read-only 
(`VIRTUAL`) columns in a tabl
     </thead>
     <tbody>
     <tr>
-      <td><code><a 
href="https://docs.aws.amazon.com/kinesis/latest/APIReference/API_Record.html#Streams-Type-Record-ApproximateArrivalTimestamp";>'timestamp'</a></code></td>
-      <td><code>TIMESTAMP(3) WITH LOCAL TIMEZONE NOT NULL</code></td>
+      <td><code><a 
href="https://docs.aws.amazon.com/kinesis/latest/APIReference/API_Record.html#Streams-Type-Record-ApproximateArrivalTimestamp";>timestamp</a></code></td>
+      <td><code>TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL</code></td>
       <td>The approximate time when the record was inserted into the 
stream.</td>
     </tr>
     <tr>
-      <td><code><a 
href="https://docs.aws.amazon.com/kinesis/latest/APIReference/API_Shard.html#Streams-Type-Shard-ShardId";>'shard-id'</a></code></td>
+      <td><code><a 
href="https://docs.aws.amazon.com/kinesis/latest/APIReference/API_Shard.html#Streams-Type-Shard-ShardId";>shard-id</a></code></td>
       <td><code>VARCHAR(128) NOT NULL</code></td>
       <td>The unique identifier of the shard within the stream from which the 
record was read.</td>
     </tr>
     <tr>
-      <td><code><a 
href="https://docs.aws.amazon.com/kinesis/latest/APIReference/API_Record.html#Streams-Type-Record-SequenceNumber";>'sequence-number'</a></code></td>
+      <td><code><a 
href="https://docs.aws.amazon.com/kinesis/latest/APIReference/API_Record.html#Streams-Type-Record-SequenceNumber";>sequence-number</a></code></td>
       <td><code>VARCHAR(128) NOT NULL</code></td>
       <td>The unique identifier of the record within its shard.</td>
     </tr>
     </tbody>
 </table>
 
-The extended `CREATE TABLE` example demonstrates the syntax for exposing these 
metadata columns:
+The extended `CREATE TABLE` example demonstrates the syntax for exposing these 
metadata fields:
 
 <div class="codetabs" markdown="1">
 <div data-lang="SQL" markdown="1">
 {% highlight sql %}
 CREATE TABLE KinesisTable (
- user_id BIGINT,
- item_id BIGINT,
- category_id BIGINT,
- behavior STRING,
- ts TIMESTAMP(3),
- arrival_time TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL,
- shard_id VARCHAR(128) NOT NULL METADATA FROM 'shard-id' VIRTUAL,
- sequence_number VARCHAR(128) NOT NULL METADATA FROM 'sequence-number' VIRTUAL
-) PARTITIONED BY (user_id, item_id) WITH (
- 'connector' = 'kinesis',
- 'stream' = 'user_behavior',
- 'aws.region' = 'us-east-2',
- 'scan.stream.initpos' = 'LATEST',
- 'format' = 'csv'
+  `user_id` BIGINT,
+  `item_id` BIGINT,
+  `category_id` BIGINT,
+  `behavior` STRING,
+  `ts` TIMESTAMP(3),
+  `arrival_time` TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL,

Review comment:
       On the debezium page you said only Kafka can forward metadata. Is that 
not true, or did I misunderstand what you said. 

##########
File path: 
flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonDecodingFormat.java
##########
@@ -150,13 +150,16 @@ public Object convert(GenericRowData row, int pos) {
 
                INGESTION_TIMESTAMP(
                        "ingestion-timestamp",
-                       DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).notNull(),
+                       DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).nullable(),

Review comment:
       Is this supposed to be part of this PR? 

##########
File path: docs/dev/table/sql/create.md
##########
@@ -182,24 +187,165 @@ CREATE TABLE [catalog_name.][db_name.]table_name
 
 {% endhighlight %}
 
-Creates a table with the given name. If a table with the same name already 
exists in the catalog, an exception is thrown.
+The statement above creates a table with the given name. If a table with the 
same name already exists
+in the catalog, an exception is thrown.
+
+### Columns
 
-**COMPUTED COLUMN**
+**Physical / Regular Columns**
 
-A computed column is a virtual column that is generated using the syntax  
"`column_name AS computed_column_expression`". It is generated from a non-query 
expression that uses other columns in the same table and is not physically 
stored within the table. For example, a computed column could be defined as 
`cost AS price * quantity`. The expression may contain any combination of 
physical column, constant, function, or variable. The expression cannot contain 
a subquery.
+Physical columns are regular columns known from databases. They define the 
names, the types, and the
+order of fields in the physical data. Thus, physical columns represent the 
payload that is read from
+and written to an external system. Connectors and formats are using these 
columns (in the defined order)
+to configure themselves. Other kinds of columns can be declared between 
physical columns but will not
+influence the final physical schema.

Review comment:
       ```suggestion
   Physical columns are regular columns known from databases. They define the 
names, the types, and the
   order of fields in the physical data. Thus, physical columns represent the 
payload that is read from
   and written to an external system. Connectors and formats use these columns 
(in the defined order)
   to configure themselves. Other kinds of columns can be declared between 
physical columns but will not
   influence the final physical schema.
   ```

##########
File path: docs/dev/table/sql/create.md
##########
@@ -182,24 +187,165 @@ CREATE TABLE [catalog_name.][db_name.]table_name
 
 {% endhighlight %}
 
-Creates a table with the given name. If a table with the same name already 
exists in the catalog, an exception is thrown.
+The statement above creates a table with the given name. If a table with the 
same name already exists
+in the catalog, an exception is thrown.
+
+### Columns
 
-**COMPUTED COLUMN**
+**Physical / Regular Columns**
 
-A computed column is a virtual column that is generated using the syntax  
"`column_name AS computed_column_expression`". It is generated from a non-query 
expression that uses other columns in the same table and is not physically 
stored within the table. For example, a computed column could be defined as 
`cost AS price * quantity`. The expression may contain any combination of 
physical column, constant, function, or variable. The expression cannot contain 
a subquery.
+Physical columns are regular columns known from databases. They define the 
names, the types, and the
+order of fields in the physical data. Thus, physical columns represent the 
payload that is read from
+and written to an external system. Connectors and formats are using these 
columns (in the defined order)
+to configure themselves. Other kinds of columns can be declared between 
physical columns but will not
+influence the final physical schema.
 
-Computed columns are commonly used in Flink for defining [time attributes]({{ 
site.baseurl}}/dev/table/streaming/time_attributes.html) in CREATE TABLE 
statements.
-A [processing time attribute]({{ 
site.baseurl}}/dev/table/streaming/time_attributes.html#processing-time) can be 
defined easily via `proc AS PROCTIME()` using the system `PROCTIME()` function.
-On the other hand, computed column can be used to derive event time column 
because an event time column may need to be derived from existing fields, e.g. 
the original field is not `TIMESTAMP(3)` type or is nested in a JSON string.
+The following statement creates a table with only regular columns:
 
-Notes:
+{% highlight sql %}
+CREATE TABLE MyTable (
+  `user_id` BIGINT,
+  `name` STRING
+) WITH (
+  ...
+);
+{% endhighlight %}
 
-- A computed column defined on a source table is computed after reading from 
the source, it can be used in the following SELECT query statements.
-- A computed column cannot be the target of an INSERT statement. In INSERT 
statements, the schema of SELECT clause should match the schema of the target 
table without computed columns.
+**Metadata Columns**
+
+Metadata columns are an extension to the SQL standard and allow to access 
connector and/or format specific
+fields for every row of a table. A metadata column is indicated by the 
`METADATA` keyword. For example,
+a metadata column can be be used to read and write the timestamp from and to 
Kafka records for time-based
+operations. The [connector and format documentation]({% link 
dev/table/connectors/index.md %}) lists the
+available metadata fields for every component. However, declaring a metadata 
column in a table's schema
+is optional.
+
+The following statement creates a table with an additional metadata column 
that references the metadata field `timestamp`:
+
+{% highlight sql %}
+CREATE TABLE MyTable (
+  `user_id` BIGINT,
+  `name` STRING,
+  `record_time` TIMESTAMP(3) WITH LOCAL TIME ZONE METADATA FROM 'timestamp'    
-- reads and writes a Kafka record's timestamp
+) WITH (
+  'connector' = 'kafka'
+  ...
+);
+{% endhighlight %}
 
-**WATERMARK**
+Every metadata field is identified by a string-based key and has a documented 
data type. For example,
+the Kafka connector exposes a metadata field with key `timestamp` and data 
type `TIMESTAMP(3) WITH LOCAL TIME ZONE`
+that can be used for both reading and writing records.
 
-The `WATERMARK` defines the event time attributes of a table and takes the 
form `WATERMARK FOR rowtime_column_name  AS watermark_strategy_expression`.
+In the example above, the metadata column `record_time` becomes part of the 
table's schema and can be
+transformed and stored like a regular column:
+
+{% highlight sql %}
+INSERT INTO MyTable SELECT user_id, name, record_time + INTERVAL '1' SECOND 
FROM MyTable;
+{% endhighlight %}
+
+For convenience, the `FROM` clause can be omitted if the column name should be 
used as the identifying metadata key:
+
+{% highlight sql %}
+CREATE TABLE MyTable (
+  `user_id` BIGINT,
+  `name` STRING,
+  `timestamp` TIMESTAMP(3) WITH LOCAL TIME ZONE METADATA    -- use column name 
as metadata key
+) WITH (
+  'connector' = 'kafka'
+  ...
+);
+{% endhighlight %}
+
+For convenience, the runtime will perform an explicit cast if the data type of 
the column differs from
+the data type of the metadata field. Of course this requires that the two data 
types are compatible.
+
+{% highlight sql %}
+CREATE TABLE MyTable (
+  `user_id` BIGINT,
+  `name` STRING,
+  `timestamp` BIGINT METADATA    -- cast the timestamp as BIGINT
+) WITH (
+  'connector' = 'kafka'
+  ...
+);
+{% endhighlight %}
+
+By default, the planner assumes that a metadata column can be used for both 
reading and writing. However,
+in many cases an external system provides more read-only metadata fields than 
writable fields. Therefore,
+it is possible to exclude metadata columns from persisting using the `VIRTUAL` 
keyword.
+
+{% highlight sql %}
+CREATE TABLE MyTable (
+  `timestamp` BIGINT METADATA,       -- part of the query-to-sink schema
+  `offset` BIGINT METADATA VIRTUAL,  -- not part of the query-to-sink schema
+  `user_id` BIGINT,
+  `name` STRING,
+) WITH (
+  'connector' = 'kafka'
+  ...
+);
+{% endhighlight %}
+
+In the example above, the `offset` is a read-only metadata column and excluded 
from the query-to-sink
+schema. Thus, source-to-query schema (for `SELECT`) and query-to-sink (for 
`INSERT INTO`) schema differ:
+
+{% highlight text %}
+source-to-query schema:
+MyTable(`timestamp` BIGINT, `offset` BIGINT, `user_id` BIGINT, `name` STRING)
+
+query-to-sink schema:
+MyTable(`timestamp` BIGINT, `user_id` BIGINT, `name` STRING)
+{% endhighlight %}
+
+**Computed Columns**
+
+Computed columns are virtual columns that are generated using the syntax 
`column_name AS computed_column_expression`.
+
+A computed column evaluates an expression that can reference other columns 
declared in the same table.
+Both physical columns and metadata columns can be accessed if they preceed the 
computed column in the
+schema declaration. The column itself is not physically stored within the 
table. The column's data type

Review comment:
       Can you clarify when the computed column is evaluated? At the source or 
on demand? Or both? 

##########
File path: docs/dev/table/connectors/upsert-kafka.md
##########
@@ -181,6 +199,12 @@ Connector Options
 Features
 ----------------
 
+### Key and Value Formats
+
+See the [regular Kafka connector]({% link dev/connectors/kafka.md 
%}#key-and-value-formats) for more
+explanation around key and value formats. However, note that this connector 
requires both a key and
+value format where the key fields are derived from the `PRIMARY KEY` 
constraint.

Review comment:
       Can you add an example, I suspect this will be a very common use case. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to