aalexandrov commented on a change in pull request #13770:
URL: https://github.com/apache/flink/pull/13770#discussion_r517349077



##########
File path: docs/dev/table/connectors/kinesis.md
##########
@@ -0,0 +1,334 @@
+---
+title: "Amazon Kinesis Data Streams SQL Connector"
+nav-title: Kinesis
+nav-parent_id: sql-connectors
+nav-pos: 2
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<span class="label label-primary">Scan Source: Unbounded</span>
+<span class="label label-primary">Sink: Streaming Append Mode</span>
+
+* This will be replaced by the TOC
+{:toc}
+
+The Kinesis connector allows for reading data from and writing data into 
[Amazon Kinesis Data Streams 
(KDS)](https://aws.amazon.com/kinesis/data-streams/).
+
+Dependencies
+------------
+
+To use the connector, add the following Maven dependency to your project:
+
+{% highlight xml %}
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-connector-kinesis{{ site.scala_version_suffix 
}}</artifactId>
+  <version>{{site.version }}</version>
+</dependency>
+{% endhighlight %}
+
+How to create a Kinesis data stream table
+-----------------------------------------
+
+Follow the instructions from the [Amazon KDS Developer 
Guide](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html)
 to set up a Kinesis stream.
+The following example shows how to create a table backed by a Kinesis data 
stream:
+
+<div class="codetabs" markdown="1">
+<div data-lang="SQL" markdown="1">
+{% highlight sql %}
+CREATE TABLE KinesisTable (
+ user_id BIGINT,
+ item_id BIGINT,
+ category_id BIGINT,
+ behavior STRING,
+ ts TIMESTAMP(3)
+) PARTITIONED BY (user_id, item_id) WITH (
+ 'connector' = 'kinesis',
+ 'stream' = 'user_behavior',
+ 'properties.aws.region' = 'us-east-2',
+ 'properties.flink.stream.initpos' = 'LATEST',
+ 'format' = 'csv'
+)
+{% endhighlight %}
+</div>
+</div>
+
+Available Metadata
+------------------
+
+The following metadata can be exposed as read-only (`VIRTUAL`) columns in a 
table definition.
+
+<table class="table table-bordered">
+    <thead>
+    <tr>
+      <th class="text-left" style="width: 25%">Column Name</th>
+      <th class="text-center" style="width: 45%">Column Type</th>
+      <th class="text-center" style="width: 35%">Description</th>
+    </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><code><a 
href="https://docs.aws.amazon.com/kinesis/latest/APIReference/API_Record.html#Streams-Type-Record-ApproximateArrivalTimestamp";>ApproximateArrivalTimestamp</a></code></td>
+      <td><code>TIMESTAMP(3) WITH LOCAL TIMEZONE NOT NULL</code></td>
+      <td>The approximate time that the record was inserted into the 
stream.</td>
+    </tr>
+    <tr>
+      <td><code><a 
href="https://docs.aws.amazon.com/kinesis/latest/APIReference/API_Shard.html#Streams-Type-Shard-ShardId";>ShardId</a></code></td>
+      <td><code>VARCHAR(128) NOT NULL</code></td>
+      <td>The unique identifier of the shard within the stream from which the 
record was read.</td>
+    </tr>
+    <tr>
+      <td><code><a 
href="https://docs.aws.amazon.com/kinesis/latest/APIReference/API_Record.html#Streams-Type-Record-SequenceNumber";>SequenceNumber</a></code></td>
+      <td><code>VARCHAR(128) NOT NULL</code></td>
+      <td>The unique identifier of the record within its shard.</td>
+    </tr>
+    </tbody>
+</table>
+
+The extended `CREATE TABLE` example demonstrates the syntax for exposing these 
metadata columns:
+
+<div class="codetabs" markdown="1">
+<div data-lang="SQL" markdown="1">
+{% highlight sql %}
+CREATE TABLE KinesisTable (
+ user_id BIGINT,
+ item_id BIGINT,
+ category_id BIGINT,
+ behavior STRING,
+ ts TIMESTAMP(3),
+ arrival_time TIMESTAMP(3) METADATA FROM 'ApproximateArrivalTimestamp' VIRTUAL,
+ shard_id VARCHAR(128) NOT NULL METADATA FROM 'ShardId' VIRTUAL,
+ sequence_number VARCHAR(128) NOT NULL METADATA FROM 'SequenceNumber' VIRTUAL
+) PARTITIONED BY (user_id, item_id) WITH (
+ 'connector' = 'kinesis',
+ 'stream' = 'user_behavior',
+ 'properties.aws.region' = 'us-east-2',
+ 'properties.flink.stream.initpos' = 'LATEST',
+ 'format' = 'csv'
+)
+{% endhighlight %}
+</div>
+</div>
+
+Connector Options
+-----------------
+
+<table class="table table-bordered">
+    <thead>
+    <tr>
+      <th class="text-left" style="width: 25%">Option</th>
+      <th class="text-center" style="width: 8%">Required</th>
+      <th class="text-center" style="width: 7%">Default</th>
+      <th class="text-center" style="width: 10%">Type</th>
+      <th class="text-center" style="width: 50%">Description</th>
+    </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><h5>connector</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Specify what connector to use. For Kinesis use 
<code>'kinesis'</code>.</td>
+    </tr>
+    <tr>
+      <td><h5>stream</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Name of the Kinesis data stream backing this table.</td>
+    </tr>
+    <tr>
+      <td><h5>format</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The format used to deserialize and serialize Kinesis data stream 
records. See <a href="#data-type-mapping">Data Type Mapping</a> for 
details.</td>
+    </tr>
+    <tr>
+      <td><h5>sink.partitioner</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">random or row-based</td>
+      <td>String</td>
+      <td>Optional output partitioning from Flink's partitions into Kinesis 
shards. See <a href="#sink-partitioning">Sink Partitioning</a> for details.</td>
+    </tr>
+    <tr>
+      <td><h5>sink.partitioner.field.delimiter</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">|</td>
+      <td>String</td>
+      <td>Optional field delimiter for a fields-based partitioner derived from 
a PARTITION BY clause. See <a href="#sink-partitioning">Sink Partitioning</a> 
for details.</td>
+    </tr>
+    <tr>
+      <td><h5>properties.aws.region</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The AWS region where the stream is defined. Either this or 
<code>properties.aws.endpoint</code> are required.</td>
+    </tr>
+    <tr>
+      <td><h5>properties.aws.endpoint</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The AWS endpoint for Kinesis (derived from the AWS region setting if 
not set). Either this or <code>properties.aws.region</code> are required.</td>
+    </tr>
+    <tr>
+      <td><h5>properties.aws.credentials.provider</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">AUTO</td>
+      <td>String</td>
+      <td>A credentials provider to use when authenticating against the 
Kinesis endpoint. See <a href="#authentication">Authentication</a> for 
details.</td>
+    </tr>
+    <tr>
+      <td><h5>properties.flink.stream.initpos</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">LATEST</td>
+      <td>String</td>
+      <td>Initial position to be used when reading from the table. See <a 
href="#start-reading-position">Start Reading Position</a> for details.</td>
+    </tr>
+    <tr>
+      <td><h5>properties.flink.stream.recordpublisher</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">POLLING</td>
+      <td>String</td>
+      <td>The `RecordPublisher` type to use for sources. See <a 
href="#enhanced-fan-out">Enhanced Fan-Out</a> for details.</td>
+    </tr>
+    <tr>
+      <td><h5>properties.*</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>
+      Other properties to pass to the FlinkKinesisConsumer or 
FlinkKinesisProducer constructors. 
+      See the constants defined in 
+      <ul>
+        <li><code>AWSConfigConstants</code>,</li> 
+        <li><code>ConsumerConfigConstants</code>, and</li> 
+        <li><code>ProducerConfigConstants</code></li>
+      </ul>
+      for detailed information on the available suffixes.
+      </td>
+    </tr>
+    </tbody>
+</table>
+
+Features
+--------
+
+### Authorization
+
+Make sure to [create an appropriate IAM 
policy](https://docs.aws.amazon.com/streams/latest/dev/controlling-access.html) 
to allow reading from / writing to the Kinesis data streams.
+
+### Authentication
+
+Depending on your deployment you would choose a different Credentials Provider 
to allow access to Kinesis.
+By default, the `AUTO` Credentials Provider is used.
+If the access key ID and secret key are set in the deployment configuration, 
this results in using the `BASIC` provider.  
+
+A specific 
[AWSCredentialsProvider](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/index.html?com/amazonaws/auth/AWSCredentialsProvider.html)
 can be **optionally** set using the `properties.aws.credentials.provider` 
setting.
+Supported values are:
+
+* `AUTO` - Use the default AWS Credentials Provider chain that searches for 
credentials in the following order: `ENV_VARS`, `SYS_PROPS`, 
`WEB_IDENTITY_TOKEN`, `PROFILE`, and EC2/ECS credentials provider.
+* `BASIC` - Use access key ID and secret key supplied as configuration. 
+* `ENV_VAR` - Use `AWS_ACCESS_KEY_ID` & `AWS_SECRET_ACCESS_KEY` environment 
variables.
+* `SYS_PROP` - Use Java system properties `aws.accessKeyId` and 
`aws.secretKey`.
+* `PROFILE` - Use an AWS credentials profile to create the AWS credentials.
+* `ASSUME_ROLE` - Create AWS credentials by assuming a role. The credentials 
for assuming the role must be supplied.
+* `WEB_IDENTITY_TOKEN` - Create AWS credentials by assuming a role using Web 
Identity Token. 
+
+### Start Reading Position
+
+You can configure table sources to start reading a table-backing Kinesis data 
stream from a specific position through the `properties.flink.stream.initpos` 
option.

Review comment:
       I adopted the `properties.*` prefix-based based on a previous discussion 
(#2228 and #4473). The consensus from the discussion in these PRs was to:
   
   - Stick with naming closed to the KDS documentation so people familiar with 
official Kinesis terminology can find they way around more quickly [see this 
comment](https://github.com/apache/flink/pull/2228).
   - Make sure that users can directly pass-through options to the underlying 
KCL `KinesisProducerConfiguration` class [see this 
comment](https://github.com/apache/flink/pull/4473#issuecomment-320855718).
   
   This refers at components exposed at a lower layer (`DataSet` & `DataStream` 
APIs) and creates tension with the desire to keep property key names unified 
across different connectors at the Table API & SQL layers.
   
   I suggest to do the following:
   
   - Keys that are related to flink-level configuration will be exposed with 
table-api names and re-mapped internally by the factory components (so for 
example `properties.flink.stream.initpos` will be exposed at the SQL layer as 
`scan.stream.initpos`.
   - Keys that are passed directly to `KinesisProducerConfiguration` will be 
exposed with `kinesis.*` prefix.

##########
File path: docs/dev/table/connectors/kinesis.md
##########
@@ -0,0 +1,334 @@
+---
+title: "Amazon Kinesis Data Streams SQL Connector"
+nav-title: Kinesis
+nav-parent_id: sql-connectors
+nav-pos: 2
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<span class="label label-primary">Scan Source: Unbounded</span>
+<span class="label label-primary">Sink: Streaming Append Mode</span>
+
+* This will be replaced by the TOC
+{:toc}
+
+The Kinesis connector allows for reading data from and writing data into 
[Amazon Kinesis Data Streams 
(KDS)](https://aws.amazon.com/kinesis/data-streams/).
+
+Dependencies
+------------
+
+To use the connector, add the following Maven dependency to your project:
+
+{% highlight xml %}
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-connector-kinesis{{ site.scala_version_suffix 
}}</artifactId>
+  <version>{{site.version }}</version>
+</dependency>
+{% endhighlight %}
+
+How to create a Kinesis data stream table
+-----------------------------------------
+
+Follow the instructions from the [Amazon KDS Developer 
Guide](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html)
 to set up a Kinesis stream.
+The following example shows how to create a table backed by a Kinesis data 
stream:
+
+<div class="codetabs" markdown="1">
+<div data-lang="SQL" markdown="1">
+{% highlight sql %}
+CREATE TABLE KinesisTable (
+ user_id BIGINT,
+ item_id BIGINT,
+ category_id BIGINT,
+ behavior STRING,
+ ts TIMESTAMP(3)
+) PARTITIONED BY (user_id, item_id) WITH (
+ 'connector' = 'kinesis',
+ 'stream' = 'user_behavior',
+ 'properties.aws.region' = 'us-east-2',
+ 'properties.flink.stream.initpos' = 'LATEST',
+ 'format' = 'csv'
+)
+{% endhighlight %}
+</div>
+</div>
+
+Available Metadata
+------------------
+
+The following metadata can be exposed as read-only (`VIRTUAL`) columns in a 
table definition.
+
+<table class="table table-bordered">
+    <thead>
+    <tr>
+      <th class="text-left" style="width: 25%">Column Name</th>
+      <th class="text-center" style="width: 45%">Column Type</th>
+      <th class="text-center" style="width: 35%">Description</th>
+    </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><code><a 
href="https://docs.aws.amazon.com/kinesis/latest/APIReference/API_Record.html#Streams-Type-Record-ApproximateArrivalTimestamp";>ApproximateArrivalTimestamp</a></code></td>
+      <td><code>TIMESTAMP(3) WITH LOCAL TIMEZONE NOT NULL</code></td>
+      <td>The approximate time that the record was inserted into the 
stream.</td>
+    </tr>
+    <tr>
+      <td><code><a 
href="https://docs.aws.amazon.com/kinesis/latest/APIReference/API_Shard.html#Streams-Type-Shard-ShardId";>ShardId</a></code></td>
+      <td><code>VARCHAR(128) NOT NULL</code></td>
+      <td>The unique identifier of the shard within the stream from which the 
record was read.</td>
+    </tr>
+    <tr>
+      <td><code><a 
href="https://docs.aws.amazon.com/kinesis/latest/APIReference/API_Record.html#Streams-Type-Record-SequenceNumber";>SequenceNumber</a></code></td>
+      <td><code>VARCHAR(128) NOT NULL</code></td>
+      <td>The unique identifier of the record within its shard.</td>
+    </tr>
+    </tbody>
+</table>
+
+The extended `CREATE TABLE` example demonstrates the syntax for exposing these 
metadata columns:
+
+<div class="codetabs" markdown="1">
+<div data-lang="SQL" markdown="1">
+{% highlight sql %}
+CREATE TABLE KinesisTable (
+ user_id BIGINT,
+ item_id BIGINT,
+ category_id BIGINT,
+ behavior STRING,
+ ts TIMESTAMP(3),
+ arrival_time TIMESTAMP(3) METADATA FROM 'ApproximateArrivalTimestamp' VIRTUAL,

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to