aalexandrov commented on a change in pull request #13770: URL: https://github.com/apache/flink/pull/13770#discussion_r517349077
########## File path: docs/dev/table/connectors/kinesis.md ########## @@ -0,0 +1,334 @@ +--- +title: "Amazon Kinesis Data Streams SQL Connector" +nav-title: Kinesis +nav-parent_id: sql-connectors +nav-pos: 2 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +<span class="label label-primary">Scan Source: Unbounded</span> +<span class="label label-primary">Sink: Streaming Append Mode</span> + +* This will be replaced by the TOC +{:toc} + +The Kinesis connector allows for reading data from and writing data into [Amazon Kinesis Data Streams (KDS)](https://aws.amazon.com/kinesis/data-streams/). + +Dependencies +------------ + +To use the connector, add the following Maven dependency to your project: + +{% highlight xml %} +<dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-kinesis{{ site.scala_version_suffix }}</artifactId> + <version>{{site.version }}</version> +</dependency> +{% endhighlight %} + +How to create a Kinesis data stream table +----------------------------------------- + +Follow the instructions from the [Amazon KDS Developer Guide](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html) to set up a Kinesis stream. +The following example shows how to create a table backed by a Kinesis data stream: + +<div class="codetabs" markdown="1"> +<div data-lang="SQL" markdown="1"> +{% highlight sql %} +CREATE TABLE KinesisTable ( + user_id BIGINT, + item_id BIGINT, + category_id BIGINT, + behavior STRING, + ts TIMESTAMP(3) +) PARTITIONED BY (user_id, item_id) WITH ( + 'connector' = 'kinesis', + 'stream' = 'user_behavior', + 'properties.aws.region' = 'us-east-2', + 'properties.flink.stream.initpos' = 'LATEST', + 'format' = 'csv' +) +{% endhighlight %} +</div> +</div> + +Available Metadata +------------------ + +The following metadata can be exposed as read-only (`VIRTUAL`) columns in a table definition. + +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 25%">Column Name</th> + <th class="text-center" style="width: 45%">Column Type</th> + <th class="text-center" style="width: 35%">Description</th> + </tr> + </thead> + <tbody> + <tr> + <td><code><a href="https://docs.aws.amazon.com/kinesis/latest/APIReference/API_Record.html#Streams-Type-Record-ApproximateArrivalTimestamp">ApproximateArrivalTimestamp</a></code></td> + <td><code>TIMESTAMP(3) WITH LOCAL TIMEZONE NOT NULL</code></td> + <td>The approximate time that the record was inserted into the stream.</td> + </tr> + <tr> + <td><code><a href="https://docs.aws.amazon.com/kinesis/latest/APIReference/API_Shard.html#Streams-Type-Shard-ShardId">ShardId</a></code></td> + <td><code>VARCHAR(128) NOT NULL</code></td> + <td>The unique identifier of the shard within the stream from which the record was read.</td> + </tr> + <tr> + <td><code><a href="https://docs.aws.amazon.com/kinesis/latest/APIReference/API_Record.html#Streams-Type-Record-SequenceNumber">SequenceNumber</a></code></td> + <td><code>VARCHAR(128) NOT NULL</code></td> + <td>The unique identifier of the record within its shard.</td> + </tr> + </tbody> +</table> + +The extended `CREATE TABLE` example demonstrates the syntax for exposing these metadata columns: + +<div class="codetabs" markdown="1"> +<div data-lang="SQL" markdown="1"> +{% highlight sql %} +CREATE TABLE KinesisTable ( + user_id BIGINT, + item_id BIGINT, + category_id BIGINT, + behavior STRING, + ts TIMESTAMP(3), + arrival_time TIMESTAMP(3) METADATA FROM 'ApproximateArrivalTimestamp' VIRTUAL, + shard_id VARCHAR(128) NOT NULL METADATA FROM 'ShardId' VIRTUAL, + sequence_number VARCHAR(128) NOT NULL METADATA FROM 'SequenceNumber' VIRTUAL +) PARTITIONED BY (user_id, item_id) WITH ( + 'connector' = 'kinesis', + 'stream' = 'user_behavior', + 'properties.aws.region' = 'us-east-2', + 'properties.flink.stream.initpos' = 'LATEST', + 'format' = 'csv' +) +{% endhighlight %} +</div> +</div> + +Connector Options +----------------- + +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 25%">Option</th> + <th class="text-center" style="width: 8%">Required</th> + <th class="text-center" style="width: 7%">Default</th> + <th class="text-center" style="width: 10%">Type</th> + <th class="text-center" style="width: 50%">Description</th> + </tr> + </thead> + <tbody> + <tr> + <td><h5>connector</h5></td> + <td>required</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>Specify what connector to use. For Kinesis use <code>'kinesis'</code>.</td> + </tr> + <tr> + <td><h5>stream</h5></td> + <td>required</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>Name of the Kinesis data stream backing this table.</td> + </tr> + <tr> + <td><h5>format</h5></td> + <td>required</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>The format used to deserialize and serialize Kinesis data stream records. See <a href="#data-type-mapping">Data Type Mapping</a> for details.</td> + </tr> + <tr> + <td><h5>sink.partitioner</h5></td> + <td>optional</td> + <td style="word-wrap: break-word;">random or row-based</td> + <td>String</td> + <td>Optional output partitioning from Flink's partitions into Kinesis shards. See <a href="#sink-partitioning">Sink Partitioning</a> for details.</td> + </tr> + <tr> + <td><h5>sink.partitioner.field.delimiter</h5></td> + <td>optional</td> + <td style="word-wrap: break-word;">|</td> + <td>String</td> + <td>Optional field delimiter for a fields-based partitioner derived from a PARTITION BY clause. See <a href="#sink-partitioning">Sink Partitioning</a> for details.</td> + </tr> + <tr> + <td><h5>properties.aws.region</h5></td> + <td>optional</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>The AWS region where the stream is defined. Either this or <code>properties.aws.endpoint</code> are required.</td> + </tr> + <tr> + <td><h5>properties.aws.endpoint</h5></td> + <td>optional</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>The AWS endpoint for Kinesis (derived from the AWS region setting if not set). Either this or <code>properties.aws.region</code> are required.</td> + </tr> + <tr> + <td><h5>properties.aws.credentials.provider</h5></td> + <td>optional</td> + <td style="word-wrap: break-word;">AUTO</td> + <td>String</td> + <td>A credentials provider to use when authenticating against the Kinesis endpoint. See <a href="#authentication">Authentication</a> for details.</td> + </tr> + <tr> + <td><h5>properties.flink.stream.initpos</h5></td> + <td>optional</td> + <td style="word-wrap: break-word;">LATEST</td> + <td>String</td> + <td>Initial position to be used when reading from the table. See <a href="#start-reading-position">Start Reading Position</a> for details.</td> + </tr> + <tr> + <td><h5>properties.flink.stream.recordpublisher</h5></td> + <td>optional</td> + <td style="word-wrap: break-word;">POLLING</td> + <td>String</td> + <td>The `RecordPublisher` type to use for sources. See <a href="#enhanced-fan-out">Enhanced Fan-Out</a> for details.</td> + </tr> + <tr> + <td><h5>properties.*</h5></td> + <td>optional</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td> + Other properties to pass to the FlinkKinesisConsumer or FlinkKinesisProducer constructors. + See the constants defined in + <ul> + <li><code>AWSConfigConstants</code>,</li> + <li><code>ConsumerConfigConstants</code>, and</li> + <li><code>ProducerConfigConstants</code></li> + </ul> + for detailed information on the available suffixes. + </td> + </tr> + </tbody> +</table> + +Features +-------- + +### Authorization + +Make sure to [create an appropriate IAM policy](https://docs.aws.amazon.com/streams/latest/dev/controlling-access.html) to allow reading from / writing to the Kinesis data streams. + +### Authentication + +Depending on your deployment you would choose a different Credentials Provider to allow access to Kinesis. +By default, the `AUTO` Credentials Provider is used. +If the access key ID and secret key are set in the deployment configuration, this results in using the `BASIC` provider. + +A specific [AWSCredentialsProvider](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/index.html?com/amazonaws/auth/AWSCredentialsProvider.html) can be **optionally** set using the `properties.aws.credentials.provider` setting. +Supported values are: + +* `AUTO` - Use the default AWS Credentials Provider chain that searches for credentials in the following order: `ENV_VARS`, `SYS_PROPS`, `WEB_IDENTITY_TOKEN`, `PROFILE`, and EC2/ECS credentials provider. +* `BASIC` - Use access key ID and secret key supplied as configuration. +* `ENV_VAR` - Use `AWS_ACCESS_KEY_ID` & `AWS_SECRET_ACCESS_KEY` environment variables. +* `SYS_PROP` - Use Java system properties `aws.accessKeyId` and `aws.secretKey`. +* `PROFILE` - Use an AWS credentials profile to create the AWS credentials. +* `ASSUME_ROLE` - Create AWS credentials by assuming a role. The credentials for assuming the role must be supplied. +* `WEB_IDENTITY_TOKEN` - Create AWS credentials by assuming a role using Web Identity Token. + +### Start Reading Position + +You can configure table sources to start reading a table-backing Kinesis data stream from a specific position through the `properties.flink.stream.initpos` option. Review comment: I adopted the `properties.*` prefix-based based on a previous discussion (#2228 and #4473). The consensus from the discussion in these PRs was to: - Stick with naming closed to the KDS documentation so people familiar with official Kinesis terminology can find they way around more quickly [see this comment](https://github.com/apache/flink/pull/2228). - Make sure that users can directly pass-through options to the underlying KCL `KinesisProducerConfiguration` class [see this comment](https://github.com/apache/flink/pull/4473#issuecomment-320855718). This refers at components exposed at a lower layer (`DataSet` & `DataStream` APIs) and creates tension with the desire to keep property key names unified across different connectors at the Table API & SQL layers. I suggest to do the following: - Keys that are related to flink-level configuration will be exposed with table-api names and re-mapped internally by the factory components (so for example `properties.flink.stream.initpos` will be exposed at the SQL layer as `scan.stream.initpos`. - Keys that are passed directly to `KinesisProducerConfiguration` will be exposed with `kinesis.*` prefix. ########## File path: docs/dev/table/connectors/kinesis.md ########## @@ -0,0 +1,334 @@ +--- +title: "Amazon Kinesis Data Streams SQL Connector" +nav-title: Kinesis +nav-parent_id: sql-connectors +nav-pos: 2 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +<span class="label label-primary">Scan Source: Unbounded</span> +<span class="label label-primary">Sink: Streaming Append Mode</span> + +* This will be replaced by the TOC +{:toc} + +The Kinesis connector allows for reading data from and writing data into [Amazon Kinesis Data Streams (KDS)](https://aws.amazon.com/kinesis/data-streams/). + +Dependencies +------------ + +To use the connector, add the following Maven dependency to your project: + +{% highlight xml %} +<dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-kinesis{{ site.scala_version_suffix }}</artifactId> + <version>{{site.version }}</version> +</dependency> +{% endhighlight %} + +How to create a Kinesis data stream table +----------------------------------------- + +Follow the instructions from the [Amazon KDS Developer Guide](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html) to set up a Kinesis stream. +The following example shows how to create a table backed by a Kinesis data stream: + +<div class="codetabs" markdown="1"> +<div data-lang="SQL" markdown="1"> +{% highlight sql %} +CREATE TABLE KinesisTable ( + user_id BIGINT, + item_id BIGINT, + category_id BIGINT, + behavior STRING, + ts TIMESTAMP(3) +) PARTITIONED BY (user_id, item_id) WITH ( + 'connector' = 'kinesis', + 'stream' = 'user_behavior', + 'properties.aws.region' = 'us-east-2', + 'properties.flink.stream.initpos' = 'LATEST', + 'format' = 'csv' +) +{% endhighlight %} +</div> +</div> + +Available Metadata +------------------ + +The following metadata can be exposed as read-only (`VIRTUAL`) columns in a table definition. + +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 25%">Column Name</th> + <th class="text-center" style="width: 45%">Column Type</th> + <th class="text-center" style="width: 35%">Description</th> + </tr> + </thead> + <tbody> + <tr> + <td><code><a href="https://docs.aws.amazon.com/kinesis/latest/APIReference/API_Record.html#Streams-Type-Record-ApproximateArrivalTimestamp">ApproximateArrivalTimestamp</a></code></td> + <td><code>TIMESTAMP(3) WITH LOCAL TIMEZONE NOT NULL</code></td> + <td>The approximate time that the record was inserted into the stream.</td> + </tr> + <tr> + <td><code><a href="https://docs.aws.amazon.com/kinesis/latest/APIReference/API_Shard.html#Streams-Type-Shard-ShardId">ShardId</a></code></td> + <td><code>VARCHAR(128) NOT NULL</code></td> + <td>The unique identifier of the shard within the stream from which the record was read.</td> + </tr> + <tr> + <td><code><a href="https://docs.aws.amazon.com/kinesis/latest/APIReference/API_Record.html#Streams-Type-Record-SequenceNumber">SequenceNumber</a></code></td> + <td><code>VARCHAR(128) NOT NULL</code></td> + <td>The unique identifier of the record within its shard.</td> + </tr> + </tbody> +</table> + +The extended `CREATE TABLE` example demonstrates the syntax for exposing these metadata columns: + +<div class="codetabs" markdown="1"> +<div data-lang="SQL" markdown="1"> +{% highlight sql %} +CREATE TABLE KinesisTable ( + user_id BIGINT, + item_id BIGINT, + category_id BIGINT, + behavior STRING, + ts TIMESTAMP(3), + arrival_time TIMESTAMP(3) METADATA FROM 'ApproximateArrivalTimestamp' VIRTUAL, Review comment: Done. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
